From 705e8e7c312a7e1041697447af3527e22f8e899d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 3 May 2024 13:23:00 -0500 Subject: [PATCH] PoolStore: implement NRelay --- deno.json | 2 +- src/pipeline.ts | 10 ++- src/storages.ts | 2 - src/storages/pool-store.ts | 124 ++++++++++++++++++++----------------- 4 files changed, 75 insertions(+), 63 deletions(-) diff --git a/deno.json b/deno.json index b4e834a..6d611c0 100644 --- a/deno.json +++ b/deno.json @@ -19,7 +19,7 @@ "@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.15.0", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.17.1", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", diff --git a/src/pipeline.ts b/src/pipeline.ts index b193617..a3420fe 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -183,15 +183,19 @@ async function trackHashtags(event: NostrEvent): Promise { /** Queue related events to fetch. */ async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { - if (!event.user) { - Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); + if (!event.author) { + Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }) + .then((event) => handleEvent(event, AbortSignal.timeout(1000))) + .catch(() => {}); } for (const [name, id, relay] of event.tags) { if (name === 'e') { const { count } = await Storages.cache.count([{ ids: [id] }]); if (!count) { - Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); + Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }) + .then((event) => handleEvent(event, AbortSignal.timeout(1000))) + .catch(() => {}); } } } diff --git a/src/storages.ts b/src/storages.ts index 21fe9d5..a51cbd1 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,7 +1,6 @@ import { NCache } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; -import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Optimizer } from '@/storages/optimizer.ts'; @@ -52,7 +51,6 @@ export class Storages { this._client = new PoolStore({ pool, relays: activeRelays, - publisher: pipeline, }); } return this._client; diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts index 953720b..73a10e2 100644 --- a/src/storages/pool-store.ts +++ b/src/storages/pool-store.ts @@ -1,7 +1,16 @@ -import { NostrEvent, NostrFilter, NSet, NStore } from '@nostrify/nostrify'; +import { + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NRelay, + NSet, +} from '@nostrify/nostrify'; +import { Machina } from '@nostrify/nostrify/utils'; import Debug from '@soapbox/stickynotes/debug'; import { RelayPoolWorker } from 'nostr-relaypool'; -import { matchFilters } from 'nostr-tools'; +import { getFilterLimit, matchFilters } from 'nostr-tools'; import { normalizeFilters } from '@/filter.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; @@ -12,23 +21,16 @@ import { Conf } from '@/config.ts'; interface PoolStoreOpts { pool: InstanceType; relays: WebSocket['url'][]; - publisher: { - handleEvent(event: NostrEvent, signal: AbortSignal): Promise; - }; } -class PoolStore implements NStore { - #debug = Debug('ditto:client'); - #pool: InstanceType; - #relays: WebSocket['url'][]; - #publisher: { - handleEvent(event: NostrEvent, signal: AbortSignal): Promise; - }; +class PoolStore implements NRelay { + private debug = Debug('ditto:client'); + private pool: InstanceType; + private relays: WebSocket['url'][]; constructor(opts: PoolStoreOpts) { - this.#pool = opts.pool; - this.#relays = opts.relays; - this.#publisher = opts.publisher; + this.pool = opts.pool; + this.relays = opts.relays; } async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise { @@ -40,58 +42,66 @@ class PoolStore implements NStore { const relays = [...relaySet].slice(0, 4); event = purifyEvent(event); - this.#debug('EVENT', event, relays); + this.debug('EVENT', event, relays); - this.#pool.publish(event, relays); + this.pool.publish(event, relays); return Promise.resolve(); } - query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - if (opts.signal?.aborted) return Promise.reject(abortError()); + async *req( + filters: NostrFilter[], + opts: { signal?: AbortSignal; limit?: number } = {}, + ): AsyncIterable { + if (opts.signal?.aborted) return; filters = normalizeFilters(filters); - this.#debug('REQ', JSON.stringify(filters)); - if (!filters.length) return Promise.resolve([]); + if (!filters.length) return; - return new Promise((resolve, reject) => { - const results = new NSet(); + this.debug('REQ', JSON.stringify(filters)); - const unsub = this.#pool.subscribe( - filters, - this.#relays, - (event: NostrEvent | null) => { - if (event && matchFilters(filters, event)) { - this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {}); - results.add({ - id: event.id, - kind: event.kind, - pubkey: event.pubkey, - content: event.content, - tags: event.tags, - created_at: event.created_at, - sig: event.sig, - }); - } - if (typeof opts.limit === 'number' && results.size >= opts.limit) { - unsub(); - resolve([...results]); - } - }, - undefined, - () => { - unsub(); - resolve([...results]); - }, - ); + const uuid = crypto.randomUUID(); + const machina = new Machina(opts.signal); - const onAbort = () => { - unsub(); - reject(abortError()); - opts.signal?.removeEventListener('abort', onAbort); - }; + const unsub = this.pool.subscribe( + filters, + this.relays, + (event: NostrEvent | null) => { + if (event && matchFilters(filters, event)) { + machina.push(['EVENT', uuid, purifyEvent(event)]); + } + }, + undefined, + () => { + machina.push(['EOSE', uuid]); + }, + ); - opts.signal?.addEventListener('abort', onAbort); - }); + try { + for await (const msg of machina) { + yield msg; + } + } finally { + unsub(); + } + } + + async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { + const events = new NSet(); + + const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0); + if (limit === 0) return []; + + for await (const msg of this.req(filters, opts)) { + if (msg[0] === 'EOSE') break; + if (msg[0] === 'EVENT') events.add(msg[2]); + if (msg[0] === 'CLOSED') throw new Error('Subscription closed'); + + if (events.size >= limit) { + break; + } + } + + return [...events]; } }