Rewrite hydrateEvents

This commit is contained in:
Alex Gleason 2024-04-23 19:31:40 -05:00
parent cbf1e8f280
commit 0aab3eb775
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
3 changed files with 130 additions and 103 deletions

View File

@ -11,6 +11,7 @@ import {
clientMsgSchema, clientMsgSchema,
type ClientREQ, type ClientREQ,
} from '@/schemas/nostr.ts'; } from '@/schemas/nostr.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
import { Sub } from '@/subs.ts'; import { Sub } from '@/subs.ts';
import type { AppController } from '@/app.ts'; import type { AppController } from '@/app.ts';
@ -70,7 +71,7 @@ function connectStream(socket: WebSocket) {
send(['EOSE', subId]); send(['EOSE', subId]);
for await (const event of Sub.sub(socket, subId, filters)) { for await (const event of Sub.sub(socket, subId, filters)) {
send(['EVENT', subId, event]); send(['EVENT', subId, purifyEvent(event)]);
} }
} }

View File

@ -67,8 +67,8 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
await db.event(event1repostedCopy); await db.event(event1repostedCopy);
await db.event(event6copy); await db.event(event6copy);
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet"); assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet");
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet"); assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet");
const controller = new AbortController(); const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 1000); const timeoutId = setTimeout(() => controller.abort(), 1000);
@ -143,8 +143,8 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async ()
await db.event(event1quoteCopy); await db.event(event1quoteCopy);
await db.event(event6copy); await db.event(event6copy);
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet"); assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet");
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet"); assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet");
const controller = new AbortController(); const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 1000); const timeoutId = setTimeout(() => controller.abort(), 1000);

View File

