pipeline: pass signals all the way down

This commit is contained in:
Alex Gleason 2024-01-23 14:35:35 -06:00
parent c6062874bd
commit 77f2e2d940
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
3 changed files with 30 additions and 25 deletions

View File

@ -27,28 +27,28 @@ async function handleEvent(event: DittoEvent): Promise<void> {
const signal = AbortSignal.timeout(5000); const signal = AbortSignal.timeout(5000);
if (!(await verifySignatureWorker(event))) return; if (!(await verifySignatureWorker(event))) return;
const wanted = reqmeister.isWanted(event); const wanted = reqmeister.isWanted(event);
if (await encounterEvent(event)) return; if (await encounterEvent(event, signal)) return;
debug(`NostrEvent<${event.kind}> ${event.id}`); debug(`NostrEvent<${event.kind}> ${event.id}`);
await hydrateEvent(event); await hydrateEvent(event);
await Promise.all([ await Promise.all([
storeEvent(event, { force: wanted }), storeEvent(event, { force: wanted, signal }),
processDeletions(event), processDeletions(event, signal),
trackRelays(event), trackRelays(event),
trackHashtags(event), trackHashtags(event),
fetchRelatedEvents(event, signal), fetchRelatedEvents(event, signal),
processMedia(event), processMedia(event),
payZap(event, signal), payZap(event, signal),
streamOut(event), streamOut(event),
broadcast(event), broadcast(event, signal),
]); ]);
} }
/** Encounter the event, and return whether it has already been encountered. */ /** Encounter the event, and return whether it has already been encountered. */
async function encounterEvent(event: NostrEvent): Promise<boolean> { async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0; const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0;
memorelay.event(event); memorelay.event(event, { signal });
reqmeister.event(event); reqmeister.event(event, { signal });
return preexisting; return preexisting;
} }
@ -62,24 +62,26 @@ async function hydrateEvent(event: DittoEvent): Promise<void> {
const isAdminEvent = ({ pubkey }: NostrEvent): boolean => pubkey === Conf.pubkey; const isAdminEvent = ({ pubkey }: NostrEvent): boolean => pubkey === Conf.pubkey;
interface StoreEventOpts { interface StoreEventOpts {
force?: boolean; force: boolean;
signal: AbortSignal;
} }
/** Maybe store the event, if eligible. */ /** Maybe store the event, if eligible. */
async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise<void> { async function storeEvent(event: DittoEvent, opts: StoreEventOpts): Promise<void> {
if (isEphemeralKind(event.kind)) return; if (isEphemeralKind(event.kind)) return;
const { force = false } = opts; const { force = false, signal } = opts;
if (force || event.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { if (force || event.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const isDeleted = await eventsDB.count( const isDeleted = await eventsDB.count(
[{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }], [{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }],
opts,
) > 0; ) > 0;
if (isDeleted) { if (isDeleted) {
return Promise.reject(new RelayError('blocked', 'event was deleted')); return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else { } else {
await Promise.all([ await Promise.all([
eventsDB.event(event).catch(debug), eventsDB.event(event, { signal }).catch(debug),
updateStats(event).catch(debug), updateStats(event).catch(debug),
]); ]);
} }
@ -89,20 +91,20 @@ async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise
} }
/** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */ /** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */
async function processDeletions(event: NostrEvent): Promise<void> { async function processDeletions(event: NostrEvent, signal: AbortSignal): Promise<void> {
if (event.kind === 5) { if (event.kind === 5) {
const ids = getTagSet(event.tags, 'e'); const ids = getTagSet(event.tags, 'e');
if (event.pubkey === Conf.pubkey) { if (event.pubkey === Conf.pubkey) {
await eventsDB.remove([{ ids: [...ids] }]); await eventsDB.remove([{ ids: [...ids] }], { signal });
} else { } else {
const events = await eventsDB.query([{ const events = await eventsDB.query(
ids: [...ids], [{ ids: [...ids], authors: [event.pubkey] }],
authors: [event.pubkey], { signal },
}]); );
const deleteIds = events.map(({ id }) => id); const deleteIds = events.map(({ id }) => id);
await eventsDB.remove([{ ids: deleteIds }]); await eventsDB.remove([{ ids: deleteIds }], { signal });
} }
} }
} }
@ -148,7 +150,7 @@ function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) {
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
} }
for (const [name, id, relay] of event.tags) { for (const [name, id, relay] of event.tags) {
if (name === 'e' && !memorelay.count([{ ids: [id] }])) { if (name === 'e' && !memorelay.count([{ ids: [id] }], { signal })) {
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
} }
} }
@ -223,11 +225,11 @@ function streamOut(event: NostrEvent) {
* Publish the event to other relays. * Publish the event to other relays.
* This should only be done in certain circumstances, like mentioning a user or publishing deletions. * This should only be done in certain circumstances, like mentioning a user or publishing deletions.
*/ */
function broadcast(event: DittoEvent) { function broadcast(event: DittoEvent, signal: AbortSignal) {
if (!event.user || !isFresh(event)) return; if (!event.user || !isFresh(event)) return;
if (event.kind === 5) { if (event.kind === 5) {
client.event(event); client.event(event, { signal });
} }
} }

View File

@ -8,6 +8,7 @@ import { type DittoFilter } from '@/interfaces/DittoFilter.ts';
import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { isNostrId, isURL } from '@/utils.ts'; import { isNostrId, isURL } from '@/utils.ts';
import { abortError } from '@/utils/abort.ts';
/** Function to decide whether or not to index a tag. */ /** Function to decide whether or not to index a tag. */
type TagCondition = ({ event, count, value }: { type TagCondition = ({ event, count, value }: {
@ -63,7 +64,7 @@ class EventsDB implements NStore {
} }
/** Insert an event (and its tags) into the database. */ /** Insert an event (and its tags) into the database. */
async event(event: NostrEvent): Promise<void> { async event(event: NostrEvent, _opts?: NStoreOpts): Promise<void> {
event = cleanEvent(event); event = cleanEvent(event);
this.#debug('EVENT', JSON.stringify(event)); this.#debug('EVENT', JSON.stringify(event));
@ -336,7 +337,7 @@ class EventsDB implements NStore {
} }
/** Delete events based on filters from the database. */ /** Delete events based on filters from the database. */
async remove(filters: DittoFilter[]): Promise<void> { async remove(filters: DittoFilter[], _opts?: NStoreOpts): Promise<void> {
if (!filters.length) return Promise.resolve(); if (!filters.length) return Promise.resolve();
this.#debug('DELETE', JSON.stringify(filters)); this.#debug('DELETE', JSON.stringify(filters));
@ -344,8 +345,10 @@ class EventsDB implements NStore {
} }
/** Get number of events that would be returned by filters. */ /** Get number of events that would be returned by filters. */
async count(filters: DittoFilter[]): Promise<number> { async count(filters: DittoFilter[], opts: NStoreOpts = {}): Promise<number> {
if (opts.signal?.aborted) return Promise.reject(abortError());
if (!filters.length) return Promise.resolve(0); if (!filters.length) return Promise.resolve(0);
this.#debug('COUNT', JSON.stringify(filters)); this.#debug('COUNT', JSON.stringify(filters));
const query = this.getEventsQuery(filters); const query = this.getEventsQuery(filters);

View File

@ -106,7 +106,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
}); });
} }
event(event: NostrEvent): Promise<void> { event(event: NostrEvent, _opts?: NStoreOpts): Promise<void> {
const filterId = getFilterId(eventToMicroFilter(event)); const filterId = getFilterId(eventToMicroFilter(event));
this.#queue = this.#queue.filter(([id]) => id !== filterId); this.#queue = this.#queue.filter(([id]) => id !== filterId);
this.emit(filterId, event); this.emit(filterId, event);