diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 427c350..552ea3b 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,4 +1,4 @@ -import { NostrFilter } from '@nostrify/nostrify'; +import { NostrEvent, NostrFilter } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; import { z } from 'zod'; @@ -11,6 +11,7 @@ import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; +import { renderNotification } from '@/views/mastodon/notifications.ts'; const debug = Debug('ditto:streaming'); @@ -52,6 +53,11 @@ const streamingController: AppController = async (c) => { const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); + const store = await Storages.db(); + const pubsub = await Storages.pubsub(); + + const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; + function send(name: string, payload: object) { if (socket.readyState === WebSocket.OPEN) { debug('send', name, JSON.stringify(payload)); @@ -63,52 +69,54 @@ const streamingController: AppController = async (c) => { } } - socket.onopen = async () => { - if (!stream) return; - - const filter = await topicToFilter(stream, c.req.query(), pubkey); - if (!filter) return; - + async function sub(type: string, filters: NostrFilter[], render: (event: NostrEvent) => Promise) { try { - const db = await Storages.db(); - const pubsub = await Storages.pubsub(); - - for await (const msg of pubsub.req([filter], { signal: controller.signal })) { + for await (const msg of pubsub.req(filters, { signal: controller.signal })) { if (msg[0] === 'EVENT') { const event = msg[2]; - if (pubkey) { - const policy = new MuteListPolicy(pubkey, await Storages.admin()); + if (policy) { const [, , ok] = await policy.call(event); if (!ok) { continue; } } - await hydrateEvents({ - events: [event], - store: db, - signal: AbortSignal.timeout(1000), - }); + await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) }); - if (event.kind === 1) { - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); - } - } + const result = await render(event); - if (event.kind === 6) { - const status = await renderReblog(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); - } + if (result) { + send(type, result); } } } } catch (e) { debug('streaming error:', e); } + } + + socket.onopen = async () => { + if (!stream) return; + const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); + + if (topicFilter) { + sub('update', [topicFilter], async (event) => { + if (event.kind === 1) { + return await renderStatus(event, { viewerPubkey: pubkey }); + } + if (event.kind === 6) { + return await renderReblog(event, { viewerPubkey: pubkey }); + } + }); + } + + if (['user', 'user:notification'].includes(stream) && pubkey) { + sub('notification', [{ '#p': [pubkey] }], async (event) => { + return await renderNotification(event, { viewerPubkey: pubkey }); + }); + return; + } }; socket.onclose = () => { diff --git a/src/views/mastodon/notifications.ts b/src/views/mastodon/notifications.ts index e11d45a..8f2a8a6 100644 --- a/src/views/mastodon/notifications.ts +++ b/src/views/mastodon/notifications.ts @@ -1,8 +1,10 @@ +import { NostrEvent } from '@nostrify/nostrify'; + +import { Conf } from '@/config.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { nostrDate } from '@/utils.ts'; import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; -import { NostrEvent } from '@nostrify/nostrify'; interface RenderNotificationOpts { viewerPubkey: string; @@ -27,7 +29,7 @@ function renderNotification(event: DittoEvent, opts: RenderNotificationOpts) { return renderReaction(event, opts); } - if (event.kind === 30360) { + if (event.kind === 30360 && event.pubkey === Conf.pubkey) { return renderNameGrant(event); } } @@ -49,7 +51,7 @@ async function renderReblog(event: DittoEvent, opts: RenderNotificationOpts) { if (event.repost?.kind !== 1) return; const status = await renderStatus(event.repost, opts); if (!status) return; - const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); + const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey); return { id: notificationId(event), @@ -64,7 +66,7 @@ async function renderFavourite(event: DittoEvent, opts: RenderNotificationOpts) if (event.reacted?.kind !== 1) return; const status = await renderStatus(event.reacted, opts); if (!status) return; - const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); + const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey); return { id: notificationId(event), @@ -79,7 +81,7 @@ async function renderReaction(event: DittoEvent, opts: RenderNotificationOpts) { if (event.reacted?.kind !== 1) return; const status = await renderStatus(event.reacted, opts); if (!status) return; - const account = event.author ? await renderAccount(event.author) : accountFromPubkey(event.pubkey); + const account = event.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey); return { id: notificationId(event),