@ -1,137 +1,163 @@
import { NostrEvent, NStore } from '@nostrify/nostrify'; import { NostrEvent, NStore } from '@nostrify/nostrify';
import { db } from '@/db.ts'; import { db } from '@/db.ts';
import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Conf } from '@/config.ts';
interface HydrateEventOpts { interface HydrateOpts {
events: DittoEvent[]; events: DittoEvent[];
storage: NStore; storage: NStore;
signal?: AbortSignal; signal?: AbortSignal;
} }
/** Hydrate events using the provided storage. */ /** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateEventOpts): Promise<DittoEvent[]> { async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, storage, signal } = opts; const { events, storage, signal } = opts;
if (!events.length) { if (!events.length) {
return events; return events;
} }
const allEventsMap: Map<string, DittoEvent> = new Map(events.map((event) => { const cache = [...events];
return [event.id, structuredClone(event)];
}));
const childrenEventsIds = (events.map((event) => { for (const event of await gatherReposts({ events, storage, signal })) {
if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost cache.push(event);
if (event.kind === 6) return event.tags.find(([name]) => name === 'e')?.[1]; // possible repost
return;
}).filter(Boolean)) as string[];
if (childrenEventsIds.length > 0) {
const childrenEvents = await storage.query([{ ids: childrenEventsIds }], { signal });
childrenEvents.forEach((event) => {
allEventsMap.set(event.id, structuredClone(event));
});
if (childrenEvents.length > 0) {
const grandChildrenEventsIds = (childrenEvents.map((event) => {
if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost
return;
}).filter(Boolean)) as string[];
if (grandChildrenEventsIds.length > 0) {
const grandChildrenEvents = await storage.query([{ ids: grandChildrenEventsIds }], { signal });
grandChildrenEvents.forEach((event) => {
allEventsMap.set(event.id, structuredClone(event));
});
} }
}
}
await hydrateAuthors({ events: [...allEventsMap.values()], storage, signal });
await hydrateAuthorStats([...allEventsMap.values()].filter((e) => e.kind === 0));
await hydrateEventStats([...allEventsMap.values()].filter((e) => e.kind === 1));
events.forEach((event) => { for (const event of await gatherQuotes({ events, storage, signal })) {
const correspondingEvent = allEventsMap.get(event.id); cache.push(event);
if (correspondingEvent?.author) event.author = correspondingEvent.author; }
if (correspondingEvent?.author_stats) event.author_stats = correspondingEvent.author_stats;
if (correspondingEvent?.event_stats) event.event_stats = correspondingEvent.event_stats; for (const event of await gatherAuthors({ events, storage, signal })) {
cache.push(event);
}
for (const event of await gatherUsers({ events, storage, signal })) {
cache.push(event);
}
const stats = {
authors: await gatherAuthorStats(cache),
events: await gatherEventStats(cache),
};
assembleEvents(cache, cache, stats);
assembleEvents(events, cache, stats);
return events;
}
function assembleEvents(
a: DittoEvent[],
b: DittoEvent[],
stats: { authors: DittoTables['author_stats'][]; events: DittoTables['event_stats'][] },
): DittoEvent[] {
const admin = Conf.pubkey;
for (const event of a) {
if (event.kind === 6) {
const id = event.tags.find(([name]) => name === 'e')?.[1];
event.repost = b.find((e) => e.kind === 1 && id === e.id);
}
if (event.kind === 1) { if (event.kind === 1) {
const quoteId = event.tags.find(([name]) => name === 'q')?.[1]; const id = event.tags.find(([name]) => name === 'q')?.[1];
if (quoteId) { event.quote_repost = b.find((e) => e.kind === 1 && id === e.id);
event.quote_repost = allEventsMap.get(quoteId);
} }
} else if (event.kind === 6) {
const repostedId = event.tags.find(([name]) => name === 'e')?.[1]; event.author = b.find((e) => e.kind === 0 && e.pubkey === event.pubkey);
if (repostedId) { event.author_stats = stats.authors.find((stats) => stats.pubkey === event.pubkey);
const repostedEvent = allEventsMap.get(repostedId); event.event_stats = stats.events.find((stats) => stats.event_id === event.id);
if (repostedEvent && repostedEvent.tags.find(([name]) => name === 'q')?.[1]) { // The repost is a repost of a quote repost
const postBeingQuoteRepostedId = repostedEvent.tags.find(([name]) => name === 'q')?.[1]; event.user = b.find((e) =>
event.repost = { e.kind === 30361 && e.pubkey === admin && e.tags.find(([name]) => name === 'd')?.[1] === event.pubkey
quote_repost: allEventsMap.get(postBeingQuoteRepostedId!), );
...allEventsMap.get(repostedId)!,
};
} else { // The repost is a repost of a normal post
event.repost = allEventsMap.get(repostedId);
} }
}
} return a;
});
return events;
} }
async function hydrateAuthors(opts: Omit<HydrateEventOpts, 'relations'>): Promise<DittoEvent[]> { function gatherReposts({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
const { events, storage, signal } = opts; const ids = new Set<string>();
const pubkeys = new Set([...events].map((event) => event.pubkey));
const authors = await storage.query([{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], { signal });
for (const event of events) { for (const event of events) {
event.author = authors.find((author) => author.pubkey === event.pubkey); if (event.kind === 6) {
const id = event.tags.find(([name]) => name === 'e')?.[1];
if (id) {
ids.add(id);
}
}
} }
return events; return storage.query(
[{ ids: [...ids], limit: ids.size }],
{ signal },
);
} }
async function hydrateAuthorStats(events: DittoEvent[]): Promise<DittoEvent[]> { function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
const results = await db const ids = new Set<string>();
for (const event of events) {
if (event.kind === 1) {
const id = event.tags.find(([name]) => name === 'q')?.[1];
if (id) {
ids.add(id);
}
}
}
return storage.query(
[{ ids: [...ids], limit: ids.size }],
{ signal },
);
}
function gatherAuthors({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
const pubkeys = new Set(events.map((event) => event.pubkey));
return storage.query(
[{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }],
{ signal },
);
}
function gatherUsers({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
const pubkeys = new Set(events.map((event) => event.pubkey));
return storage.query(
[{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }],
{ signal },
);
}
function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> {
const pubkeys = new Set<string>(
events
.filter((event) => event.kind === 0)
.map((event) => event.pubkey),
);
return db
.selectFrom('author_stats') .selectFrom('author_stats')
.selectAll() .selectAll()
.where('pubkey', 'in', events.map((event) => event.pubkey)) .where('pubkey', 'in', [...pubkeys])
.execute(); .execute();
for (const event of events) {
const stat = results.find((result) => result.pubkey === event.pubkey);
if (stat) {
event.author_stats = {
followers_count: Math.max(stat.followers_count, 0) || 0,
following_count: Math.max(stat.following_count, 0) || 0,
notes_count: Math.max(stat.notes_count, 0) || 0,
};
}
}
return events;
} }
async function hydrateEventStats(events: DittoEvent[]): Promise<DittoEvent[]> { function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> {
const results = await db const ids = new Set<string>(
events
.filter((event) => event.kind === 1)
.map((event) => event.id),
);
return db
.selectFrom('event_stats') .selectFrom('event_stats')
.selectAll() .selectAll()
.where('event_id', 'in', events.map((event) => event.id)) .where('event_id', 'in', [...ids])
.execute(); .execute();
for (const event of events) {
const stat = results.find((result) => result.event_id === event.id);
if (stat) {
event.event_stats = {
replies_count: Math.max(stat.replies_count, 0) || 0,
reposts_count: Math.max(stat.reposts_count, 0) || 0,
reactions_count: Math.max(stat.reactions_count, 0) || 0,
};
}
}
return events;
} }
/** Return a normalized event without any non-standard keys. */ /** Return a normalized event without any non-standard keys. */