From 83a7b1f2318bf16bf523d4a61ca5a0402a0e2643 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Fri, 17 May 2024 14:46:20 +0530 Subject: [PATCH 1/7] create tags_usages kysely table --- src/db/DittoTables.ts | 7 +++++++ src/db/migrations/021_create_trends.ts | 28 ++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 src/db/migrations/021_create_trends.ts diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index 42d39ea..abbf570 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -6,8 +6,15 @@ export interface DittoTables { author_stats: AuthorStatsRow; event_stats: EventStatsRow; pubkey_domains: PubkeyDomainRow; + trends_tag_usages: TagUsageRow; } +interface TagUsageRow { + tag: string, + pubkey8: string, + inserted_at: number +}; + interface AuthorStatsRow { pubkey: string; followers_count: number; diff --git a/src/db/migrations/021_create_trends.ts b/src/db/migrations/021_create_trends.ts new file mode 100644 index 0000000..ed3dbc7 --- /dev/null +++ b/src/db/migrations/021_create_trends.ts @@ -0,0 +1,28 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await db.transaction().execute(async trx => { + await trx.schema + .createTable('trends_tag_usages') + .ifNotExists() + .addColumn('tag', 'text', c => c.notNull().modifyEnd(sql`collate nocase`)) + .addColumn('pubkey8', 'text', c => c.notNull()) + .addColumn('inserted_at', 'integer', c => c.notNull()) + .execute(); + + await trx.schema + .createIndex('trends_idx_time_tag') + .ifNotExists() + .on('trends_tag_usages') + .column('inserted_at') + .column('tag') + .execute(); + }) +} + +export async function down(db: Kysely): Promise { + await db.transaction().execute(async trx => { + await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); + await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); + }) +} From f19629600dee459253e3aaa8031c48713096b368 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Mon, 20 May 2024 00:05:03 +0530 Subject: [PATCH 2/7] rewrite trendsworker with kysely and deno cron, format changes --- src/controllers/api/trends.ts | 13 +-- src/db/DittoTables.ts | 8 +- src/db/migrations/021_create_trends.ts | 40 ++++---- src/workers/trends.test.ts | 9 +- src/workers/trends.worker.ts | 124 +++++++++++-------------- 5 files changed, 90 insertions(+), 104 deletions(-) diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index cd4d0f2..dc8e6f3 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -5,12 +5,13 @@ import { Conf } from '@/config.ts'; import { Time } from '@/utils.ts'; import { stripTime } from '@/utils/time.ts'; import { TrendsWorker } from '@/workers/trends.ts'; +import { Context } from 'hono'; -await TrendsWorker.open('data/trends.sqlite3'); +await TrendsWorker.setupCleanupJob(); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); -const trendingTagsController: AppController = async (c) => { +const trendingTagsController: AppController = async (c: Context) => { const limit = limitSchema.parse(c.req.query('limit')); if (limit < 1) return c.json([]); @@ -26,9 +27,9 @@ const trendingTagsController: AppController = async (c) => { }); return c.json( - await Promise.all(tags.map(async ({ name, uses, accounts }) => ({ - name, - url: Conf.local(`/tags/${name}`), + await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ + tag, + url: Conf.local(`/tags/${tag}`), history: [ // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. // This result is more accurate than what Mastodon returns. @@ -38,7 +39,7 @@ const trendingTagsController: AppController = async (c) => { uses: String(uses), }, ...(await TrendsWorker.getTagHistory({ - tag: name, + tag, since: lastWeek, until: now, limit: 6, diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index abbf570..d2a42ca 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -10,10 +10,10 @@ export interface DittoTables { } interface TagUsageRow { - tag: string, - pubkey8: string, - inserted_at: number -}; + tag: string; + pubkey8: string; + inserted_at: number; +} interface AuthorStatsRow { pubkey: string; diff --git a/src/db/migrations/021_create_trends.ts b/src/db/migrations/021_create_trends.ts index ed3dbc7..cabbf28 100644 --- a/src/db/migrations/021_create_trends.ts +++ b/src/db/migrations/021_create_trends.ts @@ -1,28 +1,28 @@ import { Kysely, sql } from 'kysely'; export async function up(db: Kysely): Promise { - await db.transaction().execute(async trx => { - await trx.schema - .createTable('trends_tag_usages') - .ifNotExists() - .addColumn('tag', 'text', c => c.notNull().modifyEnd(sql`collate nocase`)) - .addColumn('pubkey8', 'text', c => c.notNull()) - .addColumn('inserted_at', 'integer', c => c.notNull()) - .execute(); + await db.transaction().execute(async (trx) => { + await trx.schema + .createTable('trends_tag_usages') + .ifNotExists() + .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`)) + .addColumn('pubkey8', 'text', (c) => c.notNull()) + .addColumn('inserted_at', 'integer', (c) => c.notNull()) + .execute(); - await trx.schema - .createIndex('trends_idx_time_tag') - .ifNotExists() - .on('trends_tag_usages') - .column('inserted_at') - .column('tag') - .execute(); - }) + await trx.schema + .createIndex('trends_idx_time_tag') + .ifNotExists() + .on('trends_tag_usages') + .column('inserted_at') + .column('tag') + .execute(); + }); } export async function down(db: Kysely): Promise { - await db.transaction().execute(async trx => { - await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); - await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); - }) + await db.transaction().execute(async (trx) => { + await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); + await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); + }); } diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index ca1646e..4c89048 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -1,9 +1,6 @@ import { assertEquals } from '@std/assert'; - import { TrendsWorker } from './trends.ts'; -await TrendsWorker.open(':memory:'); - const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; Deno.test('getTrendingTags', async () => { @@ -19,9 +16,9 @@ Deno.test('getTrendingTags', async () => { }); const expected = [ - { name: 'ditto', accounts: 3, uses: 3 }, - { name: 'hello', accounts: 2, uses: 3 }, - { name: 'yolo', accounts: 1, uses: 1 }, + { tag: 'ditto', accounts: 3, uses: 3 }, + { tag: 'hello', accounts: 2, uses: 3 }, + { tag: 'yolo', accounts: 1, uses: 1 }, ]; assertEquals(result, expected); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 33fd1a1..b9f5f80 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -1,9 +1,10 @@ import { NSchema } from '@nostrify/nostrify'; import * as Comlink from 'comlink'; -import { Sqlite } from '@/deps.ts'; import { hashtagSchema } from '@/schema.ts'; import { generateDateRange, Time } from '@/utils/time.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; +import { sql } from 'kysely'; interface GetTrendingTagsOpts { since: Date; @@ -20,73 +21,57 @@ interface GetTagHistoryOpts { offset?: number; } -let db: Sqlite; +const kysely = await DittoDB.getInstance(); export const TrendsWorker = { - open(path: string) { - db = new Sqlite(path); - - db.execute(` - CREATE TABLE IF NOT EXISTS tag_usages ( - tag TEXT NOT NULL COLLATE NOCASE, - pubkey8 TEXT NOT NULL, - inserted_at INTEGER NOT NULL - ); - - CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag); - `); - - const cleanup = () => { - console.info('Cleaning up old tag usages...'); + setupCleanupJob() { + Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => { const lastWeek = new Date(new Date().getTime() - Time.days(7)); - this.cleanupTagUsages(lastWeek); - }; - - setInterval(cleanup, Time.hours(1)); - cleanup(); + await this.cleanupTagUsages(lastWeek); + }); }, /** Gets the most used hashtags between the date range. */ - getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) { - return db.query( - ` - SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE inserted_at >= ? AND inserted_at < ? - GROUP BY tag - HAVING COUNT(DISTINCT pubkey8) >= ? - ORDER BY COUNT(DISTINCT pubkey8) - DESC LIMIT ?; - `, - [since, until, threshold, limit], - ).map((row) => ({ - name: row[0], - accounts: Number(row[1]), - uses: Number(row[2]), - })); + async getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ + tag: string; + accounts: number; + uses: number; + }[]> { + return await kysely.selectFrom('trends_tag_usages') + .select(({ fn }) => [ + 'tag', + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('inserted_at', '>=', since.valueOf()) + .where('inserted_at', '<', until.valueOf()) + .groupBy('tag') + .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold)) + .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc') + .limit(limit) + .execute(); }, /** * Gets the tag usage count for a specific tag. * It returns an array with counts for each date between the range. */ - getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { - const result = db.query( - ` - SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE tag = ? AND inserted_at >= ? AND inserted_at < ? - GROUP BY date(inserted_at) - ORDER BY date(inserted_at) DESC - LIMIT ? - OFFSET ?; - `, - [tag, since, until, limit, offset], - ).map((row) => ({ - day: new Date(row[0]), - accounts: Number(row[1]), - uses: Number(row[2]), - })); + async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { + const result = await kysely + .selectFrom('trends_tag_usages') + .select(({ fn }) => [ + 'inserted_at', + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('tag', '=', tag) + .where('inserted_at', '>=', since.valueOf()) + .where('inserted_at', '<', until.valueOf()) + .groupBy(sql`inserted_at`) + .orderBy(sql`inserted_at`, 'desc') + .limit(limit) + .offset(offset) + .execute(); /** Full date range between `since` and `until`. */ const dateRange = generateDateRange( @@ -96,26 +81,29 @@ export const TrendsWorker = { // Fill in missing dates with 0 usages. return dateRange.map((day) => { - const data = result.find((item) => item.day.getTime() === day.getTime()); - return data || { day, accounts: 0, uses: 0 }; + const data = result.find((item) => new Date(item.inserted_at).getTime() === day.getTime()); + if (data) { + return { ...data, day: new Date(data.inserted_at) }; + } + return { day, accounts: 0, uses: 0 }; }); }, - addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void { + async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date().valueOf()): Promise { const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); - db.query( - 'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '), - tags.map((tag) => [tag, pubkey8, date]).flat(), - ); + await kysely + .insertInto('trends_tag_usages') + .values(tags.map((tag) => ({ tag, pubkey8, inserted_at }))) + .execute(); }, - cleanupTagUsages(until: Date): void { - db.query( - 'DELETE FROM tag_usages WHERE inserted_at < ?', - [until], - ); + async cleanupTagUsages(until: Date): Promise { + await kysely + .deleteFrom('trends_tag_usages') + .where('inserted_at', '<', until.valueOf()) + .execute(); }, }; From 949697d80a000a99fe0eac8f4bee85ab20940755 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Mon, 20 May 2024 00:11:29 +0530 Subject: [PATCH 3/7] fix type error caused by trends refactor --- src/pipeline.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 15d495e..9c1ce12 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -177,7 +177,7 @@ async function trackHashtags(event: NostrEvent): Promise { try { debug('tracking tags:', JSON.stringify(tags)); - await TrendsWorker.addTagUsages(event.pubkey, tags, date); + await TrendsWorker.addTagUsages(event.pubkey, tags, date.valueOf()); } catch (_e) { // do nothing } @@ -192,7 +192,7 @@ async function fetchRelatedEvents(event: DittoEvent) { const signal = AbortSignal.timeout(3000); reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => {}); + .catch(() => { }); } for (const [name, id] of event.tags) { @@ -202,7 +202,7 @@ async function fetchRelatedEvents(event: DittoEvent) { const signal = AbortSignal.timeout(3000); reqmeister.query([{ ids: [id] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => {}); + .catch(() => { }); } } } From 29eb87dab321c151375d70a31bfc38a692a9a341 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Mon, 20 May 2024 00:14:29 +0530 Subject: [PATCH 4/7] format --- src/pipeline.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 9c1ce12..0672c88 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -192,7 +192,7 @@ async function fetchRelatedEvents(event: DittoEvent) { const signal = AbortSignal.timeout(3000); reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => { }); + .catch(() => {}); } for (const [name, id] of event.tags) { @@ -202,7 +202,7 @@ async function fetchRelatedEvents(event: DittoEvent) { const signal = AbortSignal.timeout(3000); reqmeister.query([{ ids: [id] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => { }); + .catch(() => {}); } } } From b2a5ff3eaf4310b464ca8cb8925dc95b2f0e833a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 21 May 2024 17:12:33 -0500 Subject: [PATCH 5/7] Make Trends use SQLite again --- deno.json | 3 +- src/controllers/api/trends.ts | 2 +- src/db/DittoTables.ts | 7 ---- src/db/migrations/021_create_trends.ts | 28 --------------- src/workers/trends.test.ts | 2 ++ src/workers/trends.worker.ts | 49 +++++++++++++++++++++----- 6 files changed, 44 insertions(+), 47 deletions(-) delete mode 100644 src/db/migrations/021_create_trends.ts diff --git a/deno.json b/deno.json index 75ea333..e54bb7a 100644 --- a/deno.json +++ b/deno.json @@ -14,7 +14,7 @@ "stats:recompute": "deno run -A scripts/stats-recompute.ts", "soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip" }, - "unstable": ["ffi", "kv", "worker-options"], + "unstable": ["cron", "ffi", "kv", "worker-options"], "exclude": ["./public"], "imports": { "@/": "./src/", @@ -37,7 +37,6 @@ "@std/streams": "jsr:@std/streams@^0.223.0", "comlink": "npm:comlink@^4.4.1", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", - "deno-sqlite": "https://raw.githubusercontent.com/alexgleason/deno-sqlite/325f66d8c395e7f6f5ee78ebfa42a0eeea4a942b/mod.ts", "entities": "npm:entities@^4.5.0", "fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0", "formdata-helper": "npm:formdata-helper@^0.3.0", diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index dc8e6f3..b53e3e2 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -7,7 +7,7 @@ import { stripTime } from '@/utils/time.ts'; import { TrendsWorker } from '@/workers/trends.ts'; import { Context } from 'hono'; -await TrendsWorker.setupCleanupJob(); +await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index d2a42ca..42d39ea 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -6,13 +6,6 @@ export interface DittoTables { author_stats: AuthorStatsRow; event_stats: EventStatsRow; pubkey_domains: PubkeyDomainRow; - trends_tag_usages: TagUsageRow; -} - -interface TagUsageRow { - tag: string; - pubkey8: string; - inserted_at: number; } interface AuthorStatsRow { diff --git a/src/db/migrations/021_create_trends.ts b/src/db/migrations/021_create_trends.ts deleted file mode 100644 index cabbf28..0000000 --- a/src/db/migrations/021_create_trends.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Kysely, sql } from 'kysely'; - -export async function up(db: Kysely): Promise { - await db.transaction().execute(async (trx) => { - await trx.schema - .createTable('trends_tag_usages') - .ifNotExists() - .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`)) - .addColumn('pubkey8', 'text', (c) => c.notNull()) - .addColumn('inserted_at', 'integer', (c) => c.notNull()) - .execute(); - - await trx.schema - .createIndex('trends_idx_time_tag') - .ifNotExists() - .on('trends_tag_usages') - .column('inserted_at') - .column('tag') - .execute(); - }); -} - -export async function down(db: Kysely): Promise { - await db.transaction().execute(async (trx) => { - await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); - await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); - }); -} diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index 4c89048..85e9631 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -1,6 +1,8 @@ import { assertEquals } from '@std/assert'; import { TrendsWorker } from './trends.ts'; +await TrendsWorker.open(':memory:'); + const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; Deno.test('getTrendingTags', async () => { diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 5a93fa3..1f27382 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -1,11 +1,12 @@ +/// +import { Database as Sqlite } from '@db/sqlite'; import { NSchema } from '@nostrify/nostrify'; +import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite'; +import { Kysely, sql } from 'kysely'; import * as Comlink from 'comlink'; -import { DB as Sqlite } from 'deno-sqlite'; import { hashtagSchema } from '@/schema.ts'; import { generateDateRange, Time } from '@/utils/time.ts'; -import { DittoDB } from '@/db/DittoDB.ts'; -import { sql } from 'kysely'; interface GetTrendingTagsOpts { since: Date; @@ -22,10 +23,40 @@ interface GetTagHistoryOpts { offset?: number; } -const kysely = await DittoDB.getInstance(); +interface TagsDB { + tag_usages: { + tag: string; + pubkey8: string; + inserted_at: number; + }; +} + +let kysely: Kysely; export const TrendsWorker = { - setupCleanupJob() { + async open(path: string) { + kysely = new Kysely({ + dialect: new DenoSqlite3Dialect({ + database: new Sqlite(path), + }), + }); + + await kysely.schema + .createTable('tag_usages') + .ifNotExists() + .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`)) + .addColumn('pubkey8', 'text', (c) => c.notNull()) + .addColumn('inserted_at', 'integer', (c) => c.notNull()) + .execute(); + + await kysely.schema + .createIndex('idx_time_tag') + .ifNotExists() + .on('tag_usages') + .column('inserted_at') + .column('tag') + .execute(); + Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => { const lastWeek = new Date(new Date().getTime() - Time.days(7)); await this.cleanupTagUsages(lastWeek); @@ -38,7 +69,7 @@ export const TrendsWorker = { accounts: number; uses: number; }[]> { - return await kysely.selectFrom('trends_tag_usages') + return await kysely.selectFrom('tag_usages') .select(({ fn }) => [ 'tag', fn.agg('count', ['pubkey8']).distinct().as('accounts'), @@ -59,7 +90,7 @@ export const TrendsWorker = { */ async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { const result = await kysely - .selectFrom('trends_tag_usages') + .selectFrom('tag_usages') .select(({ fn }) => [ 'inserted_at', fn.agg('count', ['pubkey8']).distinct().as('accounts'), @@ -95,14 +126,14 @@ export const TrendsWorker = { const tags = hashtagSchema.array().min(1).parse(hashtags); await kysely - .insertInto('trends_tag_usages') + .insertInto('tag_usages') .values(tags.map((tag) => ({ tag, pubkey8, inserted_at }))) .execute(); }, async cleanupTagUsages(until: Date): Promise { await kysely - .deleteFrom('trends_tag_usages') + .deleteFrom('tag_usages') .where('inserted_at', '<', until.valueOf()) .execute(); }, From 58222537967b90a11c91c731f3cd9a40e5f54b9e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 21 May 2024 17:57:59 -0500 Subject: [PATCH 6/7] Fix trends --- src/controllers/api/trends.ts | 5 ++--- src/pipeline.ts | 2 +- src/workers/trends.test.ts | 1 + src/workers/trends.worker.ts | 38 +++++++++++++++++++++-------------- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index b53e3e2..1a47d8c 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -5,13 +5,12 @@ import { Conf } from '@/config.ts'; import { Time } from '@/utils.ts'; import { stripTime } from '@/utils/time.ts'; import { TrendsWorker } from '@/workers/trends.ts'; -import { Context } from 'hono'; await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); -const trendingTagsController: AppController = async (c: Context) => { +const trendingTagsController: AppController = async (c) => { const limit = limitSchema.parse(c.req.query('limit')); if (limit < 1) return c.json([]); @@ -28,7 +27,7 @@ const trendingTagsController: AppController = async (c: Context) => { return c.json( await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ - tag, + name: tag, url: Conf.local(`/tags/${tag}`), history: [ // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. diff --git a/src/pipeline.ts b/src/pipeline.ts index 36ec54e..bfb0577 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -177,7 +177,7 @@ async function trackHashtags(event: NostrEvent): Promise { try { debug('tracking tags:', JSON.stringify(tags)); - await TrendsWorker.addTagUsages(event.pubkey, tags, date.valueOf()); + await TrendsWorker.addTagUsages(event.pubkey, tags, date); } catch (_e) { // do nothing } diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index 85e9631..da33820 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -1,4 +1,5 @@ import { assertEquals } from '@std/assert'; + import { TrendsWorker } from './trends.ts'; await TrendsWorker.open(':memory:'); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 1f27382..90d1b57 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -27,7 +27,7 @@ interface TagsDB { tag_usages: { tag: string; pubkey8: string; - inserted_at: number; + inserted_at: Date; }; } @@ -41,6 +41,13 @@ export const TrendsWorker = { }), }); + await sql`PRAGMA synchronous = normal`.execute(kysely); + await sql`PRAGMA temp_store = memory`.execute(kysely); + await sql`PRAGMA foreign_keys = ON`.execute(kysely); + await sql`PRAGMA auto_vacuum = FULL`.execute(kysely); + await sql`PRAGMA journal_mode = WAL`.execute(kysely); + await sql`PRAGMA mmap_size = 50000000`.execute(kysely); + await kysely.schema .createTable('tag_usages') .ifNotExists() @@ -64,19 +71,19 @@ export const TrendsWorker = { }, /** Gets the most used hashtags between the date range. */ - async getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ + getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ tag: string; accounts: number; uses: number; }[]> { - return await kysely.selectFrom('tag_usages') + return kysely.selectFrom('tag_usages') .select(({ fn }) => [ 'tag', fn.agg('count', ['pubkey8']).distinct().as('accounts'), fn.countAll().as('uses'), ]) - .where('inserted_at', '>=', since.valueOf()) - .where('inserted_at', '<', until.valueOf()) + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) .groupBy('tag') .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold)) .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc') @@ -92,15 +99,15 @@ export const TrendsWorker = { const result = await kysely .selectFrom('tag_usages') .select(({ fn }) => [ - 'inserted_at', + sql`date(inserted_at)`.as('day'), fn.agg('count', ['pubkey8']).distinct().as('accounts'), fn.countAll().as('uses'), ]) .where('tag', '=', tag) - .where('inserted_at', '>=', since.valueOf()) - .where('inserted_at', '<', until.valueOf()) - .groupBy(sql`inserted_at`) - .orderBy(sql`inserted_at`, 'desc') + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) + .groupBy(sql`date(inserted_at)`) + .orderBy(sql`date(inserted_at)`, 'desc') .limit(limit) .offset(offset) .execute(); @@ -113,15 +120,16 @@ export const TrendsWorker = { // Fill in missing dates with 0 usages. return dateRange.map((day) => { - const data = result.find((item) => new Date(item.inserted_at).getTime() === day.getTime()); + const data = result.find((item) => new Date(item.day).getTime() === day.getTime()); if (data) { - return { ...data, day: new Date(data.inserted_at) }; + return { ...data, day: new Date(data.day) }; + } else { + return { day, accounts: 0, uses: 0 }; } - return { day, accounts: 0, uses: 0 }; }); }, - async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date().valueOf()): Promise { + async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise { const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); @@ -134,7 +142,7 @@ export const TrendsWorker = { async cleanupTagUsages(until: Date): Promise { await kysely .deleteFrom('tag_usages') - .where('inserted_at', '<', until.valueOf()) + .where('inserted_at', '<', until) .execute(); }, }; From 80344e3c5f99584ec72edc5a7343465dbdf480a4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 21 May 2024 18:15:17 -0500 Subject: [PATCH 7/7] Fix trends cache --- src/app.ts | 8 ++--- src/controllers/api/trends.ts | 68 +++++++++++++++++++---------------- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/src/app.ts b/src/app.ts index be2ce84..9b82bee 100644 --- a/src/app.ts +++ b/src/app.ts @@ -194,12 +194,8 @@ app.get('/api/v2/search', searchController); app.get('/api/pleroma/frontend_configurations', frontendConfigController); -app.get( - '/api/v1/trends/tags', - cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }), - trendingTagsController, -); -app.get('/api/v1/trends', cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }), trendingTagsController); +app.get('/api/v1/trends/tags', trendingTagsController); +app.get('/api/v1/trends', trendingTagsController); app.get('/api/v1/suggestions', suggestionsV1Controller); app.get('/api/v2/suggestions', suggestionsV2Controller); diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index 1a47d8c..64b3019 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -10,10 +10,20 @@ await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); +let cache = getTrends(); + +Deno.cron('update trends cache', { minute: { every: 15 } }, async () => { + const trends = await getTrends(); + cache = Promise.resolve(trends); +}); + const trendingTagsController: AppController = async (c) => { const limit = limitSchema.parse(c.req.query('limit')); - if (limit < 1) return c.json([]); + const trends = await cache; + return c.json(trends.slice(0, limit)); +}; +async function getTrends() { const now = new Date(); const yesterday = new Date(now.getTime() - Time.days(1)); const lastWeek = new Date(now.getTime() - Time.days(7)); @@ -22,36 +32,34 @@ const trendingTagsController: AppController = async (c) => { const tags = await TrendsWorker.getTrendingTags({ since: yesterday, until: now, - limit, + limit: 20, }); - return c.json( - await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ - name: tag, - url: Conf.local(`/tags/${tag}`), - history: [ - // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. - // This result is more accurate than what Mastodon returns. - { - day: String(Math.floor(stripTime(now).getTime() / 1000)), - accounts: String(accounts), - uses: String(uses), - }, - ...(await TrendsWorker.getTagHistory({ - tag, - since: lastWeek, - until: now, - limit: 6, - offset: 1, - })).map((history) => ({ - // For some reason, Mastodon wants these to be strings... oh well. - day: String(Math.floor(history.day.getTime() / 1000)), - accounts: String(history.accounts), - uses: String(history.uses), - })), - ], - }))), - ); -}; + return Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ + name: tag, + url: Conf.local(`/tags/${tag}`), + history: [ + // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. + // This result is more accurate than what Mastodon returns. + { + day: String(Math.floor(stripTime(now).getTime() / 1000)), + accounts: String(accounts), + uses: String(uses), + }, + ...(await TrendsWorker.getTagHistory({ + tag, + since: lastWeek, + until: now, + limit: 6, + offset: 1, + })).map((history) => ({ + // For some reason, Mastodon wants these to be strings... oh well. + day: String(Math.floor(history.day.getTime() / 1000)), + accounts: String(history.accounts), + uses: String(history.uses), + })), + ], + }))); +} export { trendingTagsController };