From e6c8d1dad964145acb84b21f874b2fe388713aa1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 29 Dec 2023 13:12:16 -0600 Subject: [PATCH 1/4] Add an EventStore interface, refactor eventsDB --- fixtures/events/55920b75.json | 15 -------- src/controllers/api/accounts.ts | 8 ++--- src/controllers/api/notifications.ts | 4 +-- src/controllers/api/pleroma.ts | 4 +-- src/controllers/api/search.ts | 6 ++-- src/controllers/api/timelines.ts | 4 +-- src/controllers/nostr/relay.ts | 4 +-- src/db/events.test.ts | 49 +++++++++++++------------- src/db/events.ts | 51 ++++++++++++++-------------- src/pipeline.ts | 10 +++--- src/queries.ts | 17 +++++----- src/stats.ts | 4 +-- src/store.ts | 38 +++++++++++++++++++++ src/views.ts | 4 +-- src/views/mastodon/statuses.ts | 10 ++---- 15 files changed, 124 insertions(+), 104 deletions(-) delete mode 100644 fixtures/events/55920b75.json create mode 100644 src/store.ts diff --git a/fixtures/events/55920b75.json b/fixtures/events/55920b75.json deleted file mode 100644 index f902786..0000000 --- a/fixtures/events/55920b75.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "kind": 1, - "content": "I'm vegan btw", - "tags": [ - [ - "proxy", - "https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79", - "activitypub" - ] - ], - "pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6", - "created_at": 1691091365, - "id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2", - "sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579" -} \ No newline at end of file diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 4ce92d6..ddd8170 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,6 +1,6 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { insertUser } from '@/db/users.ts'; import { findReplyTag, nip19, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; @@ -151,7 +151,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await eventsDB.getFilters([filter]); + let events = await eventsDB.getEvents([filter]); if (exclude_replies) { events = events.filter((event) => !findReplyTag(event)); @@ -256,7 +256,7 @@ const favouritesController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const params = paginationSchema.parse(c.req.query()); - const events7 = await eventsDB.getFilters( + const events7 = await eventsDB.getEvents( [{ kinds: [7], authors: [pubkey], ...params }], { signal: AbortSignal.timeout(1000) }, ); @@ -265,7 +265,7 @@ const favouritesController: AppController = async (c) => { .map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1]) .filter((id): id is string => !!id); - const events1 = await eventsDB.getFilters( + const events1 = await eventsDB.getEvents( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { signal: AbortSignal.timeout(1000), diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 1e99129..881f9ac 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -1,5 +1,5 @@ import { type AppController } from '@/app.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; @@ -7,7 +7,7 @@ const notificationsController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const { since, until } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.getFilters( + const events = await eventsDB.getEvents( [{ kinds: [1], '#p': [pubkey], since, until }], { signal: AbortSignal.timeout(3000) }, ); diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index e69b27e..450be07 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -1,12 +1,12 @@ import { type AppController } from '@/app.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { z } from '@/deps.ts'; import { configSchema, elixirTupleSchema } from '@/schemas/pleroma-api.ts'; import { createAdminEvent } from '@/utils/web.ts'; import { Conf } from '@/config.ts'; const frontendConfigController: AppController = async (c) => { - const [event] = await eventsDB.getFilters([{ + const [event] = await eventsDB.getEvents([{ kinds: [30078], authors: [Conf.pubkey], '#d': ['pub.ditto.frontendConfig'], diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index 2e91c43..a514d66 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -1,5 +1,5 @@ import { AppController } from '@/app.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { type Event, nip19, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { booleanParamSchema } from '@/schema.ts'; @@ -76,7 +76,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise { const filters = await getLookupFilters(query); - const [event] = await eventsDB.getFilters(filters, { limit: 1, signal }); + const [event] = await eventsDB.getEvents(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index da4ab37..2a20fdd 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -1,4 +1,4 @@ -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { getFeedPubkeys } from '@/queries.ts'; @@ -33,7 +33,7 @@ const hashtagTimelineController: AppController = (c) => { /** Render statuses for timelines. */ async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) { - const events = await eventsDB.getFilters( + const events = await eventsDB.getEvents( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), { signal }, ); diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index f4b32b3..fe1cc31 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,5 @@ import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; import { @@ -63,7 +63,7 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); - for (const event of await eventsDB.getFilters(filters, { limit: FILTER_LIMIT })) { + for (const event of await eventsDB.getEvents(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 297813a..5afa85c 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -1,49 +1,50 @@ -import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import { assertEquals } from '@/deps-test.ts'; - -import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts'; import { insertUser } from '@/db/users.ts'; +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { eventsDB as db } from './events.ts'; + Deno.test('count filters', async () => { - assertEquals(await countFilters([{ kinds: [1] }]), 0); - await insertEvent(event55920b75, { user: undefined }); - assertEquals(await countFilters([{ kinds: [1] }]), 1); + assertEquals(await db.countEvents([{ kinds: [1] }]), 0); + await db.storeEvent(event1, { user: undefined }); + assertEquals(await db.countEvents([{ kinds: [1] }]), 1); }); Deno.test('insert and filter events', async () => { - await insertEvent(event55920b75, { user: undefined }); + await db.storeEvent(event1, { user: undefined }); - assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); - assertEquals(await getFilters([{ kinds: [3] }]), []); - assertEquals(await getFilters([{ since: 1691091000 }]), [event55920b75]); - assertEquals(await getFilters([{ until: 1691091000 }]), []); + assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); + assertEquals(await db.getEvents([{ kinds: [3] }]), []); + assertEquals(await db.getEvents([{ since: 1691091000 }]), [event1]); + assertEquals(await db.getEvents([{ until: 1691091000 }]), []); assertEquals( - await getFilters([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), - [event55920b75], + await db.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), + [event1], ); }); Deno.test('delete events', async () => { - await insertEvent(event55920b75, { user: undefined }); - assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); - await deleteFilters([{ kinds: [1] }]); - assertEquals(await getFilters([{ kinds: [1] }]), []); + await db.storeEvent(event1, { user: undefined }); + assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); + await db.deleteEvents([{ kinds: [1] }]); + assertEquals(await db.getEvents([{ kinds: [1] }]), []); }); Deno.test('query events with local filter', async () => { - await insertEvent(event55920b75, { user: undefined }); + await db.storeEvent(event1, { user: undefined }); - assertEquals(await getFilters([{}]), [event55920b75]); - assertEquals(await getFilters([{ local: true }]), []); - assertEquals(await getFilters([{ local: false }]), [event55920b75]); + assertEquals(await db.getEvents([{}]), [event1]); + assertEquals(await db.getEvents([{ local: true }]), []); + assertEquals(await db.getEvents([{ local: false }]), [event1]); await insertUser({ username: 'alex', - pubkey: event55920b75.pubkey, + pubkey: event1.pubkey, inserted_at: new Date(), admin: 0, }); - assertEquals(await getFilters([{ local: true }]), [event55920b75]); - assertEquals(await getFilters([{ local: false }]), []); + assertEquals(await db.getEvents([{ local: true }]), [event1]); + assertEquals(await db.getEvents([{ local: false }]), []); }); diff --git a/src/db/events.ts b/src/db/events.ts index 91368fe..fa051eb 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,12 +1,12 @@ import { db, type DittoDB } from '@/db.ts'; import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts } from '@/store.ts'; import { EventData } from '@/types.ts'; import { isNostrId, isURL } from '@/utils.ts'; -import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; - const debug = Debug('ditto:db:events'); /** Function to decide whether or not to index a tag. */ @@ -29,8 +29,8 @@ const tagConditions: Record = { }; /** Insert an event (and its tags) into the database. */ -function insertEvent(event: Event, data: EventData): Promise { - debug('insertEvent', JSON.stringify(event)); +function storeEvent(event: Event, data: EventData): Promise { + debug('EVENT', JSON.stringify(event)); return db.transaction().execute(async (trx) => { /** Insert the event into the database. */ @@ -207,29 +207,20 @@ function getFilterQuery(filter: DittoFilter): EventQuery { } /** Combine filter queries into a single union query. */ -function getFiltersQuery(filters: DittoFilter[]) { +function getEventsQuery(filters: DittoFilter[]) { return filters .map((filter) => db.selectFrom(() => getFilterQuery(filter).as('events')).selectAll()) .reduce((result, query) => result.unionAll(query)); } -type AuthorStats = Omit; -type EventStats = Omit; - -interface DittoEvent extends Event { - author?: DittoEvent<0>; - author_stats?: AuthorStats; - event_stats?: EventStats; -} - /** Get events for filters from the database. */ -async function getFilters( +async function getEvents( filters: DittoFilter[], - opts: GetFiltersOpts = {}, + opts: GetEventsOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); debug('REQ', JSON.stringify(filters)); - let query = getFiltersQuery(filters); + let query = getEventsQuery(filters); if (typeof opts.limit === 'number') { query = query.limit(opts.limit); @@ -279,12 +270,12 @@ async function getFilters( } /** Delete events based on filters from the database. */ -function deleteFilters(filters: DittoFilter[]) { - if (!filters.length) return Promise.resolve([]); - debug('deleteFilters', JSON.stringify(filters)); +async function deleteEvents(filters: DittoFilter[]): Promise { + if (!filters.length) return Promise.resolve(); + debug('DELETE', JSON.stringify(filters)); - return db.transaction().execute(async (trx) => { - const query = getFiltersQuery(filters).clearSelect().select('id'); + await db.transaction().execute(async (trx) => { + const query = getEventsQuery(filters).clearSelect().select('id'); await trx.deleteFrom('events_fts') .where('id', 'in', () => query) @@ -297,10 +288,10 @@ function deleteFilters(filters: DittoFilter[]) { } /** Get number of events that would be returned by filters. */ -async function countFilters(filters: DittoFilter[]): Promise { +async function countEvents(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); - debug('countFilters', JSON.stringify(filters)); - const query = getFiltersQuery(filters); + debug('COUNT', JSON.stringify(filters)); + const query = getEventsQuery(filters); const [{ count }] = await query .clearSelect() @@ -362,4 +353,12 @@ function buildUserSearchContent(event: Event<0>): string { return [name, nip05, about].filter(Boolean).join('\n'); } -export { countFilters, deleteFilters, type DittoEvent, getFilters, insertEvent }; +/** SQLite database storage adapter for Nostr events. */ +const eventsDB: EventStore = { + storeEvent, + getEvents, + countEvents, + deleteEvents, +}; + +export { eventsDB }; diff --git a/src/pipeline.ts b/src/pipeline.ts index d9c03ba..2ad3c97 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,5 +1,5 @@ import { Conf } from '@/config.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { memorelay } from '@/db/memorelay.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; @@ -69,7 +69,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = const { force = false } = opts; if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - const [deletion] = await eventsDB.getFilters( + const [deletion] = await eventsDB.getEvents( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, ); @@ -78,7 +78,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.insertEvent(event, data).catch(debug), + eventsDB.storeEvent(event, data).catch(debug), updateStats(event).catch(debug), ]); } @@ -91,13 +91,13 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = async function processDeletions(event: Event): Promise { if (event.kind === 5) { const ids = getTagSet(event.tags, 'e'); - const events = await eventsDB.getFilters([{ ids: [...ids] }]); + const events = await eventsDB.getEvents([{ ids: [...ids] }]); const deleteIds = events .filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id)) .map((event) => event.id); - await eventsDB.deleteFilters([{ ids: deleteIds }]); + await eventsDB.deleteEvents([{ ids: deleteIds }]); } } diff --git a/src/queries.ts b/src/queries.ts index 0f08877..3049144 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,8 +1,9 @@ -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; +import { memorelay } from '@/db/memorelay.ts'; import { type Event, findReplyTag } from '@/deps.ts'; import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; import { reqmeister } from '@/reqmeister.ts'; -import { memorelay } from '@/db/memorelay.ts'; +import { type DittoEvent } from '@/store.ts'; interface GetEventOpts { /** Signal to abort the request. */ @@ -21,7 +22,7 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent[]; + const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as DittoEvent[]; if (memoryEvent && !relations) { return memoryEvent; @@ -32,7 +33,7 @@ const getEvent = async ( filter.kinds = [kind]; } - const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal }) + const dbEvent = await eventsDB.getEvents([filter], { limit: 1, signal }) .then(([event]) => event); // TODO: make this DRY-er. @@ -65,7 +66,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise event); @@ -78,7 +79,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const [event] = await eventsDB.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); + const [event] = await eventsDB.getEvents([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; @@ -117,7 +118,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise } function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise[]> { - return eventsDB.getFilters( + return eventsDB.getEvents( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], { limit: 200, signal }, ); @@ -125,7 +126,7 @@ function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Pr /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { - const [event] = await eventsDB.getFilters([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); + const [event] = await eventsDB.getEvents([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); return Boolean(event); } diff --git a/src/stats.ts b/src/stats.ts index b462021..e09791f 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,5 +1,5 @@ import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; type AuthorStat = keyof Omit; @@ -125,7 +125,7 @@ function eventStatsQuery(diffs: EventStatDiff[]) { /** Get the last version of the event, if any. */ async function maybeGetPrev(event: Event): Promise> { - const [prev] = await eventsDB.getFilters([ + const [prev] = await eventsDB.getEvents([ { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, ]); diff --git a/src/store.ts b/src/store.ts new file mode 100644 index 0000000..a41f849 --- /dev/null +++ b/src/store.ts @@ -0,0 +1,38 @@ +import { type DittoDB } from '@/db.ts'; +import { type Event } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; +import { type EventData } from '@/types.ts'; + +/** Additional options to apply to the whole subscription. */ +interface GetEventsOpts { + /** Signal to abort the request. */ + signal?: AbortSignal; + /** Event limit for the whole subscription. */ + limit?: number; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; +} + +type AuthorStats = Omit; +type EventStats = Omit; + +/** Internal Event representation used by Ditto, including extra keys. */ +interface DittoEvent extends Event { + author?: DittoEvent<0>; + author_stats?: AuthorStats; + event_stats?: EventStats; +} + +/** Storage interface for Nostr events. */ +interface EventStore { + /** Add an event to the store. */ + storeEvent(event: Event, data?: EventData): Promise; + /** Get events from filters. */ + getEvents(filters: DittoFilter[], opts?: GetEventsOpts): Promise[]>; + /** Get the number of events from filters. */ + countEvents(filters: DittoFilter[]): Promise; + /** Delete events from filters. */ + deleteEvents(filters: DittoFilter[]): Promise; +} + +export type { DittoEvent, EventStore, GetEventsOpts }; diff --git a/src/views.ts b/src/views.ts index a111456..95a998e 100644 --- a/src/views.ts +++ b/src/views.ts @@ -1,5 +1,5 @@ import { AppContext } from '@/app.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { type Filter } from '@/deps.ts'; import { getAuthor } from '@/queries.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; @@ -7,7 +7,7 @@ import { paginated } from '@/utils/web.ts'; /** Render account objects for the author of each event. */ async function renderEventAccounts(c: AppContext, filters: Filter[]) { - const events = await eventsDB.getFilters(filters); + const events = await eventsDB.getEvents(filters); const pubkeys = new Set(events.map(({ pubkey }) => pubkey)); if (!pubkeys.size) { diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index 17b1677..2363ef9 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -1,7 +1,7 @@ import { isCWTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts'; import { Conf } from '@/config.ts'; -import * as eventsDB from '@/db/events.ts'; +import { eventsDB } from '@/db/events.ts'; import { findReplyTag, nip19 } from '@/deps.ts'; import { getMediaLinks, parseNoteContent } from '@/note.ts'; import { getAuthor } from '@/queries.ts'; @@ -33,12 +33,8 @@ async function renderStatus(event: eventsDB.DittoEvent<1>, viewerPubkey?: string .all([ Promise.all(mentionedPubkeys.map(toMention)), firstUrl ? unfurlCardCached(firstUrl) : null, - viewerPubkey - ? eventsDB.getFilters([{ kinds: [6], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) - : [], - viewerPubkey - ? eventsDB.getFilters([{ kinds: [7], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) - : [], + viewerPubkey ? eventsDB.getEvents([{ kinds: [6], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) : [], + viewerPubkey ? eventsDB.getEvents([{ kinds: [7], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) : [], ]); const content = buildInlineRecipients(mentions) + html; From ccfdbfeb8db82ae9ed533d7dda44d306dcc2bfb9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 29 Dec 2023 13:22:51 -0600 Subject: [PATCH 2/4] Rework client as EventStore --- src/client.ts | 22 ++++++++++++++++++---- src/db/events.ts | 15 +++++++-------- src/pipeline.ts | 6 +++--- src/pool.ts | 12 ++---------- src/reqmeister.ts | 4 ++-- src/store.ts | 12 ++++++++++-- 6 files changed, 42 insertions(+), 29 deletions(-) diff --git a/src/client.ts b/src/client.ts index 85e430a..fbcfaa5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,13 +1,12 @@ import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; - -import type { GetFiltersOpts } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; const debug = Debug('ditto:client'); /** Get events from a NIP-01 filter. */ -function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { +function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); debug('REQ', JSON.stringify(filters)); @@ -50,4 +49,19 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts }); } -export { getFilters }; +/** Publish an event to the given relays, or the entire pool. */ +function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { + const { relays = activeRelays } = opts; + debug('EVENT', event); + pool.publish(event, relays); + return Promise.resolve(); +} + +const client: EventStore = { + getEvents, + storeEvent, + countEvents: () => Promise.reject(new Error('COUNT not implemented')), + deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')), +}; + +export { client }; diff --git a/src/db/events.ts b/src/db/events.ts index fa051eb..d977368 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -3,8 +3,7 @@ import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; -import { type DittoEvent, EventStore, type GetEventsOpts } from '@/store.ts'; -import { EventData } from '@/types.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; import { isNostrId, isURL } from '@/utils.ts'; const debug = Debug('ditto:db:events'); @@ -12,7 +11,7 @@ const debug = Debug('ditto:db:events'); /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: Event; - data: EventData; + opts: StoreEventOpts; count: number; value: string; }) => boolean; @@ -21,7 +20,7 @@ type TagCondition = ({ event, count, value }: { const tagConditions: Record = { 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), 'e': ({ count, value }) => count < 15 && isNostrId(value), - 'media': ({ count, value, data }) => (data.user || count < 4) && isURL(value), + 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value), 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), 'proxy': ({ count, value }) => count === 0 && isURL(value), 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), @@ -29,7 +28,7 @@ const tagConditions: Record = { }; /** Insert an event (and its tags) into the database. */ -function storeEvent(event: Event, data: EventData): Promise { +function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { debug('EVENT', JSON.stringify(event)); return db.transaction().execute(async (trx) => { @@ -51,7 +50,7 @@ function storeEvent(event: Event, data: EventData): Promise { /** Index event tags depending on the conditions defined above. */ async function indexTags() { - const tags = filterIndexableTags(event, data); + const tags = filterIndexableTags(event, opts); const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); if (!tags.length) return; @@ -302,7 +301,7 @@ async function countEvents(filters: DittoFilter[]): Promise } /** Return only the tags that should be indexed. */ -function filterIndexableTags(event: Event, data: EventData): string[][] { +function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { const tagCounts: Record = {}; function getCount(name: string) { @@ -316,7 +315,7 @@ function filterIndexableTags(event: Event, data: EventData): string[][] { function checkCondition(name: string, value: string, condition: TagCondition) { return condition({ event, - data, + opts, count: getCount(name), value, }); diff --git a/src/pipeline.ts b/src/pipeline.ts index 2ad3c97..76984fd 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,3 +1,4 @@ +import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; import { eventsDB } from '@/db/events.ts'; import { memorelay } from '@/db/memorelay.ts'; @@ -6,7 +7,6 @@ import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; -import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; @@ -78,7 +78,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.storeEvent(event, data).catch(debug), + eventsDB.storeEvent(event, { data }).catch(debug), updateStats(event).catch(debug), ]); } @@ -176,7 +176,7 @@ function broadcast(event: Event, data: EventData) { if (!data.user || !isFresh(event)) return; if (event.kind === 5) { - publish(event); + client.storeEvent(event); } } diff --git a/src/pool.ts b/src/pool.ts index b92f4eb..e3b3082 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -1,7 +1,5 @@ import { getActiveRelays } from '@/db/relays.ts'; -import { Debug, type Event, RelayPoolWorker } from '@/deps.ts'; - -const debug = Debug('ditto:pool'); +import { RelayPoolWorker } from '@/deps.ts'; const activeRelays = await getActiveRelays(); @@ -17,10 +15,4 @@ const pool = new RelayPoolWorker(worker, activeRelays, { logErrorsAndNotices: false, }); -/** Publish an event to the given relays, or the entire pool. */ -function publish(event: Event, relays: string[] = activeRelays) { - debug('publish', event); - return pool.publish(event, relays); -} - -export { activeRelays, pool, publish }; +export { activeRelays, pool }; diff --git a/src/reqmeister.ts b/src/reqmeister.ts index cff8cb9..19cce8d 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,4 +1,4 @@ -import * as client from '@/client.ts'; +import { client } from '@/client.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; import { Time } from '@/utils/time.ts'; @@ -64,7 +64,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an if (filters.length) { debug('REQ', JSON.stringify(filters)); - const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) }); + const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) }); for (const event of events) { this.encounter(event); diff --git a/src/store.ts b/src/store.ts index a41f849..5567406 100644 --- a/src/store.ts +++ b/src/store.ts @@ -13,6 +13,14 @@ interface GetEventsOpts { relays?: WebSocket['url'][]; } +/** Options when storing an event. */ +interface StoreEventOpts { + /** Event data to store. */ + data?: EventData; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; +} + type AuthorStats = Omit; type EventStats = Omit; @@ -26,7 +34,7 @@ interface DittoEvent extends Event { /** Storage interface for Nostr events. */ interface EventStore { /** Add an event to the store. */ - storeEvent(event: Event, data?: EventData): Promise; + storeEvent(event: Event, opts?: StoreEventOpts): Promise; /** Get events from filters. */ getEvents(filters: DittoFilter[], opts?: GetEventsOpts): Promise[]>; /** Get the number of events from filters. */ @@ -35,4 +43,4 @@ interface EventStore { deleteEvents(filters: DittoFilter[]): Promise; } -export type { DittoEvent, EventStore, GetEventsOpts }; +export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts }; From 56373c4ce326473fb9560100f606ea9f4df41a9f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 29 Dec 2023 13:35:57 -0600 Subject: [PATCH 3/4] Refactor memorelay as an EventStore --- src/db/events.ts | 2 +- src/db/memorelay.test.ts | 10 ++++----- src/db/memorelay.ts | 44 ++++++++++++++++++++-------------------- src/filter.ts | 11 ---------- src/pipeline.ts | 10 ++++----- src/queries.ts | 8 ++++---- 6 files changed, 36 insertions(+), 49 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index d977368..96cf845 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -354,8 +354,8 @@ function buildUserSearchContent(event: Event<0>): string { /** SQLite database storage adapter for Nostr events. */ const eventsDB: EventStore = { - storeEvent, getEvents, + storeEvent, countEvents, deleteEvents, }; diff --git a/src/db/memorelay.test.ts b/src/db/memorelay.test.ts index b0125cf..27c545d 100644 --- a/src/db/memorelay.test.ts +++ b/src/db/memorelay.test.ts @@ -5,14 +5,12 @@ import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; import { memorelay } from './memorelay.ts'; Deno.test('memorelay', async () => { - assertEquals(memorelay.hasEvent(event1), false); - assertEquals(memorelay.hasEventById(event1.id), false); + assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0); - memorelay.insertEvent(event1); + await memorelay.storeEvent(event1); - assertEquals(memorelay.hasEvent(event1), true); - assertEquals(memorelay.hasEventById(event1.id), true); + assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 1); - const result = await memorelay.getFilters([{ ids: [event1.id] }]); + const result = await memorelay.getEvents([{ ids: [event1.id] }]); assertEquals(result[0], event1); }); diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index 4f6022e..57714a0 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -1,5 +1,6 @@ import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; -import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts'; +import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts } from '@/store.ts'; const debug = Debug('ditto:memorelay'); @@ -10,7 +11,7 @@ const events = new LRUCache({ }); /** Get events from memory. */ -function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { +function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); debug('REQ', JSON.stringify(filters)); @@ -30,7 +31,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts } /** Insert an event into memory. */ -function insertEvent(event: Event): void { +function storeEvent(event: Event): Promise { for (const microfilter of getMicroFilters(event)) { const filterId = getFilterId(microfilter); const existing = events.get(filterId); @@ -38,32 +39,31 @@ function insertEvent(event: Event): void { events.set(filterId, event); } } + return Promise.resolve(); } -/** Check if an event is in memory. */ -function hasEvent(event: Event): boolean { - for (const microfilter of getMicroFilters(event)) { - const filterId = getFilterId(microfilter); - const existing = events.get(filterId); - if (existing) { - return true; +/** Count events in memory for the filters. */ +async function countEvents(filters: Filter[]): Promise { + const events = await getEvents(filters); + return events.length; +} + +/** Delete events from memory. */ +function deleteEvents(filters: Filter[]): Promise { + for (const filter of filters) { + if (isMicrofilter(filter)) { + events.delete(getFilterId(filter)); } } - return false; -} - -/** Check if an event is in memory by ID. */ -function hasEventById(eventId: string): boolean { - const filterId = getFilterId({ ids: [eventId] }); - return events.has(filterId); + return Promise.resolve(); } /** In-memory data store for events using microfilters. */ -const memorelay = { - getFilters, - insertEvent, - hasEvent, - hasEventById, +const memorelay: EventStore = { + getEvents, + storeEvent, + countEvents, + deleteEvents, }; export { memorelay }; diff --git a/src/filter.ts b/src/filter.ts index 926e360..430a8d3 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -21,16 +21,6 @@ type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] }; /** Filter to get one specific event. */ type MicroFilter = IdMicrofilter | AuthorMicrofilter; -/** Additional options to apply to the whole subscription. */ -interface GetFiltersOpts { - /** Signal to abort the request. */ - signal?: AbortSignal; - /** Event limit for the whole subscription. */ - limit?: number; - /** Relays to use, if applicable. */ - relays?: WebSocket['url'][]; -} - function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) { return false; @@ -97,7 +87,6 @@ export { type DittoFilter, eventToMicroFilter, getFilterId, - type GetFiltersOpts, getMicroFilters, type IdMicrofilter, isMicrofilter, diff --git a/src/pipeline.ts b/src/pipeline.ts index 76984fd..f4e0abe 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -26,7 +26,7 @@ const debug = Debug('ditto:pipeline'); async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); - if (encounterEvent(event)) return; + if (await encounterEvent(event)) return; debug(`Event<${event.kind}> ${event.id}`); const data = await getEventData(event); @@ -43,9 +43,9 @@ async function handleEvent(event: Event): Promise { } /** Encounter the event, and return whether it has already been encountered. */ -function encounterEvent(event: Event): boolean { - const preexisting = memorelay.hasEvent(event); - memorelay.insertEvent(event); +async function encounterEvent(event: Event): Promise { + const preexisting = (await memorelay.countEvents([{ ids: [event.id] }])) > 0; + memorelay.storeEvent(event); reqmeister.encounter(event); return preexisting; } @@ -142,7 +142,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); } for (const [name, id, relay] of event.tags) { - if (name === 'e' && !memorelay.hasEventById(id)) { + if (name === 'e' && !memorelay.countEvents([{ ids: [id] }])) { reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); } } diff --git a/src/queries.ts b/src/queries.ts index 3049144..92500d3 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -22,7 +22,7 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as DittoEvent[]; + const [memoryEvent] = await memorelay.getEvents([microfilter], opts) as DittoEvent[]; if (memoryEvent && !relations) { return memoryEvent; @@ -39,14 +39,14 @@ const getEvent = async ( // TODO: make this DRY-er. if (dbEvent && !dbEvent.author) { - const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); + const [author] = await memorelay.getEvents([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); dbEvent.author = author; } if (dbEvent) return dbEvent; if (memoryEvent && !memoryEvent.author) { - const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); + const [author] = await memorelay.getEvents([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); memoryEvent.author = author; } @@ -60,7 +60,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise Date: Fri, 29 Dec 2023 13:53:09 -0600 Subject: [PATCH 4/4] events.test: fix type error --- src/db/events.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 5afa85c..ab689d5 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -7,12 +7,12 @@ import { eventsDB as db } from './events.ts'; Deno.test('count filters', async () => { assertEquals(await db.countEvents([{ kinds: [1] }]), 0); - await db.storeEvent(event1, { user: undefined }); + await db.storeEvent(event1); assertEquals(await db.countEvents([{ kinds: [1] }]), 1); }); Deno.test('insert and filter events', async () => { - await db.storeEvent(event1, { user: undefined }); + await db.storeEvent(event1); assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); assertEquals(await db.getEvents([{ kinds: [3] }]), []); @@ -25,14 +25,14 @@ Deno.test('insert and filter events', async () => { }); Deno.test('delete events', async () => { - await db.storeEvent(event1, { user: undefined }); + await db.storeEvent(event1); assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); await db.deleteEvents([{ kinds: [1] }]); assertEquals(await db.getEvents([{ kinds: [1] }]), []); }); Deno.test('query events with local filter', async () => { - await db.storeEvent(event1, { user: undefined }); + await db.storeEvent(event1); assertEquals(await db.getEvents([{}]), [event1]); assertEquals(await db.getEvents([{ local: true }]), []);