diff --git a/deno.json b/deno.json index 75ea333..e54bb7a 100644 --- a/deno.json +++ b/deno.json @@ -14,7 +14,7 @@ "stats:recompute": "deno run -A scripts/stats-recompute.ts", "soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip" }, - "unstable": ["ffi", "kv", "worker-options"], + "unstable": ["cron", "ffi", "kv", "worker-options"], "exclude": ["./public"], "imports": { "@/": "./src/", @@ -37,7 +37,6 @@ "@std/streams": "jsr:@std/streams@^0.223.0", "comlink": "npm:comlink@^4.4.1", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", - "deno-sqlite": "https://raw.githubusercontent.com/alexgleason/deno-sqlite/325f66d8c395e7f6f5ee78ebfa42a0eeea4a942b/mod.ts", "entities": "npm:entities@^4.5.0", "fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0", "formdata-helper": "npm:formdata-helper@^0.3.0", diff --git a/src/app.ts b/src/app.ts index be2ce84..9b82bee 100644 --- a/src/app.ts +++ b/src/app.ts @@ -194,12 +194,8 @@ app.get('/api/v2/search', searchController); app.get('/api/pleroma/frontend_configurations', frontendConfigController); -app.get( - '/api/v1/trends/tags', - cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }), - trendingTagsController, -); -app.get('/api/v1/trends', cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }), trendingTagsController); +app.get('/api/v1/trends/tags', trendingTagsController); +app.get('/api/v1/trends', trendingTagsController); app.get('/api/v1/suggestions', suggestionsV1Controller); app.get('/api/v2/suggestions', suggestionsV2Controller); diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index cd4d0f2..64b3019 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -10,10 +10,20 @@ await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); +let cache = getTrends(); + +Deno.cron('update trends cache', { minute: { every: 15 } }, async () => { + const trends = await getTrends(); + cache = Promise.resolve(trends); +}); + const trendingTagsController: AppController = async (c) => { const limit = limitSchema.parse(c.req.query('limit')); - if (limit < 1) return c.json([]); + const trends = await cache; + return c.json(trends.slice(0, limit)); +}; +async function getTrends() { const now = new Date(); const yesterday = new Date(now.getTime() - Time.days(1)); const lastWeek = new Date(now.getTime() - Time.days(7)); @@ -22,36 +32,34 @@ const trendingTagsController: AppController = async (c) => { const tags = await TrendsWorker.getTrendingTags({ since: yesterday, until: now, - limit, + limit: 20, }); - return c.json( - await Promise.all(tags.map(async ({ name, uses, accounts }) => ({ - name, - url: Conf.local(`/tags/${name}`), - history: [ - // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. - // This result is more accurate than what Mastodon returns. - { - day: String(Math.floor(stripTime(now).getTime() / 1000)), - accounts: String(accounts), - uses: String(uses), - }, - ...(await TrendsWorker.getTagHistory({ - tag: name, - since: lastWeek, - until: now, - limit: 6, - offset: 1, - })).map((history) => ({ - // For some reason, Mastodon wants these to be strings... oh well. - day: String(Math.floor(history.day.getTime() / 1000)), - accounts: String(history.accounts), - uses: String(history.uses), - })), - ], - }))), - ); -}; + return Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ + name: tag, + url: Conf.local(`/tags/${tag}`), + history: [ + // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. + // This result is more accurate than what Mastodon returns. + { + day: String(Math.floor(stripTime(now).getTime() / 1000)), + accounts: String(accounts), + uses: String(uses), + }, + ...(await TrendsWorker.getTagHistory({ + tag, + since: lastWeek, + until: now, + limit: 6, + offset: 1, + })).map((history) => ({ + // For some reason, Mastodon wants these to be strings... oh well. + day: String(Math.floor(history.day.getTime() / 1000)), + accounts: String(history.accounts), + uses: String(history.uses), + })), + ], + }))); +} export { trendingTagsController }; diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index ca1646e..da33820 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -19,9 +19,9 @@ Deno.test('getTrendingTags', async () => { }); const expected = [ - { name: 'ditto', accounts: 3, uses: 3 }, - { name: 'hello', accounts: 2, uses: 3 }, - { name: 'yolo', accounts: 1, uses: 1 }, + { tag: 'ditto', accounts: 3, uses: 3 }, + { tag: 'hello', accounts: 2, uses: 3 }, + { tag: 'yolo', accounts: 1, uses: 1 }, ]; assertEquals(result, expected); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 74a256b..90d1b57 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -1,6 +1,9 @@ +/// +import { Database as Sqlite } from '@db/sqlite'; import { NSchema } from '@nostrify/nostrify'; +import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite'; +import { Kysely, sql } from 'kysely'; import * as Comlink from 'comlink'; -import { DB as Sqlite } from 'deno-sqlite'; import { hashtagSchema } from '@/schema.ts'; import { generateDateRange, Time } from '@/utils/time.ts'; @@ -20,73 +23,94 @@ interface GetTagHistoryOpts { offset?: number; } -let db: Sqlite; +interface TagsDB { + tag_usages: { + tag: string; + pubkey8: string; + inserted_at: Date; + }; +} + +let kysely: Kysely; export const TrendsWorker = { - open(path: string) { - db = new Sqlite(path); + async open(path: string) { + kysely = new Kysely({ + dialect: new DenoSqlite3Dialect({ + database: new Sqlite(path), + }), + }); - db.execute(` - CREATE TABLE IF NOT EXISTS tag_usages ( - tag TEXT NOT NULL COLLATE NOCASE, - pubkey8 TEXT NOT NULL, - inserted_at INTEGER NOT NULL - ); + await sql`PRAGMA synchronous = normal`.execute(kysely); + await sql`PRAGMA temp_store = memory`.execute(kysely); + await sql`PRAGMA foreign_keys = ON`.execute(kysely); + await sql`PRAGMA auto_vacuum = FULL`.execute(kysely); + await sql`PRAGMA journal_mode = WAL`.execute(kysely); + await sql`PRAGMA mmap_size = 50000000`.execute(kysely); - CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag); - `); + await kysely.schema + .createTable('tag_usages') + .ifNotExists() + .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`)) + .addColumn('pubkey8', 'text', (c) => c.notNull()) + .addColumn('inserted_at', 'integer', (c) => c.notNull()) + .execute(); - const cleanup = () => { - console.info('Cleaning up old tag usages...'); + await kysely.schema + .createIndex('idx_time_tag') + .ifNotExists() + .on('tag_usages') + .column('inserted_at') + .column('tag') + .execute(); + + Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => { const lastWeek = new Date(new Date().getTime() - Time.days(7)); - this.cleanupTagUsages(lastWeek); - }; - - setInterval(cleanup, Time.hours(1)); - cleanup(); + await this.cleanupTagUsages(lastWeek); + }); }, /** Gets the most used hashtags between the date range. */ - getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) { - return db.query( - ` - SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE inserted_at >= ? AND inserted_at < ? - GROUP BY tag - HAVING COUNT(DISTINCT pubkey8) >= ? - ORDER BY COUNT(DISTINCT pubkey8) - DESC LIMIT ?; - `, - [since, until, threshold, limit], - ).map((row) => ({ - name: row[0], - accounts: Number(row[1]), - uses: Number(row[2]), - })); + getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ + tag: string; + accounts: number; + uses: number; + }[]> { + return kysely.selectFrom('tag_usages') + .select(({ fn }) => [ + 'tag', + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) + .groupBy('tag') + .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold)) + .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc') + .limit(limit) + .execute(); }, /** * Gets the tag usage count for a specific tag. * It returns an array with counts for each date between the range. */ - getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { - const result = db.query( - ` - SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*) - FROM tag_usages - WHERE tag = ? AND inserted_at >= ? AND inserted_at < ? - GROUP BY date(inserted_at) - ORDER BY date(inserted_at) DESC - LIMIT ? - OFFSET ?; - `, - [tag, since, until, limit, offset], - ).map((row) => ({ - day: new Date(row[0]), - accounts: Number(row[1]), - uses: Number(row[2]), - })); + async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { + const result = await kysely + .selectFrom('tag_usages') + .select(({ fn }) => [ + sql`date(inserted_at)`.as('day'), + fn.agg('count', ['pubkey8']).distinct().as('accounts'), + fn.countAll().as('uses'), + ]) + .where('tag', '=', tag) + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) + .groupBy(sql`date(inserted_at)`) + .orderBy(sql`date(inserted_at)`, 'desc') + .limit(limit) + .offset(offset) + .execute(); /** Full date range between `since` and `until`. */ const dateRange = generateDateRange( @@ -96,26 +120,30 @@ export const TrendsWorker = { // Fill in missing dates with 0 usages. return dateRange.map((day) => { - const data = result.find((item) => item.day.getTime() === day.getTime()); - return data || { day, accounts: 0, uses: 0 }; + const data = result.find((item) => new Date(item.day).getTime() === day.getTime()); + if (data) { + return { ...data, day: new Date(data.day) }; + } else { + return { day, accounts: 0, uses: 0 }; + } }); }, - addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void { + async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise { const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); - db.query( - 'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '), - tags.map((tag) => [tag, pubkey8, date]).flat(), - ); + await kysely + .insertInto('tag_usages') + .values(tags.map((tag) => ({ tag, pubkey8, inserted_at }))) + .execute(); }, - cleanupTagUsages(until: Date): void { - db.query( - 'DELETE FROM tag_usages WHERE inserted_at < ?', - [until], - ); + async cleanupTagUsages(until: Date): Promise { + await kysely + .deleteFrom('tag_usages') + .where('inserted_at', '<', until) + .execute(); }, };