From 58222537967b90a11c91c731f3cd9a40e5f54b9e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 21 May 2024 17:57:59 -0500 Subject: [PATCH] Fix trends --- src/controllers/api/trends.ts | 5 ++--- src/pipeline.ts | 2 +- src/workers/trends.test.ts | 1 + src/workers/trends.worker.ts | 38 +++++++++++++++++++++-------------- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index b53e3e2..1a47d8c 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -5,13 +5,12 @@ import { Conf } from '@/config.ts'; import { Time } from '@/utils.ts'; import { stripTime } from '@/utils/time.ts'; import { TrendsWorker } from '@/workers/trends.ts'; -import { Context } from 'hono'; await TrendsWorker.open('data/trends.sqlite3'); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); -const trendingTagsController: AppController = async (c: Context) => { +const trendingTagsController: AppController = async (c) => { const limit = limitSchema.parse(c.req.query('limit')); if (limit < 1) return c.json([]); @@ -28,7 +27,7 @@ const trendingTagsController: AppController = async (c: Context) => { return c.json( await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({ - tag, + name: tag, url: Conf.local(`/tags/${tag}`), history: [ // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. diff --git a/src/pipeline.ts b/src/pipeline.ts index 36ec54e..bfb0577 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -177,7 +177,7 @@ async function trackHashtags(event: NostrEvent): Promise { try { debug('tracking tags:', JSON.stringify(tags)); - await TrendsWorker.addTagUsages(event.pubkey, tags, date.valueOf()); + await TrendsWorker.addTagUsages(event.pubkey, tags, date); } catch (_e) { // do nothing } diff --git a/src/workers/trends.test.ts b/src/workers/trends.test.ts index 85e9631..da33820 100644 --- a/src/workers/trends.test.ts +++ b/src/workers/trends.test.ts @@ -1,4 +1,5 @@ import { assertEquals } from '@std/assert'; + import { TrendsWorker } from './trends.ts'; await TrendsWorker.open(':memory:'); diff --git a/src/workers/trends.worker.ts b/src/workers/trends.worker.ts index 1f27382..90d1b57 100644 --- a/src/workers/trends.worker.ts +++ b/src/workers/trends.worker.ts @@ -27,7 +27,7 @@ interface TagsDB { tag_usages: { tag: string; pubkey8: string; - inserted_at: number; + inserted_at: Date; }; } @@ -41,6 +41,13 @@ export const TrendsWorker = { }), }); + await sql`PRAGMA synchronous = normal`.execute(kysely); + await sql`PRAGMA temp_store = memory`.execute(kysely); + await sql`PRAGMA foreign_keys = ON`.execute(kysely); + await sql`PRAGMA auto_vacuum = FULL`.execute(kysely); + await sql`PRAGMA journal_mode = WAL`.execute(kysely); + await sql`PRAGMA mmap_size = 50000000`.execute(kysely); + await kysely.schema .createTable('tag_usages') .ifNotExists() @@ -64,19 +71,19 @@ export const TrendsWorker = { }, /** Gets the most used hashtags between the date range. */ - async getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ + getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{ tag: string; accounts: number; uses: number; }[]> { - return await kysely.selectFrom('tag_usages') + return kysely.selectFrom('tag_usages') .select(({ fn }) => [ 'tag', fn.agg('count', ['pubkey8']).distinct().as('accounts'), fn.countAll().as('uses'), ]) - .where('inserted_at', '>=', since.valueOf()) - .where('inserted_at', '<', until.valueOf()) + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) .groupBy('tag') .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold)) .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc') @@ -92,15 +99,15 @@ export const TrendsWorker = { const result = await kysely .selectFrom('tag_usages') .select(({ fn }) => [ - 'inserted_at', + sql`date(inserted_at)`.as('day'), fn.agg('count', ['pubkey8']).distinct().as('accounts'), fn.countAll().as('uses'), ]) .where('tag', '=', tag) - .where('inserted_at', '>=', since.valueOf()) - .where('inserted_at', '<', until.valueOf()) - .groupBy(sql`inserted_at`) - .orderBy(sql`inserted_at`, 'desc') + .where('inserted_at', '>=', since) + .where('inserted_at', '<', until) + .groupBy(sql`date(inserted_at)`) + .orderBy(sql`date(inserted_at)`, 'desc') .limit(limit) .offset(offset) .execute(); @@ -113,15 +120,16 @@ export const TrendsWorker = { // Fill in missing dates with 0 usages. return dateRange.map((day) => { - const data = result.find((item) => new Date(item.inserted_at).getTime() === day.getTime()); + const data = result.find((item) => new Date(item.day).getTime() === day.getTime()); if (data) { - return { ...data, day: new Date(data.inserted_at) }; + return { ...data, day: new Date(data.day) }; + } else { + return { day, accounts: 0, uses: 0 }; } - return { day, accounts: 0, uses: 0 }; }); }, - async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date().valueOf()): Promise { + async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise { const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const tags = hashtagSchema.array().min(1).parse(hashtags); @@ -134,7 +142,7 @@ export const TrendsWorker = { async cleanupTagUsages(until: Date): Promise { await kysely .deleteFrom('tag_usages') - .where('inserted_at', '<', until.valueOf()) + .where('inserted_at', '<', until) .execute(); }, };