Rework Kysely db to be async

This commit is contained in:
Alex Gleason 2024-05-14 18:44:42 -05:00
parent 68b5887ed0
commit a4226a963f
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
9 changed files with 106 additions and 79 deletions

View File

@ -24,6 +24,7 @@
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0",
"@std/assert": "jsr:@std/assert@^0.225.1",
"@std/cli": "jsr:@std/cli@^0.223.0",
"@std/crypto": "jsr:@std/crypto@^0.224.0",
"@std/dotenv": "jsr:@std/dotenv@^0.224.0",

View File

@ -1,41 +0,0 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { FileMigrationProvider, Migrator } from 'kysely';
import { DittoDB } from '@/db/DittoDB.ts';
const db = await DittoDB.getInstance();
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname,
}),
});
/** Migrate the database to the latest version. */
async function migrate() {
console.info('Running migrations...');
const results = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.info('Everything up-to-date.');
} else {
console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) {
console.info(` - ${migrationName}: ${status}`);
}
}
}
}
await migrate();
export { db };

View File

@ -1,4 +1,7 @@
import { Kysely } from 'kysely';
import fs from 'node:fs/promises';
import path from 'node:path';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
@ -6,17 +9,63 @@ import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts';
export class DittoDB {
private static kysely: Promise<Kysely<DittoTables>> | undefined;
static getInstance(): Promise<Kysely<DittoTables>> {
if (!this.kysely) {
this.kysely = this._getInstance();
}
return this.kysely;
}
static async _getInstance(): Promise<Kysely<DittoTables>> {
const { databaseUrl } = Conf;
let kysely: Kysely<DittoTables>;
switch (databaseUrl.protocol) {
case 'sqlite:':
return DittoSQLite.getInstance();
kysely = await DittoSQLite.getInstance();
break;
case 'postgres:':
case 'postgresql:':
return DittoPostgres.getInstance();
kysely = await DittoPostgres.getInstance();
break;
default:
throw new Error('Unsupported database URL.');
}
await this.migrate(kysely);
return kysely;
}
/** Migrate the database to the latest version. */
private static async migrate(kysely: Kysely<DittoTables>) {
const migrator = new Migrator({
db: kysely,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: new URL(import.meta.resolve('../db/migrations')).pathname,
}),
});
console.info('Running migrations...');
const results = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.info('Everything up-to-date.');
} else {
console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) {
console.info(` - ${migrationName}: ${status}`);
}
}
}
}
}

View File

