Make Trends use SQLite again
This commit is contained in:
parent
4d21dd05a8
commit
b2a5ff3eaf
|
@ -14,7 +14,7 @@
|
||||||
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
||||||
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip"
|
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip"
|
||||||
},
|
},
|
||||||
"unstable": ["ffi", "kv", "worker-options"],
|
"unstable": ["cron", "ffi", "kv", "worker-options"],
|
||||||
"exclude": ["./public"],
|
"exclude": ["./public"],
|
||||||
"imports": {
|
"imports": {
|
||||||
"@/": "./src/",
|
"@/": "./src/",
|
||||||
|
@ -37,7 +37,6 @@
|
||||||
"@std/streams": "jsr:@std/streams@^0.223.0",
|
"@std/streams": "jsr:@std/streams@^0.223.0",
|
||||||
"comlink": "npm:comlink@^4.4.1",
|
"comlink": "npm:comlink@^4.4.1",
|
||||||
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
|
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
|
||||||
"deno-sqlite": "https://raw.githubusercontent.com/alexgleason/deno-sqlite/325f66d8c395e7f6f5ee78ebfa42a0eeea4a942b/mod.ts",
|
|
||||||
"entities": "npm:entities@^4.5.0",
|
"entities": "npm:entities@^4.5.0",
|
||||||
"fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0",
|
"fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0",
|
||||||
"formdata-helper": "npm:formdata-helper@^0.3.0",
|
"formdata-helper": "npm:formdata-helper@^0.3.0",
|
||||||
|
|
|
@ -7,7 +7,7 @@ import { stripTime } from '@/utils/time.ts';
|
||||||
import { TrendsWorker } from '@/workers/trends.ts';
|
import { TrendsWorker } from '@/workers/trends.ts';
|
||||||
import { Context } from 'hono';
|
import { Context } from 'hono';
|
||||||
|
|
||||||
await TrendsWorker.setupCleanupJob();
|
await TrendsWorker.open('data/trends.sqlite3');
|
||||||
|
|
||||||
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
||||||
|
|
||||||
|
|
|
@ -6,13 +6,6 @@ export interface DittoTables {
|
||||||
author_stats: AuthorStatsRow;
|
author_stats: AuthorStatsRow;
|
||||||
event_stats: EventStatsRow;
|
event_stats: EventStatsRow;
|
||||||
pubkey_domains: PubkeyDomainRow;
|
pubkey_domains: PubkeyDomainRow;
|
||||||
trends_tag_usages: TagUsageRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface TagUsageRow {
|
|
||||||
tag: string;
|
|
||||||
pubkey8: string;
|
|
||||||
inserted_at: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
interface AuthorStatsRow {
|
interface AuthorStatsRow {
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
import { Kysely, sql } from 'kysely';
|
|
||||||
|
|
||||||
export async function up(db: Kysely<any>): Promise<void> {
|
|
||||||
await db.transaction().execute(async (trx) => {
|
|
||||||
await trx.schema
|
|
||||||
.createTable('trends_tag_usages')
|
|
||||||
.ifNotExists()
|
|
||||||
.addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
|
|
||||||
.addColumn('pubkey8', 'text', (c) => c.notNull())
|
|
||||||
.addColumn('inserted_at', 'integer', (c) => c.notNull())
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
await trx.schema
|
|
||||||
.createIndex('trends_idx_time_tag')
|
|
||||||
.ifNotExists()
|
|
||||||
.on('trends_tag_usages')
|
|
||||||
.column('inserted_at')
|
|
||||||
.column('tag')
|
|
||||||
.execute();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function down(db: Kysely<any>): Promise<void> {
|
|
||||||
await db.transaction().execute(async (trx) => {
|
|
||||||
await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute();
|
|
||||||
await trx.schema.dropTable('trends_tag_usages').ifExists().execute();
|
|
||||||
});
|
|
||||||
}
|
|
|
@ -1,6 +1,8 @@
|
||||||
import { assertEquals } from '@std/assert';
|
import { assertEquals } from '@std/assert';
|
||||||
import { TrendsWorker } from './trends.ts';
|
import { TrendsWorker } from './trends.ts';
|
||||||
|
|
||||||
|
await TrendsWorker.open(':memory:');
|
||||||
|
|
||||||
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
||||||
|
|
||||||
Deno.test('getTrendingTags', async () => {
|
Deno.test('getTrendingTags', async () => {
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
|
/// <reference lib="webworker" />
|
||||||
|
import { Database as Sqlite } from '@db/sqlite';
|
||||||
import { NSchema } from '@nostrify/nostrify';
|
import { NSchema } from '@nostrify/nostrify';
|
||||||
|
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||||
|
import { Kysely, sql } from 'kysely';
|
||||||
import * as Comlink from 'comlink';
|
import * as Comlink from 'comlink';
|
||||||
import { DB as Sqlite } from 'deno-sqlite';
|
|
||||||
|
|
||||||
import { hashtagSchema } from '@/schema.ts';
|
import { hashtagSchema } from '@/schema.ts';
|
||||||
import { generateDateRange, Time } from '@/utils/time.ts';
|
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
|
||||||
import { sql } from 'kysely';
|
|
||||||
|
|
||||||
interface GetTrendingTagsOpts {
|
interface GetTrendingTagsOpts {
|
||||||
since: Date;
|
since: Date;
|
||||||
|
@ -22,10 +23,40 @@ interface GetTagHistoryOpts {
|
||||||
offset?: number;
|
offset?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const kysely = await DittoDB.getInstance();
|
interface TagsDB {
|
||||||
|
tag_usages: {
|
||||||
|
tag: string;
|
||||||
|
pubkey8: string;
|
||||||
|
inserted_at: number;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let kysely: Kysely<TagsDB>;
|
||||||
|
|
||||||
export const TrendsWorker = {
|
export const TrendsWorker = {
|
||||||
setupCleanupJob() {
|
async open(path: string) {
|
||||||
|
kysely = new Kysely({
|
||||||
|
dialect: new DenoSqlite3Dialect({
|
||||||
|
database: new Sqlite(path),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
await kysely.schema
|
||||||
|
.createTable('tag_usages')
|
||||||
|
.ifNotExists()
|
||||||
|
.addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
|
||||||
|
.addColumn('pubkey8', 'text', (c) => c.notNull())
|
||||||
|
.addColumn('inserted_at', 'integer', (c) => c.notNull())
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
await kysely.schema
|
||||||
|
.createIndex('idx_time_tag')
|
||||||
|
.ifNotExists()
|
||||||
|
.on('tag_usages')
|
||||||
|
.column('inserted_at')
|
||||||
|
.column('tag')
|
||||||
|
.execute();
|
||||||
|
|
||||||
Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
|
Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
|
||||||
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
||||||
await this.cleanupTagUsages(lastWeek);
|
await this.cleanupTagUsages(lastWeek);
|
||||||
|
@ -38,7 +69,7 @@ export const TrendsWorker = {
|
||||||
accounts: number;
|
accounts: number;
|
||||||
uses: number;
|
uses: number;
|
||||||
}[]> {
|
}[]> {
|
||||||
return await kysely.selectFrom('trends_tag_usages')
|
return await kysely.selectFrom('tag_usages')
|
||||||
.select(({ fn }) => [
|
.select(({ fn }) => [
|
||||||
'tag',
|
'tag',
|
||||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||||
|
@ -59,7 +90,7 @@ export const TrendsWorker = {
|
||||||
*/
|
*/
|
||||||
async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
||||||
const result = await kysely
|
const result = await kysely
|
||||||
.selectFrom('trends_tag_usages')
|
.selectFrom('tag_usages')
|
||||||
.select(({ fn }) => [
|
.select(({ fn }) => [
|
||||||
'inserted_at',
|
'inserted_at',
|
||||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||||
|
@ -95,14 +126,14 @@ export const TrendsWorker = {
|
||||||
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
||||||
|
|
||||||
await kysely
|
await kysely
|
||||||
.insertInto('trends_tag_usages')
|
.insertInto('tag_usages')
|
||||||
.values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
|
.values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
|
||||||
.execute();
|
.execute();
|
||||||
},
|
},
|
||||||
|
|
||||||
async cleanupTagUsages(until: Date): Promise<void> {
|
async cleanupTagUsages(until: Date): Promise<void> {
|
||||||
await kysely
|
await kysely
|
||||||
.deleteFrom('trends_tag_usages')
|
.deleteFrom('tag_usages')
|
||||||
.where('inserted_at', '<', until.valueOf())
|
.where('inserted_at', '<', until.valueOf())
|
||||||
.execute();
|
.execute();
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue