Implement the DenoSqliteDriver correctly

This commit is contained in:
Alex Gleason 2023-08-06 22:21:46 -05:00
parent 7c2f290775
commit 781ca741dd
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
2 changed files with 93 additions and 36 deletions

View File

@ -1,17 +1,17 @@
export { export {
CompiledQuery,
type DatabaseConnection,
type DatabaseIntrospector, type DatabaseIntrospector,
type Dialect, type Dialect,
type DialectAdapter, type DialectAdapter,
type Driver, type Driver,
Kysely, Kysely,
type QueryCompiler, type QueryCompiler,
type QueryResult,
SqliteAdapter, SqliteAdapter,
type SqliteDatabase,
type SqliteDialectConfig, type SqliteDialectConfig,
SqliteDriver,
SqliteIntrospector, SqliteIntrospector,
SqliteQueryCompiler, SqliteQueryCompiler,
type SqliteStatement,
} from 'npm:kysely@^0.25.0'; } from 'npm:kysely@^0.25.0';
export type { DB as DenoSqlite } from 'https://deno.land/x/sqlite@v3.7.3/mod.ts'; export type { DB as DenoSqlite } from 'https://deno.land/x/sqlite@v3.7.3/mod.ts';

View File

@ -1,50 +1,107 @@
import { type DenoSqlite, type SqliteDatabase, SqliteDriver, type SqliteStatement } from '../deps.ts'; import { CompiledQuery, type DatabaseConnection, type DenoSqlite, type Driver, type QueryResult } from '../deps.ts';
import type { DenoSqliteDialectConfig } from './deno-sqlite-dialect-config.ts'; import type { DenoSqliteDialectConfig } from './deno-sqlite-dialect-config.ts';
class DenoSqliteDriver extends SqliteDriver { class DenoSqliteDriver implements Driver {
readonly #config: DenoSqliteDialectConfig;
readonly #connectionMutex = new ConnectionMutex();
#db?: DenoSqlite;
#connection?: DatabaseConnection;
constructor(config: DenoSqliteDialectConfig) { constructor(config: DenoSqliteDialectConfig) {
super({ this.#config = Object.freeze({ ...config });
...config, }
database: async () =>
new DenoSqliteDatabase( async init(): Promise<void> {
typeof config.database === 'function' ? await config.database() : config.database, this.#db = typeof this.#config.database === 'function' ? await this.#config.database() : this.#config.database;
),
}); this.#connection = new DenoSqliteConnection(this.#db);
if (this.#config.onCreateConnection) {
await this.#config.onCreateConnection(this.#connection);
}
}
async acquireConnection(): Promise<DatabaseConnection> {
// SQLite only has one single connection. We use a mutex here to wait
// until the single connection has been released.
await this.#connectionMutex.lock();
return this.#connection!;
}
async beginTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('begin'));
}
async commitTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('commit'));
}
async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw('rollback'));
}
// deno-lint-ignore require-await
async releaseConnection(): Promise<void> {
this.#connectionMutex.unlock();
}
// deno-lint-ignore require-await
async destroy(): Promise<void> {
this.#db?.close();
} }
} }
/** HACK: This is an adapter class. */ class DenoSqliteConnection implements DatabaseConnection {
class DenoSqliteDatabase implements SqliteDatabase { readonly #db: DenoSqlite;
#db: DenoSqlite;
constructor(db: DenoSqlite) { constructor(db: DenoSqlite) {
this.#db = db; this.#db = db;
} }
close(): void { executeQuery<O>({ sql, parameters }: CompiledQuery): Promise<QueryResult<O>> {
this.#db.close(); const query = this.#db.prepareQuery(sql);
const rows = query.allEntries(parameters as any);
const { changes, lastInsertRowId } = this.#db;
query.finalize();
return Promise.resolve({
rows: rows as O[],
numAffectedRows: BigInt(changes),
insertId: BigInt(lastInsertRowId),
});
} }
prepare(sql: string): SqliteStatement { // deno-lint-ignore require-yield
const query = this.#db.prepareQuery(sql); async *streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
return { throw new Error('Sqlite driver doesn\'t support streaming');
// HACK: implement an actual driver to fix this. }
reader: true, }
all: (parameters: ReadonlyArray<unknown>) => {
const result = query.allEntries(parameters as any); class ConnectionMutex {
query.finalize(); #promise?: Promise<void>;
return result; #resolve?: () => void;
},
run: (parameters: ReadonlyArray<unknown>) => { async lock(): Promise<void> {
query.execute(parameters as any); while (this.#promise) {
query.finalize(); await this.#promise;
return { }
changes: this.#db.changes,
lastInsertRowid: this.#db.lastInsertRowId, this.#promise = new Promise((resolve) => {
}; this.#resolve = resolve;
}, });
}; }
unlock(): void {
const resolve = this.#resolve;
this.#promise = undefined;
this.#resolve = undefined;
resolve?.();
} }
} }