diff --git a/src/kinds.ts b/src/kinds.ts deleted file mode 100644 index 7953837..0000000 --- a/src/kinds.ts +++ /dev/null @@ -1,46 +0,0 @@ -/** Events are **regular**, which means they're all expected to be stored by relays. */ -function isRegularKind(kind: number) { - return (1000 <= kind && kind < 10000) || [1, 2, 4, 5, 6, 7, 8, 16, 40, 41, 42, 43, 44].includes(kind); -} - -/** Events are **replaceable**, which means that, for each combination of `pubkey` and `kind`, only the latest event is expected to (SHOULD) be stored by relays, older versions are expected to be discarded. */ -function isReplaceableKind(kind: number) { - return (10000 <= kind && kind < 20000) || [0, 3].includes(kind); -} - -/** Events are **ephemeral**, which means they are not expected to be stored by relays. */ -function isEphemeralKind(kind: number) { - return 20000 <= kind && kind < 30000; -} - -/** Events are **parameterized replaceable**, which means that, for each combination of `pubkey`, `kind` and the `d` tag, only the latest event is expected to be stored by relays, older versions are expected to be discarded. */ -function isParameterizedReplaceableKind(kind: number) { - return 30000 <= kind && kind < 40000; -} - -/** These events are only valid if published by the server keypair. */ -function isDittoInternalKind(kind: number) { - return kind === 30361; -} - -/** Classification of the event kind. */ -type KindClassification = 'regular' | 'replaceable' | 'ephemeral' | 'parameterized' | 'unknown'; - -/** Determine the classification of this kind of event if known, or `unknown`. */ -function classifyKind(kind: number): KindClassification { - if (isRegularKind(kind)) return 'regular'; - if (isReplaceableKind(kind)) return 'replaceable'; - if (isEphemeralKind(kind)) return 'ephemeral'; - if (isParameterizedReplaceableKind(kind)) return 'parameterized'; - return 'unknown'; -} - -export { - classifyKind, - isDittoInternalKind, - isEphemeralKind, - isParameterizedReplaceableKind, - isRegularKind, - isReplaceableKind, - type KindClassification, -}; diff --git a/src/pipeline.ts b/src/pipeline.ts index 48e5c38..ec14179 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,4 +1,4 @@ -import { NostrEvent, NPolicy, NSchema as n } from '@nostrify/nostrify'; +import { NKinds, NostrEvent, NPolicy, NSchema as n } from '@nostrify/nostrify'; import { LNURL } from '@nostrify/nostrify/ln'; import { PipePolicy } from '@nostrify/nostrify/policies'; import Debug from '@soapbox/stickynotes/debug'; @@ -8,7 +8,6 @@ import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { RelayError } from '@/RelayError.ts'; import { updateStats } from '@/stats.ts'; @@ -103,7 +102,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { - if (isEphemeralKind(event.kind)) return; + if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); const [deletion] = await store.query( diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 19caf5a..22101ef 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -1,13 +1,13 @@ -import { NIP50, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify'; -import Debug from '@soapbox/stickynotes/debug'; -import { Kysely, type SelectQueryBuilder } from 'kysely'; -import { sortEvents } from 'nostr-tools'; +// deno-lint-ignore-file require-await + +import { NDatabase, NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify'; +import { Stickynotes } from '@soapbox/stickynotes'; +import { Kysely } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { normalizeFilters } from '@/filter.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; import { isNostrId, isURL } from '@/utils.ts'; import { abortError } from '@/utils/abort.ts'; @@ -19,218 +19,131 @@ type TagCondition = ({ event, count, value }: { value: string; }) => boolean; -/** Conditions for when to index certain tags. */ -const tagConditions: Record = { - 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), - 'e': ({ event, count, value }) => ((event.user && event.kind === 10003) || count < 15) && isNostrId(value), - 'L': ({ event, count }) => event.kind === 1985 || count === 0, - 'l': ({ event, count }) => event.kind === 1985 || count === 0, - 'media': ({ event, count, value }) => (event.user || count < 4) && isURL(value), - 'P': ({ count, value }) => count === 0 && isNostrId(value), - 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), - 'proxy': ({ count, value }) => count === 0 && isURL(value), - 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), - 't': ({ count, value }) => count < 5 && value.length < 50, - 'name': ({ event, count }) => event.kind === 30361 && count === 0, - 'role': ({ event, count }) => event.kind === 30361 && count === 0, -}; - -type EventQuery = SelectQueryBuilder; - /** SQLite database storage adapter for Nostr events. */ class EventsDB implements NStore { - #db: Kysely; - #debug = Debug('ditto:db:events'); - private protocol = Conf.databaseUrl.protocol; + private store: NDatabase; + private console = new Stickynotes('ditto:db:events'); - constructor(db: Kysely) { - this.#db = db; + /** Conditions for when to index certain tags. */ + static tagConditions: Record = { + 'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind), + 'e': ({ event, count, value }) => ((event.user && event.kind === 10003) || count < 15) && isNostrId(value), + 'L': ({ event, count }) => event.kind === 1985 || count === 0, + 'l': ({ event, count }) => event.kind === 1985 || count === 0, + 'media': ({ event, count, value }) => (event.user || count < 4) && isURL(value), + 'P': ({ count, value }) => count === 0 && isNostrId(value), + 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), + 'proxy': ({ count, value }) => count === 0 && isURL(value), + 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), + 't': ({ count, value }) => count < 5 && value.length < 50, + 'name': ({ event, count }) => event.kind === 30361 && count === 0, + 'role': ({ event, count }) => event.kind === 30361 && count === 0, + }; + + constructor(private kysely: Kysely) { + this.store = new NDatabase(kysely, { + fts5: Conf.databaseUrl.protocol === 'sqlite:', + indexTags: EventsDB.indexTags, + searchText: EventsDB.searchText, + }); } /** Insert an event (and its tags) into the database. */ async event(event: NostrEvent, _opts?: { signal?: AbortSignal }): Promise { event = purifyEvent(event); - this.#debug('EVENT', JSON.stringify(event)); - - if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) { - throw new Error('Internal events can only be stored by the server keypair'); - } - - return await this.#db.transaction().execute(async (trx) => { - /** Insert the event into the database. */ - async function addEvent() { - await trx.insertInto('nostr_events') - .values({ ...event, tags: JSON.stringify(event.tags) }) - .execute(); - } - - const protocol = this.protocol; - /** Add search data to the FTS table. */ - async function indexSearch() { - if (protocol !== 'sqlite:') return; - const searchContent = buildSearchContent(event); - if (!searchContent) return; - await trx.insertInto('nostr_fts5') - .values({ event_id: event.id, content: searchContent.substring(0, 1000) }) - .execute(); - } - - /** Index event tags depending on the conditions defined above. */ - async function indexTags() { - const tags = filterIndexableTags(event); - const rows = tags.map(([name, value]) => ({ event_id: event.id, name, value })); - - if (!tags.length) return; - await trx.insertInto('nostr_tags') - .values(rows) - .execute(); - } - - if (isReplaceableKind(event.kind)) { - const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey] }).execute(); - for (const prevEvent of prevEvents) { - if (prevEvent.created_at >= event.created_at) { - throw new Error('Cannot replace an event with an older event'); - } - } - await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey] }]); - } - - if (isParameterizedReplaceableKind(event.kind)) { - const d = event.tags.find(([tag]) => tag === 'd')?.[1]; - if (d) { - const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey], '#d': [d] }) - .execute(); - for (const prevEvent of prevEvents) { - if (prevEvent.created_at >= event.created_at) { - throw new Error('Cannot replace an event with an older event'); - } - } - await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey], '#d': [d] }]); - } - } - - // Run the queries. - await Promise.all([ - addEvent(), - indexTags(), - indexSearch(), - ]); - }).catch((error) => { - // Don't throw for duplicate events. - if (error.message.includes('UNIQUE constraint failed')) { - return; - } else { - throw error; - } - }); + this.console.debug('EVENT', JSON.stringify(event)); + return this.store.event(event); } - /** Build the query for a filter. */ - getFilterQuery(db: Kysely, filter: NostrFilter): EventQuery { - let query = db - .selectFrom('nostr_events') - .select([ - 'nostr_events.id', - 'nostr_events.kind', - 'nostr_events.pubkey', - 'nostr_events.content', - 'nostr_events.tags', - 'nostr_events.created_at', - 'nostr_events.sig', - ]) - .where('nostr_events.deleted_at', 'is', null); + /** Get events for filters from the database. */ + async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { + filters = await this.expandFilters(filters); - /** Whether we are querying for replaceable events by author. */ - const isAddrQuery = filter.authors && - filter.kinds && - filter.kinds.every((kind) => isReplaceableKind(kind) || isParameterizedReplaceableKind(kind)); + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); - // Avoid ORDER BY when querying for replaceable events by author. - if (!isAddrQuery) { - query = query.orderBy('nostr_events.created_at', 'desc'); - } + this.console.debug('REQ', JSON.stringify(filters)); - for (const [key, value] of Object.entries(filter)) { - if (value === undefined) continue; - - switch (key as keyof NostrFilter) { - case 'ids': - query = query.where('nostr_events.id', 'in', filter.ids!); - break; - case 'kinds': - query = query.where('nostr_events.kind', 'in', filter.kinds!); - break; - case 'authors': - query = query.where('nostr_events.pubkey', 'in', filter.authors!); - break; - case 'since': - query = query.where('nostr_events.created_at', '>=', filter.since!); - break; - case 'until': - query = query.where('nostr_events.created_at', '<=', filter.until!); - break; - case 'limit': - query = query.limit(filter.limit!); - break; - } - } - - const joinedQuery = query.leftJoin('nostr_tags', 'nostr_tags.event_id', 'nostr_events.id'); - - for (const [key, value] of Object.entries(filter)) { - if (key.startsWith('#') && Array.isArray(value)) { - const name = key.replace(/^#/, ''); - query = joinedQuery - .where('nostr_tags.name', '=', name) - .where('nostr_tags.value', 'in', value); - } - } - - if (filter.search && this.protocol === 'sqlite:') { - query = query - .innerJoin('nostr_fts5', 'nostr_fts5.event_id', 'nostr_events.id') - .where('nostr_fts5.content', 'match', JSON.stringify(filter.search)); - } - - return query; + return this.store.query(filters, opts); } - /** Combine filter queries into a single union query. */ - getEventsQuery(filters: NostrFilter[]) { - return filters - .map((filter) => this.#db.selectFrom(() => this.getFilterQuery(this.#db, filter).as('events')).selectAll()) - .reduce((result, query) => result.unionAll(query)); + /** Delete events based on filters from the database. */ + async remove(filters: NostrFilter[], _opts?: { signal?: AbortSignal }): Promise { + if (!filters.length) return Promise.resolve(); + this.console.debug('DELETE', JSON.stringify(filters)); + + return this.store.remove(filters); } - /** Query to get user events, joined by tags. */ - usersQuery() { - return this.getFilterQuery(this.#db, { kinds: [30361], authors: [Conf.pubkey] }) - .leftJoin('nostr_tags', 'nostr_tags.event_id', 'nostr_events.id') - .where('nostr_tags.name', '=', 'd') - .select('nostr_tags.value as d_tag') - .as('users'); + /** Get number of events that would be returned by filters. */ + async count( + filters: NostrFilter[], + opts: { signal?: AbortSignal } = {}, + ): Promise<{ count: number; approximate: boolean }> { + if (opts.signal?.aborted) return Promise.reject(abortError()); + if (!filters.length) return Promise.resolve({ count: 0, approximate: false }); + + this.console.debug('COUNT', JSON.stringify(filters)); + + return this.store.count(filters); + } + + /** Return only the tags that should be indexed. */ + static indexTags(event: DittoEvent): string[][] { + const tagCounts: Record = {}; + + function getCount(name: string) { + return tagCounts[name] || 0; + } + + function incrementCount(name: string) { + tagCounts[name] = getCount(name) + 1; + } + + function checkCondition(name: string, value: string, condition: TagCondition) { + return condition({ + event, + count: getCount(name), + value, + }); + } + + return event.tags.reduce((results, tag) => { + const [name, value] = tag; + const condition = EventsDB.tagConditions[name] as TagCondition | undefined; + + if (value && condition && value.length < 200 && checkCondition(name, value, condition)) { + results.push(tag); + } + + incrementCount(name); + return results; + }, []); + } + + /** Build a search index from the event. */ + static searchText(event: NostrEvent): string { + switch (event.kind) { + case 0: + return EventsDB.buildUserSearchContent(event); + case 1: + return event.content; + case 30009: + return EventsDB.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); + default: + return ''; + } + } + + /** Build search content for a user. */ + static buildUserSearchContent(event: NostrEvent): string { + const { name, nip05, about } = n.json().pipe(n.metadata()).catch({}).parse(event.content); + return [name, nip05, about].filter(Boolean).join('\n'); + } + + /** Build search content from tag values. */ + static buildTagsSearchContent(tags: string[][]): string { + return tags.map(([_tag, value]) => value).join('\n'); } /** Converts filters to more performant, simpler filters that are better for SQLite. */ @@ -244,7 +157,7 @@ class EventsDB implements NStore { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain) { - const query = this.#db + const query = this.kysely .selectFrom('pubkey_domains') .select('pubkey') .where('domain', '=', domain); @@ -268,166 +181,6 @@ class EventsDB implements NStore { return normalizeFilters(filters); // Improves performance of `{ kinds: [0], authors: ['...'] }` queries. } - - /** Get events for filters from the database. */ - async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - filters = await this.expandFilters(filters); - - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - - this.#debug('REQ', JSON.stringify(filters)); - let query = this.getEventsQuery(filters); - - if (typeof opts.limit === 'number') { - query = query.limit(opts.limit); - } - - const events = (await query.execute()).map((row) => { - const event: DittoEvent = { - id: row.id, - kind: row.kind, - pubkey: row.pubkey, - content: row.content, - created_at: row.created_at, - tags: JSON.parse(row.tags), - sig: row.sig, - }; - - if (row.author_id) { - event.author = { - id: row.author_id, - kind: row.author_kind! as 0, - pubkey: row.author_pubkey!, - content: row.author_content!, - created_at: row.author_created_at!, - tags: JSON.parse(row.author_tags!), - sig: row.author_sig!, - }; - } - - if (typeof row.author_stats_followers_count === 'number') { - event.author_stats = { - followers_count: row.author_stats_followers_count, - following_count: row.author_stats_following_count!, - notes_count: row.author_stats_notes_count!, - }; - } - - if (typeof row.stats_replies_count === 'number') { - event.event_stats = { - replies_count: row.stats_replies_count, - reposts_count: row.stats_reposts_count!, - reactions_count: row.stats_reactions_count!, - }; - } - - return event; - }); - - return sortEvents(events); - } - - /** Delete events from each table. Should be run in a transaction! */ - async deleteEventsTrx(db: Kysely, filters: NostrFilter[]) { - if (!filters.length) return Promise.resolve(); - this.#debug('DELETE', JSON.stringify(filters)); - - const query = this.getEventsQuery(filters).clearSelect().select('id'); - - return await db.updateTable('nostr_events') - .where('id', 'in', () => query) - .set({ deleted_at: Math.floor(Date.now() / 1000) }) - .execute(); - } - - /** Delete events based on filters from the database. */ - async remove(filters: NostrFilter[], _opts?: { signal?: AbortSignal }): Promise { - if (!filters.length) return Promise.resolve(); - this.#debug('DELETE', JSON.stringify(filters)); - - await this.#db.transaction().execute((trx) => this.deleteEventsTrx(trx, filters)); - } - - /** Get number of events that would be returned by filters. */ - async count( - filters: NostrFilter[], - opts: { signal?: AbortSignal } = {}, - ): Promise<{ count: number; approximate: boolean }> { - if (opts.signal?.aborted) return Promise.reject(abortError()); - if (!filters.length) return Promise.resolve({ count: 0, approximate: false }); - - this.#debug('COUNT', JSON.stringify(filters)); - const query = this.getEventsQuery(filters); - - const [{ count }] = await query - .clearSelect() - .select((eb) => eb.fn.count('id').as('count')) - .execute(); - - return { - count: Number(count), - approximate: false, - }; - } -} - -/** Return only the tags that should be indexed. */ -function filterIndexableTags(event: DittoEvent): string[][] { - const tagCounts: Record = {}; - - function getCount(name: string) { - return tagCounts[name] || 0; - } - - function incrementCount(name: string) { - tagCounts[name] = getCount(name) + 1; - } - - function checkCondition(name: string, value: string, condition: TagCondition) { - return condition({ - event, - count: getCount(name), - value, - }); - } - - return event.tags.reduce((results, tag) => { - const [name, value] = tag; - const condition = tagConditions[name] as TagCondition | undefined; - - if (value && condition && value.length < 200 && checkCondition(name, value, condition)) { - results.push(tag); - } - - incrementCount(name); - return results; - }, []); -} - -/** Build a search index from the event. */ -function buildSearchContent(event: NostrEvent): string { - switch (event.kind) { - case 0: - return buildUserSearchContent(event); - case 1: - return event.content; - case 30009: - return buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); - default: - return ''; - } -} - -/** Build search content for a user. */ -function buildUserSearchContent(event: NostrEvent): string { - const { name, nip05, about } = n.json().pipe(n.metadata()).catch({}).parse(event.content); - return [name, nip05, about].filter(Boolean).join('\n'); -} - -/** Build search content from tag values. */ -function buildTagsSearchContent(tags: string[][]): string { - return tags.map(([_tag, value]) => value).join('\n'); } export { EventsDB };