diff --git a/src/pipeline.ts b/src/pipeline.ts index 9f30589..80b686e 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -27,28 +27,28 @@ async function handleEvent(event: DittoEvent): Promise { const signal = AbortSignal.timeout(5000); if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); - if (await encounterEvent(event)) return; + if (await encounterEvent(event, signal)) return; debug(`NostrEvent<${event.kind}> ${event.id}`); await hydrateEvent(event); await Promise.all([ - storeEvent(event, { force: wanted }), - processDeletions(event), + storeEvent(event, { force: wanted, signal }), + processDeletions(event, signal), trackRelays(event), trackHashtags(event), fetchRelatedEvents(event, signal), processMedia(event), payZap(event, signal), streamOut(event), - broadcast(event), + broadcast(event, signal), ]); } /** Encounter the event, and return whether it has already been encountered. */ -async function encounterEvent(event: NostrEvent): Promise { +async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise { const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0; - memorelay.event(event); - reqmeister.event(event); + memorelay.event(event, { signal }); + reqmeister.event(event, { signal }); return preexisting; } @@ -62,24 +62,26 @@ async function hydrateEvent(event: DittoEvent): Promise { const isAdminEvent = ({ pubkey }: NostrEvent): boolean => pubkey === Conf.pubkey; interface StoreEventOpts { - force?: boolean; + force: boolean; + signal: AbortSignal; } /** Maybe store the event, if eligible. */ -async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise { +async function storeEvent(event: DittoEvent, opts: StoreEventOpts): Promise { if (isEphemeralKind(event.kind)) return; - const { force = false } = opts; + const { force = false, signal } = opts; if (force || event.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const isDeleted = await eventsDB.count( [{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }], + opts, ) > 0; if (isDeleted) { return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.event(event).catch(debug), + eventsDB.event(event, { signal }).catch(debug), updateStats(event).catch(debug), ]); } @@ -89,20 +91,20 @@ async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise } /** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */ -async function processDeletions(event: NostrEvent): Promise { +async function processDeletions(event: NostrEvent, signal: AbortSignal): Promise { if (event.kind === 5) { const ids = getTagSet(event.tags, 'e'); if (event.pubkey === Conf.pubkey) { - await eventsDB.remove([{ ids: [...ids] }]); + await eventsDB.remove([{ ids: [...ids] }], { signal }); } else { - const events = await eventsDB.query([{ - ids: [...ids], - authors: [event.pubkey], - }]); + const events = await eventsDB.query( + [{ ids: [...ids], authors: [event.pubkey] }], + { signal }, + ); const deleteIds = events.map(({ id }) => id); - await eventsDB.remove([{ ids: deleteIds }]); + await eventsDB.remove([{ ids: deleteIds }], { signal }); } } } @@ -148,7 +150,7 @@ function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); } for (const [name, id, relay] of event.tags) { - if (name === 'e' && !memorelay.count([{ ids: [id] }])) { + if (name === 'e' && !memorelay.count([{ ids: [id] }], { signal })) { reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); } } @@ -223,11 +225,11 @@ function streamOut(event: NostrEvent) { * Publish the event to other relays. * This should only be done in certain circumstances, like mentioning a user or publishing deletions. */ -function broadcast(event: DittoEvent) { +function broadcast(event: DittoEvent, signal: AbortSignal) { if (!event.user || !isFresh(event)) return; if (event.kind === 5) { - client.event(event); + client.event(event, { signal }); } } diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 96f4c1b..c85841d 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -8,6 +8,7 @@ import { type DittoFilter } from '@/interfaces/DittoFilter.ts'; import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { isNostrId, isURL } from '@/utils.ts'; +import { abortError } from '@/utils/abort.ts'; /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { @@ -63,7 +64,7 @@ class EventsDB implements NStore { } /** Insert an event (and its tags) into the database. */ - async event(event: NostrEvent): Promise { + async event(event: NostrEvent, _opts?: NStoreOpts): Promise { event = cleanEvent(event); this.#debug('EVENT', JSON.stringify(event)); @@ -336,7 +337,7 @@ class EventsDB implements NStore { } /** Delete events based on filters from the database. */ - async remove(filters: DittoFilter[]): Promise { + async remove(filters: DittoFilter[], _opts?: NStoreOpts): Promise { if (!filters.length) return Promise.resolve(); this.#debug('DELETE', JSON.stringify(filters)); @@ -344,8 +345,10 @@ class EventsDB implements NStore { } /** Get number of events that would be returned by filters. */ - async count(filters: DittoFilter[]): Promise { + async count(filters: DittoFilter[], opts: NStoreOpts = {}): Promise { + if (opts.signal?.aborted) return Promise.reject(abortError()); if (!filters.length) return Promise.resolve(0); + this.#debug('COUNT', JSON.stringify(filters)); const query = this.getEventsQuery(filters); diff --git a/src/storages/reqmeister.ts b/src/storages/reqmeister.ts index 18cd9b1..f43de71 100644 --- a/src/storages/reqmeister.ts +++ b/src/storages/reqmeister.ts @@ -106,7 +106,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) }); } - event(event: NostrEvent): Promise { + event(event: NostrEvent, _opts?: NStoreOpts): Promise { const filterId = getFilterId(eventToMicroFilter(event)); this.#queue = this.#queue.filter(([id]) => id !== filterId); this.emit(filterId, event);