From c6062874bd73656c17af611fb17eba0239c2fb5e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 23 Jan 2024 14:06:16 -0600 Subject: [PATCH] Implement NStore interface from NLib --- scripts/db.ts | 2 +- src/controllers/api/accounts.ts | 8 ++-- src/controllers/api/admin.ts | 4 +- src/controllers/api/blocks.ts | 2 +- src/controllers/api/bookmarks.ts | 2 +- src/controllers/api/instance.ts | 2 +- src/controllers/api/notifications.ts | 2 +- src/controllers/api/pleroma.ts | 6 +-- src/controllers/api/search.ts | 4 +- src/controllers/api/timelines.ts | 2 +- src/controllers/nostr/relay-info.ts | 2 +- src/controllers/nostr/relay.ts | 2 +- src/db/users.ts | 2 +- src/deps.ts | 4 +- src/pipeline.ts | 16 ++++---- src/queries.ts | 18 ++++----- src/stats.ts | 2 +- src/storages/events-db.test.ts | 44 ++++++++++----------- src/storages/events-db.ts | 15 +++----- src/storages/hydrate.ts | 6 +-- src/storages/memorelay.test.ts | 4 +- src/storages/memorelay.ts | 39 ++++++++++--------- src/storages/optimizer.ts | 57 +++++++++++++--------------- src/storages/pool-store.ts | 44 +++++++++++++-------- src/storages/reqmeister.ts | 33 ++++++++-------- src/storages/search-store.ts | 35 ++++++++--------- src/storages/types.ts | 34 ----------------- src/utils/abort.ts | 6 +++ src/utils/api.ts | 2 +- src/views.ts | 8 ++-- src/views/mastodon/relationships.ts | 2 +- src/views/mastodon/statuses.ts | 2 +- 32 files changed, 193 insertions(+), 218 deletions(-) delete mode 100644 src/storages/types.ts create mode 100644 src/utils/abort.ts diff --git a/scripts/db.ts b/scripts/db.ts index 7893fd5..f15e67a 100644 --- a/scripts/db.ts +++ b/scripts/db.ts @@ -39,6 +39,6 @@ async function usersToEvents() { created_at: Math.floor(new Date(row.inserted_at).getTime() / 1000), }); - await eventsDB.add(event); + await eventsDB.event(event); } } diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index f7904af..4525a0f 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -134,7 +134,7 @@ const accountStatusesController: AppController = async (c) => { const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query()); if (pinned) { - const [pinEvent] = await eventsDB.filter([{ kinds: [10001], authors: [pubkey], limit: 1 }]); + const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }]); if (pinEvent) { const pinnedEventIds = getTagSet(pinEvent.tags, 'e'); return renderStatuses(c, [...pinnedEventIds].reverse()); @@ -156,7 +156,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await eventsDB.filter([filter]); + let events = await eventsDB.query([filter]); if (exclude_replies) { events = events.filter((event) => !findReplyTag(event.tags)); @@ -293,7 +293,7 @@ const favouritesController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const params = paginationSchema.parse(c.req.query()); - const events7 = await eventsDB.filter( + const events7 = await eventsDB.query( [{ kinds: [7], authors: [pubkey], ...params }], { signal: AbortSignal.timeout(1000) }, ); @@ -302,7 +302,7 @@ const favouritesController: AppController = async (c) => { .map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1]) .filter((id): id is string => !!id); - const events1 = await eventsDB.filter( + const events1 = await eventsDB.query( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { signal: AbortSignal.timeout(1000), diff --git a/src/controllers/api/admin.ts b/src/controllers/api/admin.ts index 32a9733..77c1ed3 100644 --- a/src/controllers/api/admin.ts +++ b/src/controllers/api/admin.ts @@ -39,9 +39,9 @@ const adminAccountsController: AppController = async (c) => { const { since, until, limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]); + const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]); const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!); - const authors = await eventsDB.filter([{ kinds: [0], authors: pubkeys }]); + const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }]); for (const event of events) { const d = event.tags.find(([name]) => name === 'd')?.[1]; diff --git a/src/controllers/api/blocks.ts b/src/controllers/api/blocks.ts index 8422915..c2085b1 100644 --- a/src/controllers/api/blocks.ts +++ b/src/controllers/api/blocks.ts @@ -7,7 +7,7 @@ import { renderAccounts } from '@/views.ts'; const blocksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; - const [event10000] = await eventsDB.filter([ + const [event10000] = await eventsDB.query([ { kinds: [10000], authors: [pubkey], limit: 1 }, ]); diff --git a/src/controllers/api/bookmarks.ts b/src/controllers/api/bookmarks.ts index 5722d97..bf383ac 100644 --- a/src/controllers/api/bookmarks.ts +++ b/src/controllers/api/bookmarks.ts @@ -7,7 +7,7 @@ import { renderStatuses } from '@/views.ts'; const bookmarksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; - const [event10003] = await eventsDB.filter([ + const [event10003] = await eventsDB.query([ { kinds: [10003], authors: [pubkey], limit: 1 }, ]); diff --git a/src/controllers/api/instance.ts b/src/controllers/api/instance.ts index 8e0630b..6a668bf 100644 --- a/src/controllers/api/instance.ts +++ b/src/controllers/api/instance.ts @@ -6,7 +6,7 @@ import { eventsDB } from '@/storages.ts'; const instanceController: AppController = async (c) => { const { host, protocol } = Conf.url; - const [event] = await eventsDB.filter([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); + const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); const meta = jsonServerMetaSchema.parse(event?.content); /** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */ diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index c618a6a..0a0745a 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -7,7 +7,7 @@ const notificationsController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const { since, until } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.filter( + const events = await eventsDB.query( [{ kinds: [1], '#p': [pubkey], since, until }], { signal: AbortSignal.timeout(3000) }, ); diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index b9656da..9a5b087 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -8,7 +8,7 @@ import { createAdminEvent } from '@/utils/api.ts'; import { jsonSchema } from '@/schema.ts'; const frontendConfigController: AppController = async (c) => { - const [event] = await eventsDB.filter([{ + const [event] = await eventsDB.query([{ kinds: [30078], authors: [Conf.pubkey], '#d': ['pub.ditto.pleroma.config'], @@ -36,7 +36,7 @@ const frontendConfigController: AppController = async (c) => { const configController: AppController = async (c) => { const { pubkey } = Conf; - const [event] = await eventsDB.filter([{ + const [event] = await eventsDB.query([{ kinds: [30078], authors: [pubkey], '#d': ['pub.ditto.pleroma.config'], @@ -54,7 +54,7 @@ const configController: AppController = async (c) => { const updateConfigController: AppController = async (c) => { const { pubkey } = Conf; - const [event] = await eventsDB.filter([{ + const [event] = await eventsDB.query([{ kinds: [30078], authors: [pubkey], '#d': ['pub.ditto.pleroma.config'], diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index 87da478..30a0811 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -78,7 +78,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: Abort filter.authors = [account_id]; } - return searchStore.filter([filter], { signal }); + return searchStore.query([filter], { signal }); } /** Get event kinds to search from `type` query param. */ @@ -96,7 +96,7 @@ function typeToKinds(type: SearchQuery['type']): number[] { /** Resolve a searched value into an event, if applicable. */ async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise { const filters = await getLookupFilters(query, signal); - const [event] = await searchStore.filter(filters, { limit: 1, signal }); + const [event] = await searchStore.query(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index 35579dd..f76a3ac 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -32,7 +32,7 @@ const hashtagTimelineController: AppController = (c) => { /** Render statuses for timelines. */ async function renderStatuses(c: AppContext, filters: DittoFilter[], signal = AbortSignal.timeout(1000)) { - const events = await eventsDB.filter( + const events = await eventsDB.query( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), { signal }, ); diff --git a/src/controllers/nostr/relay-info.ts b/src/controllers/nostr/relay-info.ts index 60c2f8b..ed3e8b6 100644 --- a/src/controllers/nostr/relay-info.ts +++ b/src/controllers/nostr/relay-info.ts @@ -4,7 +4,7 @@ import { jsonServerMetaSchema } from '@/schemas/nostr.ts'; import { eventsDB } from '@/storages.ts'; const relayInfoController: AppController = async (c) => { - const [event] = await eventsDB.filter([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); + const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); const meta = jsonServerMetaSchema.parse(event?.content); return c.json({ diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index bab09dc..5361c79 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -63,7 +63,7 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); - for (const event of await eventsDB.filter(filters, { limit: FILTER_LIMIT })) { + for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } diff --git a/src/db/users.ts b/src/db/users.ts index 4b6f9e8..f57472a 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -65,7 +65,7 @@ async function findUser(user: Partial): Promise { } } - const [event] = await eventsDB.filter([filter]); + const [event] = await eventsDB.query([filter]); if (event) { return { diff --git a/src/deps.ts b/src/deps.ts index 8b2f47d..6827815 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -91,6 +91,8 @@ export { NIP05, type NostrEvent, type NostrFilter, -} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/5d711597f3b2a163817cc1fb0f1f3ce8cede7cf7/mod.ts'; + type NStore, + type NStoreOpts, +} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/057ecc6e2ce813db6e2279288fbfd08c5b53cc0c/mod.ts'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/pipeline.ts b/src/pipeline.ts index 4804a9d..9f30589 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -47,14 +47,14 @@ async function handleEvent(event: DittoEvent): Promise { /** Encounter the event, and return whether it has already been encountered. */ async function encounterEvent(event: NostrEvent): Promise { const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0; - memorelay.add(event); - reqmeister.add(event); + memorelay.event(event); + reqmeister.event(event); return preexisting; } /** Hydrate the event with the user, if applicable. */ async function hydrateEvent(event: DittoEvent): Promise { - const [user] = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], limit: 1 }]); + const [user] = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], limit: 1 }]); event.user = user; } @@ -79,7 +79,7 @@ async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.add(event).catch(debug), + eventsDB.event(event).catch(debug), updateStats(event).catch(debug), ]); } @@ -94,15 +94,15 @@ async function processDeletions(event: NostrEvent): Promise { const ids = getTagSet(event.tags, 'e'); if (event.pubkey === Conf.pubkey) { - await eventsDB.deleteFilters([{ ids: [...ids] }]); + await eventsDB.remove([{ ids: [...ids] }]); } else { - const events = await eventsDB.filter([{ + const events = await eventsDB.query([{ ids: [...ids], authors: [event.pubkey], }]); const deleteIds = events.map(({ id }) => id); - await eventsDB.deleteFilters([{ ids: deleteIds }]); + await eventsDB.remove([{ ids: deleteIds }]); } } } @@ -227,7 +227,7 @@ function broadcast(event: DittoEvent) { if (!event.user || !isFresh(event)) return; if (event.kind === 5) { - client.add(event); + client.event(event); } } diff --git a/src/queries.ts b/src/queries.ts index 885cf44..585d026 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -25,7 +25,7 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - const [memoryEvent] = await memorelay.filter([microfilter], opts) as DittoEvent[]; + const [memoryEvent] = await memorelay.query([microfilter], opts) as DittoEvent[]; if (memoryEvent && !relations) { debug(`getEvent: ${id.slice(0, 8)} found in memory`); @@ -37,13 +37,13 @@ const getEvent = async ( filter.kinds = [kind]; } - const dbEvent = await eventsDB.filter([filter], { limit: 1, signal }) + const dbEvent = await eventsDB.query([filter], { limit: 1, signal }) .then(([event]) => event); // TODO: make this DRY-er. if (dbEvent && !dbEvent.author) { - const [author] = await memorelay.filter([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); + const [author] = await memorelay.query([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); dbEvent.author = author; } @@ -53,7 +53,7 @@ const getEvent = async ( } if (memoryEvent && !memoryEvent.author) { - const [author] = await memorelay.filter([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); + const [author] = await memorelay.query([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); memoryEvent.author = author; } @@ -77,13 +77,13 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise event); @@ -96,7 +96,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise => { - const [event] = await eventsDB.filter([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); + const [event] = await eventsDB.query([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; @@ -132,7 +132,7 @@ async function getAncestors(event: NostrEvent, result: NostrEvent[] = []): Promi } function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise { - return eventsDB.filter( + return eventsDB.query( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], { limit: 200, signal }, ); @@ -140,7 +140,7 @@ function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Pr /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { - const [event] = await eventsDB.filter([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); + const [event] = await eventsDB.query([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); return Boolean(event); } diff --git a/src/stats.ts b/src/stats.ts index df7f63c..aebb705 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -126,7 +126,7 @@ function eventStatsQuery(diffs: EventStatDiff[]) { /** Get the last version of the event, if any. */ async function maybeGetPrev(event: NostrEvent): Promise { - const [prev] = await eventsDB.filter([ + const [prev] = await eventsDB.query([ { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, ]); diff --git a/src/storages/events-db.test.ts b/src/storages/events-db.test.ts index 9b56171..5b09af3 100644 --- a/src/storages/events-db.test.ts +++ b/src/storages/events-db.test.ts @@ -11,36 +11,36 @@ const eventsDB = new EventsDB(db); Deno.test('count filters', async () => { assertEquals(await eventsDB.count([{ kinds: [1] }]), 0); - await eventsDB.add(event1); + await eventsDB.event(event1); assertEquals(await eventsDB.count([{ kinds: [1] }]), 1); }); Deno.test('insert and filter events', async () => { - await eventsDB.add(event1); + await eventsDB.event(event1); - assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]); - assertEquals(await eventsDB.filter([{ kinds: [3] }]), []); - assertEquals(await eventsDB.filter([{ since: 1691091000 }]), [event1]); - assertEquals(await eventsDB.filter([{ until: 1691091000 }]), []); + assertEquals(await eventsDB.query([{ kinds: [1] }]), [event1]); + assertEquals(await eventsDB.query([{ kinds: [3] }]), []); + assertEquals(await eventsDB.query([{ since: 1691091000 }]), [event1]); + assertEquals(await eventsDB.query([{ until: 1691091000 }]), []); assertEquals( - await eventsDB.filter([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), + await eventsDB.query([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), [event1], ); }); Deno.test('delete events', async () => { - await eventsDB.add(event1); - assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]); - await eventsDB.deleteFilters([{ kinds: [1] }]); - assertEquals(await eventsDB.filter([{ kinds: [1] }]), []); + await eventsDB.event(event1); + assertEquals(await eventsDB.query([{ kinds: [1] }]), [event1]); + await eventsDB.remove([{ kinds: [1] }]); + assertEquals(await eventsDB.query([{ kinds: [1] }]), []); }); Deno.test('query events with local filter', async () => { - await eventsDB.add(event1); + await eventsDB.event(event1); - assertEquals(await eventsDB.filter([{}]), [event1]); - assertEquals(await eventsDB.filter([{ local: true }]), []); - assertEquals(await eventsDB.filter([{ local: false }]), [event1]); + assertEquals(await eventsDB.query([{}]), [event1]); + assertEquals(await eventsDB.query([{ local: true }]), []); + assertEquals(await eventsDB.query([{ local: false }]), [event1]); const userEvent = await buildUserEvent({ username: 'alex', @@ -48,20 +48,20 @@ Deno.test('query events with local filter', async () => { inserted_at: new Date(), admin: false, }); - await eventsDB.add(userEvent); + await eventsDB.event(userEvent); - assertEquals(await eventsDB.filter([{ kinds: [1], local: true }]), [event1]); - assertEquals(await eventsDB.filter([{ kinds: [1], local: false }]), []); + assertEquals(await eventsDB.query([{ kinds: [1], local: true }]), [event1]); + assertEquals(await eventsDB.query([{ kinds: [1], local: false }]), []); }); Deno.test('inserting replaceable events', async () => { assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 0); - await eventsDB.add(event0); - await assertRejects(() => eventsDB.add(event0)); + await eventsDB.event(event0); + await assertRejects(() => eventsDB.event(event0)); assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 1); const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 }; - await eventsDB.add(changeEvent); - assertEquals(await eventsDB.filter([{ kinds: [0] }]), [changeEvent]); + await eventsDB.event(changeEvent); + assertEquals(await eventsDB.query([{ kinds: [0] }]), [changeEvent]); }); diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 2413af9..96f4c1b 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -1,6 +1,6 @@ import { Conf } from '@/config.ts'; import { type DittoDB } from '@/db.ts'; -import { Debug, Kysely, type NostrEvent, type SelectQueryBuilder } from '@/deps.ts'; +import { Debug, Kysely, type NostrEvent, type NStore, type NStoreOpts, type SelectQueryBuilder } from '@/deps.ts'; import { cleanEvent } from '@/events.ts'; import { normalizeFilters } from '@/filter.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; @@ -9,8 +9,6 @@ import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { isNostrId, isURL } from '@/utils.ts'; -import { type EventStore, type GetEventsOpts } from './types.ts'; - /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: DittoEvent; @@ -56,19 +54,16 @@ type EventQuery = SelectQueryBuilder; /** SQLite database storage adapter for Nostr events. */ -class EventsDB implements EventStore { +class EventsDB implements NStore { #db: Kysely; #debug = Debug('ditto:db:events'); - /** NIPs supported by this storage method. */ - supportedNips = [1, 45, 50]; - constructor(db: Kysely) { this.#db = db; } /** Insert an event (and its tags) into the database. */ - async add(event: NostrEvent): Promise { + async event(event: NostrEvent): Promise { event = cleanEvent(event); this.#debug('EVENT', JSON.stringify(event)); @@ -268,7 +263,7 @@ class EventsDB implements EventStore { } /** Get events for filters from the database. */ - async filter(filters: DittoFilter[], opts: GetEventsOpts = {}): Promise { + async query(filters: DittoFilter[], opts: NStoreOpts = {}): Promise { filters = normalizeFilters(filters); // Improves performance of `{ kinds: [0], authors: ['...'] }` queries. if (opts.signal?.aborted) return Promise.resolve([]); @@ -341,7 +336,7 @@ class EventsDB implements EventStore { } /** Delete events based on filters from the database. */ - async deleteFilters(filters: DittoFilter[]): Promise { + async remove(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(); this.#debug('DELETE', JSON.stringify(filters)); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 1ce0368..db172f3 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -1,11 +1,11 @@ +import { type NStore } from '@/deps.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { type DittoFilter } from '@/interfaces/DittoFilter.ts'; -import { type EventStore } from '@/storages/types.ts'; interface HydrateEventOpts { events: DittoEvent[]; filters: DittoFilter[]; - storage: EventStore; + storage: NStore; signal?: AbortSignal; } @@ -15,7 +15,7 @@ async function hydrateEvents(opts: HydrateEventOpts): Promise { if (filters.some((filter) => filter.relations?.includes('author'))) { const pubkeys = new Set([...events].map((event) => event.pubkey)); - const authors = await storage.filter([{ kinds: [0], authors: [...pubkeys] }], { signal }); + const authors = await storage.query([{ kinds: [0], authors: [...pubkeys] }], { signal }); for (const event of events) { event.author = authors.find((author) => author.pubkey === event.pubkey); diff --git a/src/storages/memorelay.test.ts b/src/storages/memorelay.test.ts index 536c6eb..ddde6f8 100644 --- a/src/storages/memorelay.test.ts +++ b/src/storages/memorelay.test.ts @@ -13,10 +13,10 @@ const memorelay = new Memorelay({ Deno.test('memorelay', async () => { assertEquals(await memorelay.count([{ ids: [event1.id] }]), 0); - await memorelay.add(event1); + await memorelay.event(event1); assertEquals(await memorelay.count([{ ids: [event1.id] }]), 1); - const result = await memorelay.filter([{ ids: [event1.id] }]); + const result = await memorelay.query([{ ids: [event1.id] }]); assertEquals(result[0], event1); }); diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 7a9d263..2d67173 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,16 +1,21 @@ -import { Debug, LRUCache, matchFilter, type NostrEvent, type NostrFilter, NSet } from '@/deps.ts'; +import { + Debug, + LRUCache, + matchFilter, + type NostrEvent, + type NostrFilter, + NSet, + type NStore, + type NStoreOpts, +} from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; - -import { type EventStore, type GetEventsOpts } from './types.ts'; +import { abortError } from '@/utils/abort.ts'; /** In-memory data store for events. */ -class Memorelay implements EventStore { +class Memorelay implements NStore { #debug = Debug('ditto:memorelay'); #cache: LRUCache; - /** NIPs supported by this storage method. */ - supportedNips = [1, 45]; - constructor(...args: ConstructorParameters>) { this.#cache = new LRUCache(...args); } @@ -25,13 +30,12 @@ class Memorelay implements EventStore { } /** Get events from memory. */ - filter(filters: NostrFilter[], opts: GetEventsOpts = {}): Promise { + query(filters: NostrFilter[], opts: NStoreOpts = {}): Promise { + if (opts.signal?.aborted) return Promise.reject(abortError()); + filters = normalizeFilters(filters); - - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - this.#debug('REQ', JSON.stringify(filters)); + if (!filters.length) return Promise.resolve([]); /** Event results to return. */ const results = new NSet(); @@ -90,20 +94,21 @@ class Memorelay implements EventStore { } /** Insert an event into memory. */ - add(event: NostrEvent): Promise { + event(event: NostrEvent, opts: NStoreOpts = {}): Promise { + if (opts.signal?.aborted) return Promise.reject(abortError()); this.#cache.set(event.id, event); return Promise.resolve(); } /** Count events in memory for the filters. */ - async count(filters: NostrFilter[]): Promise { - const events = await this.filter(filters); + async count(filters: NostrFilter[], opts?: NStoreOpts): Promise { + const events = await this.query(filters, opts); return events.length; } /** Delete events from memory. */ - async deleteFilters(filters: NostrFilter[]): Promise { - for (const event of await this.filter(filters)) { + async remove(filters: NostrFilter[], opts: NStoreOpts): Promise { + for (const event of await this.query(filters, opts)) { this.#cache.delete(event.id); } return Promise.resolve(); diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts index 81d63e0..f3ac342 100644 --- a/src/storages/optimizer.ts +++ b/src/storages/optimizer.ts @@ -1,24 +1,21 @@ -import { Debug, NSet } from '@/deps.ts'; +import { Debug, NSet, type NStore, type NStoreOpts } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { type DittoFilter } from '@/interfaces/DittoFilter.ts'; - -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts'; +import { abortError } from '@/utils/abort.ts'; interface OptimizerOpts { - db: EventStore; - cache: EventStore; - client: EventStore; + db: NStore; + cache: NStore; + client: NStore; } -class Optimizer implements EventStore { +class Optimizer implements NStore { #debug = Debug('ditto:optimizer'); - #db: EventStore; - #cache: EventStore; - #client: EventStore; - - supportedNips = [1]; + #db: NStore; + #cache: NStore; + #client: NStore; constructor(opts: OptimizerOpts) { this.#db = opts.db; @@ -26,25 +23,23 @@ class Optimizer implements EventStore { this.#client = opts.client; } - async add(event: DittoEvent, opts?: StoreEventOpts | undefined): Promise { + async event(event: DittoEvent, opts?: NStoreOpts | undefined): Promise { + if (opts?.signal?.aborted) return Promise.reject(abortError()); + await Promise.all([ - this.#db.add(event, opts), - this.#cache.add(event, opts), + this.#db.event(event, opts), + this.#cache.event(event, opts), ]); } - async filter( - filters: DittoFilter[], - opts: GetEventsOpts | undefined = {}, - ): Promise { - this.#debug('REQ', JSON.stringify(filters)); + async query(filters: DittoFilter[], opts: NStoreOpts = {}): Promise { + if (opts?.signal?.aborted) return Promise.reject(abortError()); - const { limit = Infinity } = opts; filters = normalizeFilters(filters); - - if (opts?.signal?.aborted) return Promise.resolve([]); + this.#debug('REQ', JSON.stringify(filters)); if (!filters.length) return Promise.resolve([]); + const { limit = Infinity } = opts; const results = new NSet(); // Filters with IDs are immutable, so we can take them straight from the cache if we have them. @@ -53,7 +48,7 @@ class Optimizer implements EventStore { if (filter.ids) { this.#debug(`Filter[${i}] is an IDs filter; querying cache...`); const ids = new Set(filter.ids); - for (const event of await this.#cache.filter([filter], opts)) { + for (const event of await this.#cache.query([filter], opts)) { ids.delete(event.id); results.add(event); if (results.size >= limit) return getResults(); @@ -67,7 +62,7 @@ class Optimizer implements EventStore { // Query the database for events. this.#debug('Querying database...'); - for (const dbEvent of await this.#db.filter(filters, opts)) { + for (const dbEvent of await this.#db.query(filters, opts)) { results.add(dbEvent); if (results.size >= limit) return getResults(); } @@ -80,14 +75,14 @@ class Optimizer implements EventStore { // Query the cache again. this.#debug('Querying cache...'); - for (const cacheEvent of await this.#cache.filter(filters, opts)) { + for (const cacheEvent of await this.#cache.query(filters, opts)) { results.add(cacheEvent); if (results.size >= limit) return getResults(); } // Finally, query the client. this.#debug('Querying client...'); - for (const clientEvent of await this.#client.filter(filters, opts)) { + for (const clientEvent of await this.#client.query(filters, opts)) { results.add(clientEvent); if (results.size >= limit) return getResults(); } @@ -100,12 +95,12 @@ class Optimizer implements EventStore { return getResults(); } - countEvents(_filters: DittoFilter[]): Promise { - throw new Error('COUNT not implemented.'); + count(_filters: DittoFilter[]): Promise { + return Promise.reject(new Error('COUNT not implemented.')); } - deleteEvents(_filters: DittoFilter[]): Promise { - throw new Error('DELETE not implemented.'); + remove(_filters: DittoFilter[]): Promise { + return Promise.reject(new Error('DELETE not implemented.')); } } diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts index 001778c..5b5f2ef 100644 --- a/src/storages/pool-store.ts +++ b/src/storages/pool-store.ts @@ -1,7 +1,16 @@ -import { Debug, matchFilters, type NostrEvent, type NostrFilter, NSet, type RelayPoolWorker } from '@/deps.ts'; +import { + Debug, + matchFilters, + type NostrEvent, + type NostrFilter, + NSet, + type NStore, + type NStoreOpts, + type RelayPoolWorker, +} from '@/deps.ts'; import { cleanEvent } from '@/events.ts'; import { normalizeFilters } from '@/filter.ts'; -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { abortError } from '@/utils/abort.ts'; interface PoolStoreOpts { pool: InstanceType; @@ -11,7 +20,7 @@ interface PoolStoreOpts { }; } -class PoolStore implements EventStore { +class PoolStore implements NStore { #debug = Debug('ditto:client'); #pool: InstanceType; #relays: WebSocket['url'][]; @@ -19,31 +28,31 @@ class PoolStore implements EventStore { handleEvent(event: NostrEvent): Promise; }; - supportedNips = [1]; - constructor(opts: PoolStoreOpts) { this.#pool = opts.pool; this.#relays = opts.relays; this.#publisher = opts.publisher; } - add(event: NostrEvent, opts: StoreEventOpts = {}): Promise { + event(event: NostrEvent, opts: NStoreOpts = {}): Promise { + if (opts.signal?.aborted) return Promise.reject(abortError()); const { relays = this.#relays } = opts; + event = cleanEvent(event); this.#debug('EVENT', event); + this.#pool.publish(event, relays); return Promise.resolve(); } - filter(filters: NostrFilter[], opts: GetEventsOpts = {}): Promise { - filters = normalizeFilters(filters); + query(filters: NostrFilter[], opts: NStoreOpts = {}): Promise { + if (opts.signal?.aborted) return Promise.reject(abortError()); - if (opts.signal?.aborted) return Promise.resolve([]); + filters = normalizeFilters(filters); + this.#debug('REQ', JSON.stringify(filters)); if (!filters.length) return Promise.resolve([]); - this.#debug('REQ', JSON.stringify(filters)); - - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const results = new NSet(); const unsub = this.#pool.subscribe( @@ -74,10 +83,13 @@ class PoolStore implements EventStore { }, ); - opts.signal?.addEventListener('abort', () => { + const onAbort = () => { unsub(); - resolve([...results]); - }); + reject(abortError()); + opts.signal?.removeEventListener('abort', onAbort); + }; + + opts.signal?.addEventListener('abort', onAbort); }); } @@ -85,7 +97,7 @@ class PoolStore implements EventStore { return Promise.reject(new Error('COUNT not implemented')); } - deleteFilters() { + remove() { return Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')); } } diff --git a/src/storages/reqmeister.ts b/src/storages/reqmeister.ts index d7d5953..18cd9b1 100644 --- a/src/storages/reqmeister.ts +++ b/src/storages/reqmeister.ts @@ -1,10 +1,10 @@ -import { Debug, EventEmitter, type NostrEvent, type NostrFilter } from '@/deps.ts'; +import { Debug, EventEmitter, type NostrEvent, type NostrFilter, type NStore, type NStoreOpts } from '@/deps.ts'; import { eventToMicroFilter, getFilterId, isMicrofilter, type MicroFilter } from '@/filter.ts'; -import { type EventStore, GetEventsOpts } from '@/storages/types.ts'; import { Time } from '@/utils/time.ts'; +import { abortError } from '@/utils/abort.ts'; interface ReqmeisterOpts { - client: EventStore; + client: NStore; delay?: number; timeout?: number; } @@ -17,7 +17,7 @@ interface ReqmeisterReqOpts { type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; /** Batches requests to Nostr relays using microfilters. */ -class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) => any }> implements EventStore { +class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) => any }> implements NStore { #debug = Debug('ditto:reqmeister'); #opts: ReqmeisterOpts; @@ -25,8 +25,6 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) #promise!: Promise; #resolve!: () => void; - supportedNips = []; - constructor(opts: ReqmeisterOpts) { super(); this.#opts = opts; @@ -66,11 +64,10 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); if (filters.length) { - this.#debug('REQ', JSON.stringify(filters)); - const events = await client.filter(filters, { signal: AbortSignal.timeout(timeout) }); + const events = await client.query(filters, { signal: AbortSignal.timeout(timeout) }); for (const event of events) { - this.add(event); + this.event(event); } } @@ -85,7 +82,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) } = opts; if (signal.aborted) { - return Promise.reject(new DOMException('Aborted', 'AbortError')); + return Promise.reject(abortError()); } const filterId = getFilterId(filter); @@ -109,7 +106,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) }); } - add(event: NostrEvent): Promise { + event(event: NostrEvent): Promise { const filterId = getFilterId(eventToMicroFilter(event)); this.#queue = this.#queue.filter(([id]) => id !== filterId); this.emit(filterId, event); @@ -121,13 +118,15 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) return this.#queue.some(([id]) => id === filterId); } - filter(filters: NostrFilter[], opts?: GetEventsOpts | undefined): Promise { - if (opts?.signal?.aborted) return Promise.resolve([]); + query(filters: NostrFilter[], opts?: NStoreOpts): Promise { + if (opts?.signal?.aborted) return Promise.reject(abortError()); + + this.#debug('REQ', JSON.stringify(filters)); if (!filters.length) return Promise.resolve([]); const promises = filters.reduce[]>((result, filter) => { if (isMicrofilter(filter)) { - result.push(this.req(filter) as Promise); + result.push(this.req(filter, opts)); } return result; }, []); @@ -136,11 +135,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) } count(_filters: NostrFilter[]): Promise { - throw new Error('COUNT not implemented.'); + return Promise.reject(new Error('COUNT not implemented.')); } - deleteFilters(_filters: NostrFilter[]): Promise { - throw new Error('DELETE not implemented.'); + remove(_filters: NostrFilter[]): Promise { + return Promise.reject(new Error('DELETE not implemented.')); } } diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index 54cf48b..b8109f5 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -1,27 +1,25 @@ import { NiceRelay } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/5f4fb59c90c092e5aa59c01e6556a4bec264c167/mod.ts'; -import { Debug, type NostrEvent, type NostrFilter, NSet } from '@/deps.ts'; +import { Debug, type NostrEvent, type NostrFilter, NSet, type NStore, type NStoreOpts } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { type DittoFilter } from '@/interfaces/DittoFilter.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { abortError } from '@/utils/abort.ts'; interface SearchStoreOpts { relay: string | undefined; - fallback: EventStore; - hydrator?: EventStore; + fallback: NStore; + hydrator?: NStore; } -class SearchStore implements EventStore { +class SearchStore implements NStore { #debug = Debug('ditto:storages:search'); - #fallback: EventStore; - #hydrator: EventStore; + #fallback: NStore; + #hydrator: NStore; #relay: NiceRelay | undefined; - supportedNips = [50]; - constructor(opts: SearchStoreOpts) { this.#fallback = opts.fallback; this.#hydrator = opts.hydrator ?? this; @@ -31,17 +29,14 @@ class SearchStore implements EventStore { } } - add(_event: NostrEvent, _opts?: StoreEventOpts | undefined): Promise { - throw new Error('EVENT not implemented.'); + event(_event: NostrEvent, _opts?: NStoreOpts): Promise { + return Promise.reject(new Error('EVENT not implemented.')); } - async filter( - filters: DittoFilter[], - opts?: GetEventsOpts | undefined, - ): Promise { + async query(filters: DittoFilter[], opts?: NStoreOpts): Promise { filters = normalizeFilters(filters); - if (opts?.signal?.aborted) return Promise.resolve([]); + if (opts?.signal?.aborted) return Promise.reject(abortError()); if (!filters.length) return Promise.resolve([]); this.#debug('REQ', JSON.stringify(filters)); @@ -70,16 +65,16 @@ class SearchStore implements EventStore { return hydrateEvents({ events: [...events], filters, storage: this.#hydrator, signal: opts?.signal }); } else { this.#debug(`Searching for "${query}" locally...`); - return this.#fallback.filter(filters, opts); + return this.#fallback.query(filters, opts); } } count(_filters: NostrFilter[]): Promise { - throw new Error('COUNT not implemented.'); + return Promise.reject(new Error('COUNT not implemented.')); } - deleteFilters(_filters: NostrFilter[]): Promise { - throw new Error('DELETE not implemented.'); + remove(_filters: NostrFilter[]): Promise { + return Promise.reject(new Error('DELETE not implemented.')); } } diff --git a/src/storages/types.ts b/src/storages/types.ts deleted file mode 100644 index f1b1883..0000000 --- a/src/storages/types.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { type DittoFilter } from '@/interfaces/DittoFilter.ts'; - -/** Additional options to apply to the whole subscription. */ -interface GetEventsOpts { - /** Signal to abort the request. */ - signal?: AbortSignal; - /** Event limit for the whole subscription. */ - limit?: number; - /** Relays to use, if applicable. */ - relays?: WebSocket['url'][]; -} - -/** Options when storing an event. */ -interface StoreEventOpts { - /** Relays to use, if applicable. */ - relays?: WebSocket['url'][]; -} - -/** Storage interface for Nostr events. */ -interface EventStore { - /** Indicates NIPs supported by this data store, similar to NIP-11. For example, `50` would indicate support for `search` filters. */ - supportedNips: readonly number[]; - /** Add an event to the store. */ - add(event: DittoEvent, opts?: StoreEventOpts): Promise; - /** Get events from filters. */ - filter(filters: DittoFilter[], opts?: GetEventsOpts): Promise; - /** Get the number of events from filters. */ - count?(filters: DittoFilter[]): Promise; - /** Delete events from filters. */ - deleteFilters?(filters: DittoFilter[]): Promise; -} - -export type { EventStore, GetEventsOpts, StoreEventOpts }; diff --git a/src/utils/abort.ts b/src/utils/abort.ts new file mode 100644 index 0000000..147e43c --- /dev/null +++ b/src/utils/abort.ts @@ -0,0 +1,6 @@ +/** Creates an `AbortError` object matching the Fetch API. */ +function abortError() { + return new DOMException('The signal has been aborted', 'AbortError'); +} + +export { abortError }; diff --git a/src/utils/api.ts b/src/utils/api.ts index 1768bb3..498bbab 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -51,7 +51,7 @@ async function updateEvent( fn: (prev: NostrEvent | undefined) => E, c: AppContext, ): Promise { - const [prev] = await eventsDB.filter([filter], { limit: 1 }); + const [prev] = await eventsDB.query([filter], { limit: 1 }); return createEvent(fn(prev), c); } diff --git a/src/views.ts b/src/views.ts index 19267a9..d631da4 100644 --- a/src/views.ts +++ b/src/views.ts @@ -11,14 +11,14 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal return c.json([]); } - const events = await eventsDB.filter(filters, { signal }); + const events = await eventsDB.query(filters, { signal }); const pubkeys = new Set(events.map(({ pubkey }) => pubkey)); if (!pubkeys.size) { return c.json([]); } - const authors = await eventsDB.filter( + const authors = await eventsDB.query( [{ kinds: [0], authors: [...pubkeys], relations: ['author_stats'] }], { signal }, ); @@ -33,7 +33,7 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) { const { since, until, limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.filter( + const events = await eventsDB.query( [{ kinds: [0], authors, relations: ['author_stats'], since, until, limit }], { signal }, ); @@ -53,7 +53,7 @@ async function renderStatuses(c: AppContext, ids: string[], signal = AbortSignal const { limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.filter( + const events = await eventsDB.query( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'], limit }], { signal }, ); diff --git a/src/views/mastodon/relationships.ts b/src/views/mastodon/relationships.ts index 9f20024..983b134 100644 --- a/src/views/mastodon/relationships.ts +++ b/src/views/mastodon/relationships.ts @@ -2,7 +2,7 @@ import { eventsDB } from '@/storages.ts'; import { hasTag } from '@/tags.ts'; async function renderRelationship(sourcePubkey: string, targetPubkey: string) { - const events = await eventsDB.filter([ + const events = await eventsDB.query([ { kinds: [3], authors: [sourcePubkey], limit: 1 }, { kinds: [3], authors: [targetPubkey], limit: 1 }, { kinds: [10000], authors: [sourcePubkey], limit: 1 }, diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index e582a00..172a2b6 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -36,7 +36,7 @@ async function renderStatus(event: DittoEvent, viewerPubkey?: string) { Promise.all(mentionedPubkeys.map(toMention)), firstUrl ? unfurlCardCached(firstUrl) : null, viewerPubkey - ? await eventsDB.filter([ + ? await eventsDB.query([ { kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 },