From f19629600dee459253e3aaa8031c48713096b368 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Mon, 20 May 2024 00:05:03 +0530 Subject: [PATCH] rewrite trendsworker with kysely and deno cron, format changes --- src/controllers/api/trends.ts | 13 +-- src/db/DittoTables.ts | 8 +- src/db/migrations/021_create_trends.ts | 40 ++++---- src/workers/trends.test.ts | 9 +- src/workers/trends.worker.ts | 124 +++++++++++-------------- 5 files changed, 90 insertions(+), 104 deletions(-) diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index cd4d0f2..dc8e6f3 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -5,12 +5,13 @@ import { Conf } from '@/config.ts'; import { Time } from '@/utils.ts'; import { stripTime } from '@/utils/time.ts'; import { TrendsWorker } from '@/workers/trends.ts'; +import { Context } from 'hono'; -await TrendsWorker.open('data/trends.sqlite3'); +await TrendsWorker.setupCleanupJob(); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); -const trendingTagsController: AppController = async (c) => { +const trendingTagsController: AppController = async (c: Context) => { const limit = limitSchema.parse(c.req.query('limit')); if (limit < 1) return c.json([]); @@ -26,9 +27,9 @@ const trendingTagsController: AppController = async (c) => { }); return c.json( - await Promise.all(tags.map(async ({ name, uses, accounts }) => ({ - name, - url: Conf.local(`/tags/${name}`), + await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ + tag, + url: Conf.local(`/tags/${tag}`), history: [ // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. // This result is more accurate than what Mastodon returns. @@ -38,7 +39,7 @@ const trendingTagsController: AppController = async (c) => { uses: String(uses), }, ...(await TrendsWorker.getTagHistory({ - tag: name, + tag, since: lastWeek, until: now, limit: 6, diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index abbf570..d2a42ca 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -10,10 +10,10 @@ export interface DittoTables { } interface TagUsageRow { - tag: string, - pubkey8: string, - inserted_at: number -}; + tag: string; + pubkey8: string; + inserted_at: number; +} interface AuthorStatsRow { pubkey: string; diff --git a/src/db/migrations/021_create_trends.ts b/src/db/migrations/021_create_trends.ts index ed3dbc7..cabbf28 100644 --- a/src/db/migrations/021_create_trends.ts +++ b/src/db/migrations/021_create_trends.ts @@ -1,28 +1,28 @@ import { Kysely, sql } from 'kysely'; export async function up(db: Kysely): Promise { - await db.transaction().execute(async trx => { - await trx.schema - .createTable('trends_tag_usages') - .ifNotExists() - .addColumn('tag', 'text', c => c.notNull().modifyEnd(sql`collate nocase`)) - .addColumn('pubkey8', 'text', c => c.notNull()) - .addColumn('inserted_at', 'integer', c => c.notNull()) - .execute(); + await db.transaction().execute(async (trx) => { + await trx.schema + .createTable('trends_tag_usages') + .ifNotExists() + .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`)) + .addColumn('pubkey8', 'text', (c) => c.notNull()) + .addColumn('inserted_at', 'integer', (c) => c.notNull()) + .execute(); - await trx.schema - .createIndex('trends_idx_time_tag') - .ifNotExists() - .on('trends_tag_usages') - .column('inserted_at') - .column('tag') - .execute(); - }) + await trx.schema + .createIndex('trends_idx_time_tag') + .ifNotExists() + .on('trends_tag_usages') + .column('inserted_at') + .column('tag') + .execute(); + }); } export async function down(db: Kysely): Promise { - await db.transaction().execute(async trx => { - await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); - await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); - }) + await db.transaction().execute(async (trx) => { + await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); + await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); + }); } diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index ca1646e..4c89048 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -1,9 +1,6 @@ import { assertEquals } from '@std/assert'; - import { TrendsWorker } from './trends.ts'; -await TrendsWorker.open(':memory:'); - const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; Deno.test('getTrendingTags', async () => { @@ -19,9 +16,9 @@ Deno.test('getTrendingTags', async () => { }); const expected = [ - { name: 'ditto', accounts: 3, uses: 3 }, - { name: 'hello', accounts: 2, uses: 3 }, - { name: 'yolo', accounts: 1, uses: 1 }, + { tag: 'ditto', accounts: 3, uses: 3 }, + { tag: 'hello', accounts: 2, uses: 3 }, + { tag: 'yolo', accounts: 1, uses: 1 }, ]; assertEquals(result, expected); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 33fd1a1..b9f5f80 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -1,9 +1,10 @@ import { NSchema } from '@nostrify/nostrify'; import * as Comlink from 'comlink'; -import { Sqlite } from '@/deps.ts'; import { hashtagSchema } from '@/schema.ts'; import { generateDateRange, Time } from '@/utils/time.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; +import { sql } from 'kysely'; interface GetTrendingTagsOpts { since: Date; @@ -20,73 +21,57 @@ interface GetTagHistoryOpts { offset?: number; } -let db: Sqlite; +const kysely = await DittoDB.getInstance(); export const TrendsWorker = { - open(path: string) { - db = new Sqlite(path); - - db.execute(` - CREATE TABLE IF NOT EXISTS tag_usages ( - tag TEXT NOT NULL COLLATE NOCASE, - pubkey8 TEXT NOT NULL, - inserted_at INTEGER NOT NULL - ); - - CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag); - `); - - const cleanup = () => { - console.info('Cleaning up old tag usages...'); + setupCleanupJob() { + Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => { const lastWeek = new Date(new Date().getTime() - Time.days(7)); - this.cleanupTagUsages(lastWeek); - }; - - setInterval(cleanup, Time.hours(1)); - cleanup(); + await this.cleanupTagUsages(lastWeek); + }); }, /** Gets the most used hashtags between the date range. */ - getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) { - return db.query( - ` - SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE inserted_at >= ? AND inserted_at < ? - GROUP BY tag - HAVING COUNT(DISTINCT pubkey8) >= ? - ORDER BY COUNT(DISTINCT pubkey8) - DESC LIMIT ?; - `, - [since, until, threshold, limit], - ).map((row) => ({ - name: row[0], - accounts: Number(row[1]), - uses: Number(row[2]), - })); + async getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ + tag: string; + accounts: number; + uses: number; + }[]> { + return await kysely.selectFrom('trends_tag_usages') + .select(({ fn }) => [ + 'tag', + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('inserted_at', '>=', since.valueOf()) + .where('inserted_at', '<', until.valueOf()) + .groupBy('tag') + .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold)) + .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc') + .limit(limit) + .execute(); }, /** * Gets the tag usage count for a specific tag. * It returns an array with counts for each date between the range. */ - getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { - const result = db.query( - ` - SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE tag = ? AND inserted_at >= ? AND inserted_at < ? - GROUP BY date(inserted_at) - ORDER BY date(inserted_at) DESC - LIMIT ? - OFFSET ?; - `, - [tag, since, until, limit, offset], - ).map((row) => ({ - day: new Date(row[0]), - accounts: Number(row[1]), - uses: Number(row[2]), - })); + async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { + const result = await kysely + .selectFrom('trends_tag_usages') + .select(({ fn }) => [ + 'inserted_at', + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('tag', '=', tag) + .where('inserted_at', '>=', since.valueOf()) + .where('inserted_at', '<', until.valueOf()) + .groupBy(sql`inserted_at`) + .orderBy(sql`inserted_at`, 'desc') + .limit(limit) + .offset(offset) + .execute(); /** Full date range between `since` and `until`. */ const dateRange = generateDateRange( @@ -96,26 +81,29 @@ export const TrendsWorker = { // Fill in missing dates with 0 usages. return dateRange.map((day) => { - const data = result.find((item) => item.day.getTime() === day.getTime()); - return data || { day, accounts: 0, uses: 0 }; + const data = result.find((item) => new Date(item.inserted_at).getTime() === day.getTime()); + if (data) { + return { ...data, day: new Date(data.inserted_at) }; + } + return { day, accounts: 0, uses: 0 }; }); }, - addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void { + async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date().valueOf()): Promise { const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); - db.query( - 'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '), - tags.map((tag) => [tag, pubkey8, date]).flat(), - ); + await kysely + .insertInto('trends_tag_usages') + .values(tags.map((tag) => ({ tag, pubkey8, inserted_at }))) + .execute(); }, - cleanupTagUsages(until: Date): void { - db.query( - 'DELETE FROM tag_usages WHERE inserted_at < ?', - [until], - ); + async cleanupTagUsages(until: Date): Promise { + await kysely + .deleteFrom('trends_tag_usages') + .where('inserted_at', '<', until.valueOf()) + .execute(); }, };