From e9c5ef89ff984d33312cef7861723f35d9e5222f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 3 May 2024 13:33:50 -0500 Subject: [PATCH] Reqmeister: improve API and fetching logic (untested) --- src/pipeline.ts | 16 +++++++++------- src/storages/reqmeister.ts | 17 ++++++++--------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index a3420fe..f28b886 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -59,7 +59,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { } /** Queue related events to fetch. */ -async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { +async function fetchRelatedEvents(event: DittoEvent) { if (!event.author) { - Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }) - .then((event) => handleEvent(event, AbortSignal.timeout(1000))) + const signal = AbortSignal.timeout(3000); + Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) + .then((events) => events.forEach((event) => handleEvent(event, signal))) .catch(() => {}); } - for (const [name, id, relay] of event.tags) { + for (const [name, id] of event.tags) { if (name === 'e') { const { count } = await Storages.cache.count([{ ids: [id] }]); if (!count) { - Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }) - .then((event) => handleEvent(event, AbortSignal.timeout(1000))) + const signal = AbortSignal.timeout(3000); + Storages.reqmeister.query([{ ids: [id] }], { signal }) + .then((events) => events.forEach((event) => handleEvent(event, signal))) .catch(() => {}); } } diff --git a/src/storages/reqmeister.ts b/src/storages/reqmeister.ts index eede200..e3833d3 100644 --- a/src/storages/reqmeister.ts +++ b/src/storages/reqmeister.ts @@ -82,7 +82,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) this.#perform(); } - req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { + private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { const { relays = [], signal = AbortSignal.timeout(this.#opts.timeout ?? 1000), @@ -120,12 +120,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) return Promise.resolve(); } - isWanted(event: NostrEvent): boolean { - const filterId = getFilterId(eventToMicroFilter(event)); - return this.#queue.some(([id]) => id === filterId); - } - - query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise { + async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise { if (opts?.signal?.aborted) return Promise.reject(abortError()); this.#debug('REQ', JSON.stringify(filters)); @@ -133,12 +128,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) const promises = filters.reduce[]>((result, filter) => { if (isMicrofilter(filter)) { - result.push(this.req(filter, opts)); + result.push(this.fetch(filter, opts)); } return result; }, []); - return Promise.all(promises); + const results = await Promise.allSettled(promises); + + return results + .filter((result): result is PromiseFulfilledResult => result.status === 'fulfilled') + .map((result) => result.value); } }