diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 2921aaa..c0e905b 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -11,6 +11,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; import { Sub } from '@/subs.ts'; import type { AppController } from '@/app.ts'; @@ -70,7 +71,7 @@ function connectStream(socket: WebSocket) { send(['EOSE', subId]); for await (const event of Sub.sub(socket, subId, filters)) { - send(['EVENT', subId, event]); + send(['EVENT', subId, purifyEvent(event)]); } } diff --git a/src/storages/hydrate.test.ts b/src/storages/hydrate.test.ts index d9acfa1..de69008 100644 --- a/src/storages/hydrate.test.ts +++ b/src/storages/hydrate.test.ts @@ -35,22 +35,13 @@ Deno.test('hydrateEvents(): author --- WITHOUT stats', async () => { assertEquals((event1copy as DittoEvent).author, undefined, "Event hasn't been hydrated yet"); - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 1000); - await hydrateEvents({ events: [event1copy], storage: db, - signal: controller.signal, }); const expectedEvent = { ...event1copy, author: event0copy }; assertEquals(event1copy, expectedEvent); - - await db.remove([{ kinds: [0, 1] }]); - assertEquals(await db.query([{ kinds: [0, 1] }]), []); - - clearTimeout(timeoutId); }); Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => { @@ -67,16 +58,12 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => { await db.event(event1repostedCopy); await db.event(event6copy); - assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet"); - assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet"); - - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 1000); + assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet"); + assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet"); await hydrateEvents({ events: [event6copy], storage: db, - signal: controller.signal, }); const expectedEvent6 = { @@ -85,11 +72,6 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => { repost: { ...event1repostedCopy, author: event0madePostCopy }, }; assertEquals(event6copy, expectedEvent6); - - await db.remove([{ kinds: [0, 1, 6] }]); - assertEquals(await db.query([{ kinds: [0, 1, 6] }]), []); - - clearTimeout(timeoutId); }); Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => { @@ -106,13 +88,9 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => { await db.event(event1quoteRepostCopy); await db.event(event1willBeQuoteRepostedCopy); - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 1000); - await hydrateEvents({ events: [event1quoteRepostCopy], storage: db, - signal: controller.signal, }); const expectedEvent1quoteRepost = { @@ -122,11 +100,6 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => { }; assertEquals(event1quoteRepostCopy, expectedEvent1quoteRepost); - - await db.remove([{ kinds: [0, 1] }]); - assertEquals(await db.query([{ kinds: [0, 1] }]), []); - - clearTimeout(timeoutId); }); Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () => { @@ -143,16 +116,12 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () await db.event(event1quoteCopy); await db.event(event6copy); - assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet"); - assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet"); - - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 1000); + assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet"); + assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet"); await hydrateEvents({ events: [event6copy], storage: db, - signal: controller.signal, }); const expectedEvent6 = { @@ -161,9 +130,4 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () repost: { ...event1quoteCopy, author: event0copy, quote_repost: { author: event0copy, ...event1copy } }, }; assertEquals(event6copy, expectedEvent6); - - await db.remove([{ kinds: [0, 1, 6] }]); - assertEquals(await db.query([{ kinds: [0, 1, 6] }]), []); - - clearTimeout(timeoutId); }); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index f65641f..619b798 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -1,137 +1,185 @@ import { NostrEvent, NStore } from '@nostrify/nostrify'; -import { db } from '@/db.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -interface HydrateEventOpts { +import { db } from '@/db.ts'; +import { matchFilter } from '@/deps.ts'; +import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; +import { Conf } from '@/config.ts'; + +interface HydrateOpts { events: DittoEvent[]; storage: NStore; signal?: AbortSignal; } /** Hydrate events using the provided storage. */ -async function hydrateEvents(opts: HydrateEventOpts): Promise { +async function hydrateEvents(opts: HydrateOpts): Promise { const { events, storage, signal } = opts; if (!events.length) { return events; } - const allEventsMap: Map = new Map(events.map((event) => { - return [event.id, structuredClone(event)]; - })); + const cache = [...events]; - const childrenEventsIds = (events.map((event) => { - if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost - if (event.kind === 6) return event.tags.find(([name]) => name === 'e')?.[1]; // possible repost - return; - }).filter(Boolean)) as string[]; + for (const event of await gatherReposts({ events: cache, storage, signal })) { + cache.push(event); + } - if (childrenEventsIds.length > 0) { - const childrenEvents = await storage.query([{ ids: childrenEventsIds }], { signal }); - childrenEvents.forEach((event) => { - allEventsMap.set(event.id, structuredClone(event)); - }); + for (const event of await gatherQuotes({ events: cache, storage, signal })) { + cache.push(event); + } - if (childrenEvents.length > 0) { - const grandChildrenEventsIds = (childrenEvents.map((event) => { - if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost - return; - }).filter(Boolean)) as string[]; - if (grandChildrenEventsIds.length > 0) { - const grandChildrenEvents = await storage.query([{ ids: grandChildrenEventsIds }], { signal }); - grandChildrenEvents.forEach((event) => { - allEventsMap.set(event.id, structuredClone(event)); - }); + for (const event of await gatherAuthors({ events: cache, storage, signal })) { + cache.push(event); + } + + for (const event of await gatherUsers({ events: cache, storage, signal })) { + cache.push(event); + } + + const stats = { + authors: await gatherAuthorStats(cache), + events: await gatherEventStats(cache), + }; + + // Dedupe events. + const results = [...new Map(cache.map((event) => [event.id, event])).values()]; + + // First connect all the events to each-other, then connect the connected events to the original list. + assembleEvents(results, results, stats); + assembleEvents(events, results, stats); + + return events; +} + +/** Connect the events in list `b` to the DittoEvent fields in list `a`. */ +function assembleEvents( + a: DittoEvent[], + b: DittoEvent[], + stats: { authors: DittoTables['author_stats'][]; events: DittoTables['event_stats'][] }, +): DittoEvent[] { + const admin = Conf.pubkey; + + for (const event of a) { + event.author = b.find((e) => matchFilter({ kinds: [0], authors: [event.pubkey] }, e)); + event.user = b.find((e) => matchFilter({ kinds: [30361], authors: [admin], '#d': [event.pubkey] }, e)); + + if (event.kind === 6) { + const id = event.tags.find(([name]) => name === 'e')?.[1]; + if (id) { + event.repost = b.find((e) => matchFilter({ kinds: [1], ids: [id] }, e)); } } - } - await hydrateAuthors({ events: [...allEventsMap.values()], storage, signal }); - await hydrateAuthorStats([...allEventsMap.values()].filter((e) => e.kind === 0)); - await hydrateEventStats([...allEventsMap.values()].filter((e) => e.kind === 1)); - - events.forEach((event) => { - const correspondingEvent = allEventsMap.get(event.id); - if (correspondingEvent?.author) event.author = correspondingEvent.author; - if (correspondingEvent?.author_stats) event.author_stats = correspondingEvent.author_stats; - if (correspondingEvent?.event_stats) event.event_stats = correspondingEvent.event_stats; if (event.kind === 1) { - const quoteId = event.tags.find(([name]) => name === 'q')?.[1]; - if (quoteId) { - event.quote_repost = allEventsMap.get(quoteId); - } - } else if (event.kind === 6) { - const repostedId = event.tags.find(([name]) => name === 'e')?.[1]; - if (repostedId) { - const repostedEvent = allEventsMap.get(repostedId); - if (repostedEvent && repostedEvent.tags.find(([name]) => name === 'q')?.[1]) { // The repost is a repost of a quote repost - const postBeingQuoteRepostedId = repostedEvent.tags.find(([name]) => name === 'q')?.[1]; - event.repost = { - quote_repost: allEventsMap.get(postBeingQuoteRepostedId!), - ...allEventsMap.get(repostedId)!, - }; - } else { // The repost is a repost of a normal post - event.repost = allEventsMap.get(repostedId); - } + const id = event.tags.find(([name]) => name === 'q')?.[1]; + if (id) { + event.quote_repost = b.find((e) => matchFilter({ kinds: [1], ids: [id] }, e)); } } - }); - return events; -} -async function hydrateAuthors(opts: Omit): Promise { - const { events, storage, signal } = opts; - - const pubkeys = new Set([...events].map((event) => event.pubkey)); - const authors = await storage.query([{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], { signal }); - - for (const event of events) { - event.author = authors.find((author) => author.pubkey === event.pubkey); + event.author_stats = stats.authors.find((stats) => stats.pubkey === event.pubkey); + event.event_stats = stats.events.find((stats) => stats.event_id === event.id); } - return events; + return a; } -async function hydrateAuthorStats(events: DittoEvent[]): Promise { - const results = await db +/** Collect reposts from the events. */ +function gatherReposts({ events, storage, signal }: HydrateOpts): Promise { + const ids = new Set(); + + for (const event of events) { + if (event.kind === 6) { + const id = event.tags.find(([name]) => name === 'e')?.[1]; + if (id) { + ids.add(id); + } + } + } + + return storage.query( + [{ ids: [...ids], limit: ids.size }], + { signal }, + ); +} + +/** Collect quotes from the events. */ +function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise { + const ids = new Set(); + + for (const event of events) { + if (event.kind === 1) { + const id = event.tags.find(([name]) => name === 'q')?.[1]; + if (id) { + ids.add(id); + } + } + } + + return storage.query( + [{ ids: [...ids], limit: ids.size }], + { signal }, + ); +} + +/** Collect authors from the events. */ +function gatherAuthors({ events, storage, signal }: HydrateOpts): Promise { + const pubkeys = new Set(events.map((event) => event.pubkey)); + + return storage.query( + [{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], + { signal }, + ); +} + +/** Collect users from the events. */ +function gatherUsers({ events, storage, signal }: HydrateOpts): Promise { + const pubkeys = new Set(events.map((event) => event.pubkey)); + + return storage.query( + [{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }], + { signal }, + ); +} + +/** Collect author stats from the events. */ +function gatherAuthorStats(events: DittoEvent[]): Promise { + const pubkeys = new Set( + events + .filter((event) => event.kind === 0) + .map((event) => event.pubkey), + ); + + if (!pubkeys.size) { + return Promise.resolve([]); + } + + return db .selectFrom('author_stats') .selectAll() - .where('pubkey', 'in', events.map((event) => event.pubkey)) + .where('pubkey', 'in', [...pubkeys]) .execute(); - - for (const event of events) { - const stat = results.find((result) => result.pubkey === event.pubkey); - if (stat) { - event.author_stats = { - followers_count: Math.max(stat.followers_count, 0) || 0, - following_count: Math.max(stat.following_count, 0) || 0, - notes_count: Math.max(stat.notes_count, 0) || 0, - }; - } - } - - return events; } -async function hydrateEventStats(events: DittoEvent[]): Promise { - const results = await db - .selectFrom('event_stats') - .selectAll() - .where('event_id', 'in', events.map((event) => event.id)) - .execute(); +/** Collect event stats from the events. */ +function gatherEventStats(events: DittoEvent[]): Promise { + const ids = new Set( + events + .filter((event) => event.kind === 1) + .map((event) => event.id), + ); - for (const event of events) { - const stat = results.find((result) => result.event_id === event.id); - if (stat) { - event.event_stats = { - replies_count: Math.max(stat.replies_count, 0) || 0, - reposts_count: Math.max(stat.reposts_count, 0) || 0, - reactions_count: Math.max(stat.reactions_count, 0) || 0, - }; - } + if (!ids.size) { + return Promise.resolve([]); } - return events; + return db + .selectFrom('event_stats') + .selectAll() + .where('event_id', 'in', [...ids]) + .execute(); } /** Return a normalized event without any non-standard keys. */