rewrite trendsworker with kysely and deno cron, format changes

This commit is contained in:
Siddharth Singh 2024-05-20 00:05:03 +05:30
parent 83a7b1f231
commit f19629600d
No known key found for this signature in database
5 changed files with 90 additions and 104 deletions

View File

@ -5,12 +5,13 @@ import { Conf } from '@/config.ts';
import { Time } from '@/utils.ts'; import { Time } from '@/utils.ts';
import { stripTime } from '@/utils/time.ts'; import { stripTime } from '@/utils/time.ts';
import { TrendsWorker } from '@/workers/trends.ts'; import { TrendsWorker } from '@/workers/trends.ts';
import { Context } from 'hono';
await TrendsWorker.open('data/trends.sqlite3'); await TrendsWorker.setupCleanupJob();
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20)); const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
const trendingTagsController: AppController = async (c) => { const trendingTagsController: AppController = async (c: Context) => {
const limit = limitSchema.parse(c.req.query('limit')); const limit = limitSchema.parse(c.req.query('limit'));
if (limit < 1) return c.json([]); if (limit < 1) return c.json([]);
@ -26,9 +27,9 @@ const trendingTagsController: AppController = async (c) => {
}); });
return c.json( return c.json(
await Promise.all(tags.map(async ({ name, uses, accounts }) => ({ await Promise.all(tags.map(async ({ tag, uses, accounts }) => ({
name, tag,
url: Conf.local(`/tags/${name}`), url: Conf.local(`/tags/${tag}`),
history: [ history: [
// Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below. // Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below.
// This result is more accurate than what Mastodon returns. // This result is more accurate than what Mastodon returns.
@ -38,7 +39,7 @@ const trendingTagsController: AppController = async (c) => {
uses: String(uses), uses: String(uses),
}, },
...(await TrendsWorker.getTagHistory({ ...(await TrendsWorker.getTagHistory({
tag: name, tag,
since: lastWeek, since: lastWeek,
until: now, until: now,
limit: 6, limit: 6,

View File

@ -10,10 +10,10 @@ export interface DittoTables {
} }
interface TagUsageRow { interface TagUsageRow {
tag: string, tag: string;
pubkey8: string, pubkey8: string;
inserted_at: number inserted_at: number;
}; }
interface AuthorStatsRow { interface AuthorStatsRow {
pubkey: string; pubkey: string;

View File

@ -1,13 +1,13 @@
import { Kysely, sql } from 'kysely'; import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> { export async function up(db: Kysely<any>): Promise<void> {
await db.transaction().execute(async trx => { await db.transaction().execute(async (trx) => {
await trx.schema await trx.schema
.createTable('trends_tag_usages') .createTable('trends_tag_usages')
.ifNotExists() .ifNotExists()
.addColumn('tag', 'text', c => c.notNull().modifyEnd(sql`collate nocase`)) .addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
.addColumn('pubkey8', 'text', c => c.notNull()) .addColumn('pubkey8', 'text', (c) => c.notNull())
.addColumn('inserted_at', 'integer', c => c.notNull()) .addColumn('inserted_at', 'integer', (c) => c.notNull())
.execute(); .execute();
await trx.schema await trx.schema
@ -17,12 +17,12 @@ export async function up(db: Kysely<any>): Promise<void> {
.column('inserted_at') .column('inserted_at')
.column('tag') .column('tag')
.execute(); .execute();
}) });
} }
export async function down(db: Kysely<any>): Promise<void> { export async function down(db: Kysely<any>): Promise<void> {
await db.transaction().execute(async trx => { await db.transaction().execute(async (trx) => {
await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute(); await trx.schema.dropIndex('trends_idx_time_tag').ifExists().execute();
await trx.schema.dropTable('trends_tag_usages').ifExists().execute(); await trx.schema.dropTable('trends_tag_usages').ifExists().execute();
}) });
} }

View File

@ -1,9 +1,6 @@
import { assertEquals } from '@std/assert'; import { assertEquals } from '@std/assert';
import { TrendsWorker } from './trends.ts'; import { TrendsWorker } from './trends.ts';
await TrendsWorker.open(':memory:');
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`; const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
Deno.test('getTrendingTags', async () => { Deno.test('getTrendingTags', async () => {
@ -19,9 +16,9 @@ Deno.test('getTrendingTags', async () => {
}); });
const expected = [ const expected = [
{ name: 'ditto', accounts: 3, uses: 3 }, { tag: 'ditto', accounts: 3, uses: 3 },
{ name: 'hello', accounts: 2, uses: 3 }, { tag: 'hello', accounts: 2, uses: 3 },
{ name: 'yolo', accounts: 1, uses: 1 }, { tag: 'yolo', accounts: 1, uses: 1 },
]; ];
assertEquals(result, expected); assertEquals(result, expected);

View File

@ -1,9 +1,10 @@
import { NSchema } from '@nostrify/nostrify'; import { NSchema } from '@nostrify/nostrify';
import * as Comlink from 'comlink'; import * as Comlink from 'comlink';
import { Sqlite } from '@/deps.ts';
import { hashtagSchema } from '@/schema.ts'; import { hashtagSchema } from '@/schema.ts';
import { generateDateRange, Time } from '@/utils/time.ts'; import { generateDateRange, Time } from '@/utils/time.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { sql } from 'kysely';
interface GetTrendingTagsOpts { interface GetTrendingTagsOpts {
since: Date; since: Date;
@ -20,73 +21,57 @@ interface GetTagHistoryOpts {
offset?: number; offset?: number;
} }
let db: Sqlite; const kysely = await DittoDB.getInstance();
export const TrendsWorker = { export const TrendsWorker = {
open(path: string) { setupCleanupJob() {
db = new Sqlite(path); Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
db.execute(`
CREATE TABLE IF NOT EXISTS tag_usages (
tag TEXT NOT NULL COLLATE NOCASE,
pubkey8 TEXT NOT NULL,
inserted_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag);
`);
const cleanup = () => {
console.info('Cleaning up old tag usages...');
const lastWeek = new Date(new Date().getTime() - Time.days(7)); const lastWeek = new Date(new Date().getTime() - Time.days(7));
this.cleanupTagUsages(lastWeek); await this.cleanupTagUsages(lastWeek);
}; });
setInterval(cleanup, Time.hours(1));
cleanup();
}, },
/** Gets the most used hashtags between the date range. */ /** Gets the most used hashtags between the date range. */
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) { async getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{
return db.query<string[]>( tag: string;
` accounts: number;
SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*) uses: number;
FROM tag_usages }[]> {
WHERE inserted_at >= ? AND inserted_at < ? return await kysely.selectFrom('trends_tag_usages')
GROUP BY tag .select(({ fn }) => [
HAVING COUNT(DISTINCT pubkey8) >= ? 'tag',
ORDER BY COUNT(DISTINCT pubkey8) fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
DESC LIMIT ?; fn.countAll<number>().as('uses'),
`, ])
[since, until, threshold, limit], .where('inserted_at', '>=', since.valueOf())
).map((row) => ({ .where('inserted_at', '<', until.valueOf())
name: row[0], .groupBy('tag')
accounts: Number(row[1]), .having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold))
uses: Number(row[2]), .orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc')
})); .limit(limit)
.execute();
}, },
/** /**
* Gets the tag usage count for a specific tag. * Gets the tag usage count for a specific tag.
* It returns an array with counts for each date between the range. * It returns an array with counts for each date between the range.
*/ */
getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) { async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
const result = db.query<string[]>( const result = await kysely
` .selectFrom('trends_tag_usages')
SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*) .select(({ fn }) => [
FROM tag_usages 'inserted_at',
WHERE tag = ? AND inserted_at >= ? AND inserted_at < ? fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
GROUP BY date(inserted_at) fn.countAll<number>().as('uses'),
ORDER BY date(inserted_at) DESC ])
LIMIT ? .where('tag', '=', tag)
OFFSET ?; .where('inserted_at', '>=', since.valueOf())
`, .where('inserted_at', '<', until.valueOf())
[tag, since, until, limit, offset], .groupBy(sql`inserted_at`)
).map((row) => ({ .orderBy(sql`inserted_at`, 'desc')
day: new Date(row[0]), .limit(limit)
accounts: Number(row[1]), .offset(offset)
uses: Number(row[2]), .execute();
}));
/** Full date range between `since` and `until`. */ /** Full date range between `since` and `until`. */
const dateRange = generateDateRange( const dateRange = generateDateRange(
@ -96,26 +81,29 @@ export const TrendsWorker = {
// Fill in missing dates with 0 usages. // Fill in missing dates with 0 usages.
return dateRange.map((day) => { return dateRange.map((day) => {
const data = result.find((item) => item.day.getTime() === day.getTime()); const data = result.find((item) => new Date(item.inserted_at).getTime() === day.getTime());
return data || { day, accounts: 0, uses: 0 }; if (data) {
return { ...data, day: new Date(data.inserted_at) };
}
return { day, accounts: 0, uses: 0 };
}); });
}, },
addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void { async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date().valueOf()): Promise<void> {
const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8); const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8);
const tags = hashtagSchema.array().min(1).parse(hashtags); const tags = hashtagSchema.array().min(1).parse(hashtags);
db.query( await kysely
'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '), .insertInto('trends_tag_usages')
tags.map((tag) => [tag, pubkey8, date]).flat(), .values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
); .execute();
}, },
cleanupTagUsages(until: Date): void { async cleanupTagUsages(until: Date): Promise<void> {
db.query( await kysely
'DELETE FROM tag_usages WHERE inserted_at < ?', .deleteFrom('trends_tag_usages')
[until], .where('inserted_at', '<', until.valueOf())
); .execute();
}, },
}; };