From ccfdbfeb8db82ae9ed533d7dda44d306dcc2bfb9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 29 Dec 2023 13:22:51 -0600 Subject: [PATCH] Rework client as EventStore --- src/client.ts | 22 ++++++++++++++++++---- src/db/events.ts | 15 +++++++-------- src/pipeline.ts | 6 +++--- src/pool.ts | 12 ++---------- src/reqmeister.ts | 4 ++-- src/store.ts | 12 ++++++++++-- 6 files changed, 42 insertions(+), 29 deletions(-) diff --git a/src/client.ts b/src/client.ts index 85e430a..fbcfaa5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,13 +1,12 @@ import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; - -import type { GetFiltersOpts } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; const debug = Debug('ditto:client'); /** Get events from a NIP-01 filter. */ -function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { +function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); debug('REQ', JSON.stringify(filters)); @@ -50,4 +49,19 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts }); } -export { getFilters }; +/** Publish an event to the given relays, or the entire pool. */ +function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { + const { relays = activeRelays } = opts; + debug('EVENT', event); + pool.publish(event, relays); + return Promise.resolve(); +} + +const client: EventStore = { + getEvents, + storeEvent, + countEvents: () => Promise.reject(new Error('COUNT not implemented')), + deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')), +}; + +export { client }; diff --git a/src/db/events.ts b/src/db/events.ts index fa051eb..d977368 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -3,8 +3,7 @@ import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; -import { type DittoEvent, EventStore, type GetEventsOpts } from '@/store.ts'; -import { EventData } from '@/types.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; import { isNostrId, isURL } from '@/utils.ts'; const debug = Debug('ditto:db:events'); @@ -12,7 +11,7 @@ const debug = Debug('ditto:db:events'); /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: Event; - data: EventData; + opts: StoreEventOpts; count: number; value: string; }) => boolean; @@ -21,7 +20,7 @@ type TagCondition = ({ event, count, value }: { const tagConditions: Record = { 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), 'e': ({ count, value }) => count < 15 && isNostrId(value), - 'media': ({ count, value, data }) => (data.user || count < 4) && isURL(value), + 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value), 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), 'proxy': ({ count, value }) => count === 0 && isURL(value), 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), @@ -29,7 +28,7 @@ const tagConditions: Record = { }; /** Insert an event (and its tags) into the database. */ -function storeEvent(event: Event, data: EventData): Promise { +function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { debug('EVENT', JSON.stringify(event)); return db.transaction().execute(async (trx) => { @@ -51,7 +50,7 @@ function storeEvent(event: Event, data: EventData): Promise { /** Index event tags depending on the conditions defined above. */ async function indexTags() { - const tags = filterIndexableTags(event, data); + const tags = filterIndexableTags(event, opts); const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); if (!tags.length) return; @@ -302,7 +301,7 @@ async function countEvents(filters: DittoFilter[]): Promise } /** Return only the tags that should be indexed. */ -function filterIndexableTags(event: Event, data: EventData): string[][] { +function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { const tagCounts: Record = {}; function getCount(name: string) { @@ -316,7 +315,7 @@ function filterIndexableTags(event: Event, data: EventData): string[][] { function checkCondition(name: string, value: string, condition: TagCondition) { return condition({ event, - data, + opts, count: getCount(name), value, }); diff --git a/src/pipeline.ts b/src/pipeline.ts index 2ad3c97..76984fd 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,3 +1,4 @@ +import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; import { eventsDB } from '@/db/events.ts'; import { memorelay } from '@/db/memorelay.ts'; @@ -6,7 +7,6 @@ import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; -import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; @@ -78,7 +78,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.storeEvent(event, data).catch(debug), + eventsDB.storeEvent(event, { data }).catch(debug), updateStats(event).catch(debug), ]); } @@ -176,7 +176,7 @@ function broadcast(event: Event, data: EventData) { if (!data.user || !isFresh(event)) return; if (event.kind === 5) { - publish(event); + client.storeEvent(event); } } diff --git a/src/pool.ts b/src/pool.ts index b92f4eb..e3b3082 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -1,7 +1,5 @@ import { getActiveRelays } from '@/db/relays.ts'; -import { Debug, type Event, RelayPoolWorker } from '@/deps.ts'; - -const debug = Debug('ditto:pool'); +import { RelayPoolWorker } from '@/deps.ts'; const activeRelays = await getActiveRelays(); @@ -17,10 +15,4 @@ const pool = new RelayPoolWorker(worker, activeRelays, { logErrorsAndNotices: false, }); -/** Publish an event to the given relays, or the entire pool. */ -function publish(event: Event, relays: string[] = activeRelays) { - debug('publish', event); - return pool.publish(event, relays); -} - -export { activeRelays, pool, publish }; +export { activeRelays, pool }; diff --git a/src/reqmeister.ts b/src/reqmeister.ts index cff8cb9..19cce8d 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,4 +1,4 @@ -import * as client from '@/client.ts'; +import { client } from '@/client.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; import { Time } from '@/utils/time.ts'; @@ -64,7 +64,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an if (filters.length) { debug('REQ', JSON.stringify(filters)); - const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) }); + const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) }); for (const event of events) { this.encounter(event); diff --git a/src/store.ts b/src/store.ts index a41f849..5567406 100644 --- a/src/store.ts +++ b/src/store.ts @@ -13,6 +13,14 @@ interface GetEventsOpts { relays?: WebSocket['url'][]; } +/** Options when storing an event. */ +interface StoreEventOpts { + /** Event data to store. */ + data?: EventData; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; +} + type AuthorStats = Omit; type EventStats = Omit; @@ -26,7 +34,7 @@ interface DittoEvent extends Event { /** Storage interface for Nostr events. */ interface EventStore { /** Add an event to the store. */ - storeEvent(event: Event, data?: EventData): Promise; + storeEvent(event: Event, opts?: StoreEventOpts): Promise; /** Get events from filters. */ getEvents(filters: DittoFilter[], opts?: GetEventsOpts): Promise[]>; /** Get the number of events from filters. */ @@ -35,4 +43,4 @@ interface EventStore { deleteEvents(filters: DittoFilter[]): Promise; } -export type { DittoEvent, EventStore, GetEventsOpts }; +export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts };