From d569dfd5b571c8a15218398c5bb953d7b5defcf4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Dec 2023 16:33:02 -0600 Subject: [PATCH] Add TrendsWorker for tracking/querying trending tags with a Web Worker --- deno.json | 2 +- src/controllers/api/trends.ts | 60 +++++++++++---------- src/pipeline.ts | 6 +-- src/trends.test.ts | 32 ----------- src/workers/trends.test.ts | 30 +++++++++++ src/workers/trends.ts | 19 +++++++ src/{trends.ts => workers/trends.worker.ts} | 41 +++++++------- 7 files changed, 104 insertions(+), 86 deletions(-) delete mode 100644 src/trends.test.ts create mode 100644 src/workers/trends.test.ts create mode 100644 src/workers/trends.ts rename src/{trends.ts => workers/trends.worker.ts} (86%) diff --git a/deno.json b/deno.json index f4d2d05..8d07861 100644 --- a/deno.json +++ b/deno.json @@ -5,7 +5,7 @@ "start": "deno run -A --unstable src/server.ts", "dev": "deno run -A --unstable --watch src/server.ts", "debug": "deno run -A --unstable --inspect src/server.ts", - "test": "DB_PATH=\":memory:\" deno test -A --unstable src", + "test": "DB_PATH=\":memory:\" deno test -A --unstable", "check": "deno check src/server.ts", "relays:sync": "deno run -A --unstable scripts/relays.ts sync" }, diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index 7fc98e3..7970300 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -1,13 +1,15 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { z } from '@/deps.ts'; -import { trends } from '@/trends.ts'; import { Time } from '@/utils.ts'; import { stripTime } from '@/utils/time.ts'; +import { TrendsWorker } from '@/workers/trends.ts'; + +await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); -const trendingTagsController: AppController = (c) => { +const trendingTagsController: AppController = async (c) => { const limit = limitSchema.parse(c.req.query('limit')); if (limit < 1) return c.json([]); @@ -16,37 +18,39 @@ const trendingTagsController: AppController = (c) => { const lastWeek = new Date(now.getTime() - Time.days(7)); /** Most used hashtags within the past 24h. */ - const tags = trends.getTrendingTags({ + const tags = await TrendsWorker.getTrendingTags({ since: yesterday, until: now, limit, }); - return c.json(tags.map(({ name, uses, accounts }) => ({ - name, - url: Conf.local(`/tags/${name}`), - history: [ - // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. - // This result is more accurate than what Mastodon returns. - { - day: String(Math.floor(stripTime(now).getTime() / 1000)), - accounts: String(accounts), - uses: String(uses), - }, - ...trends.getTagHistory({ - tag: name, - since: lastWeek, - until: now, - limit: 6, - offset: 1, - }).map((history) => ({ - // For some reason, Mastodon wants these to be strings... oh well. - day: String(Math.floor(history.day.getTime() / 1000)), - accounts: String(history.accounts), - uses: String(history.uses), - })), - ], - }))); + return c.json( + await Promise.all(tags.map(async ({ name, uses, accounts }) => ({ + name, + url: Conf.local(`/tags/${name}`), + history: [ + // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. + // This result is more accurate than what Mastodon returns. + { + day: String(Math.floor(stripTime(now).getTime() / 1000)), + accounts: String(accounts), + uses: String(uses), + }, + ...(await TrendsWorker.getTagHistory({ + tag: name, + since: lastWeek, + until: now, + limit: 6, + offset: 1, + })).map((history) => ({ + // For some reason, Mastodon wants these to be strings... oh well. + day: String(Math.floor(history.day.getTime() / 1000)), + accounts: String(history.accounts), + uses: String(history.uses), + })), + ], + }))), + ); }; export { trendingTagsController }; diff --git a/src/pipeline.ts b/src/pipeline.ts index b048e55..9749a9c 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -10,8 +10,8 @@ import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; -import { trends } from '@/trends.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; +import { TrendsWorker } from '@/workers/trends.ts'; import { verifySignatureWorker } from '@/workers/verify.ts'; import type { EventData } from '@/types.ts'; @@ -90,7 +90,7 @@ async function processDeletions(event: Event): Promise { } /** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { +async function trackHashtags(event: Event): Promise { const date = nostrDate(event.created_at); const tags = event.tags @@ -102,7 +102,7 @@ function trackHashtags(event: Event): void { try { console.info('tracking tags:', tags); - trends.addTagUsages(event.pubkey, tags, date); + await TrendsWorker.addTagUsages(event.pubkey, tags, date); } catch (_e) { // do nothing } diff --git a/src/trends.test.ts b/src/trends.test.ts deleted file mode 100644 index dbc6d7c..0000000 --- a/src/trends.test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { assertEquals } from '@/deps-test.ts'; -import { Sqlite } from '@/deps.ts'; - -import { TrendsDB } from './trends.ts'; - -const db = new Sqlite(':memory:'); -const trends = new TrendsDB(db); - -const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; - -Deno.test('getTrendingTags', () => { - trends.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']); - trends.addTagUsages(p8('00000000'), ['hello']); - trends.addTagUsages(p8('00000001'), ['Ditto', 'hello']); - trends.addTagUsages(p8('00000010'), ['DITTO']); - - const result = trends.getTrendingTags({ - since: new Date('1999-01-01T00:00:00'), - until: new Date('2999-01-01T00:00:00'), - threshold: 1, - }); - - const expected = [ - { name: 'ditto', accounts: 3, uses: 3 }, - { name: 'hello', accounts: 2, uses: 3 }, - { name: 'yolo', accounts: 1, uses: 1 }, - ]; - - assertEquals(result, expected); - - trends.cleanupTagUsages(new Date('2999-01-01T00:00:00')); -}); diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts new file mode 100644 index 0000000..ef51f23 --- /dev/null +++ b/src/workers/trends.test.ts @@ -0,0 +1,30 @@ +import { assertEquals } from '@/deps-test.ts'; + +import { TrendsWorker } from './trends.ts'; + +await TrendsWorker.open(':memory:'); + +const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; + +Deno.test('getTrendingTags', async () => { + await TrendsWorker.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']); + await TrendsWorker.addTagUsages(p8('00000000'), ['hello']); + await TrendsWorker.addTagUsages(p8('00000001'), ['Ditto', 'hello']); + await TrendsWorker.addTagUsages(p8('00000010'), ['DITTO']); + + const result = await TrendsWorker.getTrendingTags({ + since: new Date('1999-01-01T00:00:00'), + until: new Date('2999-01-01T00:00:00'), + threshold: 1, + }); + + const expected = [ + { name: 'ditto', accounts: 3, uses: 3 }, + { name: 'hello', accounts: 2, uses: 3 }, + { name: 'yolo', accounts: 1, uses: 1 }, + ]; + + assertEquals(result, expected); + + await TrendsWorker.cleanupTagUsages(new Date('2999-01-01T00:00:00')); +}); diff --git a/src/workers/trends.ts b/src/workers/trends.ts new file mode 100644 index 0000000..b455283 --- /dev/null +++ b/src/workers/trends.ts @@ -0,0 +1,19 @@ +import { Comlink } from '@/deps.ts'; + +import type { TrendsWorker as _TrendsWorker } from '@/workers/trends.worker.ts'; + +const worker = new Worker(new URL('./trends.worker.ts', import.meta.url), { type: 'module' }); + +const TrendsWorker = Comlink.wrap(worker); + +await new Promise((resolve) => { + const handleEvent = ({ data }: MessageEvent) => { + if (data === 'ready') { + worker.removeEventListener('message', handleEvent); + resolve(); + } + }; + worker.addEventListener('message', handleEvent); +}); + +export { TrendsWorker }; diff --git a/src/trends.ts b/src/workers/trends.worker.ts similarity index 86% rename from src/trends.ts rename to src/workers/trends.worker.ts index a97b2c2..fd9e8fe 100644 --- a/src/trends.ts +++ b/src/workers/trends.worker.ts @@ -1,8 +1,7 @@ -import { Sqlite } from '@/deps.ts'; +import { Comlink, Sqlite } from '@/deps.ts'; import { hashtagSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { Time } from '@/utils.ts'; -import { generateDateRange } from '@/utils/time.ts'; +import { generateDateRange, Time } from '@/utils/time.ts'; interface GetTrendingTagsOpts { since: Date; @@ -19,13 +18,13 @@ interface GetTagHistoryOpts { offset?: number; } -class TrendsDB { - #db: Sqlite; +let db: Sqlite; - constructor(db: Sqlite) { - this.#db = db; +export const TrendsWorker = { + open(path: string) { + db = new Sqlite(path); - this.#db.execute(` + db.execute(` CREATE TABLE IF NOT EXISTS tag_usages ( tag TEXT NOT NULL COLLATE NOCASE, pubkey8 TEXT NOT NULL, @@ -43,11 +42,11 @@ class TrendsDB { setInterval(cleanup, Time.hours(1)); cleanup(); - } + }, /** Gets the most used hashtags between the date range. */ getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) { - return this.#db.query( + return db.query( ` SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*) FROM tag_usages @@ -63,14 +62,14 @@ class TrendsDB { accounts: Number(row[1]), uses: Number(row[2]), })); - } + }, /** * Gets the tag usage count for a specific tag. * It returns an array with counts for each date between the range. */ getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { - const result = this.#db.query( + const result = db.query( ` SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*) FROM tag_usages @@ -98,28 +97,26 @@ class TrendsDB { const data = result.find((item) => item.day.getTime() === day.getTime()); return data || { day, accounts: 0, uses: 0 }; }); - } + }, addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void { const pubkey8 = nostrIdSchema.parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); - this.#db.query( + db.query( 'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '), tags.map((tag) => [tag, pubkey8, date]).flat(), ); - } + }, cleanupTagUsages(until: Date): void { - this.#db.query( + db.query( 'DELETE FROM tag_usages WHERE inserted_at < ?', [until], ); - } -} + }, +}; -const trends = new TrendsDB( - new Sqlite('data/trends.sqlite3'), -); +Comlink.expose(TrendsWorker); -export { trends, TrendsDB }; +self.postMessage('ready');