diff --git a/deno.json b/deno.json index 0225510..12fbb57 100644 --- a/deno.json +++ b/deno.json @@ -24,6 +24,7 @@ "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", + "@std/assert": "jsr:@std/assert@^0.225.1", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/crypto": "jsr:@std/crypto@^0.224.0", "@std/dotenv": "jsr:@std/dotenv@^0.224.0", diff --git a/src/db.ts b/src/db.ts deleted file mode 100644 index 1a5f06d..0000000 --- a/src/db.ts +++ /dev/null @@ -1,41 +0,0 @@ -import fs from 'node:fs/promises'; -import path from 'node:path'; - -import { FileMigrationProvider, Migrator } from 'kysely'; - -import { DittoDB } from '@/db/DittoDB.ts'; - -const db = await DittoDB.getInstance(); - -const migrator = new Migrator({ - db, - provider: new FileMigrationProvider({ - fs, - path, - migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname, - }), -}); - -/** Migrate the database to the latest version. */ -async function migrate() { - console.info('Running migrations...'); - const results = await migrator.migrateToLatest(); - - if (results.error) { - console.error(results.error); - Deno.exit(1); - } else { - if (!results.results?.length) { - console.info('Everything up-to-date.'); - } else { - console.info('Migrations finished!'); - for (const { migrationName, status } of results.results!) { - console.info(` - ${migrationName}: ${status}`); - } - } - } -} - -await migrate(); - -export { db }; diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index abe068b..9c3b280 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -1,4 +1,7 @@ -import { Kysely } from 'kysely'; +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; @@ -6,17 +9,63 @@ import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoTables } from '@/db/DittoTables.ts'; export class DittoDB { + private static kysely: Promise> | undefined; + static getInstance(): Promise> { + if (!this.kysely) { + this.kysely = this._getInstance(); + } + return this.kysely; + } + + static async _getInstance(): Promise> { const { databaseUrl } = Conf; + let kysely: Kysely; + switch (databaseUrl.protocol) { case 'sqlite:': - return DittoSQLite.getInstance(); + kysely = await DittoSQLite.getInstance(); + break; case 'postgres:': case 'postgresql:': - return DittoPostgres.getInstance(); + kysely = await DittoPostgres.getInstance(); + break; default: throw new Error('Unsupported database URL.'); } + + await this.migrate(kysely); + + return kysely; + } + + /** Migrate the database to the latest version. */ + private static async migrate(kysely: Kysely) { + const migrator = new Migrator({ + db: kysely, + provider: new FileMigrationProvider({ + fs, + path, + migrationFolder: new URL(import.meta.resolve('../db/migrations')).pathname, + }), + }); + + console.info('Running migrations...'); + const results = await migrator.migrateToLatest(); + + if (results.error) { + console.error(results.error); + Deno.exit(1); + } else { + if (!results.results?.length) { + console.info('Everything up-to-date.'); + } else { + console.info('Migrations finished!'); + for (const { migrationName, status } of results.results!) { + console.info(` - ${migrationName}: ${status}`); + } + } + } } } diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index 960abe8..708e2f9 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -1,6 +1,6 @@ import uuid62 from 'uuid62'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { type MediaData } from '@/schemas/nostr.ts'; interface UnattachedMedia { @@ -19,7 +19,8 @@ async function insertUnattachedMedia(media: Omit { if (!urls.length) return; - await db.deleteFrom('unattached_media') + const kysely = await DittoDB.getInstance(); + await kysely.deleteFrom('unattached_media') .where('pubkey', '=', pubkey) .where('url', 'in', urls) .execute(); diff --git a/src/pipeline.ts b/src/pipeline.ts index 9742b49..48e5c38 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -5,7 +5,7 @@ import Debug from '@soapbox/stickynotes/debug'; import { sql } from 'kysely'; import { Conf } from '@/config.ts'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { isEphemeralKind } from '@/kinds.ts'; @@ -91,7 +91,8 @@ async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise { await hydrateEvents({ events: [event], store: await Storages.db(), signal }); - const domain = await db + const kysely = await DittoDB.getInstance(); + const domain = await kysely .selectFrom('pubkey_domains') .select('domain') .where('pubkey', '=', event.pubkey) @@ -140,6 +141,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise pubkey_domains.last_updated_at - `.execute(db); + `.execute(kysely); } catch (_e) { // do nothing } diff --git a/src/stats.ts b/src/stats.ts index bff2aeb..f8efe16 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -2,7 +2,7 @@ import { NKinds, NostrEvent } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; import { InsertQueryBuilder } from 'kysely'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { Storages } from '@/storages.ts'; import { findReplyTag } from '@/tags.ts'; @@ -25,7 +25,7 @@ async function updateStats(event: NostrEvent) { if (event.kind === 3) { prev = await getPrevEvent(event); if (!prev || event.created_at >= prev.created_at) { - queries.push(updateFollowingCountQuery(event)); + queries.push(await updateFollowingCountQuery(event)); } } @@ -37,8 +37,8 @@ async function updateStats(event: NostrEvent) { debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs })); } - if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); - if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); + if (pubkeyDiffs.length) queries.push(await authorStatsQuery(pubkeyDiffs)); + if (eventDiffs.length) queries.push(await eventStatsQuery(eventDiffs)); if (queries.length) { await Promise.all(queries.map((query) => query.execute())); @@ -102,7 +102,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr } /** Create an author stats query from the list of diffs. */ -function authorStatsQuery(diffs: AuthorStatDiff[]) { +async function authorStatsQuery(diffs: AuthorStatDiff[]) { const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => { const row: DittoTables['author_stats'] = { pubkey, @@ -114,7 +114,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { return row; }); - return db.insertInto('author_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('author_stats') .values(values) .onConflict((oc) => oc @@ -128,7 +129,7 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { } /** Create an event stats query from the list of diffs. */ -function eventStatsQuery(diffs: EventStatDiff[]) { +async function eventStatsQuery(diffs: EventStatDiff[]) { const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => { const row: DittoTables['event_stats'] = { event_id, @@ -140,7 +141,8 @@ function eventStatsQuery(diffs: EventStatDiff[]) { return row; }); - return db.insertInto('event_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('event_stats') .values(values) .onConflict((oc) => oc @@ -167,14 +169,15 @@ async function getPrevEvent(event: NostrEvent): Promise } /** Set the following count to the total number of unique "p" tags in the follow list. */ -function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) { +async function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) { const following_count = new Set( tags .filter(([name]) => name === 'p') .map(([_, value]) => value), ).size; - return db.insertInto('author_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('author_stats') .values({ pubkey, following_count, diff --git a/src/storages.ts b/src/storages.ts index cbda925..f591e11 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,7 +1,8 @@ // deno-lint-ignore-file require-await import { NCache } from '@nostrify/nostrify'; + import { Conf } from '@/config.ts'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; @@ -24,7 +25,10 @@ export class Storages { /** SQLite database to store events this Ditto server cares about. */ public static async db(): Promise { if (!this._db) { - this._db = Promise.resolve(new EventsDB(db)); + this._db = (async () => { + const kysely = await DittoDB.getInstance(); + return new EventsDB(kysely); + })(); } return this._db; } diff --git a/src/storages/events-db.test.ts b/src/storages/events-db.test.ts index dd92c1b..d935e6b 100644 --- a/src/storages/events-db.test.ts +++ b/src/storages/events-db.test.ts @@ -1,12 +1,14 @@ -import { db } from '@/db.ts'; -import { assertEquals, assertRejects } from '@/deps-test.ts'; +import { assertEquals, assertRejects } from '@std/assert'; + +import { DittoDB } from '@/db/DittoDB.ts'; import event0 from '~/fixtures/events/event-0.json' with { type: 'json' }; import event1 from '~/fixtures/events/event-1.json' with { type: 'json' }; import { EventsDB } from '@/storages/events-db.ts'; -const eventsDB = new EventsDB(db); +const kysely = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); Deno.test('count filters', async () => { assertEquals((await eventsDB.count([{ kinds: [1] }])).count, 0); @@ -34,7 +36,7 @@ Deno.test('query events with domain search filter', async () => { assertEquals(await eventsDB.query([{ search: 'domain:localhost:8000' }]), []); assertEquals(await eventsDB.query([{ search: '' }]), [event1]); - await db + await kysely .insertInto('pubkey_domains') .values({ pubkey: event1.pubkey, domain: 'localhost:8000', last_updated_at: event1.created_at }) .execute(); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 3109ac6..8d2d302 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -1,7 +1,7 @@ import { NostrEvent, NStore } from '@nostrify/nostrify'; import { matchFilter } from 'nostr-tools'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { Conf } from '@/config.ts'; @@ -239,7 +239,7 @@ function gatherReportedProfiles({ events, store, signal }: HydrateOpts): Promise } /** Collect author stats from the events. */ -function gatherAuthorStats(events: DittoEvent[]): Promise { +async function gatherAuthorStats(events: DittoEvent[]): Promise { const pubkeys = new Set( events .filter((event) => event.kind === 0) @@ -250,7 +250,8 @@ function gatherAuthorStats(events: DittoEvent[]): Promise { +async function gatherEventStats(events: DittoEvent[]): Promise { const ids = new Set( events .filter((event) => event.kind === 1) @@ -269,7 +270,8 @@ function gatherEventStats(events: DittoEvent[]): Promise