Merge branch 'postgres' into 'main'

Support Postgres

See merge request soapbox-pub/ditto!167
This commit is contained in:
Alex Gleason 2024-04-20 02:13:08 +00:00
commit aeaa236518
19 changed files with 222 additions and 165 deletions

View File

@ -14,7 +14,9 @@
"imports": {
"@/": "./src/",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.9.7",
"~/fixtures/": "./fixtures/"
"~/fixtures/": "./fixtures/",
"kysely": "npm:kysely@^0.26.3",
"kysely_deno_postgres": "https://deno.land/x/kysely_deno_postgres@v0.4.0/mod.ts"
},
"lint": {
"include": ["src/", "scripts/"],

View File

@ -1,93 +1,10 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
import { Conf } from '@/config.ts';
import { setPragma } from '@/pragma.ts';
import SqliteWorker from '@/workers/sqlite.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { FileMigrationProvider, Migrator } from '@/deps.ts';
interface DittoDB {
events: EventRow;
events_fts: EventFTSRow;
tags: TagRow;
relays: RelayRow;
unattached_media: UnattachedMediaRow;
author_stats: AuthorStatsRow;
event_stats: EventStatsRow;
pubkey_domains: PubkeyDomainRow;
}
interface AuthorStatsRow {
pubkey: string;
followers_count: number;
following_count: number;
notes_count: number;
}
interface EventStatsRow {
event_id: string;
replies_count: number;
reposts_count: number;
reactions_count: number;
}
interface EventRow {
id: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
tags: string;
sig: string;
deleted_at: number | null;
}
interface EventFTSRow {
id: string;
content: string;
}
interface TagRow {
tag: string;
value: string;
event_id: string;
}
interface RelayRow {
url: string;
domain: string;
active: boolean;
}
interface UnattachedMediaRow {
id: string;
pubkey: string;
url: string;
data: string;
uploaded_at: Date;
}
interface PubkeyDomainRow {
pubkey: string;
domain: string;
last_updated_at: number;
}
const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(Conf.dbPath);
const db = new Kysely<DittoDB>({
dialect: new PolySqliteDialect({
database: sqliteWorker,
}),
});
// Set PRAGMA values.
await Promise.all([
setPragma(db, 'synchronous', 'normal'),
setPragma(db, 'temp_store', 'memory'),
setPragma(db, 'mmap_size', Conf.sqlite.mmapSize),
]);
const db = await DittoDB.getInstance();
const migrator = new Migrator({
db,
@ -120,4 +37,4 @@ async function migrate() {
await migrate();
export { type AuthorStatsRow, db, type DittoDB, type EventRow, type EventStatsRow, type TagRow };
export { db };

20
src/db/DittoDB.ts Normal file
View File

@ -0,0 +1,20 @@
import { Conf } from '@/config.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Kysely } from '@/deps.ts';
export class DittoDB {
static getInstance(): Promise<Kysely<DittoTables>> {
const { databaseUrl } = Conf;
switch (databaseUrl.protocol) {
case 'sqlite:':
return DittoSQLite.getInstance();
case 'postgres:':
return DittoPostgres.getInstance();
default:
throw new Error('Unsupported database URL.');
}
}
}

66
src/db/DittoTables.ts Normal file
View File

@ -0,0 +1,66 @@
export interface DittoTables {
events: EventRow;
events_fts: EventFTSRow;
tags: TagRow;
relays: RelayRow;
unattached_media: UnattachedMediaRow;
author_stats: AuthorStatsRow;
event_stats: EventStatsRow;
pubkey_domains: PubkeyDomainRow;
}
interface AuthorStatsRow {
pubkey: string;
followers_count: number;
following_count: number;
notes_count: number;
}
interface EventStatsRow {
event_id: string;
replies_count: number;
reposts_count: number;
reactions_count: number;
}
interface EventRow {
id: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
tags: string;
sig: string;
deleted_at: number | null;
}
interface EventFTSRow {
id: string;
content: string;
}
interface TagRow {
tag: string;
value: string;
event_id: string;
}
interface RelayRow {
url: string;
domain: string;
active: boolean;
}
interface UnattachedMediaRow {
id: string;
pubkey: string;
url: string;
data: string;
uploaded_at: number;
}
interface PubkeyDomainRow {
pubkey: string;
domain: string;
last_updated_at: number;
}

View File

@ -0,0 +1,35 @@
import { Kysely, PostgresAdapter, PostgresIntrospector, PostgresQueryCompiler } from 'kysely';
import { PostgreSQLDriver } from 'kysely_deno_postgres';
import { DittoTables } from '@/db/DittoTables.ts';
export class DittoPostgres {
static db: Kysely<DittoTables> | undefined;
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.db) {
this.db = new Kysely({
dialect: {
createAdapter() {
return new PostgresAdapter();
},
// @ts-ignore mismatched kysely versions probably
createDriver() {
return new PostgreSQLDriver({
connectionString: Deno.env.get('DATABASE_URL'),
});
},
createIntrospector(db: Kysely<unknown>) {
return new PostgresIntrospector(db);
},
createQueryCompiler() {
return new PostgresQueryCompiler();
},
},
});
}
return this.db;
}
}

View File

@ -0,0 +1,54 @@
import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Kysely, PolySqliteDialect, sql } from '@/deps.ts';
import SqliteWorker from '@/workers/sqlite.ts';
export class DittoSQLite {
static db: Kysely<DittoTables> | undefined;
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.db) {
const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(this.path);
this.db = new Kysely<DittoTables>({
dialect: new PolySqliteDialect({
database: sqliteWorker,
}),
});
// Set PRAGMA values.
await Promise.all([
sql`PRAGMA synchronous = normal`.execute(this.db),
sql`PRAGMA temp_store = memory`.execute(this.db),
sql`PRAGMA foreign_keys = ON`.execute(this.db),
sql`PRAGMA auto_vacuum = FULL`.execute(this.db),
sql`PRAGMA journal_mode = WAL`.execute(this.db),
sql.raw(`PRAGMA mmap_size = ${Conf.sqlite.mmapSize}`).execute(this.db),
]);
}
return this.db;
}
/** Get the relative or absolute path based on the `DATABASE_URL`. */
static get path() {
if (Deno.env.get('DATABASE_URL') === 'sqlite://:memory:') {
return ':memory:';
}
const { host, pathname } = Conf.databaseUrl;
if (!pathname) return '';
// Get relative path.
if (host === '') {
return pathname;
} else if (host === '.') {
return pathname;
} else if (host) {
return host + pathname;
}
return '';
}
}

