diff --git a/deno.json b/deno.json index b4e834a..6d611c0 100644 --- a/deno.json +++ b/deno.json @@ -19,7 +19,7 @@ "@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.15.0", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.17.1", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", diff --git a/src/filter.ts b/src/filter.ts index 59b0298..fd698c4 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,9 +1,8 @@ import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import stringifyStable from 'fast-stable-stringify'; +import { getFilterLimit } from 'nostr-tools'; import { z } from 'zod'; -import { isReplaceableKind } from '@/kinds.ts'; - /** Microfilter to get one specific event by ID. */ type IdMicrofilter = { ids: [NostrEvent['id']] }; /** Microfilter to get an author. */ @@ -50,22 +49,6 @@ function isMicrofilter(filter: NostrFilter): filter is MicroFilter { return microFilterSchema.safeParse(filter).success; } -/** Calculate the intrinsic limit of a filter. */ -function getFilterLimit(filter: NostrFilter): number { - if (filter.ids && !filter.ids.length) return 0; - if (filter.kinds && !filter.kinds.length) return 0; - if (filter.authors && !filter.authors.length) return 0; - - return Math.min( - Math.max(0, filter.limit ?? Infinity), - filter.ids?.length ?? Infinity, - filter.authors?.length && - filter.kinds?.every((kind) => isReplaceableKind(kind)) - ? filter.authors.length * filter.kinds.length - : Infinity, - ); -} - /** Returns true if the filter could potentially return any stored events at all. */ function canFilter(filter: NostrFilter): boolean { return getFilterLimit(filter) > 0; diff --git a/src/pipeline.ts b/src/pipeline.ts index b193617..f28b886 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -59,7 +59,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { } /** Queue related events to fetch. */ -async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { - if (!event.user) { - Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); +async function fetchRelatedEvents(event: DittoEvent) { + if (!event.author) { + const signal = AbortSignal.timeout(3000); + Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) + .then((events) => events.forEach((event) => handleEvent(event, signal))) + .catch(() => {}); } - for (const [name, id, relay] of event.tags) { + for (const [name, id] of event.tags) { if (name === 'e') { const { count } = await Storages.cache.count([{ ids: [id] }]); if (!count) { - Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); + const signal = AbortSignal.timeout(3000); + Storages.reqmeister.query([{ ids: [id] }], { signal }) + .then((events) => events.forEach((event) => handleEvent(event, signal))) + .catch(() => {}); } } } diff --git a/src/storages.ts b/src/storages.ts index 21fe9d5..a51cbd1 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,7 +1,6 @@ import { NCache } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; -import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Optimizer } from '@/storages/optimizer.ts'; @@ -52,7 +51,6 @@ export class Storages { this._client = new PoolStore({ pool, relays: activeRelays, - publisher: pipeline, }); } return this._client; diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts index 953720b..9f45205 100644 --- a/src/storages/pool-store.ts +++ b/src/storages/pool-store.ts @@ -1,34 +1,35 @@ -import { NostrEvent, NostrFilter, NSet, NStore } from '@nostrify/nostrify'; +import { + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NRelay, + NSet, +} from '@nostrify/nostrify'; +import { Machina } from '@nostrify/nostrify/utils'; import Debug from '@soapbox/stickynotes/debug'; import { RelayPoolWorker } from 'nostr-relaypool'; -import { matchFilters } from 'nostr-tools'; +import { getFilterLimit, matchFilters } from 'nostr-tools'; -import { normalizeFilters } from '@/filter.ts'; +import { Conf } from '@/config.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; import { abortError } from '@/utils/abort.ts'; import { getRelays } from '@/utils/outbox.ts'; -import { Conf } from '@/config.ts'; interface PoolStoreOpts { pool: InstanceType; relays: WebSocket['url'][]; - publisher: { - handleEvent(event: NostrEvent, signal: AbortSignal): Promise; - }; } -class PoolStore implements NStore { - #debug = Debug('ditto:client'); - #pool: InstanceType; - #relays: WebSocket['url'][]; - #publisher: { - handleEvent(event: NostrEvent, signal: AbortSignal): Promise; - }; +class PoolStore implements NRelay { + private debug = Debug('ditto:client'); + private pool: InstanceType; + private relays: WebSocket['url'][]; constructor(opts: PoolStoreOpts) { - this.#pool = opts.pool; - this.#relays = opts.relays; - this.#publisher = opts.publisher; + this.pool = opts.pool; + this.relays = opts.relays; } async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise { @@ -40,58 +41,61 @@ class PoolStore implements NStore { const relays = [...relaySet].slice(0, 4); event = purifyEvent(event); - this.#debug('EVENT', event, relays); + this.debug('EVENT', event, relays); - this.#pool.publish(event, relays); + this.pool.publish(event, relays); return Promise.resolve(); } - query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - if (opts.signal?.aborted) return Promise.reject(abortError()); + async *req( + filters: NostrFilter[], + opts: { signal?: AbortSignal; limit?: number } = {}, + ): AsyncIterable { + this.debug('REQ', JSON.stringify(filters)); - filters = normalizeFilters(filters); - this.#debug('REQ', JSON.stringify(filters)); - if (!filters.length) return Promise.resolve([]); + const uuid = crypto.randomUUID(); + const machina = new Machina(opts.signal); - return new Promise((resolve, reject) => { - const results = new NSet(); + const unsub = this.pool.subscribe( + filters, + this.relays, + (event: NostrEvent | null) => { + if (event && matchFilters(filters, event)) { + machina.push(['EVENT', uuid, purifyEvent(event)]); + } + }, + undefined, + () => { + machina.push(['EOSE', uuid]); + }, + ); - const unsub = this.#pool.subscribe( - filters, - this.#relays, - (event: NostrEvent | null) => { - if (event && matchFilters(filters, event)) { - this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {}); - results.add({ - id: event.id, - kind: event.kind, - pubkey: event.pubkey, - content: event.content, - tags: event.tags, - created_at: event.created_at, - sig: event.sig, - }); - } - if (typeof opts.limit === 'number' && results.size >= opts.limit) { - unsub(); - resolve([...results]); - } - }, - undefined, - () => { - unsub(); - resolve([...results]); - }, - ); + try { + for await (const msg of machina) { + yield msg; + } + } finally { + unsub(); + } + } - const onAbort = () => { - unsub(); - reject(abortError()); - opts.signal?.removeEventListener('abort', onAbort); - }; + async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { + const events = new NSet(); - opts.signal?.addEventListener('abort', onAbort); - }); + const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0); + if (limit === 0) return []; + + for await (const msg of this.req(filters, opts)) { + if (msg[0] === 'EOSE') break; + if (msg[0] === 'EVENT') events.add(msg[2]); + if (msg[0] === 'CLOSED') throw new Error('Subscription closed'); + + if (events.size >= limit) { + break; + } + } + + return [...events]; } } diff --git a/src/storages/reqmeister.ts b/src/storages/reqmeister.ts index eede200..e3833d3 100644 --- a/src/storages/reqmeister.ts +++ b/src/storages/reqmeister.ts @@ -82,7 +82,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) this.#perform(); } - req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { + private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { const { relays = [], signal = AbortSignal.timeout(this.#opts.timeout ?? 1000), @@ -120,12 +120,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) return Promise.resolve(); } - isWanted(event: NostrEvent): boolean { - const filterId = getFilterId(eventToMicroFilter(event)); - return this.#queue.some(([id]) => id === filterId); - } - - query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise { + async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise { if (opts?.signal?.aborted) return Promise.reject(abortError()); this.#debug('REQ', JSON.stringify(filters)); @@ -133,12 +128,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) const promises = filters.reduce[]>((result, filter) => { if (isMicrofilter(filter)) { - result.push(this.req(filter, opts)); + result.push(this.fetch(filter, opts)); } return result; }, []); - return Promise.all(promises); + const results = await Promise.allSettled(promises); + + return results + .filter((result): result is PromiseFulfilledResult => result.status === 'fulfilled') + .map((result) => result.value); } }