From 08da26b12adc5e130fb6f64058d50aae7c92ad70 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 13:41:04 -0600 Subject: [PATCH] reqmeister: accept `signal` --- src/pipeline.ts | 2 +- src/queries.ts | 4 ++-- src/reqmeister.ts | 19 +++++++++++++++---- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 3d065c0..577aaae 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -144,7 +144,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { } for (const [name, id, relay] of event.tags) { if (name === 'e' && !memorelay.hasEventById(id)) { - reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); + reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); } } } diff --git a/src/queries.ts b/src/queries.ts index 7a9a7dd..5f03b6e 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -52,7 +52,7 @@ const getEvent = async ( if (memoryEvent) return memoryEvent; - return await reqmeister.req(microfilter).catch(() => undefined) as Event | undefined; + return await reqmeister.req(microfilter, opts).catch(() => undefined) as Event | undefined; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ @@ -74,7 +74,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise undefined); + return reqmeister.req(microfilter, opts).catch(() => undefined); }; /** Get users the given pubkey follows. */ diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 6019758..cff8cb9 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -10,6 +10,11 @@ interface ReqmeisterOpts { timeout?: number; } +interface ReqmeisterReqOpts { + relays?: WebSocket['url'][]; + signal?: AbortSignal; +} + type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; /** Batches requests to Nostr relays using microfilters. */ @@ -70,15 +75,21 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an this.#perform(); } - req(filter: IdMicrofilter, relays?: WebSocket['url'][]): Promise; - req(filter: AuthorMicrofilter, relays?: WebSocket['url'][]): Promise>; - req(filter: MicroFilter, relays?: WebSocket['url'][]): Promise; - req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { + req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise>; + req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { + const { relays = [], signal } = opts; + if (signal?.aborted) return Promise.reject(new DOMException('Aborted', 'AbortError')); + const filterId = getFilterId(filter); + this.#queue.push([filterId, filter, relays]); + return new Promise((resolve, reject) => { this.once(filterId, resolve); this.#promise.finally(() => setTimeout(reject, 0)); + signal?.addEventListener('abort', () => reject(new DOMException('Aborted', 'AbortError')), { once: true }); }); }