Merge branch 'hydrate-again' into 'main'
Rewrite hydrateEvents again See merge request soapbox-pub/ditto!180
This commit is contained in:
commit
2bffb667c4
|
@ -11,6 +11,7 @@ import {
|
||||||
clientMsgSchema,
|
clientMsgSchema,
|
||||||
type ClientREQ,
|
type ClientREQ,
|
||||||
} from '@/schemas/nostr.ts';
|
} from '@/schemas/nostr.ts';
|
||||||
|
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
|
|
||||||
import type { AppController } from '@/app.ts';
|
import type { AppController } from '@/app.ts';
|
||||||
|
@ -70,7 +71,7 @@ function connectStream(socket: WebSocket) {
|
||||||
send(['EOSE', subId]);
|
send(['EOSE', subId]);
|
||||||
|
|
||||||
for await (const event of Sub.sub(socket, subId, filters)) {
|
for await (const event of Sub.sub(socket, subId, filters)) {
|
||||||
send(['EVENT', subId, event]);
|
send(['EVENT', subId, purifyEvent(event)]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,22 +35,13 @@ Deno.test('hydrateEvents(): author --- WITHOUT stats', async () => {
|
||||||
|
|
||||||
assertEquals((event1copy as DittoEvent).author, undefined, "Event hasn't been hydrated yet");
|
assertEquals((event1copy as DittoEvent).author, undefined, "Event hasn't been hydrated yet");
|
||||||
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeoutId = setTimeout(() => controller.abort(), 1000);
|
|
||||||
|
|
||||||
await hydrateEvents({
|
await hydrateEvents({
|
||||||
events: [event1copy],
|
events: [event1copy],
|
||||||
storage: db,
|
storage: db,
|
||||||
signal: controller.signal,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const expectedEvent = { ...event1copy, author: event0copy };
|
const expectedEvent = { ...event1copy, author: event0copy };
|
||||||
assertEquals(event1copy, expectedEvent);
|
assertEquals(event1copy, expectedEvent);
|
||||||
|
|
||||||
await db.remove([{ kinds: [0, 1] }]);
|
|
||||||
assertEquals(await db.query([{ kinds: [0, 1] }]), []);
|
|
||||||
|
|
||||||
clearTimeout(timeoutId);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
|
Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
|
||||||
|
@ -67,16 +58,12 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
|
||||||
await db.event(event1repostedCopy);
|
await db.event(event1repostedCopy);
|
||||||
await db.event(event6copy);
|
await db.event(event6copy);
|
||||||
|
|
||||||
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet");
|
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet");
|
||||||
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet");
|
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet");
|
||||||
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeoutId = setTimeout(() => controller.abort(), 1000);
|
|
||||||
|
|
||||||
await hydrateEvents({
|
await hydrateEvents({
|
||||||
events: [event6copy],
|
events: [event6copy],
|
||||||
storage: db,
|
storage: db,
|
||||||
signal: controller.signal,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const expectedEvent6 = {
|
const expectedEvent6 = {
|
||||||
|
@ -85,11 +72,6 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
|
||||||
repost: { ...event1repostedCopy, author: event0madePostCopy },
|
repost: { ...event1repostedCopy, author: event0madePostCopy },
|
||||||
};
|
};
|
||||||
assertEquals(event6copy, expectedEvent6);
|
assertEquals(event6copy, expectedEvent6);
|
||||||
|
|
||||||
await db.remove([{ kinds: [0, 1, 6] }]);
|
|
||||||
assertEquals(await db.query([{ kinds: [0, 1, 6] }]), []);
|
|
||||||
|
|
||||||
clearTimeout(timeoutId);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => {
|
Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => {
|
||||||
|
@ -106,13 +88,9 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => {
|
||||||
await db.event(event1quoteRepostCopy);
|
await db.event(event1quoteRepostCopy);
|
||||||
await db.event(event1willBeQuoteRepostedCopy);
|
await db.event(event1willBeQuoteRepostedCopy);
|
||||||
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeoutId = setTimeout(() => controller.abort(), 1000);
|
|
||||||
|
|
||||||
await hydrateEvents({
|
await hydrateEvents({
|
||||||
events: [event1quoteRepostCopy],
|
events: [event1quoteRepostCopy],
|
||||||
storage: db,
|
storage: db,
|
||||||
signal: controller.signal,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const expectedEvent1quoteRepost = {
|
const expectedEvent1quoteRepost = {
|
||||||
|
@ -122,11 +100,6 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => {
|
||||||
};
|
};
|
||||||
|
|
||||||
assertEquals(event1quoteRepostCopy, expectedEvent1quoteRepost);
|
assertEquals(event1quoteRepostCopy, expectedEvent1quoteRepost);
|
||||||
|
|
||||||
await db.remove([{ kinds: [0, 1] }]);
|
|
||||||
assertEquals(await db.query([{ kinds: [0, 1] }]), []);
|
|
||||||
|
|
||||||
clearTimeout(timeoutId);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () => {
|
Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () => {
|
||||||
|
@ -143,16 +116,12 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async ()
|
||||||
await db.event(event1quoteCopy);
|
await db.event(event1quoteCopy);
|
||||||
await db.event(event6copy);
|
await db.event(event6copy);
|
||||||
|
|
||||||
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't been hydrated author yet");
|
assertEquals((event6copy as DittoEvent).author, undefined, "Event hasn't hydrated author yet");
|
||||||
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't been hydrated repost yet");
|
assertEquals((event6copy as DittoEvent).repost, undefined, "Event hasn't hydrated repost yet");
|
||||||
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeoutId = setTimeout(() => controller.abort(), 1000);
|
|
||||||
|
|
||||||
await hydrateEvents({
|
await hydrateEvents({
|
||||||
events: [event6copy],
|
events: [event6copy],
|
||||||
storage: db,
|
storage: db,
|
||||||
signal: controller.signal,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const expectedEvent6 = {
|
const expectedEvent6 = {
|
||||||
|
@ -161,9 +130,4 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async ()
|
||||||
repost: { ...event1quoteCopy, author: event0copy, quote_repost: { author: event0copy, ...event1copy } },
|
repost: { ...event1quoteCopy, author: event0copy, quote_repost: { author: event0copy, ...event1copy } },
|
||||||
};
|
};
|
||||||
assertEquals(event6copy, expectedEvent6);
|
assertEquals(event6copy, expectedEvent6);
|
||||||
|
|
||||||
await db.remove([{ kinds: [0, 1, 6] }]);
|
|
||||||
assertEquals(await db.query([{ kinds: [0, 1, 6] }]), []);
|
|
||||||
|
|
||||||
clearTimeout(timeoutId);
|
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,137 +1,185 @@
|
||||||
import { NostrEvent, NStore } from '@nostrify/nostrify';
|
import { NostrEvent, NStore } from '@nostrify/nostrify';
|
||||||
import { db } from '@/db.ts';
|
|
||||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
|
||||||
|
|
||||||
interface HydrateEventOpts {
|
import { db } from '@/db.ts';
|
||||||
|
import { matchFilter } from '@/deps.ts';
|
||||||
|
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||||
|
import { DittoTables } from '@/db/DittoTables.ts';
|
||||||
|
import { Conf } from '@/config.ts';
|
||||||
|
|
||||||
|
interface HydrateOpts {
|
||||||
events: DittoEvent[];
|
events: DittoEvent[];
|
||||||
storage: NStore;
|
storage: NStore;
|
||||||
signal?: AbortSignal;
|
signal?: AbortSignal;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Hydrate events using the provided storage. */
|
/** Hydrate events using the provided storage. */
|
||||||
async function hydrateEvents(opts: HydrateEventOpts): Promise<DittoEvent[]> {
|
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
|
||||||
const { events, storage, signal } = opts;
|
const { events, storage, signal } = opts;
|
||||||
|
|
||||||
if (!events.length) {
|
if (!events.length) {
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
const allEventsMap: Map<string, DittoEvent> = new Map(events.map((event) => {
|
const cache = [...events];
|
||||||
return [event.id, structuredClone(event)];
|
|
||||||
}));
|
|
||||||
|
|
||||||
const childrenEventsIds = (events.map((event) => {
|
for (const event of await gatherReposts({ events: cache, storage, signal })) {
|
||||||
if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost
|
cache.push(event);
|
||||||
if (event.kind === 6) return event.tags.find(([name]) => name === 'e')?.[1]; // possible repost
|
}
|
||||||
return;
|
|
||||||
}).filter(Boolean)) as string[];
|
|
||||||
|
|
||||||
if (childrenEventsIds.length > 0) {
|
for (const event of await gatherQuotes({ events: cache, storage, signal })) {
|
||||||
const childrenEvents = await storage.query([{ ids: childrenEventsIds }], { signal });
|
cache.push(event);
|
||||||
childrenEvents.forEach((event) => {
|
}
|
||||||
allEventsMap.set(event.id, structuredClone(event));
|
|
||||||
});
|
|
||||||
|
|
||||||
if (childrenEvents.length > 0) {
|
for (const event of await gatherAuthors({ events: cache, storage, signal })) {
|
||||||
const grandChildrenEventsIds = (childrenEvents.map((event) => {
|
cache.push(event);
|
||||||
if (event.kind === 1) return event.tags.find(([name]) => name === 'q')?.[1]; // possible quote repost
|
}
|
||||||
return;
|
|
||||||
}).filter(Boolean)) as string[];
|
for (const event of await gatherUsers({ events: cache, storage, signal })) {
|
||||||
if (grandChildrenEventsIds.length > 0) {
|
cache.push(event);
|
||||||
const grandChildrenEvents = await storage.query([{ ids: grandChildrenEventsIds }], { signal });
|
}
|
||||||
grandChildrenEvents.forEach((event) => {
|
|
||||||
allEventsMap.set(event.id, structuredClone(event));
|
const stats = {
|
||||||
});
|
authors: await gatherAuthorStats(cache),
|
||||||
|
events: await gatherEventStats(cache),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Dedupe events.
|
||||||
|
const results = [...new Map(cache.map((event) => [event.id, event])).values()];
|
||||||
|
|
||||||
|
// First connect all the events to each-other, then connect the connected events to the original list.
|
||||||
|
assembleEvents(results, results, stats);
|
||||||
|
assembleEvents(events, results, stats);
|
||||||
|
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Connect the events in list `b` to the DittoEvent fields in list `a`. */
|
||||||
|
function assembleEvents(
|
||||||
|
a: DittoEvent[],
|
||||||
|
b: DittoEvent[],
|
||||||
|
stats: { authors: DittoTables['author_stats'][]; events: DittoTables['event_stats'][] },
|
||||||
|
): DittoEvent[] {
|
||||||
|
const admin = Conf.pubkey;
|
||||||
|
|
||||||
|
for (const event of a) {
|
||||||
|
event.author = b.find((e) => matchFilter({ kinds: [0], authors: [event.pubkey] }, e));
|
||||||
|
event.user = b.find((e) => matchFilter({ kinds: [30361], authors: [admin], '#d': [event.pubkey] }, e));
|
||||||
|
|
||||||
|
if (event.kind === 6) {
|
||||||
|
const id = event.tags.find(([name]) => name === 'e')?.[1];
|
||||||
|
if (id) {
|
||||||
|
event.repost = b.find((e) => matchFilter({ kinds: [1], ids: [id] }, e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
await hydrateAuthors({ events: [...allEventsMap.values()], storage, signal });
|
|
||||||
await hydrateAuthorStats([...allEventsMap.values()].filter((e) => e.kind === 0));
|
|
||||||
await hydrateEventStats([...allEventsMap.values()].filter((e) => e.kind === 1));
|
|
||||||
|
|
||||||
events.forEach((event) => {
|
|
||||||
const correspondingEvent = allEventsMap.get(event.id);
|
|
||||||
if (correspondingEvent?.author) event.author = correspondingEvent.author;
|
|
||||||
if (correspondingEvent?.author_stats) event.author_stats = correspondingEvent.author_stats;
|
|
||||||
if (correspondingEvent?.event_stats) event.event_stats = correspondingEvent.event_stats;
|
|
||||||
|
|
||||||
if (event.kind === 1) {
|
if (event.kind === 1) {
|
||||||
const quoteId = event.tags.find(([name]) => name === 'q')?.[1];
|
const id = event.tags.find(([name]) => name === 'q')?.[1];
|
||||||
if (quoteId) {
|
if (id) {
|
||||||
event.quote_repost = allEventsMap.get(quoteId);
|
event.quote_repost = b.find((e) => matchFilter({ kinds: [1], ids: [id] }, e));
|
||||||
}
|
|
||||||
} else if (event.kind === 6) {
|
|
||||||
const repostedId = event.tags.find(([name]) => name === 'e')?.[1];
|
|
||||||
if (repostedId) {
|
|
||||||
const repostedEvent = allEventsMap.get(repostedId);
|
|
||||||
if (repostedEvent && repostedEvent.tags.find(([name]) => name === 'q')?.[1]) { // The repost is a repost of a quote repost
|
|
||||||
const postBeingQuoteRepostedId = repostedEvent.tags.find(([name]) => name === 'q')?.[1];
|
|
||||||
event.repost = {
|
|
||||||
quote_repost: allEventsMap.get(postBeingQuoteRepostedId!),
|
|
||||||
...allEventsMap.get(repostedId)!,
|
|
||||||
};
|
|
||||||
} else { // The repost is a repost of a normal post
|
|
||||||
event.repost = allEventsMap.get(repostedId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
return events;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function hydrateAuthors(opts: Omit<HydrateEventOpts, 'relations'>): Promise<DittoEvent[]> {
|
event.author_stats = stats.authors.find((stats) => stats.pubkey === event.pubkey);
|
||||||
const { events, storage, signal } = opts;
|
event.event_stats = stats.events.find((stats) => stats.event_id === event.id);
|
||||||
|
|
||||||
const pubkeys = new Set([...events].map((event) => event.pubkey));
|
|
||||||
const authors = await storage.query([{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], { signal });
|
|
||||||
|
|
||||||
for (const event of events) {
|
|
||||||
event.author = authors.find((author) => author.pubkey === event.pubkey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return events;
|
return a;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function hydrateAuthorStats(events: DittoEvent[]): Promise<DittoEvent[]> {
|
/** Collect reposts from the events. */
|
||||||
const results = await db
|
function gatherReposts({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
|
||||||
|
const ids = new Set<string>();
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
if (event.kind === 6) {
|
||||||
|
const id = event.tags.find(([name]) => name === 'e')?.[1];
|
||||||
|
if (id) {
|
||||||
|
ids.add(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return storage.query(
|
||||||
|
[{ ids: [...ids], limit: ids.size }],
|
||||||
|
{ signal },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Collect quotes from the events. */
|
||||||
|
function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
|
||||||
|
const ids = new Set<string>();
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
if (event.kind === 1) {
|
||||||
|
const id = event.tags.find(([name]) => name === 'q')?.[1];
|
||||||
|
if (id) {
|
||||||
|
ids.add(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return storage.query(
|
||||||
|
[{ ids: [...ids], limit: ids.size }],
|
||||||
|
{ signal },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Collect authors from the events. */
|
||||||
|
function gatherAuthors({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
|
||||||
|
const pubkeys = new Set(events.map((event) => event.pubkey));
|
||||||
|
|
||||||
|
return storage.query(
|
||||||
|
[{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }],
|
||||||
|
{ signal },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Collect users from the events. */
|
||||||
|
function gatherUsers({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> {
|
||||||
|
const pubkeys = new Set(events.map((event) => event.pubkey));
|
||||||
|
|
||||||
|
return storage.query(
|
||||||
|
[{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }],
|
||||||
|
{ signal },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Collect author stats from the events. */
|
||||||
|
function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> {
|
||||||
|
const pubkeys = new Set<string>(
|
||||||
|
events
|
||||||
|
.filter((event) => event.kind === 0)
|
||||||
|
.map((event) => event.pubkey),
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!pubkeys.size) {
|
||||||
|
return Promise.resolve([]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return db
|
||||||
.selectFrom('author_stats')
|
.selectFrom('author_stats')
|
||||||
.selectAll()
|
.selectAll()
|
||||||
.where('pubkey', 'in', events.map((event) => event.pubkey))
|
.where('pubkey', 'in', [...pubkeys])
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
for (const event of events) {
|
|
||||||
const stat = results.find((result) => result.pubkey === event.pubkey);
|
|
||||||
if (stat) {
|
|
||||||
event.author_stats = {
|
|
||||||
followers_count: Math.max(stat.followers_count, 0) || 0,
|
|
||||||
following_count: Math.max(stat.following_count, 0) || 0,
|
|
||||||
notes_count: Math.max(stat.notes_count, 0) || 0,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return events;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function hydrateEventStats(events: DittoEvent[]): Promise<DittoEvent[]> {
|
/** Collect event stats from the events. */
|
||||||
const results = await db
|
function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> {
|
||||||
.selectFrom('event_stats')
|
const ids = new Set<string>(
|
||||||
.selectAll()
|
events
|
||||||
.where('event_id', 'in', events.map((event) => event.id))
|
.filter((event) => event.kind === 1)
|
||||||
.execute();
|
.map((event) => event.id),
|
||||||
|
);
|
||||||
|
|
||||||
for (const event of events) {
|
if (!ids.size) {
|
||||||
const stat = results.find((result) => result.event_id === event.id);
|
return Promise.resolve([]);
|
||||||
if (stat) {
|
|
||||||
event.event_stats = {
|
|
||||||
replies_count: Math.max(stat.replies_count, 0) || 0,
|
|
||||||
reposts_count: Math.max(stat.reposts_count, 0) || 0,
|
|
||||||
reactions_count: Math.max(stat.reactions_count, 0) || 0,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return events;
|
return db
|
||||||
|
.selectFrom('event_stats')
|
||||||
|
.selectAll()
|
||||||
|
.where('event_id', 'in', [...ids])
|
||||||
|
.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return a normalized event without any non-standard keys. */
|
/** Return a normalized event without any non-standard keys. */
|
||||||
|
|
Loading…
Reference in New Issue