diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8d22d5c..fbe825b 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -9,6 +9,7 @@ import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; +import { getTagSet } from '@/tags.ts'; const debug = Debug('ditto:streaming'); @@ -33,11 +34,12 @@ const streamSchema = z.enum([ type Stream = z.infer; -const streamingController: AppController = (c) => { +const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); const controller = new AbortController(); + const signal = c.req.raw.signal; if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -61,6 +63,16 @@ const streamingController: AppController = (c) => { } } + const mutedUsersSet = new Set(); + if (pubkey) { + const [mutedUsers] = await Storages.admin.query([{ authors: [pubkey], kinds: [10000], limit: 1 }], { signal }); + if (mutedUsers) { + for (const pubkey of getTagSet(mutedUsers.tags, 'p')) { + mutedUsersSet.add(pubkey); + } + } + } + socket.onopen = async () => { if (!stream) return; @@ -72,6 +84,10 @@ const streamingController: AppController = (c) => { if (msg[0] === 'EVENT') { const event = msg[2]; + if (mutedUsersSet.has(event.pubkey)) { + continue; + } + await hydrateEvents({ events: [event], storage: Storages.admin,