View File

@ -1,4 +1,4 @@
import { Kysely, sql } from '@/deps.ts';
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
@ -21,13 +21,6 @@ export async function up(db: Kysely<any>): Promise<void> {
.addColumn('event_id', 'text', (col) => col.notNull())
.execute();
await db.schema
.createTable('users')
.addColumn('pubkey', 'text', (col) => col.primaryKey())
.addColumn('username', 'text', (col) => col.notNull().unique())
.addColumn('inserted_at', 'datetime', (col) => col.notNull().defaultTo(sql`CURRENT_TIMESTAMP`))
.execute();
await db.schema
.createIndex('idx_events_kind')
.on('events')

View File

@ -1,9 +1,12 @@
import { Conf } from '@/config.ts';
import { Kysely, sql } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.databaseUrl.protocol === 'sqlite:') {
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
}
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('events_fts').execute();
await db.schema.dropTable('events_fts').ifExists().execute();
}

View File

@ -1,10 +1,6 @@
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('users')
.addColumn('admin', 'boolean', (col) => col.defaultTo(false))
.execute();
export async function up(_db: Kysely<any>): Promise<void> {
}
export async function down(db: Kysely<any>): Promise<void> {

View File

@ -1,17 +1,6 @@
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createIndex('idx_users_pubkey')
.on('users')
.column('pubkey')
.execute();
await db.schema
.createIndex('idx_users_username')
.on('users')
.column('username')
.execute();
export async function up(_db: Kysely<any>): Promise<void> {
}
export async function down(db: Kysely<any>): Promise<void> {

View File

@ -1,12 +1,7 @@
import { Kysely, sql } from '@/deps.ts';
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await sql`PRAGMA foreign_keys = ON`.execute(db);
await sql`PRAGMA auto_vacuum = FULL`.execute(db);
await sql`VACUUM`.execute(db);
export async function up(_db: Kysely<any>): Promise<void> {
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`PRAGMA foreign_keys = OFF`.execute(db);
await sql`PRAGMA auto_vacuum = NONE`.execute(db);
export async function down(_db: Kysely<any>): Promise<void> {
}

View File

@ -1,4 +1,4 @@
import { Kysely, sql } from '@/deps.ts';
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
@ -7,7 +7,7 @@ export async function up(db: Kysely<any>): Promise<void> {
.addColumn('pubkey', 'text', (c) => c.notNull())
.addColumn('url', 'text', (c) => c.notNull())
.addColumn('data', 'text', (c) => c.notNull())
.addColumn('uploaded_at', 'datetime', (c) => c.notNull().defaultTo(sql`CURRENT_TIMESTAMP`))
.addColumn('uploaded_at', 'bigint', (c) => c.notNull())
.execute();
await db.schema

View File

@ -1,9 +1,7 @@
import { Kysely, sql } from '@/deps.ts';
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await sql`PRAGMA journal_mode = WAL`.execute(db);
export async function up(_db: Kysely<any>): Promise<void> {
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`PRAGMA journal_mode = DELETE`.execute(db);
export async function down(_db: Kysely<any>): Promise<void> {
}

View File

@ -1,7 +1,7 @@
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('users').execute();
await db.schema.dropTable('users').ifExists().execute();
}
export async function down(_db: Kysely<any>): Promise<void> {

View File

@ -7,14 +7,14 @@ interface UnattachedMedia {
pubkey: string;
url: string;
data: MediaData;
uploaded_at: Date;
uploaded_at: number;
}
/** Add unattached media into the database. */
async function insertUnattachedMedia(media: Omit<UnattachedMedia, 'id' | 'uploaded_at'>) {
const result = {
id: uuid62.v4(),
uploaded_at: new Date(),
uploaded_at: Date.now(),
...media,
};
@ -41,7 +41,7 @@ function selectUnattachedMediaQuery() {
function getUnattachedMedia(until: Date) {
return selectUnattachedMediaQuery()
.leftJoin('tags', 'unattached_media.url', 'tags.value')
.where('uploaded_at', '<', until)
.where('uploaded_at', '<', until.getTime())
.execute();
}

View File

@ -1,15 +0,0 @@
import { type Kysely, sql } from '@/deps.ts';
/** Set the PRAGMA and then read back its value to confirm. */
function setPragma(db: Kysely<any>, pragma: string, value: string | number) {
return sql.raw(`PRAGMA ${pragma} = ${value}`).execute(db);
}
/** Get value of PRAGMA from the database. */
async function getPragma(db: Kysely<any>, pragma: string) {
const result = await sql.raw(`PRAGMA ${pragma}`).execute(db);
const row = result.rows[0] as Record<string, unknown> | undefined;
return row?.[pragma];
}
export { getPragma, setPragma };

View File

@ -1,11 +1,12 @@
import { NostrEvent } from '@nostrify/nostrify';
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
import { db } from '@/db.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Debug, type InsertQueryBuilder } from '@/deps.ts';
import { eventsDB } from '@/storages.ts';
import { findReplyTag } from '@/tags.ts';
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
type AuthorStat = keyof Omit<DittoTables['author_stats'], 'pubkey'>;
type EventStat = keyof Omit<DittoTables['event_stats'], 'event_id'>;
type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: number];
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
@ -16,7 +17,7 @@ const debug = Debug('ditto:stats');
/** Store stats for the event in LMDB. */
async function updateStats(event: NostrEvent) {
let prev: NostrEvent | undefined;
const queries: InsertQueryBuilder<DittoDB, any, unknown>[] = [];
const queries: InsertQueryBuilder<DittoTables, any, unknown>[] = [];
// Kind 3 is a special case - replace the count with the new list.
if (event.kind === 3) {
@ -99,8 +100,8 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
/** Create an author stats query from the list of diffs. */
function authorStatsQuery(diffs: AuthorStatDiff[]) {
const values: AuthorStatsRow[] = diffs.map(([_, pubkey, stat, diff]) => {
const row: AuthorStatsRow = {
const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => {
const row: DittoTables['author_stats'] = {
pubkey,
followers_count: 0,
following_count: 0,
@ -125,8 +126,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) {
/** Create an event stats query from the list of diffs. */
function eventStatsQuery(diffs: EventStatDiff[]) {
const values: EventStatsRow[] = diffs.map(([_, event_id, stat, diff]) => {
const row: EventStatsRow = {
const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => {
const row: DittoTables['event_stats'] = {
event_id,
replies_count: 0,
reposts_count: 0,

View File

@ -1,6 +1,6 @@
import { NIP50, NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify';
import { Conf } from '@/config.ts';
import { type DittoDB } from '@/db.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { Debug, Kysely, type SelectQueryBuilder } from '@/deps.ts';
import { normalizeFilters } from '@/filter.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
@ -33,7 +33,7 @@ const tagConditions: Record<string, TagCondition> = {
'role': ({ event, count }) => event.kind === 30361 && count === 0,
};
type EventQuery = SelectQueryBuilder<DittoDB, 'events', {
type EventQuery = SelectQueryBuilder<DittoTables, 'events', {
id: string;
tags: string;
kind: number;
@ -58,10 +58,11 @@ type EventQuery = SelectQueryBuilder<DittoDB, 'events', {
/** SQLite database storage adapter for Nostr events. */
class EventsDB implements NStore {
#db: Kysely<DittoDB>;
#db: Kysely<DittoTables>;
#debug = Debug('ditto:db:events');
private protocol = Conf.databaseUrl.protocol;
constructor(db: Kysely<DittoDB>) {
constructor(db: Kysely<DittoTables>) {
this.#db = db;
}
@ -82,8 +83,10 @@ class EventsDB implements NStore {
.execute();
}
const protocol = this.protocol;
/** Add search data to the FTS table. */
async function indexSearch() {
if (protocol !== 'sqlite:') return;
const searchContent = buildSearchContent(event);
if (!searchContent) return;
await trx.insertInto('events_fts')
@ -143,7 +146,7 @@ class EventsDB implements NStore {
}
/** Build the query for a filter. */
getFilterQuery(db: Kysely<DittoDB>, filter: NostrFilter): EventQuery {
getFilterQuery(db: Kysely<DittoTables>, filter: NostrFilter): EventQuery {
let query = db
.selectFrom('events')
.select([
@ -194,7 +197,7 @@ class EventsDB implements NStore {
}
}
if (filter.search) {
if (filter.search && this.protocol === 'sqlite:') {
query = query
.innerJoin('events_fts', 'events_fts.id', 'events.id')
.where('events_fts.content', 'match', JSON.stringify(filter.search));
@ -315,7 +318,7 @@ class EventsDB implements NStore {
}
/** Delete events from each table. Should be run in a transaction! */
async deleteEventsTrx(db: Kysely<DittoDB>, filters: NostrFilter[]) {
async deleteEventsTrx(db: Kysely<DittoTables>, filters: NostrFilter[]) {
if (!filters.length) return Promise.resolve();
this.#debug('DELETE', JSON.stringify(filters));

View File

@ -28,7 +28,7 @@ export const TrendsWorker = {
CREATE TABLE IF NOT EXISTS tag_usages (
tag TEXT NOT NULL COLLATE NOCASE,
pubkey8 TEXT NOT NULL,
inserted_at DATETIME NOT NULL
inserted_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag);