diff --git a/src/client.ts b/src/client.ts deleted file mode 100644 index 34d4544..0000000 --- a/src/client.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; -import { normalizeFilters } from '@/filter.ts'; -import * as pipeline from '@/pipeline.ts'; -import { activeRelays, pool } from '@/pool.ts'; -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; -import { EventSet } from '@/utils/event-set.ts'; - -const debug = Debug('ditto:client'); - -/** Get events from a NIP-01 filter. */ -function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { - filters = normalizeFilters(filters); - - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - - debug('REQ', JSON.stringify(filters)); - - return new Promise((resolve) => { - const results = new EventSet>(); - - const unsub = pool.subscribe( - filters, - opts.relays ?? activeRelays, - (event: Event | null) => { - if (event && matchFilters(filters, event)) { - pipeline.handleEvent(event).catch(() => {}); - results.add({ - id: event.id, - kind: event.kind as K, - pubkey: event.pubkey, - content: event.content, - tags: event.tags, - created_at: event.created_at, - sig: event.sig, - }); - } - if (typeof opts.limit === 'number' && results.size >= opts.limit) { - unsub(); - resolve([...results]); - } - }, - undefined, - () => { - unsub(); - resolve([...results]); - }, - ); - - opts.signal?.addEventListener('abort', () => { - unsub(); - resolve([...results]); - }); - }); -} - -/** Publish an event to the given relays, or the entire pool. */ -function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { - const { relays = activeRelays } = opts; - const debug = Debug('ditto:client:publish'); - debug('EVENT', event); - pool.publish(event, relays); - return Promise.resolve(); -} - -const client: EventStore = { - supportedNips: [1], - getEvents, - storeEvent, - countEvents: () => Promise.reject(new Error('COUNT not implemented')), - deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')), -}; - -export { client }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 4237652..de0d310 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,4 +1,3 @@ -import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; @@ -7,7 +6,7 @@ import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { updateStats } from '@/stats.ts'; -import { eventsDB, memorelay, reqmeister } from '@/storages.ts'; +import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; import { type EventData } from '@/types.ts'; diff --git a/src/storages.ts b/src/storages.ts index 5cc4ecb..0388518 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,13 +1,22 @@ -import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; +import * as pipeline from '@/pipeline.ts'; +import { activeRelays, pool } from '@/pool.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Memorelay } from '@/storages/memorelay.ts'; import { Optimizer } from '@/storages/optimizer.ts'; +import { PoolStore } from '@/storages/pool-store.ts'; import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; import { Time } from '@/utils/time.ts'; +/** Relay pool storage. */ +const client = new PoolStore({ + pool, + relays: activeRelays, + publisher: pipeline, +}); + /** SQLite database to store events this Ditto server cares about. */ const eventsDB = new EventsDB(db); @@ -34,4 +43,4 @@ const searchStore = new SearchStore({ fallback: optimizer, }); -export { eventsDB, memorelay, optimizer, reqmeister, searchStore }; +export { client, eventsDB, memorelay, optimizer, reqmeister, searchStore }; diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts new file mode 100644 index 0000000..eddeeb8 --- /dev/null +++ b/src/storages/pool-store.ts @@ -0,0 +1,92 @@ +import { Debug, type Event, type Filter, matchFilters, type RelayPoolWorker } from '@/deps.ts'; +import { normalizeFilters } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { EventSet } from '@/utils/event-set.ts'; + +interface PoolStoreOpts { + pool: InstanceType; + relays: WebSocket['url'][]; + publisher: { + handleEvent(event: Event): Promise; + }; +} + +class PoolStore implements EventStore { + #debug = Debug('ditto:client'); + #pool: InstanceType; + #relays: WebSocket['url'][]; + #publisher: { + handleEvent(event: Event): Promise; + }; + + supportedNips = [1]; + + constructor(opts: PoolStoreOpts) { + this.#pool = opts.pool; + this.#relays = opts.relays; + this.#publisher = opts.publisher; + } + + storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { + const { relays = this.#relays } = opts; + this.#debug('EVENT', event); + this.#pool.publish(event, relays); + return Promise.resolve(); + } + + getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { + filters = normalizeFilters(filters); + + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + + this.#debug('REQ', JSON.stringify(filters)); + + return new Promise((resolve) => { + const results = new EventSet>(); + + const unsub = this.#pool.subscribe( + filters, + opts.relays ?? this.#relays, + (event: Event | null) => { + if (event && matchFilters(filters, event)) { + this.#publisher.handleEvent(event).catch(() => {}); + results.add({ + id: event.id, + kind: event.kind as K, + pubkey: event.pubkey, + content: event.content, + tags: event.tags, + created_at: event.created_at, + sig: event.sig, + }); + } + if (typeof opts.limit === 'number' && results.size >= opts.limit) { + unsub(); + resolve([...results]); + } + }, + undefined, + () => { + unsub(); + resolve([...results]); + }, + ); + + opts.signal?.addEventListener('abort', () => { + unsub(); + resolve([...results]); + }); + }); + } + + countEvents() { + return Promise.reject(new Error('COUNT not implemented')); + } + + deleteEvents() { + return Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')); + } +} + +export { PoolStore };