From 24c405b9b9d2c5e631cd0a493386ad734dbcb8a0 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 12:54:07 -0600 Subject: [PATCH 01/23] Add Memorelay storages module --- src/storages/memorelay.ts | 63 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/storages/memorelay.ts diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts new file mode 100644 index 0000000..0ab7193 --- /dev/null +++ b/src/storages/memorelay.ts @@ -0,0 +1,63 @@ +import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; +import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts } from '@/store.ts'; + +/** In-memory data store for events using microfilters. */ +class Memorelay implements EventStore { + #debug = Debug('ditto:memorelay'); + #events: LRUCache; + + constructor(...args: ConstructorParameters>) { + this.#events = new LRUCache(...args); + } + + /** Get events from memory. */ + getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + this.#debug('REQ', JSON.stringify(filters)); + + const results: Event[] = []; + + for (const filter of filters) { + if (isMicrofilter(filter)) { + const event = this.#events.get(getFilterId(filter)); + if (event) { + results.push(event as Event); + } + } + } + + return Promise.resolve(results); + } + + /** Insert an event into memory. */ + storeEvent(event: Event): Promise { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = this.#events.get(filterId); + if (!existing || event.created_at > existing.created_at) { + this.#events.set(filterId, event); + } + } + return Promise.resolve(); + } + + /** Count events in memory for the filters. */ + async countEvents(filters: Filter[]): Promise { + const events = await this.getEvents(filters); + return events.length; + } + + /** Delete events from memory. */ + deleteEvents(filters: Filter[]): Promise { + for (const filter of filters) { + if (isMicrofilter(filter)) { + this.#events.delete(getFilterId(filter)); + } + } + return Promise.resolve(); + } +} + +export { Memorelay }; \ No newline at end of file From a1dad3a0c58fbdf875b2faf9d5fb9a79d35192dd Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 12:54:38 -0600 Subject: [PATCH 02/23] Memorelay: #events -> #cache --- src/storages/memorelay.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 0ab7193..df24108 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -5,10 +5,10 @@ import { type EventStore, type GetEventsOpts } from '@/store.ts'; /** In-memory data store for events using microfilters. */ class Memorelay implements EventStore { #debug = Debug('ditto:memorelay'); - #events: LRUCache; + #cache: LRUCache; constructor(...args: ConstructorParameters>) { - this.#events = new LRUCache(...args); + this.#cache = new LRUCache(...args); } /** Get events from memory. */ @@ -21,7 +21,7 @@ class Memorelay implements EventStore { for (const filter of filters) { if (isMicrofilter(filter)) { - const event = this.#events.get(getFilterId(filter)); + const event = this.#cache.get(getFilterId(filter)); if (event) { results.push(event as Event); } @@ -35,9 +35,9 @@ class Memorelay implements EventStore { storeEvent(event: Event): Promise { for (const microfilter of getMicroFilters(event)) { const filterId = getFilterId(microfilter); - const existing = this.#events.get(filterId); + const existing = this.#cache.get(filterId); if (!existing || event.created_at > existing.created_at) { - this.#events.set(filterId, event); + this.#cache.set(filterId, event); } } return Promise.resolve(); @@ -53,11 +53,11 @@ class Memorelay implements EventStore { deleteEvents(filters: Filter[]): Promise { for (const filter of filters) { if (isMicrofilter(filter)) { - this.#events.delete(getFilterId(filter)); + this.#cache.delete(getFilterId(filter)); } } return Promise.resolve(); } } -export { Memorelay }; \ No newline at end of file +export { Memorelay }; From f667ba3c69d08efdf2a1247741b87baaaa37b863 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 13:54:29 -0600 Subject: [PATCH 03/23] Memorelay: reimplement by just looping events with matchFilters --- src/db/memorelay.ts | 65 ++------------------------ src/deps.ts | 1 + src/{db => storages}/memorelay.test.ts | 8 +++- src/storages/memorelay.ts | 50 +++++++++++++------- 4 files changed, 44 insertions(+), 80 deletions(-) rename src/{db => storages}/memorelay.test.ts (71%) diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index 57714a0..3550346 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -1,69 +1,10 @@ -import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; -import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts'; -import { type EventStore, type GetEventsOpts } from '@/store.ts'; +import { Memorelay } from '@/storages/memorelay.ts'; -const debug = Debug('ditto:memorelay'); - -const events = new LRUCache({ +/** In-memory data store for events using microfilters. */ +const memorelay = new Memorelay({ max: 3000, maxEntrySize: 5000, sizeCalculation: (event) => JSON.stringify(event).length, }); -/** Get events from memory. */ -function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - debug('REQ', JSON.stringify(filters)); - - const results: Event[] = []; - - for (const filter of filters) { - if (isMicrofilter(filter)) { - const event = events.get(getFilterId(filter)); - if (event) { - results.push(event as Event); - } - } - } - - return Promise.resolve(results); -} - -/** Insert an event into memory. */ -function storeEvent(event: Event): Promise { - for (const microfilter of getMicroFilters(event)) { - const filterId = getFilterId(microfilter); - const existing = events.get(filterId); - if (!existing || event.created_at > existing.created_at) { - events.set(filterId, event); - } - } - return Promise.resolve(); -} - -/** Count events in memory for the filters. */ -async function countEvents(filters: Filter[]): Promise { - const events = await getEvents(filters); - return events.length; -} - -/** Delete events from memory. */ -function deleteEvents(filters: Filter[]): Promise { - for (const filter of filters) { - if (isMicrofilter(filter)) { - events.delete(getFilterId(filter)); - } - } - return Promise.resolve(); -} - -/** In-memory data store for events using microfilters. */ -const memorelay: EventStore = { - getEvents, - storeEvent, - countEvents, - deleteEvents, -}; - export { memorelay }; diff --git a/src/deps.ts b/src/deps.ts index 3e763a7..164af3a 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -18,6 +18,7 @@ export { getEventHash, getPublicKey, getSignature, + matchFilter, matchFilters, nip04, nip05, diff --git a/src/db/memorelay.test.ts b/src/storages/memorelay.test.ts similarity index 71% rename from src/db/memorelay.test.ts rename to src/storages/memorelay.test.ts index 27c545d..fbf3ba3 100644 --- a/src/db/memorelay.test.ts +++ b/src/storages/memorelay.test.ts @@ -2,7 +2,13 @@ import { assertEquals } from '@/deps-test.ts'; import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; -import { memorelay } from './memorelay.ts'; +import { Memorelay } from './memorelay.ts'; + +const memorelay = new Memorelay({ + max: 3000, + maxEntrySize: 5000, + sizeCalculation: (event) => JSON.stringify(event).length, +}); Deno.test('memorelay', async () => { assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0); diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index df24108..76328dc 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,8 +1,7 @@ -import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; -import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts'; +import { Debug, type Event, type Filter, LRUCache, matchFilter, matchFilters } from '@/deps.ts'; import { type EventStore, type GetEventsOpts } from '@/store.ts'; -/** In-memory data store for events using microfilters. */ +/** In-memory data store for events. */ class Memorelay implements EventStore { #debug = Debug('ditto:memorelay'); #cache: LRUCache; @@ -11,6 +10,15 @@ class Memorelay implements EventStore { this.#cache = new LRUCache(...args); } + /** Iterate stored events. */ + *#events(): Generator { + for (const event of this.#cache.values()) { + if (event && !(event instanceof Promise)) { + yield event; + } + } + } + /** Get events from memory. */ getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); @@ -18,13 +26,27 @@ class Memorelay implements EventStore { this.#debug('REQ', JSON.stringify(filters)); const results: Event[] = []; + const usages: number[] = []; - for (const filter of filters) { - if (isMicrofilter(filter)) { - const event = this.#cache.get(getFilterId(filter)); - if (event) { + for (const event of this.#events()) { + let index = 0; + + for (const filter of filters) { + const limit = filter.limit ?? Infinity; + const usage = usages[index] ?? 0; + + if (usage >= limit) { + continue; + } else if (matchFilter(filter, event)) { results.push(event as Event); + usages[index] = usage + 1; } + + index++; + } + + if (filters.every((filter, index) => usages[index] >= (filter.limit ?? Infinity))) { + break; } } @@ -33,13 +55,7 @@ class Memorelay implements EventStore { /** Insert an event into memory. */ storeEvent(event: Event): Promise { - for (const microfilter of getMicroFilters(event)) { - const filterId = getFilterId(microfilter); - const existing = this.#cache.get(filterId); - if (!existing || event.created_at > existing.created_at) { - this.#cache.set(filterId, event); - } - } + this.#cache.set(event.id, event); return Promise.resolve(); } @@ -51,9 +67,9 @@ class Memorelay implements EventStore { /** Delete events from memory. */ deleteEvents(filters: Filter[]): Promise { - for (const filter of filters) { - if (isMicrofilter(filter)) { - this.#cache.delete(getFilterId(filter)); + for (const event of this.#events()) { + if (matchFilters(filters, event)) { + this.#cache.delete(event.id); } } return Promise.resolve(); From cfe44876af60d8fcc41339332402fd66582f88be Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 14:06:13 -0600 Subject: [PATCH 04/23] Memorelay: add supportedNips property --- src/storages/memorelay.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 76328dc..71c68c6 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -10,6 +10,11 @@ class Memorelay implements EventStore { this.#cache = new LRUCache(...args); } + /** NIPs supported by this storage method. */ + get supportedNips(): number[] { + return [1]; + } + /** Iterate stored events. */ *#events(): Generator { for (const event of this.#cache.values()) { From b5c23cf23f21f2f1a854b5ab2202dd461e1b3e3e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 14:45:56 -0600 Subject: [PATCH 05/23] Move EventsDB into storages --- src/db/events.test.ts | 64 ----- src/db/events.ts | 410 +-------------------------------- src/storages/events-db.test.ts | 67 ++++++ src/storages/events-db.ts | 410 +++++++++++++++++++++++++++++++++ 4 files changed, 480 insertions(+), 471 deletions(-) delete mode 100644 src/db/events.test.ts create mode 100644 src/storages/events-db.test.ts create mode 100644 src/storages/events-db.ts diff --git a/src/db/events.test.ts b/src/db/events.test.ts deleted file mode 100644 index 7a5537e..0000000 --- a/src/db/events.test.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { assertEquals, assertRejects } from '@/deps-test.ts'; -import { buildUserEvent } from '@/db/users.ts'; - -import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' }; -import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; - -import { eventsDB as db } from './events.ts'; - -Deno.test('count filters', async () => { - assertEquals(await db.countEvents([{ kinds: [1] }]), 0); - await db.storeEvent(event1); - assertEquals(await db.countEvents([{ kinds: [1] }]), 1); -}); - -Deno.test('insert and filter events', async () => { - await db.storeEvent(event1); - - assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); - assertEquals(await db.getEvents([{ kinds: [3] }]), []); - assertEquals(await db.getEvents([{ since: 1691091000 }]), [event1]); - assertEquals(await db.getEvents([{ until: 1691091000 }]), []); - assertEquals( - await db.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), - [event1], - ); -}); - -Deno.test('delete events', async () => { - await db.storeEvent(event1); - assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]); - await db.deleteEvents([{ kinds: [1] }]); - assertEquals(await db.getEvents([{ kinds: [1] }]), []); -}); - -Deno.test('query events with local filter', async () => { - await db.storeEvent(event1); - - assertEquals(await db.getEvents([{}]), [event1]); - assertEquals(await db.getEvents([{ local: true }]), []); - assertEquals(await db.getEvents([{ local: false }]), [event1]); - - const userEvent = await buildUserEvent({ - username: 'alex', - pubkey: event1.pubkey, - inserted_at: new Date(), - admin: false, - }); - await db.storeEvent(userEvent); - - assertEquals(await db.getEvents([{ kinds: [1], local: true }]), [event1]); - assertEquals(await db.getEvents([{ kinds: [1], local: false }]), []); -}); - -Deno.test('inserting replaceable events', async () => { - assertEquals(await db.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0); - - await db.storeEvent(event0); - await assertRejects(() => db.storeEvent(event0)); - assertEquals(await db.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1); - - const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 }; - await db.storeEvent(changeEvent); - assertEquals(await db.getEvents([{ kinds: [0] }]), [changeEvent]); -}); diff --git a/src/db/events.ts b/src/db/events.ts index 7764a8d..7b06cdc 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,410 +1,6 @@ -import { Conf } from '@/config.ts'; -import { db, type DittoDB } from '@/db.ts'; -import { Debug, type Event, Kysely, type SelectQueryBuilder } from '@/deps.ts'; -import { type DittoFilter } from '@/filter.ts'; -import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; -import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; -import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; -import { isNostrId, isURL } from '@/utils.ts'; +import { db } from '@/db.ts'; +import { EventsDB } from '@/storages/events-db.ts'; -const debug = Debug('ditto:db:events'); - -/** Function to decide whether or not to index a tag. */ -type TagCondition = ({ event, count, value }: { - event: Event; - opts: StoreEventOpts; - count: number; - value: string; -}) => boolean; - -/** Conditions for when to index certain tags. */ -const tagConditions: Record = { - 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), - 'e': ({ event, count, value, opts }) => ((opts.data?.user && event.kind === 10003) || count < 15) && isNostrId(value), - 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value), - 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), - 'proxy': ({ count, value }) => count === 0 && isURL(value), - 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), - 't': ({ count, value }) => count < 5 && value.length < 50, - 'name': ({ event, count }) => event.kind === 30361 && count === 0, - 'role': ({ event, count }) => event.kind === 30361 && count === 0, -}; - -/** Insert an event (and its tags) into the database. */ -async function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { - debug('EVENT', JSON.stringify(event)); - - if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) { - throw new Error('Internal events can only be stored by the server keypair'); - } - - return await db.transaction().execute(async (trx) => { - /** Insert the event into the database. */ - async function addEvent() { - await trx.insertInto('events') - .values({ ...event, tags: JSON.stringify(event.tags) }) - .execute(); - } - - /** Add search data to the FTS table. */ - async function indexSearch() { - const searchContent = buildSearchContent(event); - if (!searchContent) return; - await trx.insertInto('events_fts') - .values({ id: event.id, content: searchContent.substring(0, 1000) }) - .execute(); - } - - /** Index event tags depending on the conditions defined above. */ - async function indexTags() { - const tags = filterIndexableTags(event, opts); - const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); - - if (!tags.length) return; - await trx.insertInto('tags') - .values(rows) - .execute(); - } - - if (isReplaceableKind(event.kind)) { - const prevEvents = await getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey] }).execute(); - for (const prevEvent of prevEvents) { - if (prevEvent.created_at >= event.created_at) { - throw new Error('Cannot replace an event with an older event'); - } - } - await deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey] }]); - } - - if (isParameterizedReplaceableKind(event.kind)) { - const d = event.tags.find(([tag]) => tag === 'd')?.[1]; - if (d) { - const prevEvents = await getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey], '#d': [d] }) - .execute(); - for (const prevEvent of prevEvents) { - if (prevEvent.created_at >= event.created_at) { - throw new Error('Cannot replace an event with an older event'); - } - } - await deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey], '#d': [d] }]); - } - } - - // Run the queries. - await Promise.all([ - addEvent(), - indexTags(), - indexSearch(), - ]); - }).catch((error) => { - // Don't throw for duplicate events. - if (error.message.includes('UNIQUE constraint failed')) { - return; - } else { - throw error; - } - }); -} - -type EventQuery = SelectQueryBuilder; - -/** Build the query for a filter. */ -function getFilterQuery(db: Kysely, filter: DittoFilter): EventQuery { - let query = db - .selectFrom('events') - .select([ - 'events.id', - 'events.kind', - 'events.pubkey', - 'events.content', - 'events.tags', - 'events.created_at', - 'events.sig', - ]) - .orderBy('events.created_at', 'desc'); - - for (const [key, value] of Object.entries(filter)) { - if (value === undefined) continue; - - switch (key as keyof DittoFilter) { - case 'ids': - query = query.where('events.id', 'in', filter.ids!); - break; - case 'kinds': - query = query.where('events.kind', 'in', filter.kinds!); - break; - case 'authors': - query = query.where('events.pubkey', 'in', filter.authors!); - break; - case 'since': - query = query.where('events.created_at', '>=', filter.since!); - break; - case 'until': - query = query.where('events.created_at', '<=', filter.until!); - break; - case 'limit': - query = query.limit(filter.limit!); - break; - } - - if (key.startsWith('#')) { - const tag = key.replace(/^#/, ''); - const value = filter[key as `#${string}`] as string[]; - query = query - .leftJoin('tags', 'tags.event_id', 'events.id') - .where('tags.tag', '=', tag) - .where('tags.value', 'in', value); - } - } - - if (typeof filter.local === 'boolean') { - query = query - .leftJoin(usersQuery, (join) => join.onRef('users.d_tag', '=', 'events.pubkey')) - .where('users.d_tag', filter.local ? 'is not' : 'is', null); - } - - if (filter.relations?.includes('author')) { - query = query - .leftJoin( - (eb) => - eb - .selectFrom('events') - .selectAll() - .where('kind', '=', 0) - .groupBy('pubkey') - .as('authors'), - (join) => join.onRef('authors.pubkey', '=', 'events.pubkey'), - ) - .select([ - 'authors.id as author_id', - 'authors.kind as author_kind', - 'authors.pubkey as author_pubkey', - 'authors.content as author_content', - 'authors.tags as author_tags', - 'authors.created_at as author_created_at', - 'authors.sig as author_sig', - ]); - } - - if (filter.relations?.includes('author_stats')) { - query = query - .leftJoin('author_stats', 'author_stats.pubkey', 'events.pubkey') - .select((eb) => [ - eb.fn.coalesce('author_stats.followers_count', eb.val(0)).as('author_stats_followers_count'), - eb.fn.coalesce('author_stats.following_count', eb.val(0)).as('author_stats_following_count'), - eb.fn.coalesce('author_stats.notes_count', eb.val(0)).as('author_stats_notes_count'), - ]); - } - - if (filter.relations?.includes('event_stats')) { - query = query - .leftJoin('event_stats', 'event_stats.event_id', 'events.id') - .select((eb) => [ - eb.fn.coalesce('event_stats.replies_count', eb.val(0)).as('stats_replies_count'), - eb.fn.coalesce('event_stats.reposts_count', eb.val(0)).as('stats_reposts_count'), - eb.fn.coalesce('event_stats.reactions_count', eb.val(0)).as('stats_reactions_count'), - ]); - } - - if (filter.search) { - query = query - .innerJoin('events_fts', 'events_fts.id', 'events.id') - .where('events_fts.content', 'match', JSON.stringify(filter.search)); - } - - return query; -} - -/** Combine filter queries into a single union query. */ -function getEventsQuery(filters: DittoFilter[]) { - return filters - .map((filter) => db.selectFrom(() => getFilterQuery(db, filter).as('events')).selectAll()) - .reduce((result, query) => result.unionAll(query)); -} - -/** Query to get user events, joined by tags. */ -function usersQuery() { - return getFilterQuery(db, { kinds: [30361], authors: [Conf.pubkey] }) - .leftJoin('tags', 'tags.event_id', 'events.id') - .where('tags.tag', '=', 'd') - .select('tags.value as d_tag') - .as('users'); -} - -/** Get events for filters from the database. */ -async function getEvents( - filters: DittoFilter[], - opts: GetEventsOpts = {}, -): Promise[]> { - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - debug('REQ', JSON.stringify(filters)); - let query = getEventsQuery(filters); - - if (typeof opts.limit === 'number') { - query = query.limit(opts.limit); - } - - return (await query.execute()).map((row) => { - const event: DittoEvent = { - id: row.id, - kind: row.kind as K, - pubkey: row.pubkey, - content: row.content, - created_at: row.created_at, - tags: JSON.parse(row.tags), - sig: row.sig, - }; - - if (row.author_id) { - event.author = { - id: row.author_id, - kind: row.author_kind! as 0, - pubkey: row.author_pubkey!, - content: row.author_content!, - created_at: row.author_created_at!, - tags: JSON.parse(row.author_tags!), - sig: row.author_sig!, - }; - } - - if (typeof row.author_stats_followers_count === 'number') { - event.author_stats = { - followers_count: row.author_stats_followers_count, - following_count: row.author_stats_following_count!, - notes_count: row.author_stats_notes_count!, - }; - } - - if (typeof row.stats_replies_count === 'number') { - event.event_stats = { - replies_count: row.stats_replies_count, - reposts_count: row.stats_reposts_count!, - reactions_count: row.stats_reactions_count!, - }; - } - - return event; - }); -} - -/** Delete events from each table. Should be run in a transaction! */ -async function deleteEventsTrx(db: Kysely, filters: DittoFilter[]) { - if (!filters.length) return Promise.resolve(); - debug('DELETE', JSON.stringify(filters)); - - const query = getEventsQuery(filters).clearSelect().select('id'); - - await db.deleteFrom('events_fts') - .where('id', 'in', () => query) - .execute(); - - return db.deleteFrom('events') - .where('id', 'in', () => query) - .execute(); -} - -/** Delete events based on filters from the database. */ -async function deleteEvents(filters: DittoFilter[]): Promise { - if (!filters.length) return Promise.resolve(); - debug('DELETE', JSON.stringify(filters)); - - await db.transaction().execute((trx) => deleteEventsTrx(trx, filters)); -} - -/** Get number of events that would be returned by filters. */ -async function countEvents(filters: DittoFilter[]): Promise { - if (!filters.length) return Promise.resolve(0); - debug('COUNT', JSON.stringify(filters)); - const query = getEventsQuery(filters); - - const [{ count }] = await query - .clearSelect() - .select((eb) => eb.fn.count('id').as('count')) - .execute(); - - return Number(count); -} - -/** Return only the tags that should be indexed. */ -function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { - const tagCounts: Record = {}; - - function getCount(name: string) { - return tagCounts[name] || 0; - } - - function incrementCount(name: string) { - tagCounts[name] = getCount(name) + 1; - } - - function checkCondition(name: string, value: string, condition: TagCondition) { - return condition({ - event, - opts, - count: getCount(name), - value, - }); - } - - return event.tags.reduce((results, tag) => { - const [name, value] = tag; - const condition = tagConditions[name] as TagCondition | undefined; - - if (value && condition && value.length < 200 && checkCondition(name, value, condition)) { - results.push(tag); - } - - incrementCount(name); - return results; - }, []); -} - -/** Build a search index from the event. */ -function buildSearchContent(event: Event): string { - switch (event.kind) { - case 0: - return buildUserSearchContent(event as Event<0>); - case 1: - return event.content; - default: - return ''; - } -} - -/** Build search content for a user. */ -function buildUserSearchContent(event: Event<0>): string { - const { name, nip05, about } = jsonMetaContentSchema.parse(event.content); - return [name, nip05, about].filter(Boolean).join('\n'); -} - -/** SQLite database storage adapter for Nostr events. */ -const eventsDB: EventStore = { - getEvents, - storeEvent, - countEvents, - deleteEvents, -}; +const eventsDB = new EventsDB(db); export { eventsDB }; diff --git a/src/storages/events-db.test.ts b/src/storages/events-db.test.ts new file mode 100644 index 0000000..349f82b --- /dev/null +++ b/src/storages/events-db.test.ts @@ -0,0 +1,67 @@ +import { db } from '@/db.ts'; +import { buildUserEvent } from '@/db/users.ts'; +import { assertEquals, assertRejects } from '@/deps-test.ts'; + +import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' }; +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { EventsDB } from './events-db.ts'; + +const eventsDB = new EventsDB(db); + +Deno.test('count filters', async () => { + assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 0); + await eventsDB.storeEvent(event1); + assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 1); +}); + +Deno.test('insert and filter events', async () => { + await eventsDB.storeEvent(event1); + + assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]); + assertEquals(await eventsDB.getEvents([{ kinds: [3] }]), []); + assertEquals(await eventsDB.getEvents([{ since: 1691091000 }]), [event1]); + assertEquals(await eventsDB.getEvents([{ until: 1691091000 }]), []); + assertEquals( + await eventsDB.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), + [event1], + ); +}); + +Deno.test('delete events', async () => { + await eventsDB.storeEvent(event1); + assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]); + await eventsDB.deleteEvents([{ kinds: [1] }]); + assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), []); +}); + +Deno.test('query events with local filter', async () => { + await eventsDB.storeEvent(event1); + + assertEquals(await eventsDB.getEvents([{}]), [event1]); + assertEquals(await eventsDB.getEvents([{ local: true }]), []); + assertEquals(await eventsDB.getEvents([{ local: false }]), [event1]); + + const userEvent = await buildUserEvent({ + username: 'alex', + pubkey: event1.pubkey, + inserted_at: new Date(), + admin: false, + }); + await eventsDB.storeEvent(userEvent); + + assertEquals(await eventsDB.getEvents([{ kinds: [1], local: true }]), [event1]); + assertEquals(await eventsDB.getEvents([{ kinds: [1], local: false }]), []); +}); + +Deno.test('inserting replaceable events', async () => { + assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0); + + await eventsDB.storeEvent(event0); + await assertRejects(() => eventsDB.storeEvent(event0)); + assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1); + + const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 }; + await eventsDB.storeEvent(changeEvent); + assertEquals(await eventsDB.getEvents([{ kinds: [0] }]), [changeEvent]); +}); diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts new file mode 100644 index 0000000..9f5391d --- /dev/null +++ b/src/storages/events-db.ts @@ -0,0 +1,410 @@ +import { Conf } from '@/config.ts'; +import { type DittoDB } from '@/db.ts'; +import { Debug, type Event, Kysely, type SelectQueryBuilder } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; +import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; +import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; +import { isNostrId, isURL } from '@/utils.ts'; + +/** Function to decide whether or not to index a tag. */ +type TagCondition = ({ event, count, value }: { + event: Event; + opts: StoreEventOpts; + count: number; + value: string; +}) => boolean; + +/** Conditions for when to index certain tags. */ +const tagConditions: Record = { + 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), + 'e': ({ event, count, value, opts }) => ((opts.data?.user && event.kind === 10003) || count < 15) && isNostrId(value), + 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value), + 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), + 'proxy': ({ count, value }) => count === 0 && isURL(value), + 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), + 't': ({ count, value }) => count < 5 && value.length < 50, + 'name': ({ event, count }) => event.kind === 30361 && count === 0, + 'role': ({ event, count }) => event.kind === 30361 && count === 0, +}; + +type EventQuery = SelectQueryBuilder; + +/** SQLite database storage adapter for Nostr events. */ +class EventsDB implements EventStore { + #db: Kysely; + #debug = Debug('ditto:db:events'); + + constructor(db: Kysely) { + this.#db = db; + } + + /** Insert an event (and its tags) into the database. */ + async storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { + this.#debug('EVENT', JSON.stringify(event)); + + if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) { + throw new Error('Internal events can only be stored by the server keypair'); + } + + return await this.#db.transaction().execute(async (trx) => { + /** Insert the event into the database. */ + async function addEvent() { + await trx.insertInto('events') + .values({ ...event, tags: JSON.stringify(event.tags) }) + .execute(); + } + + /** Add search data to the FTS table. */ + async function indexSearch() { + const searchContent = buildSearchContent(event); + if (!searchContent) return; + await trx.insertInto('events_fts') + .values({ id: event.id, content: searchContent.substring(0, 1000) }) + .execute(); + } + + /** Index event tags depending on the conditions defined above. */ + async function indexTags() { + const tags = filterIndexableTags(event, opts); + const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); + + if (!tags.length) return; + await trx.insertInto('tags') + .values(rows) + .execute(); + } + + if (isReplaceableKind(event.kind)) { + const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey] }).execute(); + for (const prevEvent of prevEvents) { + if (prevEvent.created_at >= event.created_at) { + throw new Error('Cannot replace an event with an older event'); + } + } + await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey] }]); + } + + if (isParameterizedReplaceableKind(event.kind)) { + const d = event.tags.find(([tag]) => tag === 'd')?.[1]; + if (d) { + const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey], '#d': [d] }) + .execute(); + for (const prevEvent of prevEvents) { + if (prevEvent.created_at >= event.created_at) { + throw new Error('Cannot replace an event with an older event'); + } + } + await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey], '#d': [d] }]); + } + } + + // Run the queries. + await Promise.all([ + addEvent(), + indexTags(), + indexSearch(), + ]); + }).catch((error) => { + // Don't throw for duplicate events. + if (error.message.includes('UNIQUE constraint failed')) { + return; + } else { + throw error; + } + }); + } + + /** Build the query for a filter. */ + getFilterQuery(db: Kysely, filter: DittoFilter): EventQuery { + let query = db + .selectFrom('events') + .select([ + 'events.id', + 'events.kind', + 'events.pubkey', + 'events.content', + 'events.tags', + 'events.created_at', + 'events.sig', + ]) + .orderBy('events.created_at', 'desc'); + + for (const [key, value] of Object.entries(filter)) { + if (value === undefined) continue; + + switch (key as keyof DittoFilter) { + case 'ids': + query = query.where('events.id', 'in', filter.ids!); + break; + case 'kinds': + query = query.where('events.kind', 'in', filter.kinds!); + break; + case 'authors': + query = query.where('events.pubkey', 'in', filter.authors!); + break; + case 'since': + query = query.where('events.created_at', '>=', filter.since!); + break; + case 'until': + query = query.where('events.created_at', '<=', filter.until!); + break; + case 'limit': + query = query.limit(filter.limit!); + break; + } + + if (key.startsWith('#')) { + const tag = key.replace(/^#/, ''); + const value = filter[key as `#${string}`] as string[]; + query = query + .leftJoin('tags', 'tags.event_id', 'events.id') + .where('tags.tag', '=', tag) + .where('tags.value', 'in', value); + } + } + + if (typeof filter.local === 'boolean') { + query = query + .leftJoin(this.usersQuery, (join) => join.onRef('users.d_tag', '=', 'events.pubkey')) + .where('users.d_tag', filter.local ? 'is not' : 'is', null); + } + + if (filter.relations?.includes('author')) { + query = query + .leftJoin( + (eb) => + eb + .selectFrom('events') + .selectAll() + .where('kind', '=', 0) + .groupBy('pubkey') + .as('authors'), + (join) => join.onRef('authors.pubkey', '=', 'events.pubkey'), + ) + .select([ + 'authors.id as author_id', + 'authors.kind as author_kind', + 'authors.pubkey as author_pubkey', + 'authors.content as author_content', + 'authors.tags as author_tags', + 'authors.created_at as author_created_at', + 'authors.sig as author_sig', + ]); + } + + if (filter.relations?.includes('author_stats')) { + query = query + .leftJoin('author_stats', 'author_stats.pubkey', 'events.pubkey') + .select((eb) => [ + eb.fn.coalesce('author_stats.followers_count', eb.val(0)).as('author_stats_followers_count'), + eb.fn.coalesce('author_stats.following_count', eb.val(0)).as('author_stats_following_count'), + eb.fn.coalesce('author_stats.notes_count', eb.val(0)).as('author_stats_notes_count'), + ]); + } + + if (filter.relations?.includes('event_stats')) { + query = query + .leftJoin('event_stats', 'event_stats.event_id', 'events.id') + .select((eb) => [ + eb.fn.coalesce('event_stats.replies_count', eb.val(0)).as('stats_replies_count'), + eb.fn.coalesce('event_stats.reposts_count', eb.val(0)).as('stats_reposts_count'), + eb.fn.coalesce('event_stats.reactions_count', eb.val(0)).as('stats_reactions_count'), + ]); + } + + if (filter.search) { + query = query + .innerJoin('events_fts', 'events_fts.id', 'events.id') + .where('events_fts.content', 'match', JSON.stringify(filter.search)); + } + + return query; + } + + /** Combine filter queries into a single union query. */ + getEventsQuery(filters: DittoFilter[]) { + return filters + .map((filter) => this.#db.selectFrom(() => this.getFilterQuery(this.#db, filter).as('events')).selectAll()) + .reduce((result, query) => result.unionAll(query)); + } + + /** Query to get user events, joined by tags. */ + usersQuery() { + return this.getFilterQuery(this.#db, { kinds: [30361], authors: [Conf.pubkey] }) + .leftJoin('tags', 'tags.event_id', 'events.id') + .where('tags.tag', '=', 'd') + .select('tags.value as d_tag') + .as('users'); + } + + /** Get events for filters from the database. */ + async getEvents( + filters: DittoFilter[], + opts: GetEventsOpts = {}, + ): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + this.#debug('REQ', JSON.stringify(filters)); + let query = this.getEventsQuery(filters); + + if (typeof opts.limit === 'number') { + query = query.limit(opts.limit); + } + + return (await query.execute()).map((row) => { + const event: DittoEvent = { + id: row.id, + kind: row.kind as K, + pubkey: row.pubkey, + content: row.content, + created_at: row.created_at, + tags: JSON.parse(row.tags), + sig: row.sig, + }; + + if (row.author_id) { + event.author = { + id: row.author_id, + kind: row.author_kind! as 0, + pubkey: row.author_pubkey!, + content: row.author_content!, + created_at: row.author_created_at!, + tags: JSON.parse(row.author_tags!), + sig: row.author_sig!, + }; + } + + if (typeof row.author_stats_followers_count === 'number') { + event.author_stats = { + followers_count: row.author_stats_followers_count, + following_count: row.author_stats_following_count!, + notes_count: row.author_stats_notes_count!, + }; + } + + if (typeof row.stats_replies_count === 'number') { + event.event_stats = { + replies_count: row.stats_replies_count, + reposts_count: row.stats_reposts_count!, + reactions_count: row.stats_reactions_count!, + }; + } + + return event; + }); + } + + /** Delete events from each table. Should be run in a transaction! */ + async deleteEventsTrx(db: Kysely, filters: DittoFilter[]) { + if (!filters.length) return Promise.resolve(); + this.#debug('DELETE', JSON.stringify(filters)); + + const query = this.getEventsQuery(filters).clearSelect().select('id'); + + await db.deleteFrom('events_fts') + .where('id', 'in', () => query) + .execute(); + + return db.deleteFrom('events') + .where('id', 'in', () => query) + .execute(); + } + + /** Delete events based on filters from the database. */ + async deleteEvents(filters: DittoFilter[]): Promise { + if (!filters.length) return Promise.resolve(); + this.#debug('DELETE', JSON.stringify(filters)); + + await this.#db.transaction().execute((trx) => this.deleteEventsTrx(trx, filters)); + } + + /** Get number of events that would be returned by filters. */ + async countEvents(filters: DittoFilter[]): Promise { + if (!filters.length) return Promise.resolve(0); + this.#debug('COUNT', JSON.stringify(filters)); + const query = this.getEventsQuery(filters); + + const [{ count }] = await query + .clearSelect() + .select((eb) => eb.fn.count('id').as('count')) + .execute(); + + return Number(count); + } +} + +/** Return only the tags that should be indexed. */ +function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { + const tagCounts: Record = {}; + + function getCount(name: string) { + return tagCounts[name] || 0; + } + + function incrementCount(name: string) { + tagCounts[name] = getCount(name) + 1; + } + + function checkCondition(name: string, value: string, condition: TagCondition) { + return condition({ + event, + opts, + count: getCount(name), + value, + }); + } + + return event.tags.reduce((results, tag) => { + const [name, value] = tag; + const condition = tagConditions[name] as TagCondition | undefined; + + if (value && condition && value.length < 200 && checkCondition(name, value, condition)) { + results.push(tag); + } + + incrementCount(name); + return results; + }, []); +} + +/** Build a search index from the event. */ +function buildSearchContent(event: Event): string { + switch (event.kind) { + case 0: + return buildUserSearchContent(event as Event<0>); + case 1: + return event.content; + default: + return ''; + } +} + +/** Build search content for a user. */ +function buildUserSearchContent(event: Event<0>): string { + const { name, nip05, about } = jsonMetaContentSchema.parse(event.content); + return [name, nip05, about].filter(Boolean).join('\n'); +} + +export { EventsDB }; From 80e6147927823cd91d1a83490e4477ea9198590f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 15:28:03 -0600 Subject: [PATCH 06/23] Make reqmeister a storage --- src/reqmeister.ts | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 27a97da..2196e39 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,6 +1,14 @@ import { client } from '@/client.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; -import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; +import { + AuthorMicrofilter, + eventToMicroFilter, + getFilterId, + IdMicrofilter, + isMicrofilter, + type MicroFilter, +} from '@/filter.ts'; +import { type EventStore, GetEventsOpts } from '@/store.ts'; import { Time } from '@/utils/time.ts'; const debug = Debug('ditto:reqmeister'); @@ -18,7 +26,7 @@ interface ReqmeisterReqOpts { type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; /** Batches requests to Nostr relays using microfilters. */ -class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { +class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> implements EventStore { #opts: ReqmeisterOpts; #queue: ReqmeisterQueueItem[] = []; #promise!: Promise; @@ -119,6 +127,33 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an const filterId = getFilterId(eventToMicroFilter(event)); return this.#queue.some(([id]) => id === filterId); } + + getEvents(filters: Filter[], opts?: GetEventsOpts | undefined): Promise[]> { + if (opts?.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + + const promises = filters.reduce>[]>((result, filter) => { + if (isMicrofilter(filter)) { + result.push(this.req(filter) as Promise>); + } + return result; + }, []); + + return Promise.all(promises); + } + + storeEvent(event: Event): Promise { + this.encounter(event); + return Promise.resolve(); + } + + countEvents(_filters: Filter[]): Promise { + throw new Error('COUNT not implemented.'); + } + + deleteEvents(_filters: Filter[]): Promise { + throw new Error('DELETE not implemented.'); + } } const reqmeister = new Reqmeister({ From 5bffffe07b6d268f7cb7f6336b869b1382bf519c Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 16:35:40 -0600 Subject: [PATCH 07/23] Add a function to calculate the intrinsic limit of a filter --- src/filter.test.ts | 12 +++++++++++- src/filter.ts | 24 ++++++++++++++++++++++++ src/storages/memorelay.ts | 6 ++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/filter.test.ts b/src/filter.test.ts index efc00d7..b1ea872 100644 --- a/src/filter.test.ts +++ b/src/filter.test.ts @@ -4,7 +4,7 @@ import { assertEquals } from '@/deps-test.ts'; import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' }; import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; -import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts'; +import { eventToMicroFilter, getFilterId, getFilterLimit, getMicroFilters, isMicrofilter } from './filter.ts'; Deno.test('getMicroFilters', () => { const event = event0 as Event<0>; @@ -35,3 +35,13 @@ Deno.test('getFilterId', () => { '{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}', ); }); + +Deno.test('getFilterLimit', () => { + assertEquals(getFilterLimit({ ids: [event0.id] }), 1); + assertEquals(getFilterLimit({ ids: [event0.id], limit: 2 }), 1); + assertEquals(getFilterLimit({ ids: [event0.id], limit: 0 }), 0); + assertEquals(getFilterLimit({ ids: [event0.id], limit: -1 }), 0); + assertEquals(getFilterLimit({ kinds: [0], authors: [event0.pubkey] }), 1); + assertEquals(getFilterLimit({ kinds: [1], authors: [event0.pubkey] }), Infinity); + assertEquals(getFilterLimit({}), Infinity); +}); diff --git a/src/filter.ts b/src/filter.ts index 430a8d3..b98d44e 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -2,6 +2,7 @@ import { Conf } from '@/config.ts'; import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; import { type EventData } from '@/types.ts'; +import { isReplaceableKind } from '@/kinds.ts'; /** Additional properties that may be added by Ditto to events. */ type Relation = 'author' | 'author_stats' | 'event_stats'; @@ -82,11 +83,34 @@ function isMicrofilter(filter: Filter): filter is MicroFilter { return microFilterSchema.safeParse(filter).success; } +/** Calculate the intrinsic limit of a filter. */ +function getFilterLimit(filter: Filter): number { + if (filter.ids && !filter.ids.length) return 0; + if (filter.kinds && !filter.kinds.length) return 0; + if (filter.authors && !filter.authors.length) return 0; + + return Math.min( + Math.max(0, filter.limit ?? Infinity), + filter.ids?.length ?? Infinity, + filter.authors?.length && + filter.kinds?.every((kind) => isReplaceableKind(kind)) + ? filter.authors.length * filter.kinds.length + : Infinity, + ); +} + +/** Returns true if the filter could potentially return any stored events at all. */ +function canFilter(filter: Filter): boolean { + return getFilterLimit(filter) > 0; +} + export { type AuthorMicrofilter, + canFilter, type DittoFilter, eventToMicroFilter, getFilterId, + getFilterLimit, getMicroFilters, type IdMicrofilter, isMicrofilter, diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 71c68c6..66c2c17 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,4 +1,5 @@ import { Debug, type Event, type Filter, LRUCache, matchFilter, matchFilters } from '@/deps.ts'; +import { canFilter, getFilterLimit } from '@/filter.ts'; import { type EventStore, type GetEventsOpts } from '@/store.ts'; /** In-memory data store for events. */ @@ -27,6 +28,7 @@ class Memorelay implements EventStore { /** Get events from memory. */ getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); + filters = filters.filter(canFilter); if (!filters.length) return Promise.resolve([]); this.#debug('REQ', JSON.stringify(filters)); @@ -37,7 +39,7 @@ class Memorelay implements EventStore { let index = 0; for (const filter of filters) { - const limit = filter.limit ?? Infinity; + const limit = getFilterLimit(filter); const usage = usages[index] ?? 0; if (usage >= limit) { @@ -50,7 +52,7 @@ class Memorelay implements EventStore { index++; } - if (filters.every((filter, index) => usages[index] >= (filter.limit ?? Infinity))) { + if (filters.every((filter, index) => usages[index] >= getFilterLimit(filter))) { break; } } From c235fa612313ac02977ed6dcb607da73e2631de8 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 18:15:28 -0600 Subject: [PATCH 08/23] Memorelay: normalize filters --- src/filter.ts | 12 ++++++++++++ src/storages/memorelay.ts | 8 ++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/filter.ts b/src/filter.ts index b98d44e..f59d6c1 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -104,6 +104,17 @@ function canFilter(filter: Filter): boolean { return getFilterLimit(filter) > 0; } +/** Normalize the `limit` of each filter, and remove filters that can't produce any events. */ +function normalizeFilters(filters: F[]): F[] { + return filters.reduce((acc, filter) => { + const limit = getFilterLimit(filter); + if (limit > 0) { + acc.push(limit === Infinity ? filter : { ...filter, limit }); + } + return acc; + }, []); +} + export { type AuthorMicrofilter, canFilter, @@ -116,5 +127,6 @@ export { isMicrofilter, matchDittoFilters, type MicroFilter, + normalizeFilters, type Relation, }; diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 66c2c17..ec744ea 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,5 +1,5 @@ import { Debug, type Event, type Filter, LRUCache, matchFilter, matchFilters } from '@/deps.ts'; -import { canFilter, getFilterLimit } from '@/filter.ts'; +import { normalizeFilters } from '@/filter.ts'; import { type EventStore, type GetEventsOpts } from '@/store.ts'; /** In-memory data store for events. */ @@ -28,7 +28,7 @@ class Memorelay implements EventStore { /** Get events from memory. */ getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); - filters = filters.filter(canFilter); + filters = normalizeFilters(filters); if (!filters.length) return Promise.resolve([]); this.#debug('REQ', JSON.stringify(filters)); @@ -39,7 +39,7 @@ class Memorelay implements EventStore { let index = 0; for (const filter of filters) { - const limit = getFilterLimit(filter); + const limit = filter.limit ?? Infinity; const usage = usages[index] ?? 0; if (usage >= limit) { @@ -52,7 +52,7 @@ class Memorelay implements EventStore { index++; } - if (filters.every((filter, index) => usages[index] >= getFilterLimit(filter))) { + if (filters.every((filter, index) => filter.limit && (usages[index] >= filter.limit))) { break; } } From 48ce1ba6c9319a09f8d0942fe130c79e1da84295 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 18:48:11 -0600 Subject: [PATCH 09/23] Memorelay: do some premature optimizations --- src/storages/memorelay.ts | 60 ++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index ec744ea..b0f7104 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,4 +1,4 @@ -import { Debug, type Event, type Filter, LRUCache, matchFilter, matchFilters } from '@/deps.ts'; +import { Debug, type Event, type Filter, LRUCache, matchFilter } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; import { type EventStore, type GetEventsOpts } from '@/store.ts'; @@ -27,32 +27,62 @@ class Memorelay implements EventStore { /** Get events from memory. */ getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { - if (opts.signal?.aborted) return Promise.resolve([]); filters = normalizeFilters(filters); + + if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + this.#debug('REQ', JSON.stringify(filters)); + /** Event results to return. */ const results: Event[] = []; - const usages: number[] = []; + /** Number of times an event has been added to results for each filter. */ + const filterUsages: number[] = []; + + /** Check if all filters have been satisfied. */ + function checkSatisfied() { + return results.length >= (opts.limit ?? Infinity) || + filters.every((filter, index) => filter.limit && (filterUsages[index] >= filter.limit)); + } + + // Optimize for filters with IDs. + filters.forEach((filter, index) => { + if (filter.ids) { + for (const id of filter.ids) { + const event = this.#cache.get(id); + if (event && matchFilter(filter, event)) { + results.push(event as Event); + } + } + filterUsages[index] = Infinity; + } + }); + + // Return early if all filters are satisfied. + if (checkSatisfied()) { + return Promise.resolve(results); + } + + // Seek through all events in memory. for (const event of this.#events()) { - let index = 0; - - for (const filter of filters) { + filters.forEach((filter, index) => { const limit = filter.limit ?? Infinity; - const usage = usages[index] ?? 0; + const usage = filterUsages[index] ?? 0; if (usage >= limit) { - continue; + return; } else if (matchFilter(filter, event)) { results.push(event as Event); - usages[index] = usage + 1; + this.#cache.get(event.id); + filterUsages[index] = usage + 1; } index++; - } + }); - if (filters.every((filter, index) => filter.limit && (usages[index] >= filter.limit))) { + // Check after each event if we can return. + if (checkSatisfied()) { break; } } @@ -73,11 +103,9 @@ class Memorelay implements EventStore { } /** Delete events from memory. */ - deleteEvents(filters: Filter[]): Promise { - for (const event of this.#events()) { - if (matchFilters(filters, event)) { - this.#cache.delete(event.id); - } + async deleteEvents(filters: Filter[]): Promise { + for (const event of await this.getEvents(filters)) { + this.#cache.delete(event.id); } return Promise.resolve(); } From 939eeae25afda5bb5dfa36aff3863f82f94ea717 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 20:22:02 -0600 Subject: [PATCH 10/23] Add Optimizer storage with EventSet --- src/storages/optimizer.ts | 93 ++++++++++++++++++++++++++++++ src/utils/event-set.test.ts | 109 ++++++++++++++++++++++++++++++++++++ src/utils/event-set.ts | 77 +++++++++++++++++++++++++ 3 files changed, 279 insertions(+) create mode 100644 src/storages/optimizer.ts create mode 100644 src/utils/event-set.test.ts create mode 100644 src/utils/event-set.ts diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts new file mode 100644 index 0000000..4b4ed9e --- /dev/null +++ b/src/storages/optimizer.ts @@ -0,0 +1,93 @@ +import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; +import { type DittoFilter, normalizeFilters } from '@/filter.ts'; +import { EventSet } from '@/utils/event-set.ts'; + +interface OptimizerOpts { + db: EventStore; + cache: EventStore; + client: EventStore; +} + +class Optimizer implements EventStore { + #db: EventStore; + #cache: EventStore; + #client: EventStore; + + constructor(opts: OptimizerOpts) { + this.#db = opts.db; + this.#cache = opts.cache; + this.#client = opts.client; + } + + async storeEvent(event: DittoEvent, opts?: StoreEventOpts | undefined): Promise { + await Promise.all([ + this.#db.storeEvent(event, opts), + this.#cache.storeEvent(event, opts), + ]); + } + + async getEvents( + filters: DittoFilter[], + opts: GetEventsOpts | undefined = {}, + ): Promise[]> { + const { limit = Infinity } = opts; + filters = normalizeFilters(filters); + + if (opts?.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + + const results = new EventSet>(); + + // Filters with IDs are immutable, so we can take them straight from the cache if we have them. + for (let i = 0; i < filters.length; i++) { + const filter = filters[i]; + if (filter.ids) { + const ids = new Set(filter.ids); + for (const event of await this.#cache.getEvents([filter], opts)) { + ids.delete(event.id); + results.add(event); + if (results.size >= limit) return getResults(); + } + filters[i] = { ...filter, ids: [...ids] }; + } + } + + filters = normalizeFilters(filters); + if (!filters.length) return getResults(); + + // Query the database for events. + for (const dbEvent of await this.#db.getEvents(filters, opts)) { + results.add(dbEvent); + if (results.size >= limit) return getResults(); + } + + // Query the cache again. + for (const cacheEvent of await this.#cache.getEvents(filters, opts)) { + results.add(cacheEvent); + if (results.size >= limit) return getResults(); + } + + // Finally, query the client. + for (const clientEvent of await this.#client.getEvents(filters, opts)) { + results.add(clientEvent); + if (results.size >= limit) return getResults(); + } + + /** Get return type from map. */ + function getResults() { + return [...results.values()]; + } + + return getResults(); + } + + countEvents(_filters: DittoFilter[]): Promise { + throw new Error('COUNT not implemented.'); + } + + deleteEvents(_filters: DittoFilter[]): Promise { + throw new Error('DELETE not implemented.'); + } +} + +export { Optimizer }; diff --git a/src/utils/event-set.test.ts b/src/utils/event-set.test.ts new file mode 100644 index 0000000..b6e26b9 --- /dev/null +++ b/src/utils/event-set.test.ts @@ -0,0 +1,109 @@ +import { assertEquals } from '@/deps-test.ts'; + +import { EventSet } from './event-set.ts'; + +Deno.test('EventSet', () => { + const set = new EventSet(); + assertEquals(set.size, 0); + + const event = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] }; + set.add(event); + assertEquals(set.size, 1); + assertEquals(set.has(event), true); + + set.add(event); + assertEquals(set.size, 1); + assertEquals(set.has(event), true); + + set.delete(event); + assertEquals(set.size, 0); + assertEquals(set.has(event), false); + + set.delete(event); + assertEquals(set.size, 0); + assertEquals(set.has(event), false); + + set.add(event); + assertEquals(set.size, 1); + assertEquals(set.has(event), true); + + set.clear(); + assertEquals(set.size, 0); + assertEquals(set.has(event), false); +}); + +Deno.test('EventSet.add (replaceable)', () => { + const event0 = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] }; + const event1 = { id: '2', kind: 0, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [] }; + const event2 = { id: '3', kind: 0, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [] }; + + const set = new EventSet(); + set.add(event0); + assertEquals(set.size, 1); + assertEquals(set.has(event0), true); + + set.add(event1); + assertEquals(set.size, 1); + assertEquals(set.has(event0), false); + assertEquals(set.has(event1), true); + + set.add(event2); + assertEquals(set.size, 1); + assertEquals(set.has(event0), false); + assertEquals(set.has(event1), false); + assertEquals(set.has(event2), true); +}); + +Deno.test('EventSet.add (parameterized)', () => { + const event0 = { id: '1', kind: 30000, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [['d', 'a']] }; + const event1 = { id: '2', kind: 30000, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [['d', 'a']] }; + const event2 = { id: '3', kind: 30000, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [['d', 'a']] }; + + const set = new EventSet(); + set.add(event0); + assertEquals(set.size, 1); + assertEquals(set.has(event0), true); + + set.add(event1); + assertEquals(set.size, 1); + assertEquals(set.has(event0), false); + assertEquals(set.has(event1), true); + + set.add(event2); + assertEquals(set.size, 1); + assertEquals(set.has(event0), false); + assertEquals(set.has(event1), false); + assertEquals(set.has(event2), true); +}); + +Deno.test('EventSet.eventReplaces', () => { + const event0 = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] }; + const event1 = { id: '2', kind: 0, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [] }; + const event2 = { id: '3', kind: 0, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [] }; + const event3 = { id: '4', kind: 0, pubkey: 'def', content: '', created_at: 0, sig: '', tags: [] }; + + assertEquals(EventSet.eventReplaces(event1, event0), true); + assertEquals(EventSet.eventReplaces(event2, event0), true); + assertEquals(EventSet.eventReplaces(event2, event1), true); + + assertEquals(EventSet.eventReplaces(event0, event1), false); + assertEquals(EventSet.eventReplaces(event0, event2), false); + assertEquals(EventSet.eventReplaces(event1, event2), false); + + assertEquals(EventSet.eventReplaces(event3, event1), false); + assertEquals(EventSet.eventReplaces(event1, event3), false); +}); + +Deno.test('EventSet.eventReplaces (parameterized)', () => { + const event0 = { id: '1', kind: 30000, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [['d', 'a']] }; + const event1 = { id: '2', kind: 30000, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [['d', 'a']] }; + const event2 = { id: '3', kind: 30000, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [['d', 'a']] }; + + assertEquals(EventSet.eventReplaces(event1, event0), true); + assertEquals(EventSet.eventReplaces(event2, event0), true); + assertEquals(EventSet.eventReplaces(event2, event1), true); + + assertEquals(EventSet.eventReplaces(event0, event1), false); + assertEquals(EventSet.eventReplaces(event0, event2), false); + assertEquals(EventSet.eventReplaces(event1, event2), false); +}); diff --git a/src/utils/event-set.ts b/src/utils/event-set.ts new file mode 100644 index 0000000..92ac417 --- /dev/null +++ b/src/utils/event-set.ts @@ -0,0 +1,77 @@ +import { type Event } from '@/deps.ts'; +import { isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; + +/** In-memory store for Nostr events with replaceable event functionality. */ +class EventSet implements Set { + #map = new Map(); + + get size() { + return this.#map.size; + } + + add(event: E): this { + if (isReplaceableKind(event.kind) || isParameterizedReplaceableKind(event.kind)) { + for (const e of this.values()) { + if (EventSet.eventReplaces(event, e)) { + this.delete(e); + } + } + } + this.#map.set(event.id, event); + return this; + } + + clear(): void { + this.#map.clear(); + } + + delete(event: E): boolean { + return this.#map.delete(event.id); + } + + forEach(callbackfn: (event: E, key: E, set: Set) => void, thisArg?: any): void { + return this.#map.forEach((event, _id) => callbackfn(event, event, this), thisArg); + } + + has(event: E): boolean { + return this.#map.has(event.id); + } + + *entries(): IterableIterator<[E, E]> { + for (const event of this.#map.values()) { + yield [event, event]; + } + } + + keys(): IterableIterator { + return this.#map.values(); + } + + values(): IterableIterator { + return this.#map.values(); + } + + [Symbol.iterator](): IterableIterator { + return this.#map.values(); + } + + [Symbol.toStringTag]: string = 'EventSet'; + + /** Returns true if both events are replaceable, belong to the same pubkey (and `d` tag, for parameterized events), and the first event is newer than the second one. */ + static eventReplaces(event: Event, event2: Event): boolean { + if (isReplaceableKind(event.kind)) { + return event.kind === event2.kind && event.pubkey === event2.pubkey && event.created_at > event2.created_at; + } else if (isParameterizedReplaceableKind(event.kind)) { + const d = event.tags.find(([name]) => name === 'd')?.[1] || ''; + const d2 = event2.tags.find(([name]) => name === 'd')?.[1] || ''; + + return event.kind === event2.kind && + event.pubkey === event2.pubkey && + d === d2 && + event.created_at > event2.created_at; + } + return false; + } +} + +export { EventSet }; From 384bb729b4c3e053e3cae99695e9aa7c3a7cafc5 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 20:59:46 -0600 Subject: [PATCH 11/23] EventsDB: fix `this` binding of usersQuery --- src/storages/events-db.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 9f5391d..4fcaf49 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -187,7 +187,7 @@ class EventsDB implements EventStore { if (typeof filter.local === 'boolean') { query = query - .leftJoin(this.usersQuery, (join) => join.onRef('users.d_tag', '=', 'events.pubkey')) + .leftJoin(() => this.usersQuery(), (join) => join.onRef('users.d_tag', '=', 'events.pubkey')) .where('users.d_tag', filter.local ? 'is not' : 'is', null); } From 8ab0fefbf2e9f7bfc04a58e1545efd5420fd0c89 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 21:19:44 -0600 Subject: [PATCH 12/23] Memorelay: use EventSet data structure --- src/storages/memorelay.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index b0f7104..fab00c0 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,6 +1,7 @@ import { Debug, type Event, type Filter, LRUCache, matchFilter } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; import { type EventStore, type GetEventsOpts } from '@/store.ts'; +import { EventSet } from '@/utils/event-set.ts'; /** In-memory data store for events. */ class Memorelay implements EventStore { @@ -35,14 +36,14 @@ class Memorelay implements EventStore { this.#debug('REQ', JSON.stringify(filters)); /** Event results to return. */ - const results: Event[] = []; + const results = new EventSet>(); /** Number of times an event has been added to results for each filter. */ const filterUsages: number[] = []; /** Check if all filters have been satisfied. */ function checkSatisfied() { - return results.length >= (opts.limit ?? Infinity) || + return results.size >= (opts.limit ?? Infinity) || filters.every((filter, index) => filter.limit && (filterUsages[index] >= filter.limit)); } @@ -52,7 +53,7 @@ class Memorelay implements EventStore { for (const id of filter.ids) { const event = this.#cache.get(id); if (event && matchFilter(filter, event)) { - results.push(event as Event); + results.add(event as Event); } } filterUsages[index] = Infinity; @@ -61,7 +62,7 @@ class Memorelay implements EventStore { // Return early if all filters are satisfied. if (checkSatisfied()) { - return Promise.resolve(results); + return Promise.resolve([...results]); } // Seek through all events in memory. @@ -73,7 +74,7 @@ class Memorelay implements EventStore { if (usage >= limit) { return; } else if (matchFilter(filter, event)) { - results.push(event as Event); + results.add(event as Event); this.#cache.get(event.id); filterUsages[index] = usage + 1; } @@ -87,7 +88,7 @@ class Memorelay implements EventStore { } } - return Promise.resolve(results); + return Promise.resolve([...results]); } /** Insert an event into memory. */ From a4bc951eee09a25f8c4f6d79b2df748a4ff72dbd Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 21:39:54 -0600 Subject: [PATCH 13/23] Add `supportedNips` to all storages --- src/client.ts | 1 + src/reqmeister.ts | 2 ++ src/storages/events-db.ts | 3 +++ src/storages/memorelay.ts | 8 +++----- src/storages/optimizer.ts | 2 ++ src/store.ts | 2 ++ 6 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/client.ts b/src/client.ts index 511cbc7..75b216f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -59,6 +59,7 @@ function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { } const client: EventStore = { + supportedNips: [1], getEvents, storeEvent, countEvents: () => Promise.reject(new Error('COUNT not implemented')), diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 2196e39..8cc7e0d 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -32,6 +32,8 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an #promise!: Promise; #resolve!: () => void; + supportedNips = []; + constructor(opts: ReqmeisterOpts = {}) { super(); this.#opts = opts; diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 4fcaf49..c5536da 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -56,6 +56,9 @@ class EventsDB implements EventStore { #db: Kysely; #debug = Debug('ditto:db:events'); + /** NIPs supported by this storage method. */ + supportedNips = [1, 45, 50]; + constructor(db: Kysely) { this.#db = db; } diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index fab00c0..85f4826 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -8,15 +8,13 @@ class Memorelay implements EventStore { #debug = Debug('ditto:memorelay'); #cache: LRUCache; + /** NIPs supported by this storage method. */ + supportedNips = [1, 45]; + constructor(...args: ConstructorParameters>) { this.#cache = new LRUCache(...args); } - /** NIPs supported by this storage method. */ - get supportedNips(): number[] { - return [1]; - } - /** Iterate stored events. */ *#events(): Generator { for (const event of this.#cache.values()) { diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts index 4b4ed9e..f70fa7f 100644 --- a/src/storages/optimizer.ts +++ b/src/storages/optimizer.ts @@ -13,6 +13,8 @@ class Optimizer implements EventStore { #cache: EventStore; #client: EventStore; + supportedNips = [1]; + constructor(opts: OptimizerOpts) { this.#db = opts.db; this.#cache = opts.cache; diff --git a/src/store.ts b/src/store.ts index 5567406..c89e9cc 100644 --- a/src/store.ts +++ b/src/store.ts @@ -33,6 +33,8 @@ interface DittoEvent extends Event { /** Storage interface for Nostr events. */ interface EventStore { + /** Indicates NIPs supported by this data store, similar to NIP-11. For example, `50` would indicate support for `search` filters. */ + supportedNips: readonly number[]; /** Add an event to the store. */ storeEvent(event: Event, opts?: StoreEventOpts): Promise; /** Get events from filters. */ From b3e4b794391c50f789e6ba2685611eb30d193928 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 21:55:53 -0600 Subject: [PATCH 14/23] Delete db/events.ts, db/memorelay.ts, move to @/storages.ts --- scripts/db.ts | 2 +- src/controllers/api/accounts.ts | 2 +- src/controllers/api/blocks.ts | 2 +- src/controllers/api/bookmarks.ts | 2 +- src/controllers/api/notifications.ts | 2 +- src/controllers/api/pleroma.ts | 2 +- src/controllers/api/search.ts | 2 +- src/controllers/api/timelines.ts | 2 +- src/controllers/nostr/relay.ts | 2 +- src/db/events.ts | 6 ------ src/db/memorelay.ts | 10 ---------- src/db/users.ts | 2 +- src/pipeline.ts | 3 +-- src/queries.ts | 3 +-- src/stats.ts | 2 +- src/storages.ts | 11 +++++++++++ src/utils/api.ts | 2 +- src/views.ts | 2 +- src/views/mastodon/relationships.ts | 2 +- src/views/mastodon/statuses.ts | 4 ++-- 20 files changed, 29 insertions(+), 36 deletions(-) delete mode 100644 src/db/events.ts delete mode 100644 src/db/memorelay.ts create mode 100644 src/storages.ts diff --git a/scripts/db.ts b/scripts/db.ts index cdc5174..cbaca91 100644 --- a/scripts/db.ts +++ b/scripts/db.ts @@ -1,8 +1,8 @@ import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; -import { eventsDB } from '@/db/events.ts'; import { type Kysely } from '@/deps.ts'; import { signAdminEvent } from '@/sign.ts'; +import { eventsDB } from '@/storages.ts'; interface DB { users: { diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 8316ede..34518cd 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,12 +1,12 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; -import { eventsDB } from '@/db/events.ts'; import { insertUser } from '@/db/users.ts'; import { findReplyTag, nip19, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { getAuthor, getFollowedPubkeys } from '@/queries.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; +import { eventsDB } from '@/storages.ts'; import { addTag, deleteTag, getTagSet } from '@/tags.ts'; import { uploadFile } from '@/upload.ts'; import { lookupAccount, nostrNow } from '@/utils.ts'; diff --git a/src/controllers/api/blocks.ts b/src/controllers/api/blocks.ts index 2ff4f3e..b45e1c6 100644 --- a/src/controllers/api/blocks.ts +++ b/src/controllers/api/blocks.ts @@ -1,5 +1,5 @@ import { type AppController } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { renderAccounts } from '@/views.ts'; diff --git a/src/controllers/api/bookmarks.ts b/src/controllers/api/bookmarks.ts index 78edff0..81cd011 100644 --- a/src/controllers/api/bookmarks.ts +++ b/src/controllers/api/bookmarks.ts @@ -1,5 +1,5 @@ import { type AppController } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { renderStatuses } from '@/views.ts'; diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index f50f891..43c974a 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -1,5 +1,5 @@ import { type AppController } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import { paginated, paginationSchema } from '@/utils/api.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index 3b84227..04370e4 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -1,7 +1,7 @@ import { type AppController } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; import { z } from '@/deps.ts'; import { configSchema, elixirTupleSchema } from '@/schemas/pleroma-api.ts'; +import { eventsDB } from '@/storages.ts'; import { createAdminEvent } from '@/utils/api.ts'; import { Conf } from '@/config.ts'; diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index a514d66..1f34b07 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -1,9 +1,9 @@ import { AppController } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; import { type Event, nip19, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { booleanParamSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; +import { eventsDB } from '@/storages.ts'; import { dedupeEvents } from '@/utils.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index 6d93280..47496a0 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -1,4 +1,4 @@ -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import { z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { getFeedPubkeys } from '@/queries.ts'; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index a1f4de0..f1c3511 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,5 @@ import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; import { diff --git a/src/db/events.ts b/src/db/events.ts deleted file mode 100644 index 7b06cdc..0000000 --- a/src/db/events.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { db } from '@/db.ts'; -import { EventsDB } from '@/storages/events-db.ts'; - -const eventsDB = new EventsDB(db); - -export { eventsDB }; diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts deleted file mode 100644 index 3550346..0000000 --- a/src/db/memorelay.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { Memorelay } from '@/storages/memorelay.ts'; - -/** In-memory data store for events using microfilters. */ -const memorelay = new Memorelay({ - max: 3000, - maxEntrySize: 5000, - sizeCalculation: (event) => JSON.stringify(event).length, -}); - -export { memorelay }; diff --git a/src/db/users.ts b/src/db/users.ts index 6b7f9ef..62340e1 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -1,8 +1,8 @@ import { Conf } from '@/config.ts'; import { Debug, type Filter } from '@/deps.ts'; -import { eventsDB } from '@/db/events.ts'; import * as pipeline from '@/pipeline.ts'; import { signAdminEvent } from '@/sign.ts'; +import { eventsDB } from '@/storages.ts'; const debug = Debug('ditto:users'); diff --git a/src/pipeline.ts b/src/pipeline.ts index 69ae941..a237b72 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,7 +1,5 @@ import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; -import { eventsDB } from '@/db/events.ts'; -import { memorelay } from '@/db/memorelay.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; @@ -10,6 +8,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; +import { eventsDB, memorelay } from '@/storages.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; import { type EventData } from '@/types.ts'; diff --git a/src/queries.ts b/src/queries.ts index cfeeb36..29c07a7 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,5 +1,4 @@ -import { eventsDB } from '@/db/events.ts'; -import { memorelay } from '@/db/memorelay.ts'; +import { eventsDB, memorelay } from '@/storages.ts'; import { Debug, type Event, findReplyTag } from '@/deps.ts'; import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; import { reqmeister } from '@/reqmeister.ts'; diff --git a/src/stats.ts b/src/stats.ts index e09791f..fd08cf5 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,6 +1,6 @@ import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; -import { eventsDB } from '@/db/events.ts'; import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; +import { eventsDB } from '@/storages.ts'; type AuthorStat = keyof Omit; type EventStat = keyof Omit; diff --git a/src/storages.ts b/src/storages.ts new file mode 100644 index 0000000..15aadc6 --- /dev/null +++ b/src/storages.ts @@ -0,0 +1,11 @@ +import { db } from '@/db.ts'; +import { EventsDB } from '@/storages/events-db.ts'; +import { Memorelay } from '@/storages/memorelay.ts'; + +/** SQLite database to store events this Ditto server cares about. */ +const eventsDB = new EventsDB(db); + +/** In-memory data store for cached events. */ +const memorelay = new Memorelay({ max: 3000 }); + +export { eventsDB, memorelay }; diff --git a/src/utils/api.ts b/src/utils/api.ts index 9a3eaf0..dbedc6c 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -13,8 +13,8 @@ import { } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { signAdminEvent, signEvent } from '@/sign.ts'; +import { eventsDB } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; -import { eventsDB } from '@/db/events.ts'; const debug = Debug('ditto:api'); diff --git a/src/views.ts b/src/views.ts index c90f26d..9ade600 100644 --- a/src/views.ts +++ b/src/views.ts @@ -1,6 +1,6 @@ import { AppContext } from '@/app.ts'; -import { eventsDB } from '@/db/events.ts'; import { type Filter } from '@/deps.ts'; +import { eventsDB } from '@/storages.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; import { paginated, paginationSchema } from '@/utils/api.ts'; diff --git a/src/views/mastodon/relationships.ts b/src/views/mastodon/relationships.ts index ca2778b..43ed7b8 100644 --- a/src/views/mastodon/relationships.ts +++ b/src/views/mastodon/relationships.ts @@ -1,4 +1,4 @@ -import { eventsDB } from '@/db/events.ts'; +import { eventsDB } from '@/storages.ts'; import { hasTag } from '@/tags.ts'; async function renderRelationship(sourcePubkey: string, targetPubkey: string) { diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index a951e0a..c0b65c2 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -1,12 +1,12 @@ import { isCWTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts'; import { Conf } from '@/config.ts'; -import { eventsDB } from '@/db/events.ts'; import { findReplyTag, nip19 } from '@/deps.ts'; import { getMediaLinks, parseNoteContent } from '@/note.ts'; import { getAuthor } from '@/queries.ts'; import { jsonMediaDataSchema } from '@/schemas/nostr.ts'; -import { DittoEvent } from '@/store.ts'; +import { eventsDB } from '@/storages.ts'; +import { type DittoEvent } from '@/store.ts'; import { nostrDate } from '@/utils.ts'; import { unfurlCardCached } from '@/utils/unfurl.ts'; import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts'; From 96dd6356615ab00a271983b5c9b5306065991c86 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 22:02:34 -0600 Subject: [PATCH 15/23] @/store.ts -> @/storages/types.ts --- src/client.ts | 2 +- src/queries.ts | 2 +- src/reqmeister.ts | 2 +- src/storages/events-db.ts | 3 ++- src/storages/memorelay.ts | 3 ++- src/storages/optimizer.ts | 3 ++- src/{store.ts => storages/types.ts} | 0 src/views/mastodon/accounts.ts | 2 +- src/views/mastodon/statuses.ts | 2 +- 9 files changed, 11 insertions(+), 8 deletions(-) rename src/{store.ts => storages/types.ts} (100%) diff --git a/src/client.ts b/src/client.ts index 75b216f..993242f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,7 +1,7 @@ import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; +import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; const debug = Debug('ditto:client'); diff --git a/src/queries.ts b/src/queries.ts index 29c07a7..56a8b67 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -2,7 +2,7 @@ import { eventsDB, memorelay } from '@/storages.ts'; import { Debug, type Event, findReplyTag } from '@/deps.ts'; import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; import { reqmeister } from '@/reqmeister.ts'; -import { type DittoEvent } from '@/store.ts'; +import { type DittoEvent } from '@/storages/types.ts'; import { getTagSet } from '@/tags.ts'; const debug = Debug('ditto:queries'); diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 8cc7e0d..48b1370 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -8,7 +8,7 @@ import { isMicrofilter, type MicroFilter, } from '@/filter.ts'; -import { type EventStore, GetEventsOpts } from '@/store.ts'; +import { type EventStore, GetEventsOpts } from '@/storages/types.ts'; import { Time } from '@/utils/time.ts'; const debug = Debug('ditto:reqmeister'); diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index c5536da..143f4f6 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -4,9 +4,10 @@ import { Debug, type Event, Kysely, type SelectQueryBuilder } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; -import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; import { isNostrId, isURL } from '@/utils.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts'; + /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: Event; diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index 85f4826..bfa2a32 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -1,8 +1,9 @@ import { Debug, type Event, type Filter, LRUCache, matchFilter } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; -import { type EventStore, type GetEventsOpts } from '@/store.ts'; import { EventSet } from '@/utils/event-set.ts'; +import { type EventStore, type GetEventsOpts } from './types.ts'; + /** In-memory data store for events. */ class Memorelay implements EventStore { #debug = Debug('ditto:memorelay'); diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts index f70fa7f..5bd4af8 100644 --- a/src/storages/optimizer.ts +++ b/src/storages/optimizer.ts @@ -1,7 +1,8 @@ -import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts'; import { type DittoFilter, normalizeFilters } from '@/filter.ts'; import { EventSet } from '@/utils/event-set.ts'; +import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts'; + interface OptimizerOpts { db: EventStore; cache: EventStore; diff --git a/src/store.ts b/src/storages/types.ts similarity index 100% rename from src/store.ts rename to src/storages/types.ts diff --git a/src/views/mastodon/accounts.ts b/src/views/mastodon/accounts.ts index eae3dd7..620a1b5 100644 --- a/src/views/mastodon/accounts.ts +++ b/src/views/mastodon/accounts.ts @@ -2,8 +2,8 @@ import { Conf } from '@/config.ts'; import { findUser } from '@/db/users.ts'; import { lodash, nip19, type UnsignedEvent } from '@/deps.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; +import { type DittoEvent } from '@/storages/types.ts'; import { verifyNip05Cached } from '@/utils/nip05.ts'; -import { type DittoEvent } from '@/store.ts'; import { Nip05, nostrDate, nostrNow, parseNip05 } from '@/utils.ts'; import { renderEmojis } from '@/views/mastodon/emojis.ts'; diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index c0b65c2..9e45f49 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -6,7 +6,7 @@ import { getMediaLinks, parseNoteContent } from '@/note.ts'; import { getAuthor } from '@/queries.ts'; import { jsonMediaDataSchema } from '@/schemas/nostr.ts'; import { eventsDB } from '@/storages.ts'; -import { type DittoEvent } from '@/store.ts'; +import { type DittoEvent } from '@/storages/types.ts'; import { nostrDate } from '@/utils.ts'; import { unfurlCardCached } from '@/utils/unfurl.ts'; import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts'; From b71124cc30bbecb550e9454e367cf629479c8e80 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 22:15:20 -0600 Subject: [PATCH 16/23] EventSet: event2 -> target --- src/utils/event-set.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/utils/event-set.ts b/src/utils/event-set.ts index 92ac417..3fd06f0 100644 --- a/src/utils/event-set.ts +++ b/src/utils/event-set.ts @@ -57,18 +57,18 @@ class EventSet implements Set { [Symbol.toStringTag]: string = 'EventSet'; - /** Returns true if both events are replaceable, belong to the same pubkey (and `d` tag, for parameterized events), and the first event is newer than the second one. */ - static eventReplaces(event: Event, event2: Event): boolean { + /** Returns true if both events are replaceable, belong to the same kind and pubkey (and `d` tag, for parameterized events), and the first event is newer than the second one. */ + static eventReplaces(event: Event, target: Event): boolean { if (isReplaceableKind(event.kind)) { - return event.kind === event2.kind && event.pubkey === event2.pubkey && event.created_at > event2.created_at; + return event.kind === target.kind && event.pubkey === target.pubkey && event.created_at > target.created_at; } else if (isParameterizedReplaceableKind(event.kind)) { const d = event.tags.find(([name]) => name === 'd')?.[1] || ''; - const d2 = event2.tags.find(([name]) => name === 'd')?.[1] || ''; + const d2 = target.tags.find(([name]) => name === 'd')?.[1] || ''; - return event.kind === event2.kind && - event.pubkey === event2.pubkey && + return event.kind === target.kind && + event.pubkey === target.pubkey && d === d2 && - event.created_at > event2.created_at; + event.created_at > target.created_at; } return false; } From 9c0a16ebcd246b77e587d48e3b91321f1acba186 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 3 Jan 2024 22:20:30 -0600 Subject: [PATCH 17/23] client: use EventSet, normalizeFilters --- src/client.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/client.ts b/src/client.ts index 993242f..34d4544 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,18 +1,23 @@ import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; +import { normalizeFilters } from '@/filter.ts'; import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { EventSet } from '@/utils/event-set.ts'; const debug = Debug('ditto:client'); /** Get events from a NIP-01 filter. */ function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { + filters = normalizeFilters(filters); + if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); return new Promise((resolve) => { - const results: Event[] = []; + const results = new EventSet>(); const unsub = pool.subscribe( filters, @@ -20,9 +25,9 @@ function getEvents(filters: Filter[], opts: GetEventsOpts = (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); - results.push({ + results.add({ id: event.id, - kind: event.kind, + kind: event.kind as K, pubkey: event.pubkey, content: event.content, tags: event.tags, @@ -30,21 +35,21 @@ function getEvents(filters: Filter[], opts: GetEventsOpts = sig: event.sig, }); } - if (typeof opts.limit === 'number' && results.length >= opts.limit) { + if (typeof opts.limit === 'number' && results.size >= opts.limit) { unsub(); - resolve(results as Event[]); + resolve([...results]); } }, undefined, () => { unsub(); - resolve(results as Event[]); + resolve([...results]); }, ); opts.signal?.addEventListener('abort', () => { unsub(); - resolve(results as Event[]); + resolve([...results]); }); }); } From ee7347df1091d713c8ae6e87d756049e80aacdf3 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 00:07:45 -0600 Subject: [PATCH 18/23] SearchStore works! --- src/config.ts | 4 +++ src/controllers/api/search.ts | 6 ++-- src/storages.ts | 10 +++++- src/storages/search-store.ts | 67 +++++++++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 4 deletions(-) create mode 100644 src/storages/search-store.ts diff --git a/src/config.ts b/src/config.ts index 246bf5a..8c2626a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -42,6 +42,10 @@ const Conf = { const { protocol, host } = Conf.url; return `${protocol === 'https:' ? 'wss:' : 'ws:'}//${host}/relay`; }, + /** Relay to use for NIP-50 `search` queries. */ + get searchRelay() { + return Deno.env.get('SEARCH_RELAY'); + }, /** Origin of the Ditto server, including the protocol and port. */ get localDomain() { return Deno.env.get('LOCAL_DOMAIN') || 'http://localhost:8000'; diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index 1f34b07..2b640d5 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -3,7 +3,7 @@ import { type Event, nip19, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { booleanParamSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { eventsDB } from '@/storages.ts'; +import { searchStore } from '@/storages.ts'; import { dedupeEvents } from '@/utils.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; @@ -76,7 +76,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise { const filters = await getLookupFilters(query); - const [event] = await eventsDB.getEvents(filters, { limit: 1, signal }); + const [event] = await searchStore.getEvents(filters, { limit: 1, signal }); return event; } diff --git a/src/storages.ts b/src/storages.ts index 15aadc6..045a362 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,6 +1,8 @@ +import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Memorelay } from '@/storages/memorelay.ts'; +import { SearchStore } from '@/storages/search-store.ts'; /** SQLite database to store events this Ditto server cares about. */ const eventsDB = new EventsDB(db); @@ -8,4 +10,10 @@ const eventsDB = new EventsDB(db); /** In-memory data store for cached events. */ const memorelay = new Memorelay({ max: 3000 }); -export { eventsDB, memorelay }; +/** Storage to use for remote search. */ +const searchStore = new SearchStore({ + relay: Conf.searchRelay, + fallback: eventsDB, +}); + +export { eventsDB, memorelay, searchStore }; diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts new file mode 100644 index 0000000..7c58a5f --- /dev/null +++ b/src/storages/search-store.ts @@ -0,0 +1,67 @@ +import { NiceRelay } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/5f4fb59c90c092e5aa59c01e6556a4bec264c167/mod.ts'; + +import { Debug, type Event, type Filter } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; +import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { EventSet } from '@/utils/event-set.ts'; + +interface SearchStoreOpts { + relay: string | undefined; + fallback: EventStore; +} + +class SearchStore implements EventStore { + #debug = Debug('ditto:storages:search'); + + #fallback: EventStore; + #relay: NiceRelay | undefined; + + supportedNips = [50]; + + constructor(opts: SearchStoreOpts) { + this.#fallback = opts.fallback; + + if (opts.relay) { + this.#relay = new NiceRelay(opts.relay); + } + } + + storeEvent(_event: Event, _opts?: StoreEventOpts | undefined): Promise { + throw new Error('EVENT not implemented.'); + } + + async getEvents( + filters: DittoFilter[], + opts?: GetEventsOpts | undefined, + ): Promise[]> { + this.#debug('REQ', JSON.stringify(filters)); + + if (this.#relay) { + this.#debug(`Searching for "${filters[0]?.search}" at ${this.#relay.socket.url}...`); + + const sub = this.#relay.req(filters, opts); + sub.eoseSignal.onabort = () => sub.close(); + const events = new EventSet>(); + + for await (const event of sub) { + this.#debug('EVENT', JSON.stringify(event)); + events.add(event); + } + + return [...events]; + } else { + this.#debug(`Searching for "${filters[0]?.search}" locally...`); + return this.#fallback.getEvents(filters, opts); + } + } + + countEvents(_filters: Filter[]): Promise { + throw new Error('COUNT not implemented.'); + } + + deleteEvents(_filters: Filter[]): Promise { + throw new Error('DELETE not implemented.'); + } +} + +export { SearchStore }; From 2595f1aadfee517fc10ccb8ad7d37b762201e826 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 00:20:22 -0600 Subject: [PATCH 19/23] SearchStorage: try fetching authors --- src/storages/search-store.ts | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index 7c58a5f..e957946 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -1,7 +1,7 @@ import { NiceRelay } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/5f4fb59c90c092e5aa59c01e6556a4bec264c167/mod.ts'; import { Debug, type Event, type Filter } from '@/deps.ts'; -import { type DittoFilter } from '@/filter.ts'; +import { type DittoFilter, normalizeFilters } from '@/filter.ts'; import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; import { EventSet } from '@/utils/event-set.ts'; @@ -34,23 +34,32 @@ class SearchStore implements EventStore { filters: DittoFilter[], opts?: GetEventsOpts | undefined, ): Promise[]> { + filters = normalizeFilters(filters); this.#debug('REQ', JSON.stringify(filters)); + const query = filters[0]?.search; if (this.#relay) { - this.#debug(`Searching for "${filters[0]?.search}" at ${this.#relay.socket.url}...`); + this.#debug(`Searching for "${query}" at ${this.#relay.socket.url}...`); const sub = this.#relay.req(filters, opts); sub.eoseSignal.onabort = () => sub.close(); const events = new EventSet>(); for await (const event of sub) { - this.#debug('EVENT', JSON.stringify(event)); events.add(event); } + if (filters[0]?.relations?.includes('author')) { + const authorIds = new Set([...events].map((event) => event.pubkey)); + const authors = await this.getEvents([{ kinds: [0], authors: [...authorIds] }], opts); + for (const event of events) { + event.author = authors.find((author) => author.id === event.pubkey); + } + } + return [...events]; } else { - this.#debug(`Searching for "${filters[0]?.search}" locally...`); + this.#debug(`Searching for "${query}" locally...`); return this.#fallback.getEvents(filters, opts); } } From 6d80b43335c8ba5a8c4c347d79a9f8b90b4ea9aa Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 00:23:37 -0600 Subject: [PATCH 20/23] SearchStore: bail early for empty filters --- src/storages/search-store.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index e957946..c5a0a3c 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -35,6 +35,10 @@ class SearchStore implements EventStore { opts?: GetEventsOpts | undefined, ): Promise[]> { filters = normalizeFilters(filters); + + if (opts?.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + this.#debug('REQ', JSON.stringify(filters)); const query = filters[0]?.search; From d170eb6d8e934f3f36ead9d54a8c73968e5018b9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 00:52:55 -0600 Subject: [PATCH 21/23] SearchStorage: make author relations work --- src/storages.ts | 13 +++++++++++-- src/storages/optimizer.ts | 15 +++++++++++++++ src/storages/search-store.ts | 2 +- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/storages.ts b/src/storages.ts index 045a362..db00b51 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,7 +2,9 @@ import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Memorelay } from '@/storages/memorelay.ts'; +import { Optimizer } from '@/storages/optimizer.ts'; import { SearchStore } from '@/storages/search-store.ts'; +import { reqmeister } from '@/reqmeister.ts'; /** SQLite database to store events this Ditto server cares about. */ const eventsDB = new EventsDB(db); @@ -10,10 +12,17 @@ const eventsDB = new EventsDB(db); /** In-memory data store for cached events. */ const memorelay = new Memorelay({ max: 3000 }); +/** Main Ditto storage adapter */ +const optimizer = new Optimizer({ + db: eventsDB, + cache: memorelay, + client: reqmeister, +}); + /** Storage to use for remote search. */ const searchStore = new SearchStore({ relay: Conf.searchRelay, - fallback: eventsDB, + fallback: optimizer, }); -export { eventsDB, memorelay, searchStore }; +export { eventsDB, memorelay, optimizer, searchStore }; diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts index 5bd4af8..e0d18fb 100644 --- a/src/storages/optimizer.ts +++ b/src/storages/optimizer.ts @@ -1,3 +1,4 @@ +import { Debug } from '@/deps.ts'; import { type DittoFilter, normalizeFilters } from '@/filter.ts'; import { EventSet } from '@/utils/event-set.ts'; @@ -10,6 +11,8 @@ interface OptimizerOpts { } class Optimizer implements EventStore { + #debug = Debug('ditto:optimizer'); + #db: EventStore; #cache: EventStore; #client: EventStore; @@ -33,6 +36,8 @@ class Optimizer implements EventStore { filters: DittoFilter[], opts: GetEventsOpts | undefined = {}, ): Promise[]> { + this.#debug('REQ', JSON.stringify(filters)); + const { limit = Infinity } = opts; filters = normalizeFilters(filters); @@ -45,6 +50,7 @@ class Optimizer implements EventStore { for (let i = 0; i < filters.length; i++) { const filter = filters[i]; if (filter.ids) { + this.#debug(`Filter[${i}] is an IDs filter; querying cache...`); const ids = new Set(filter.ids); for (const event of await this.#cache.getEvents([filter], opts)) { ids.delete(event.id); @@ -59,18 +65,27 @@ class Optimizer implements EventStore { if (!filters.length) return getResults(); // Query the database for events. + this.#debug('Querying database...'); for (const dbEvent of await this.#db.getEvents(filters, opts)) { results.add(dbEvent); if (results.size >= limit) return getResults(); } + // We already searched the DB, so stop if this is a search filter. + if (filters.some((filter) => typeof filter.search === 'string')) { + this.#debug(`Bailing early for search filter: "${filters[0]?.search}"`); + return getResults(); + } + // Query the cache again. + this.#debug('Querying cache...'); for (const cacheEvent of await this.#cache.getEvents(filters, opts)) { results.add(cacheEvent); if (results.size >= limit) return getResults(); } // Finally, query the client. + this.#debug('Querying client...'); for (const clientEvent of await this.#client.getEvents(filters, opts)) { results.add(clientEvent); if (results.size >= limit) return getResults(); diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index c5a0a3c..7dd2de8 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -57,7 +57,7 @@ class SearchStore implements EventStore { const authorIds = new Set([...events].map((event) => event.pubkey)); const authors = await this.getEvents([{ kinds: [0], authors: [...authorIds] }], opts); for (const event of events) { - event.author = authors.find((author) => author.id === event.pubkey); + event.author = authors.find((author) => author.pubkey === event.pubkey); } } From 412f71599ade7a94d6e5b9a58a5337deb7e1290b Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 01:44:56 -0600 Subject: [PATCH 22/23] Add hydrator module to hydrate relationships on events --- src/storages/hydrate.ts | 26 ++++++++++++++++++++++++++ src/storages/search-store.ts | 14 +++++--------- 2 files changed, 31 insertions(+), 9 deletions(-) create mode 100644 src/storages/hydrate.ts diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts new file mode 100644 index 0000000..3edca14 --- /dev/null +++ b/src/storages/hydrate.ts @@ -0,0 +1,26 @@ +import { type DittoFilter } from '@/filter.ts'; +import { type DittoEvent, type EventStore } from '@/storages/types.ts'; + +interface HydrateEventOpts { + events: DittoEvent[]; + filters: DittoFilter[]; + storage: EventStore; +} + +/** Hydrate event relationships using the provided storage. */ +async function hydrateEvents(opts: HydrateEventOpts): Promise[]> { + const { events, filters, storage } = opts; + + if (filters.some((filter) => filter.relations?.includes('author'))) { + const pubkeys = new Set([...events].map((event) => event.pubkey)); + const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }]); + + for (const event of events) { + event.author = authors.find((author) => author.pubkey === event.pubkey); + } + } + + return events; +} + +export { hydrateEvents }; diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index 7dd2de8..a289734 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -2,24 +2,28 @@ import { NiceRelay } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/5f import { Debug, type Event, type Filter } from '@/deps.ts'; import { type DittoFilter, normalizeFilters } from '@/filter.ts'; +import { hydrateEvents } from '@/storages/hydrate.ts'; import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; import { EventSet } from '@/utils/event-set.ts'; interface SearchStoreOpts { relay: string | undefined; fallback: EventStore; + hydrator?: EventStore; } class SearchStore implements EventStore { #debug = Debug('ditto:storages:search'); #fallback: EventStore; + #hydrator: EventStore; #relay: NiceRelay | undefined; supportedNips = [50]; constructor(opts: SearchStoreOpts) { this.#fallback = opts.fallback; + this.#hydrator = opts.hydrator ?? this; if (opts.relay) { this.#relay = new NiceRelay(opts.relay); @@ -53,15 +57,7 @@ class SearchStore implements EventStore { events.add(event); } - if (filters[0]?.relations?.includes('author')) { - const authorIds = new Set([...events].map((event) => event.pubkey)); - const authors = await this.getEvents([{ kinds: [0], authors: [...authorIds] }], opts); - for (const event of events) { - event.author = authors.find((author) => author.pubkey === event.pubkey); - } - } - - return [...events]; + return hydrateEvents({ events: [...events], filters, storage: this.#hydrator }); } else { this.#debug(`Searching for "${query}" locally...`); return this.#fallback.getEvents(filters, opts); From 89af83c660e21e65b4f0b5922b5515c774f99d11 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 4 Jan 2024 02:09:23 -0600 Subject: [PATCH 23/23] search: fix abort signals --- src/controllers/api/search.ts | 12 +++++++----- src/storages/hydrate.ts | 5 +++-- src/storages/search-store.ts | 13 +++++++++++-- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index 2b640d5..d18694c 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -30,9 +30,11 @@ const searchController: AppController = async (c) => { return c.json({ error: 'Bad request', schema: result.error }, 422); } + const signal = AbortSignal.timeout(1000); + const [event, events] = await Promise.all([ - lookupEvent(result.data), - searchEvents(result.data), + lookupEvent(result.data, signal), + searchEvents(result.data, signal), ]); if (event) { @@ -62,7 +64,7 @@ const searchController: AppController = async (c) => { }; /** Get events for the search params. */ -function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise { +function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise { if (type === 'hashtags') return Promise.resolve([]); const filter: DittoFilter = { @@ -76,7 +78,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise { +async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise { const filters = await getLookupFilters(query); const [event] = await searchStore.getEvents(filters, { limit: 1, signal }); return event; diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 3edca14..4d8edcb 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -5,15 +5,16 @@ interface HydrateEventOpts { events: DittoEvent[]; filters: DittoFilter[]; storage: EventStore; + signal?: AbortSignal; } /** Hydrate event relationships using the provided storage. */ async function hydrateEvents(opts: HydrateEventOpts): Promise[]> { - const { events, filters, storage } = opts; + const { events, filters, storage, signal } = opts; if (filters.some((filter) => filter.relations?.includes('author'))) { const pubkeys = new Set([...events].map((event) => event.pubkey)); - const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }]); + const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }], { signal }); for (const event of events) { event.author = authors.find((author) => author.pubkey === event.pubkey); diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index a289734..30e6ba1 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -50,14 +50,23 @@ class SearchStore implements EventStore { this.#debug(`Searching for "${query}" at ${this.#relay.socket.url}...`); const sub = this.#relay.req(filters, opts); - sub.eoseSignal.onabort = () => sub.close(); + + const close = () => { + sub.close(); + opts?.signal?.removeEventListener('abort', close); + sub.eoseSignal.removeEventListener('abort', close); + }; + + opts?.signal?.addEventListener('abort', close, { once: true }); + sub.eoseSignal.addEventListener('abort', close, { once: true }); + const events = new EventSet>(); for await (const event of sub) { events.add(event); } - return hydrateEvents({ events: [...events], filters, storage: this.#hydrator }); + return hydrateEvents({ events: [...events], filters, storage: this.#hydrator, signal: opts?.signal }); } else { this.#debug(`Searching for "${query}" locally...`); return this.#fallback.getEvents(filters, opts);