Rewrite SqliteWorker with Comlink

This commit is contained in:
Alex Gleason 2023-12-03 14:58:35 -06:00
parent 455752e656
commit a4e7c241d0
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
4 changed files with 38 additions and 72 deletions

View File

@ -4,7 +4,7 @@ import path from 'node:path';
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
import { Conf } from '@/config.ts';
import { getPragma, setPragma } from '@/pragma.ts';
import { sqliteWorker } from '@/workers.ts';
import SqliteWorker from '@/workers/sqlite.ts';
interface DittoDB {
events: EventRow;
@ -57,8 +57,8 @@ interface UnattachedMediaRow {
uploaded_at: Date;
}
await sqliteWorker.ready;
await sqliteWorker.open();
const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(Conf.dbPath);
const db = new Kysely<DittoDB>({
dialect: new PolySqliteDialect({

View File

@ -1,6 +0,0 @@
import { Conf } from '@/config.ts';
import SqliteWorker from '@/workers/sqlite.ts';
const sqliteWorker = new SqliteWorker(Conf.dbPath);
export { sqliteWorker };

View File

@ -1,15 +1,18 @@
import { Comlink } from '@/deps.ts';
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
import type { CompiledQuery, QueryResult } from '@/deps.ts';
class SqliteWorker {
#path: string;
#worker: Worker;
ready: Promise<void>;
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
#ready: Promise<void>;
constructor(path: string) {
this.#path = path;
constructor() {
this.#worker = new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' });
this.#client = Comlink.wrap<typeof _SqliteWorker>(this.#worker);
this.ready = new Promise<void>((resolve) => {
this.#ready = new Promise<void>((resolve) => {
const handleEvent = (event: MessageEvent) => {
if (event.data[0] === 'ready') {
this.#worker.removeEventListener('message', handleEvent);
@ -20,37 +23,18 @@ class SqliteWorker {
});
}
async open(): Promise<void> {
await this.ready;
return this.#call(['open', [this.#path]]);
async open(path: string): Promise<void> {
await this.#ready;
return this.#client.open(path);
}
async executeQuery<R>({ sql, parameters }: CompiledQuery): Promise<QueryResult<R>> {
await this.ready;
return this.#call(['query', [sql, parameters]]);
async executeQuery<R>(query: CompiledQuery): Promise<QueryResult<R>> {
await this.#ready;
return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
}
#call<T>(msg: [string, unknown[]]): Promise<T> {
const id = crypto.randomUUID();
this.#worker.postMessage([id, msg]);
// TODO: use a hashmap instead of an event listener for better performance.
return new Promise((resolve) => {
const handleEvent = (event: MessageEvent<[string, T]>) => {
const [_id, result] = event.data;
if (_id === id) {
this.#worker.removeEventListener('message', handleEvent);
resolve(result);
}
};
this.#worker.addEventListener('message', handleEvent);
});
}
// deno-lint-ignore require-await
async destroy() {
this.#worker.terminate();
destroy(): Promise<void> {
return this.#client.destroy();
}
}

View File

@ -1,38 +1,26 @@
/// <reference lib="webworker" />
import { DenoSqlite3 } from '@/deps.ts';
import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult } from '@/deps.ts';
let db: DenoSqlite3;
let db: DenoSqlite3 | undefined;
type Msg =
| ['open', [string]]
| ['query', [string, unknown[]]];
export const SqliteWorker = {
open(path: string): void {
db = new DenoSqlite3(path);
},
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
if (!db) throw new Error('Database not open');
return {
rows: db.prepare(sql).all(...parameters as any[]) as R[],
numAffectedRows: BigInt(db.changes),
insertId: BigInt(db.lastInsertRowId),
};
},
destroy() {
db?.close();
},
};
function call([cmd, args]: Msg) {
switch (cmd) {
case 'open':
return handleOpen(args[0]);
case 'query':
return handleQuery(args[0], args[1]);
}
}
function handleOpen(path: string): void {
db = new DenoSqlite3(path);
}
function handleQuery(sql: string, params: any[] = []) {
return {
rows: db.prepare(sql).all(...params),
numAffectedRows: BigInt(db.changes),
insertId: BigInt(db.lastInsertRowId),
};
}
self.addEventListener('message', (event: MessageEvent<[string, Msg]>) => {
const [id, msg] = event.data;
const result = call(msg);
self.postMessage([id, result]);
});
Comlink.expose(SqliteWorker);
self.postMessage(['ready']);