Reqmeister: improve API and fetching logic (untested)

This commit is contained in:
Alex Gleason 2024-05-03 13:33:50 -05:00
parent 6b20104327
commit e9c5ef89ff
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
2 changed files with 17 additions and 16 deletions

View File

@ -59,7 +59,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
processDeletions(event, signal), processDeletions(event, signal),
DVM.event(event), DVM.event(event),
trackHashtags(event), trackHashtags(event),
fetchRelatedEvents(event, signal), fetchRelatedEvents(event),
processMedia(event), processMedia(event),
payZap(event, signal), payZap(event, signal),
streamOut(event), streamOut(event),
@ -182,19 +182,21 @@ async function trackHashtags(event: NostrEvent): Promise<void> {
} }
/** Queue related events to fetch. */ /** Queue related events to fetch. */
async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { async function fetchRelatedEvents(event: DittoEvent) {
if (!event.author) { if (!event.author) {
Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }) const signal = AbortSignal.timeout(3000);
.then((event) => handleEvent(event, AbortSignal.timeout(1000))) Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal })
.then((events) => events.forEach((event) => handleEvent(event, signal)))
.catch(() => {}); .catch(() => {});
} }
for (const [name, id, relay] of event.tags) { for (const [name, id] of event.tags) {
if (name === 'e') { if (name === 'e') {
const { count } = await Storages.cache.count([{ ids: [id] }]); const { count } = await Storages.cache.count([{ ids: [id] }]);
if (!count) { if (!count) {
Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }) const signal = AbortSignal.timeout(3000);
.then((event) => handleEvent(event, AbortSignal.timeout(1000))) Storages.reqmeister.query([{ ids: [id] }], { signal })
.then((events) => events.forEach((event) => handleEvent(event, signal)))
.catch(() => {}); .catch(() => {});
} }
} }

View File

@ -82,7 +82,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
this.#perform(); this.#perform();
} }
req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<NostrEvent> { private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<NostrEvent> {
const { const {
relays = [], relays = [],
signal = AbortSignal.timeout(this.#opts.timeout ?? 1000), signal = AbortSignal.timeout(this.#opts.timeout ?? 1000),
@ -120,12 +120,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
return Promise.resolve(); return Promise.resolve();
} }
isWanted(event: NostrEvent): boolean { async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
const filterId = getFilterId(eventToMicroFilter(event));
return this.#queue.some(([id]) => id === filterId);
}
query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
if (opts?.signal?.aborted) return Promise.reject(abortError()); if (opts?.signal?.aborted) return Promise.reject(abortError());
this.#debug('REQ', JSON.stringify(filters)); this.#debug('REQ', JSON.stringify(filters));
@ -133,12 +128,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
const promises = filters.reduce<Promise<NostrEvent>[]>((result, filter) => { const promises = filters.reduce<Promise<NostrEvent>[]>((result, filter) => {
if (isMicrofilter(filter)) { if (isMicrofilter(filter)) {
result.push(this.req(filter, opts)); result.push(this.fetch(filter, opts));
} }
return result; return result;
}, []); }, []);
return Promise.all(promises); const results = await Promise.allSettled(promises);
return results
.filter((result): result is PromiseFulfilledResult<NostrEvent> => result.status === 'fulfilled')
.map((result) => result.value);
} }
} }