diff --git a/deno.json b/deno.json index 0774fd9..d04e960 100644 --- a/deno.json +++ b/deno.json @@ -14,7 +14,9 @@ "imports": { "@/": "./src/", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.9.7", - "~/fixtures/": "./fixtures/" + "~/fixtures/": "./fixtures/", + "kysely": "npm:kysely@^0.26.3", + "kysely_deno_postgres": "https://deno.land/x/kysely_deno_postgres@v0.4.0/mod.ts" }, "lint": { "include": ["src/", "scripts/"], diff --git a/src/db.ts b/src/db.ts index 152b198..7125f13 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,93 +1,10 @@ import fs from 'node:fs/promises'; import path from 'node:path'; -import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts'; -import { Conf } from '@/config.ts'; -import { setPragma } from '@/pragma.ts'; -import SqliteWorker from '@/workers/sqlite.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; +import { FileMigrationProvider, Migrator } from '@/deps.ts'; -interface DittoDB { - events: EventRow; - events_fts: EventFTSRow; - tags: TagRow; - relays: RelayRow; - unattached_media: UnattachedMediaRow; - author_stats: AuthorStatsRow; - event_stats: EventStatsRow; - pubkey_domains: PubkeyDomainRow; -} - -interface AuthorStatsRow { - pubkey: string; - followers_count: number; - following_count: number; - notes_count: number; -} - -interface EventStatsRow { - event_id: string; - replies_count: number; - reposts_count: number; - reactions_count: number; -} - -interface EventRow { - id: string; - kind: number; - pubkey: string; - content: string; - created_at: number; - tags: string; - sig: string; - deleted_at: number | null; -} - -interface EventFTSRow { - id: string; - content: string; -} - -interface TagRow { - tag: string; - value: string; - event_id: string; -} - -interface RelayRow { - url: string; - domain: string; - active: boolean; -} - -interface UnattachedMediaRow { - id: string; - pubkey: string; - url: string; - data: string; - uploaded_at: Date; -} - -interface PubkeyDomainRow { - pubkey: string; - domain: string; - last_updated_at: number; -} - -const sqliteWorker = new SqliteWorker(); -await sqliteWorker.open(Conf.dbPath); - -const db = new Kysely({ - dialect: new PolySqliteDialect({ - database: sqliteWorker, - }), -}); - -// Set PRAGMA values. -await Promise.all([ - setPragma(db, 'synchronous', 'normal'), - setPragma(db, 'temp_store', 'memory'), - setPragma(db, 'mmap_size', Conf.sqlite.mmapSize), -]); +const db = await DittoDB.getInstance(); const migrator = new Migrator({ db, @@ -120,4 +37,4 @@ async function migrate() { await migrate(); -export { type AuthorStatsRow, db, type DittoDB, type EventRow, type EventStatsRow, type TagRow }; +export { db }; diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts new file mode 100644 index 0000000..e27f92f --- /dev/null +++ b/src/db/DittoDB.ts @@ -0,0 +1,20 @@ +import { Conf } from '@/config.ts'; +import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; +import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; +import { Kysely } from '@/deps.ts'; + +export class DittoDB { + static getInstance(): Promise> { + const { databaseUrl } = Conf; + + switch (databaseUrl.protocol) { + case 'sqlite:': + return DittoSQLite.getInstance(); + case 'postgres:': + return DittoPostgres.getInstance(); + default: + throw new Error('Unsupported database URL.'); + } + } +} diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts new file mode 100644 index 0000000..79fec5d --- /dev/null +++ b/src/db/DittoTables.ts @@ -0,0 +1,66 @@ +export interface DittoTables { + events: EventRow; + events_fts: EventFTSRow; + tags: TagRow; + relays: RelayRow; + unattached_media: UnattachedMediaRow; + author_stats: AuthorStatsRow; + event_stats: EventStatsRow; + pubkey_domains: PubkeyDomainRow; +} + +interface AuthorStatsRow { + pubkey: string; + followers_count: number; + following_count: number; + notes_count: number; +} + +interface EventStatsRow { + event_id: string; + replies_count: number; + reposts_count: number; + reactions_count: number; +} + +interface EventRow { + id: string; + kind: number; + pubkey: string; + content: string; + created_at: number; + tags: string; + sig: string; + deleted_at: number | null; +} + +interface EventFTSRow { + id: string; + content: string; +} + +interface TagRow { + tag: string; + value: string; + event_id: string; +} + +interface RelayRow { + url: string; + domain: string; + active: boolean; +} + +interface UnattachedMediaRow { + id: string; + pubkey: string; + url: string; + data: string; + uploaded_at: number; +} + +interface PubkeyDomainRow { + pubkey: string; + domain: string; + last_updated_at: number; +} diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts new file mode 100644 index 0000000..f8a5112 --- /dev/null +++ b/src/db/adapters/DittoPostgres.ts @@ -0,0 +1,35 @@ +import { Kysely, PostgresAdapter, PostgresIntrospector, PostgresQueryCompiler } from 'kysely'; +import { PostgreSQLDriver } from 'kysely_deno_postgres'; + +import { DittoTables } from '@/db/DittoTables.ts'; + +export class DittoPostgres { + static db: Kysely | undefined; + + // deno-lint-ignore require-await + static async getInstance(): Promise> { + if (!this.db) { + this.db = new Kysely({ + dialect: { + createAdapter() { + return new PostgresAdapter(); + }, + // @ts-ignore mismatched kysely versions probably + createDriver() { + return new PostgreSQLDriver({ + connectionString: Deno.env.get('DATABASE_URL'), + }); + }, + createIntrospector(db: Kysely) { + return new PostgresIntrospector(db); + }, + createQueryCompiler() { + return new PostgresQueryCompiler(); + }, + }, + }); + } + + return this.db; + } +} diff --git a/src/db/adapters/DittoSQLite.ts b/src/db/adapters/DittoSQLite.ts new file mode 100644 index 0000000..c91407a --- /dev/null +++ b/src/db/adapters/DittoSQLite.ts @@ -0,0 +1,54 @@ +import { Conf } from '@/config.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; +import { Kysely, PolySqliteDialect, sql } from '@/deps.ts'; +import SqliteWorker from '@/workers/sqlite.ts'; + +export class DittoSQLite { + static db: Kysely | undefined; + + static async getInstance(): Promise> { + if (!this.db) { + const sqliteWorker = new SqliteWorker(); + await sqliteWorker.open(this.path); + + this.db = new Kysely({ + dialect: new PolySqliteDialect({ + database: sqliteWorker, + }), + }); + + // Set PRAGMA values. + await Promise.all([ + sql`PRAGMA synchronous = normal`.execute(this.db), + sql`PRAGMA temp_store = memory`.execute(this.db), + sql`PRAGMA foreign_keys = ON`.execute(this.db), + sql`PRAGMA auto_vacuum = FULL`.execute(this.db), + sql`PRAGMA journal_mode = WAL`.execute(this.db), + sql.raw(`PRAGMA mmap_size = ${Conf.sqlite.mmapSize}`).execute(this.db), + ]); + } + return this.db; + } + + /** Get the relative or absolute path based on the `DATABASE_URL`. */ + static get path() { + if (Deno.env.get('DATABASE_URL') === 'sqlite://:memory:') { + return ':memory:'; + } + + const { host, pathname } = Conf.databaseUrl; + + if (!pathname) return ''; + + // Get relative path. + if (host === '') { + return pathname; + } else if (host === '.') { + return pathname; + } else if (host) { + return host + pathname; + } + + return ''; + } +} diff --git a/src/db/migrations/000_create_events.ts b/src/db/migrations/000_create_events.ts index 8422071..158551b 100644 --- a/src/db/migrations/000_create_events.ts +++ b/src/db/migrations/000_create_events.ts @@ -1,4 +1,4 @@ -import { Kysely, sql } from '@/deps.ts'; +import { Kysely } from '@/deps.ts'; export async function up(db: Kysely): Promise { await db.schema @@ -21,13 +21,6 @@ export async function up(db: Kysely): Promise { .addColumn('event_id', 'text', (col) => col.notNull()) .execute(); - await db.schema - .createTable('users') - .addColumn('pubkey', 'text', (col) => col.primaryKey()) - .addColumn('username', 'text', (col) => col.notNull().unique()) - .addColumn('inserted_at', 'datetime', (col) => col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`)) - .execute(); - await db.schema .createIndex('idx_events_kind') .on('events') diff --git a/src/db/migrations/002_events_fts.ts b/src/db/migrations/002_events_fts.ts index c0341c1..9324195 100644 --- a/src/db/migrations/002_events_fts.ts +++ b/src/db/migrations/002_events_fts.ts @@ -1,9 +1,12 @@ +import { Conf } from '@/config.ts'; import { Kysely, sql } from '@/deps.ts'; export async function up(db: Kysely): Promise { - await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); + if (Conf.databaseUrl.protocol === 'sqlite:') { + await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); + } } export async function down(db: Kysely): Promise { - await db.schema.dropTable('events_fts').execute(); + await db.schema.dropTable('events_fts').ifExists().execute(); } diff --git a/src/db/migrations/003_events_admin.ts b/src/db/migrations/003_events_admin.ts index e481a59..8469fc2 100644 --- a/src/db/migrations/003_events_admin.ts +++ b/src/db/migrations/003_events_admin.ts @@ -1,10 +1,6 @@ import { Kysely } from '@/deps.ts'; -export async function up(db: Kysely): Promise { - await db.schema - .alterTable('users') - .addColumn('admin', 'boolean', (col) => col.defaultTo(false)) - .execute(); +export async function up(_db: Kysely): Promise { } export async function down(db: Kysely): Promise { diff --git a/src/db/migrations/004_add_user_indexes.ts b/src/db/migrations/004_add_user_indexes.ts index e77693b..929181c 100644 --- a/src/db/migrations/004_add_user_indexes.ts +++ b/src/db/migrations/004_add_user_indexes.ts @@ -1,17 +1,6 @@ import { Kysely } from '@/deps.ts'; -export async function up(db: Kysely): Promise { - await db.schema - .createIndex('idx_users_pubkey') - .on('users') - .column('pubkey') - .execute(); - - await db.schema - .createIndex('idx_users_username') - .on('users') - .column('username') - .execute(); +export async function up(_db: Kysely): Promise { } export async function down(db: Kysely): Promise { diff --git a/src/db/migrations/006_pragma.ts b/src/db/migrations/006_pragma.ts index e9dbad1..2639e81 100644 --- a/src/db/migrations/006_pragma.ts +++ b/src/db/migrations/006_pragma.ts @@ -1,12 +1,7 @@ -import { Kysely, sql } from '@/deps.ts'; +import { Kysely } from '@/deps.ts'; -export async function up(db: Kysely): Promise { - await sql`PRAGMA foreign_keys = ON`.execute(db); - await sql`PRAGMA auto_vacuum = FULL`.execute(db); - await sql`VACUUM`.execute(db); +export async function up(_db: Kysely): Promise { } -export async function down(db: Kysely): Promise { - await sql`PRAGMA foreign_keys = OFF`.execute(db); - await sql`PRAGMA auto_vacuum = NONE`.execute(db); +export async function down(_db: Kysely): Promise { } diff --git a/src/db/migrations/007_unattached_media.ts b/src/db/migrations/007_unattached_media.ts index a2b36a2..1887111 100644 --- a/src/db/migrations/007_unattached_media.ts +++ b/src/db/migrations/007_unattached_media.ts @@ -1,4 +1,4 @@ -import { Kysely, sql } from '@/deps.ts'; +import { Kysely } from '@/deps.ts'; export async function up(db: Kysely): Promise { await db.schema @@ -7,7 +7,7 @@ export async function up(db: Kysely): Promise { .addColumn('pubkey', 'text', (c) => c.notNull()) .addColumn('url', 'text', (c) => c.notNull()) .addColumn('data', 'text', (c) => c.notNull()) - .addColumn('uploaded_at', 'datetime', (c) => c.notNull().defaultTo(sql`CURRENT_TIMESTAMP`)) + .addColumn('uploaded_at', 'bigint', (c) => c.notNull()) .execute(); await db.schema diff --git a/src/db/migrations/008_wal.ts b/src/db/migrations/008_wal.ts index 7f96226..2639e81 100644 --- a/src/db/migrations/008_wal.ts +++ b/src/db/migrations/008_wal.ts @@ -1,9 +1,7 @@ -import { Kysely, sql } from '@/deps.ts'; +import { Kysely } from '@/deps.ts'; -export async function up(db: Kysely): Promise { - await sql`PRAGMA journal_mode = WAL`.execute(db); +export async function up(_db: Kysely): Promise { } -export async function down(db: Kysely): Promise { - await sql`PRAGMA journal_mode = DELETE`.execute(db); +export async function down(_db: Kysely): Promise { } diff --git a/src/db/migrations/010_drop_users.ts b/src/db/migrations/010_drop_users.ts index 9649b64..6cd83c0 100644 --- a/src/db/migrations/010_drop_users.ts +++ b/src/db/migrations/010_drop_users.ts @@ -1,7 +1,7 @@ import { Kysely } from '@/deps.ts'; export async function up(db: Kysely): Promise { - await db.schema.dropTable('users').execute(); + await db.schema.dropTable('users').ifExists().execute(); } export async function down(_db: Kysely): Promise { diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index ae9d882..3761947 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -7,14 +7,14 @@ interface UnattachedMedia { pubkey: string; url: string; data: MediaData; - uploaded_at: Date; + uploaded_at: number; } /** Add unattached media into the database. */ async function insertUnattachedMedia(media: Omit) { const result = { id: uuid62.v4(), - uploaded_at: new Date(), + uploaded_at: Date.now(), ...media, }; @@ -41,7 +41,7 @@ function selectUnattachedMediaQuery() { function getUnattachedMedia(until: Date) { return selectUnattachedMediaQuery() .leftJoin('tags', 'unattached_media.url', 'tags.value') - .where('uploaded_at', '<', until) + .where('uploaded_at', '<', until.getTime()) .execute(); } diff --git a/src/pragma.ts b/src/pragma.ts deleted file mode 100644 index f5aa8e4..0000000 --- a/src/pragma.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { type Kysely, sql } from '@/deps.ts'; - -/** Set the PRAGMA and then read back its value to confirm. */ -function setPragma(db: Kysely, pragma: string, value: string | number) { - return sql.raw(`PRAGMA ${pragma} = ${value}`).execute(db); -} - -/** Get value of PRAGMA from the database. */ -async function getPragma(db: Kysely, pragma: string) { - const result = await sql.raw(`PRAGMA ${pragma}`).execute(db); - const row = result.rows[0] as Record | undefined; - return row?.[pragma]; -} - -export { getPragma, setPragma }; diff --git a/src/stats.ts b/src/stats.ts index 884778e..48cc41b 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,11 +1,12 @@ import { NostrEvent } from '@nostrify/nostrify'; -import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; +import { db } from '@/db.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { Debug, type InsertQueryBuilder } from '@/deps.ts'; import { eventsDB } from '@/storages.ts'; import { findReplyTag } from '@/tags.ts'; -type AuthorStat = keyof Omit; -type EventStat = keyof Omit; +type AuthorStat = keyof Omit; +type EventStat = keyof Omit; type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: number]; type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number]; @@ -16,7 +17,7 @@ const debug = Debug('ditto:stats'); /** Store stats for the event in LMDB. */ async function updateStats(event: NostrEvent) { let prev: NostrEvent | undefined; - const queries: InsertQueryBuilder[] = []; + const queries: InsertQueryBuilder[] = []; // Kind 3 is a special case - replace the count with the new list. if (event.kind === 3) { @@ -99,8 +100,8 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr /** Create an author stats query from the list of diffs. */ function authorStatsQuery(diffs: AuthorStatDiff[]) { - const values: AuthorStatsRow[] = diffs.map(([_, pubkey, stat, diff]) => { - const row: AuthorStatsRow = { + const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => { + const row: DittoTables['author_stats'] = { pubkey, followers_count: 0, following_count: 0, @@ -125,8 +126,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { /** Create an event stats query from the list of diffs. */ function eventStatsQuery(diffs: EventStatDiff[]) { - const values: EventStatsRow[] = diffs.map(([_, event_id, stat, diff]) => { - const row: EventStatsRow = { + const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => { + const row: DittoTables['event_stats'] = { event_id, replies_count: 0, reposts_count: 0, diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 3359064..6d80f70 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -1,6 +1,6 @@ import { NIP50, NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; -import { type DittoDB } from '@/db.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { Debug, Kysely, type SelectQueryBuilder } from '@/deps.ts'; import { normalizeFilters } from '@/filter.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; @@ -33,7 +33,7 @@ const tagConditions: Record = { 'role': ({ event, count }) => event.kind === 30361 && count === 0, }; -type EventQuery = SelectQueryBuilder; + #db: Kysely; #debug = Debug('ditto:db:events'); + private protocol = Conf.databaseUrl.protocol; - constructor(db: Kysely) { + constructor(db: Kysely) { this.#db = db; } @@ -82,8 +83,10 @@ class EventsDB implements NStore { .execute(); } + const protocol = this.protocol; /** Add search data to the FTS table. */ async function indexSearch() { + if (protocol !== 'sqlite:') return; const searchContent = buildSearchContent(event); if (!searchContent) return; await trx.insertInto('events_fts') @@ -143,7 +146,7 @@ class EventsDB implements NStore { } /** Build the query for a filter. */ - getFilterQuery(db: Kysely, filter: NostrFilter): EventQuery { + getFilterQuery(db: Kysely, filter: NostrFilter): EventQuery { let query = db .selectFrom('events') .select([ @@ -194,7 +197,7 @@ class EventsDB implements NStore { } } - if (filter.search) { + if (filter.search && this.protocol === 'sqlite:') { query = query .innerJoin('events_fts', 'events_fts.id', 'events.id') .where('events_fts.content', 'match', JSON.stringify(filter.search)); @@ -315,7 +318,7 @@ class EventsDB implements NStore { } /** Delete events from each table. Should be run in a transaction! */ - async deleteEventsTrx(db: Kysely, filters: NostrFilter[]) { + async deleteEventsTrx(db: Kysely, filters: NostrFilter[]) { if (!filters.length) return Promise.resolve(); this.#debug('DELETE', JSON.stringify(filters)); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index fd9e8fe..df06fbb 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -28,7 +28,7 @@ export const TrendsWorker = { CREATE TABLE IF NOT EXISTS tag_usages ( tag TEXT NOT NULL COLLATE NOCASE, pubkey8 TEXT NOT NULL, - inserted_at DATETIME NOT NULL + inserted_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag);