Merge branch 'trends-worker' into 'main'

Add TrendsWorker for tracking/querying trending tags with a Web Worker

See merge request soapbox-pub/ditto!67
This commit is contained in:
Alex Gleason 2023-12-04 22:34:41 +00:00
commit 10447aed64
7 changed files with 104 additions and 86 deletions

View File

@ -5,7 +5,7 @@
"start": "deno run -A --unstable src/server.ts",
"dev": "deno run -A --unstable --watch src/server.ts",
"debug": "deno run -A --unstable --inspect src/server.ts",
"test": "DB_PATH=\":memory:\" deno test -A --unstable src",
"test": "DB_PATH=\":memory:\" deno test -A --unstable",
"check": "deno check src/server.ts",
"relays:sync": "deno run -A --unstable scripts/relays.ts sync"
},

View File

@ -1,13 +1,15 @@
import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
import { z } from '@/deps.ts';
import { trends } from '@/trends.ts';
import { Time } from '@/utils.ts';
import { stripTime } from '@/utils/time.ts';
import { TrendsWorker } from '@/workers/trends.ts';
await TrendsWorker.open('data/trends.sqlite3');
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
const trendingTagsController: AppController = (c) => {
const trendingTagsController: AppController = async (c) => {
const limit = limitSchema.parse(c.req.query('limit'));
if (limit < 1) return c.json([]);
@ -16,13 +18,14 @@ const trendingTagsController: AppController = (c) => {
const lastWeek = new Date(now.getTime() - Time.days(7));
/** Most used hashtags within the past 24h. */
const tags = trends.getTrendingTags({
const tags = await TrendsWorker.getTrendingTags({
since: yesterday,
until: now,
limit,
});
return c.json(tags.map(({ name, uses, accounts }) => ({
return c.json(
await Promise.all(tags.map(async ({ name, uses, accounts }) => ({
name,
url: Conf.local(`/tags/${name}`),
history: [
@ -33,20 +36,21 @@ const trendingTagsController: AppController = (c) => {
accounts: String(accounts),
uses: String(uses),
},
...trends.getTagHistory({
...(await TrendsWorker.getTagHistory({
tag: name,
since: lastWeek,
until: now,
limit: 6,
offset: 1,
}).map((history) => ({
})).map((history) => ({
// For some reason, Mastodon wants these to be strings... oh well.
day: String(Math.floor(history.day.getTime() / 1000)),
accounts: String(history.accounts),
uses: String(history.uses),
})),
],
})));
}))),
);
};
export { trendingTagsController };

View File

@ -10,8 +10,8 @@ import { publish } from '@/pool.ts';
import { isLocallyFollowed } from '@/queries.ts';
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts';
import { trends } from '@/trends.ts';
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
import { TrendsWorker } from '@/workers/trends.ts';
import { verifySignatureWorker } from '@/workers/verify.ts';
import type { EventData } from '@/types.ts';
@ -90,7 +90,7 @@ async function processDeletions(event: Event): Promise<void> {
}
/** Track whenever a hashtag is used, for processing trending tags. */
function trackHashtags(event: Event): void {
async function trackHashtags(event: Event): Promise<void> {
const date = nostrDate(event.created_at);
const tags = event.tags
@ -102,7 +102,7 @@ function trackHashtags(event: Event): void {
try {
console.info('tracking tags:', tags);
trends.addTagUsages(event.pubkey, tags, date);
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
} catch (_e) {
// do nothing
}

View File

@ -1,32 +0,0 @@
import { assertEquals } from '@/deps-test.ts';
import { Sqlite } from '@/deps.ts';
import { TrendsDB } from './trends.ts';
const db = new Sqlite(':memory:');
const trends = new TrendsDB(db);
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
Deno.test('getTrendingTags', () => {
trends.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
trends.addTagUsages(p8('00000000'), ['hello']);
trends.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
trends.addTagUsages(p8('00000010'), ['DITTO']);
const result = trends.getTrendingTags({
since: new Date('1999-01-01T00:00:00'),
until: new Date('2999-01-01T00:00:00'),
threshold: 1,
});
const expected = [
{ name: 'ditto', accounts: 3, uses: 3 },
{ name: 'hello', accounts: 2, uses: 3 },
{ name: 'yolo', accounts: 1, uses: 1 },
];
assertEquals(result, expected);
trends.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
});

View File

@ -0,0 +1,30 @@
import { assertEquals } from '@/deps-test.ts';
import { TrendsWorker } from './trends.ts';
await TrendsWorker.open(':memory:');
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
Deno.test('getTrendingTags', async () => {
await TrendsWorker.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
await TrendsWorker.addTagUsages(p8('00000000'), ['hello']);
await TrendsWorker.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
await TrendsWorker.addTagUsages(p8('00000010'), ['DITTO']);
const result = await TrendsWorker.getTrendingTags({
since: new Date('1999-01-01T00:00:00'),
until: new Date('2999-01-01T00:00:00'),
threshold: 1,
});
const expected = [
{ name: 'ditto', accounts: 3, uses: 3 },
{ name: 'hello', accounts: 2, uses: 3 },
{ name: 'yolo', accounts: 1, uses: 1 },
];
assertEquals(result, expected);
await TrendsWorker.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
});

19
src/workers/trends.ts Normal file
View File

@ -0,0 +1,19 @@
import { Comlink } from '@/deps.ts';
import type { TrendsWorker as _TrendsWorker } from '@/workers/trends.worker.ts';
const worker = new Worker(new URL('./trends.worker.ts', import.meta.url), { type: 'module' });
const TrendsWorker = Comlink.wrap<typeof _TrendsWorker>(worker);
await new Promise<void>((resolve) => {
const handleEvent = ({ data }: MessageEvent) => {
if (data === 'ready') {
worker.removeEventListener('message', handleEvent);
resolve();
}
};
worker.addEventListener('message', handleEvent);
});
export { TrendsWorker };

View File

@ -1,8 +1,7 @@
import { Sqlite } from '@/deps.ts';
import { Comlink, Sqlite } from '@/deps.ts';
import { hashtagSchema } from '@/schema.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import { Time } from '@/utils.ts';
import { generateDateRange } from '@/utils/time.ts';
import { generateDateRange, Time } from '@/utils/time.ts';
interface GetTrendingTagsOpts {
since: Date;
@ -19,13 +18,13 @@ interface GetTagHistoryOpts {
offset?: number;
}
class TrendsDB {
#db: Sqlite;
let db: Sqlite;
constructor(db: Sqlite) {
this.#db = db;
export const TrendsWorker = {
open(path: string) {
db = new Sqlite(path);
this.#db.execute(`
db.execute(`
CREATE TABLE IF NOT EXISTS tag_usages (
tag TEXT NOT NULL COLLATE NOCASE,
pubkey8 TEXT NOT NULL,
@ -43,11 +42,11 @@ class TrendsDB {
setInterval(cleanup, Time.hours(1));
cleanup();
}
},
/** Gets the most used hashtags between the date range. */
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) {
return this.#db.query<string[]>(
return db.query<string[]>(
`
SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*)
FROM tag_usages
@ -63,14 +62,14 @@ class TrendsDB {
accounts: Number(row[1]),
uses: Number(row[2]),
}));
}
},
/**
* Gets the tag usage count for a specific tag.
* It returns an array with counts for each date between the range.
*/
getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
const result = this.#db.query<string[]>(
const result = db.query<string[]>(
`
SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*)
FROM tag_usages
@ -98,28 +97,26 @@ class TrendsDB {
const data = result.find((item) => item.day.getTime() === day.getTime());
return data || { day, accounts: 0, uses: 0 };
});
}
},
addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void {
const pubkey8 = nostrIdSchema.parse(pubkey).substring(0, 8);
const tags = hashtagSchema.array().min(1).parse(hashtags);
this.#db.query(
db.query(
'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '),
tags.map((tag) => [tag, pubkey8, date]).flat(),
);
}
},
cleanupTagUsages(until: Date): void {
this.#db.query(
db.query(
'DELETE FROM tag_usages WHERE inserted_at < ?',
[until],
);
}
}
},
};
const trends = new TrendsDB(
new Sqlite('data/trends.sqlite3'),
);
Comlink.expose(TrendsWorker);
export { trends, TrendsDB };
self.postMessage('ready');