Merge branch 'stats-race' into 'main'

stats: fix race conditions (on Postgres)

See merge request soapbox-pub/ditto!311
This commit is contained in:
Alex Gleason 2024-05-25 01:21:41 +00:00
commit 6062378b7e
10 changed files with 50 additions and 51 deletions

View File

@ -61,27 +61,6 @@ class Conf {
static get externalDomain() { static get externalDomain() {
return Deno.env.get('NOSTR_EXTERNAL') || Conf.localDomain; return Deno.env.get('NOSTR_EXTERNAL') || Conf.localDomain;
} }
/** Path to the main SQLite database which stores users, events, and more. */
static get dbPath() {
if (Deno.env.get('DATABASE_URL') === 'sqlite://:memory:') {
return ':memory:';
}
const { host, pathname } = Conf.databaseUrl;
if (!pathname) return '';
// Get relative path.
if (host === '') {
return pathname;
} else if (host === '.') {
return pathname;
} else if (host) {
return host + pathname;
}
return '';
}
/** /**
* Heroku-style database URL. This is used in production to connect to the * Heroku-style database URL. This is used in production to connect to the
* database. * database.
@ -92,9 +71,24 @@ class Conf {
* protocol://username:password@host:port/database_name * protocol://username:password@host:port/database_name
* ``` * ```
*/ */
static get databaseUrl(): url.UrlWithStringQuery { static get databaseUrl(): string {
return url.parse(Deno.env.get('DATABASE_URL') ?? 'sqlite://data/db.sqlite3'); return Deno.env.get('DATABASE_URL') ?? 'sqlite://data/db.sqlite3';
} }
static db = {
get url(): url.UrlWithStringQuery {
return url.parse(Deno.env.get('DATABASE_URL') ?? 'sqlite://data/db.sqlite3');
},
get dialect(): 'sqlite' | 'postgres' | undefined {
switch (Conf.db.url.protocol) {
case 'sqlite:':
return 'sqlite';
case 'postgres:':
case 'postgresql:':
return 'postgres';
}
return undefined;
},
};
/** Character limit to enforce for posts made through Mastodon API. */ /** Character limit to enforce for posts made through Mastodon API. */
static get postCharLimit() { static get postCharLimit() {
return Number(Deno.env.get('POST_CHAR_LIMIT') || 5000); return Number(Deno.env.get('POST_CHAR_LIMIT') || 5000);

View File

@ -19,16 +19,13 @@ export class DittoDB {
} }
static async _getInstance(): Promise<Kysely<DittoTables>> { static async _getInstance(): Promise<Kysely<DittoTables>> {
const { databaseUrl } = Conf;
let kysely: Kysely<DittoTables>; let kysely: Kysely<DittoTables>;
switch (databaseUrl.protocol) { switch (Conf.db.dialect) {
case 'sqlite:': case 'sqlite':
kysely = await DittoSQLite.getInstance(); kysely = await DittoSQLite.getInstance();
break; break;
case 'postgres:': case 'postgres':
case 'postgresql:':
kysely = await DittoPostgres.getInstance(); kysely = await DittoPostgres.getInstance();
break; break;
default: default:

View File

@ -19,7 +19,7 @@ export class DittoPostgres {
// @ts-ignore mismatched kysely versions probably // @ts-ignore mismatched kysely versions probably
createDriver() { createDriver() {
return new PostgreSQLDriver( return new PostgreSQLDriver(
{ connectionString: Deno.env.get('DATABASE_URL') }, { connectionString: Conf.databaseUrl },
Conf.pg.poolSize, Conf.pg.poolSize,
); );
}, },

View File

@ -36,11 +36,11 @@ export class DittoSQLite {
/** Get the relative or absolute path based on the `DATABASE_URL`. */ /** Get the relative or absolute path based on the `DATABASE_URL`. */
static get path() { static get path() {
if (Deno.env.get('DATABASE_URL') === 'sqlite://:memory:') { if (Conf.databaseUrl === 'sqlite://:memory:') {
return ':memory:'; return ':memory:';
} }
const { host, pathname } = Conf.databaseUrl; const { host, pathname } = Conf.db.url;
if (!pathname) return ''; if (!pathname) return '';

View File

@ -3,7 +3,7 @@ import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> { export async function up(db: Kysely<any>): Promise<void> {
if (Conf.databaseUrl.protocol === 'sqlite:') { if (Conf.db.dialect === 'sqlite') {
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
} }
} }

View File

@ -7,7 +7,7 @@ export async function up(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('tags').renameTo('nostr_tags').execute(); await db.schema.alterTable('tags').renameTo('nostr_tags').execute();
await db.schema.alterTable('nostr_tags').renameColumn('tag', 'name').execute(); await db.schema.alterTable('nostr_tags').renameColumn('tag', 'name').execute();
if (Conf.databaseUrl.protocol === 'sqlite:') { if (Conf.db.dialect === 'sqlite') {
await db.schema.dropTable('events_fts').execute(); await db.schema.dropTable('events_fts').execute();
await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(db); await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(db);
} }
@ -18,7 +18,7 @@ export async function down(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('nostr_tags').renameTo('tags').execute(); await db.schema.alterTable('nostr_tags').renameTo('tags').execute();
await db.schema.alterTable('tags').renameColumn('name', 'tag').execute(); await db.schema.alterTable('tags').renameColumn('name', 'tag').execute();
if (Conf.databaseUrl.protocol === 'sqlite:') { if (Conf.db.dialect === 'sqlite') {
await db.schema.dropTable('nostr_fts5').execute(); await db.schema.dropTable('nostr_fts5').execute();
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
} }

View File

@ -3,7 +3,7 @@ import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> { export async function up(db: Kysely<any>): Promise<void> {
if (['postgres:', 'postgresql:'].includes(Conf.databaseUrl.protocol!)) { if (Conf.db.dialect === 'postgres') {
await db.schema.createTable('nostr_pgfts') await db.schema.createTable('nostr_pgfts')
.ifNotExists() .ifNotExists()
.addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade'))
@ -13,7 +13,7 @@ export async function up(db: Kysely<any>): Promise<void> {
} }
export async function down(db: Kysely<any>): Promise<void> { export async function down(db: Kysely<any>): Promise<void> {
if (['postgres:', 'postgresql:'].includes(Conf.databaseUrl.protocol!)) { if (Conf.db.dialect === 'postgres') {
await db.schema.dropTable('nostr_pgfts').ifExists().execute(); await db.schema.dropTable('nostr_pgfts').ifExists().execute();
} }
} }

View File

@ -3,7 +3,7 @@ import { Kysely } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> { export async function up(db: Kysely<any>): Promise<void> {
if (['postgres:', 'postgresql:'].includes(Conf.databaseUrl.protocol!)) { if (Conf.db.dialect === 'postgres') {
await db.schema await db.schema
.createIndex('nostr_pgfts_gin_search_vec') .createIndex('nostr_pgfts_gin_search_vec')
.ifNotExists() .ifNotExists()
@ -15,7 +15,7 @@ export async function up(db: Kysely<any>): Promise<void> {
} }
export async function down(db: Kysely<any>): Promise<void> { export async function down(db: Kysely<any>): Promise<void> {
if (['postgres:', 'postgresql:'].includes(Conf.databaseUrl.protocol!)) { if (Conf.db.dialect === 'postgres') {
await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute(); await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute();
} }
} }

View File

@ -42,17 +42,8 @@ class EventsDB implements NStore {
}; };
constructor(private kysely: Kysely<DittoTables>) { constructor(private kysely: Kysely<DittoTables>) {
let fts: 'sqlite' | 'postgres' | undefined;
if (Conf.databaseUrl.protocol === 'sqlite:') {
fts = 'sqlite';
}
if (['postgres:', 'postgresql:'].includes(Conf.databaseUrl.protocol!)) {
fts = 'postgres';
}
this.store = new NDatabase(kysely, { this.store = new NDatabase(kysely, {
fts, fts: Conf.db.dialect,
indexTags: EventsDB.indexTags, indexTags: EventsDB.indexTags,
searchText: EventsDB.searchText, searchText: EventsDB.searchText,
}); });

View File

@ -6,6 +6,7 @@ import { SetRequired } from 'type-fest';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
import { getTagSet } from '@/utils/tags.ts'; import { getTagSet } from '@/utils/tags.ts';
import { Conf } from '@/config.ts';
interface UpdateStatsOpts { interface UpdateStatsOpts {
kysely: Kysely<DittoTables>; kysely: Kysely<DittoTables>;
@ -153,8 +154,16 @@ export async function updateAuthorStats(
notes_count: 0, notes_count: 0,
}; };
const prev = await getAuthorStats(kysely, pubkey); let query = kysely
.selectFrom('author_stats')
.selectAll()
.where('pubkey', '=', pubkey);
if (Conf.db.dialect === 'postgres') {
query = query.forUpdate();
}
const prev = await query.executeTakeFirst();
const stats = fn(prev ?? empty); const stats = fn(prev ?? empty);
if (prev) { if (prev) {
@ -195,8 +204,16 @@ export async function updateEventStats(
reactions: '{}', reactions: '{}',
}; };
const prev = await getEventStats(kysely, eventId); let query = kysely
.selectFrom('event_stats')
.selectAll()
.where('event_id', '=', eventId);
if (Conf.db.dialect === 'postgres') {
query = query.forUpdate();
}
const prev = await query.executeTakeFirst();
const stats = fn(prev ?? empty); const stats = fn(prev ?? empty);
if (prev) { if (prev) {