Support streaming notifications

This commit is contained in:
Alex Gleason 2024-06-09 15:31:14 -05:00
parent 2245263011
commit 42fac52e9e
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
2 changed files with 44 additions and 34 deletions

View File

@ -1,4 +1,4 @@
import { NostrFilter } from '@nostrify/nostrify'; import { NostrEvent, NostrFilter } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import { z } from 'zod'; import { z } from 'zod';
@ -11,6 +11,7 @@ import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts';
const debug = Debug('ditto:streaming'); const debug = Debug('ditto:streaming');
@ -52,6 +53,11 @@ const streamingController: AppController = async (c) => {
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const store = await Storages.db();
const pubsub = await Storages.pubsub();
const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined;
function send(name: string, payload: object) { function send(name: string, payload: object) {
if (socket.readyState === WebSocket.OPEN) { if (socket.readyState === WebSocket.OPEN) {
debug('send', name, JSON.stringify(payload)); debug('send', name, JSON.stringify(payload));
@ -63,52 +69,54 @@ const streamingController: AppController = async (c) => {
} }
} }
socket.onopen = async () => { async function sub(type: string, filters: NostrFilter[], render: (event: NostrEvent) => Promise<unknown>) {
if (!stream) return;
const filter = await topicToFilter(stream, c.req.query(), pubkey);
if (!filter) return;
try { try {
const db = await Storages.db(); for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
const pubsub = await Storages.pubsub();
for await (const msg of pubsub.req([filter], { signal: controller.signal })) {
if (msg[0] === 'EVENT') { if (msg[0] === 'EVENT') {
const event = msg[2]; const event = msg[2];
if (pubkey) { if (policy) {
const policy = new MuteListPolicy(pubkey, await Storages.admin());
const [, , ok] = await policy.call(event); const [, , ok] = await policy.call(event);
if (!ok) { if (!ok) {
continue; continue;
} }
} }
await hydrateEvents({ await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) });
events: [event],
store: db,
signal: AbortSignal.timeout(1000),
});
if (event.kind === 1) { const result = await render(event);
const status = await renderStatus(event, { viewerPubkey: pubkey });
if (status) {
send('update', status);
}
}
if (event.kind === 6) { if (result) {
const status = await renderReblog(event, { viewerPubkey: pubkey }); send(type, result);
if (status) {
send('update', status);
}
} }
} }
} }
} catch (e) { } catch (e) {
debug('streaming error:', e); debug('streaming error:', e);
} }
}
socket.onopen = async () => {
if (!stream) return;
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
if (topicFilter) {
sub('update', [topicFilter], async (event) => {
if (event.kind === 1) {
return await renderStatus(event, { viewerPubkey: pubkey });
}
if (event.kind === 6) {
return await renderReblog(event, { viewerPubkey: pubkey });
}
});
}
if (['user', 'user:notification'].includes(stream) && pubkey) {
sub('notification', [{ '#p': [pubkey] }], async (event) => {
return await renderNotification(event, { viewerPubkey: pubkey });
});
return;
}
}; };
socket.onclose = () => { socket.onclose = () => {

View File

@ -1,8 +1,10 @@
import { NostrEvent } from '@nostrify/nostrify';
import { Conf } from '@/config.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { nostrDate } from '@/utils.ts'; import { nostrDate } from '@/utils.ts';
import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts'; import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
import { NostrEvent } from '@nostrify/nostrify';
interface RenderNotificationOpts { interface RenderNotificationOpts {
viewerPubkey: string; viewerPubkey: string;
@ -27,7 +29,7 @@ function renderNotification(event: DittoEvent, opts: RenderNotificationOpts) {
return renderReaction(event, opts); return renderReaction(event, opts);
} }
if (event.kind === 30360) { if (event.kind === 30360 && event.pubkey === Conf.pubkey) {
return renderNameGrant(event); return renderNameGrant(event);
} }
} }
@ -49,7 +51,7 @@ async function renderReblog(event: DittoEvent, opts: RenderNotificationOpts) {
if (event.repost?.kind !== 1) return; if (event.repost?.kind !== 1) return;
const status = await renderStatus(event.repost, opts); const status = await renderStatus(event.repost, opts);
if (!status) return; if (!status) return;
const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey);
return { return {
id: notificationId(event), id: notificationId(event),
@ -64,7 +66,7 @@ async function renderFavourite(event: DittoEvent, opts: RenderNotificationOpts)
if (event.reacted?.kind !== 1) return; if (event.reacted?.kind !== 1) return;
const status = await renderStatus(event.reacted, opts); const status = await renderStatus(event.reacted, opts);
if (!status) return; if (!status) return;
const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey);
return { return {
id: notificationId(event), id: notificationId(event),
@ -79,7 +81,7 @@ async function renderReaction(event: DittoEvent, opts: RenderNotificationOpts) {
if (event.reacted?.kind !== 1) return; if (event.reacted?.kind !== 1) return;
const status = await renderStatus(event.reacted, opts); const status = await renderStatus(event.reacted, opts);
if (!status) return; if (!status) return;
const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey);
return { return {
id: notificationId(event), id: notificationId(event),