@ -1,6 +1,6 @@
import uuid62 from 'uuid62';
import { db } from '@/db.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { type MediaData } from '@/schemas/nostr.ts';
interface UnattachedMedia {
@ -19,7 +19,8 @@ async function insertUnattachedMedia(media: Omit<UnattachedMedia, 'id' | 'upload
...media,
};
await db.insertInto('unattached_media')
const kysely = await DittoDB.getInstance();
await kysely.insertInto('unattached_media')
.values({ ...result, data: JSON.stringify(media.data) })
.execute();
@ -27,8 +28,9 @@ async function insertUnattachedMedia(media: Omit<UnattachedMedia, 'id' | 'upload
}
/** Select query for unattached media. */
function selectUnattachedMediaQuery() {
return db.selectFrom('unattached_media')
async function selectUnattachedMediaQuery() {
const kysely = await DittoDB.getInstance();
return kysely.selectFrom('unattached_media')
.select([
'unattached_media.id',
'unattached_media.pubkey',
@ -39,25 +41,27 @@ function selectUnattachedMediaQuery() {
}
/** Find attachments that exist but aren't attached to any events. */
function getUnattachedMedia(until: Date) {
return selectUnattachedMediaQuery()
async function getUnattachedMedia(until: Date) {
const query = await selectUnattachedMediaQuery();
return query
.leftJoin('tags', 'unattached_media.url', 'tags.value')
.where('uploaded_at', '<', until.getTime())
.execute();
}
/** Delete unattached media by URL. */
function deleteUnattachedMediaByUrl(url: string) {
return db.deleteFrom('unattached_media')
async function deleteUnattachedMediaByUrl(url: string) {
const kysely = await DittoDB.getInstance();
return kysely.deleteFrom('unattached_media')
.where('url', '=', url)
.execute();
}
/** Get unattached media by IDs. */
// deno-lint-ignore require-await
async function getUnattachedMediaByIds(ids: string[]) {
if (!ids.length) return [];
return selectUnattachedMediaQuery()
const query = await selectUnattachedMediaQuery();
return query
.where('id', 'in', ids)
.execute();
}
@ -65,7 +69,8 @@ async function getUnattachedMediaByIds(ids: string[]) {
/** Delete rows as an event with media is being created. */
async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> {
if (!urls.length) return;
await db.deleteFrom('unattached_media')
const kysely = await DittoDB.getInstance();
await kysely.deleteFrom('unattached_media')
.where('pubkey', '=', pubkey)
.where('url', 'in', urls)
.execute();

View File

@ -5,7 +5,7 @@ import Debug from '@soapbox/stickynotes/debug';
import { sql } from 'kysely';
import { Conf } from '@/config.ts';
import { db } from '@/db.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { isEphemeralKind } from '@/kinds.ts';
@ -91,7 +91,8 @@ async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<b
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const domain = await db
const kysely = await DittoDB.getInstance();
const domain = await kysely
.selectFrom('pubkey_domains')
.select('domain')
.where('pubkey', '=', event.pubkey)
@ -140,6 +141,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain.
try {
const kysely = await DittoDB.getInstance();
const { domain } = parseNip05(nip05);
await sql`
@ -149,7 +151,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
domain = excluded.domain,
last_updated_at = excluded.last_updated_at
WHERE excluded.last_updated_at > pubkey_domains.last_updated_at
`.execute(db);
`.execute(kysely);
} catch (_e) {
// do nothing
}

View File

@ -2,7 +2,7 @@ import { NKinds, NostrEvent } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug';
import { InsertQueryBuilder } from 'kysely';
import { db } from '@/db.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Storages } from '@/storages.ts';
import { findReplyTag } from '@/tags.ts';
@ -25,7 +25,7 @@ async function updateStats(event: NostrEvent) {
if (event.kind === 3) {
prev = await getPrevEvent(event);
if (!prev || event.created_at >= prev.created_at) {
queries.push(updateFollowingCountQuery(event));
queries.push(await updateFollowingCountQuery(event));
}
}
@ -37,8 +37,8 @@ async function updateStats(event: NostrEvent) {
debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs }));
}
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));
if (pubkeyDiffs.length) queries.push(await authorStatsQuery(pubkeyDiffs));
if (eventDiffs.length) queries.push(await eventStatsQuery(eventDiffs));
if (queries.length) {
await Promise.all(queries.map((query) => query.execute()));
@ -102,7 +102,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
}
/** Create an author stats query from the list of diffs. */
function authorStatsQuery(diffs: AuthorStatDiff[]) {
async function authorStatsQuery(diffs: AuthorStatDiff[]) {
const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => {
const row: DittoTables['author_stats'] = {
pubkey,
@ -114,7 +114,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) {
return row;
});
return db.insertInto('author_stats')
const kysely = await DittoDB.getInstance();
return kysely.insertInto('author_stats')
.values(values)
.onConflict((oc) =>
oc
@ -128,7 +129,7 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) {
}
/** Create an event stats query from the list of diffs. */
function eventStatsQuery(diffs: EventStatDiff[]) {
async function eventStatsQuery(diffs: EventStatDiff[]) {
const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => {
const row: DittoTables['event_stats'] = {
event_id,
@ -140,7 +141,8 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
return row;
});
return db.insertInto('event_stats')
const kysely = await DittoDB.getInstance();
return kysely.insertInto('event_stats')
.values(values)
.onConflict((oc) =>
oc
@ -167,14 +169,15 @@ async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined>
}
/** Set the following count to the total number of unique "p" tags in the follow list. */
function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) {
async function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) {
const following_count = new Set(
tags
.filter(([name]) => name === 'p')
.map(([_, value]) => value),
).size;
return db.insertInto('author_stats')
const kysely = await DittoDB.getInstance();
return kysely.insertInto('author_stats')
.values({
pubkey,
following_count,

View File

@ -1,7 +1,8 @@
// deno-lint-ignore-file require-await
import { NCache } from '@nostrify/nostrify';
import { Conf } from '@/config.ts';
import { db } from '@/db.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { EventsDB } from '@/storages/events-db.ts';
import { Optimizer } from '@/storages/optimizer.ts';
import { PoolStore } from '@/storages/pool-store.ts';
@ -24,7 +25,10 @@ export class Storages {
/** SQLite database to store events this Ditto server cares about. */
public static async db(): Promise<EventsDB> {
if (!this._db) {
this._db = Promise.resolve(new EventsDB(db));
this._db = (async () => {
const kysely = await DittoDB.getInstance();
return new EventsDB(kysely);
})();
}
return this._db;
}

View File

@ -1,12 +1,14 @@
import { db } from '@/db.ts';
import { assertEquals, assertRejects } from '@/deps-test.ts';
import { assertEquals, assertRejects } from '@std/assert';
import { DittoDB } from '@/db/DittoDB.ts';
import event0 from '~/fixtures/events/event-0.json' with { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' with { type: 'json' };
import { EventsDB } from '@/storages/events-db.ts';
const eventsDB = new EventsDB(db);
const kysely = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
Deno.test('count filters', async () => {
assertEquals((await eventsDB.count([{ kinds: [1] }])).count, 0);
@ -34,7 +36,7 @@ Deno.test('query events with domain search filter', async () => {
assertEquals(await eventsDB.query([{ search: 'domain:localhost:8000' }]), []);
assertEquals(await eventsDB.query([{ search: '' }]), [event1]);
await db
await kysely
.insertInto('pubkey_domains')
.values({ pubkey: event1.pubkey, domain: 'localhost:8000', last_updated_at: event1.created_at })
.execute();

View File

@ -1,7 +1,7 @@
import { NostrEvent, NStore } from '@nostrify/nostrify';
import { matchFilter } from 'nostr-tools';
import { db } from '@/db.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Conf } from '@/config.ts';
@ -239,7 +239,7 @@ function gatherReportedProfiles({ events, store, signal }: HydrateOpts): Promise
}
/** Collect author stats from the events. */
function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> {
async function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> {
const pubkeys = new Set<string>(
events
.filter((event) => event.kind === 0)
@ -250,7 +250,8 @@ function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_st
return Promise.resolve([]);
}
return db
const kysely = await DittoDB.getInstance();
return kysely
.selectFrom('author_stats')
.selectAll()
.where('pubkey', 'in', [...pubkeys])
@ -258,7 +259,7 @@ function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_st
}
/** Collect event stats from the events. */
function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> {
async function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> {
const ids = new Set<string>(
events
.filter((event) => event.kind === 1)
@ -269,7 +270,8 @@ function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stat
return Promise.resolve([]);
}
return db
const kysely = await DittoDB.getInstance();
return kysely
.selectFrom('event_stats')
.selectAll()
.where('event_id', 'in', [...ids])