From b626d75262cc74626f4644a949d598bb253c7d41 Mon Sep 17 00:00:00 2001 From: "P. Reis" Date: Tue, 14 May 2024 14:22:37 -0300 Subject: [PATCH 1/4] fix(streaming): posts from blocked users does not show up in global tab --- src/controllers/api/streaming.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8d22d5c..fbe825b 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -9,6 +9,7 @@ import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; +import { getTagSet } from '@/tags.ts'; const debug = Debug('ditto:streaming'); @@ -33,11 +34,12 @@ const streamSchema = z.enum([ type Stream = z.infer; -const streamingController: AppController = (c) => { +const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); const controller = new AbortController(); + const signal = c.req.raw.signal; if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -61,6 +63,16 @@ const streamingController: AppController = (c) => { } } + const mutedUsersSet = new Set(); + if (pubkey) { + const [mutedUsers] = await Storages.admin.query([{ authors: [pubkey], kinds: [10000], limit: 1 }], { signal }); + if (mutedUsers) { + for (const pubkey of getTagSet(mutedUsers.tags, 'p')) { + mutedUsersSet.add(pubkey); + } + } + } + socket.onopen = async () => { if (!stream) return; @@ -72,6 +84,10 @@ const streamingController: AppController = (c) => { if (msg[0] === 'EVENT') { const event = msg[2]; + if (mutedUsersSet.has(event.pubkey)) { + continue; + } + await hydrateEvents({ events: [event], storage: Storages.admin, From 4d342dff4a48ba68493dbce9de27a5bcdbc2dd28 Mon Sep 17 00:00:00 2001 From: "P. Reis" Date: Tue, 14 May 2024 21:14:00 -0300 Subject: [PATCH 2/4] fix(streaming): move get muted users logic before upgrading connection to web socket --- src/controllers/api/streaming.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index fbe825b..310b529 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -50,6 +50,16 @@ const streamingController: AppController = async (c) => { return c.json({ error: 'Invalid access token' }, 401); } + const mutedUsersSet = new Set(); + if (pubkey) { + const [mutedUsers] = await Storages.admin.query([{ authors: [pubkey], kinds: [10000], limit: 1 }], { signal }); + if (mutedUsers) { + for (const pubkey of getTagSet(mutedUsers.tags, 'p')) { + mutedUsersSet.add(pubkey); + } + } + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token }); function send(name: string, payload: object) { @@ -63,16 +73,6 @@ const streamingController: AppController = async (c) => { } } - const mutedUsersSet = new Set(); - if (pubkey) { - const [mutedUsers] = await Storages.admin.query([{ authors: [pubkey], kinds: [10000], limit: 1 }], { signal }); - if (mutedUsers) { - for (const pubkey of getTagSet(mutedUsers.tags, 'p')) { - mutedUsersSet.add(pubkey); - } - } - } - socket.onopen = async () => { if (!stream) return; From 0383726663886534b2020acd1c97dd6d9a95c157 Mon Sep 17 00:00:00 2001 From: "P. Reis" Date: Tue, 14 May 2024 21:44:19 -0300 Subject: [PATCH 3/4] fix(streaming): use policy instead of hand coding --- src/controllers/api/streaming.ts | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 310b529..8fa23ea 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -9,7 +9,7 @@ import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; -import { getTagSet } from '@/tags.ts'; +import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; const debug = Debug('ditto:streaming'); @@ -34,12 +34,11 @@ const streamSchema = z.enum([ type Stream = z.infer; -const streamingController: AppController = async (c) => { +const streamingController: AppController = (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); const controller = new AbortController(); - const signal = c.req.raw.signal; if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -50,16 +49,6 @@ const streamingController: AppController = async (c) => { return c.json({ error: 'Invalid access token' }, 401); } - const mutedUsersSet = new Set(); - if (pubkey) { - const [mutedUsers] = await Storages.admin.query([{ authors: [pubkey], kinds: [10000], limit: 1 }], { signal }); - if (mutedUsers) { - for (const pubkey of getTagSet(mutedUsers.tags, 'p')) { - mutedUsersSet.add(pubkey); - } - } - } - const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token }); function send(name: string, payload: object) { @@ -84,8 +73,12 @@ const streamingController: AppController = async (c) => { if (msg[0] === 'EVENT') { const event = msg[2]; - if (mutedUsersSet.has(event.pubkey)) { - continue; + if (pubkey) { + const policy = new MuteListPolicy(pubkey, Storages.admin); + const ok = await policy.call(event); + if (ok[2] === false) { + continue; + } } await hydrateEvents({ From a1326dedcc303d1ddb1d36b6e91c0fff41d7dde1 Mon Sep 17 00:00:00 2001 From: "P. Reis" Date: Tue, 14 May 2024 21:53:50 -0300 Subject: [PATCH 4/4] fix(streaming): async storage --- src/controllers/api/streaming.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f1e2b4e..04cfbbc 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -77,7 +77,7 @@ const streamingController: AppController = (c) => { const event = msg[2]; if (pubkey) { - const policy = new MuteListPolicy(pubkey, Storages.admin); + const policy = new MuteListPolicy(pubkey, await Storages.admin()); const ok = await policy.call(event); if (ok[2] === false) { continue;