Add TrendsWorker for tracking/querying trending tags with a Web Worker
This commit is contained in:
parent
3169ad0a69
commit
d569dfd5b5
|
@ -5,7 +5,7 @@
|
||||||
"start": "deno run -A --unstable src/server.ts",
|
"start": "deno run -A --unstable src/server.ts",
|
||||||
"dev": "deno run -A --unstable --watch src/server.ts",
|
"dev": "deno run -A --unstable --watch src/server.ts",
|
||||||
"debug": "deno run -A --unstable --inspect src/server.ts",
|
"debug": "deno run -A --unstable --inspect src/server.ts",
|
||||||
"test": "DB_PATH=\":memory:\" deno test -A --unstable src",
|
"test": "DB_PATH=\":memory:\" deno test -A --unstable",
|
||||||
"check": "deno check src/server.ts",
|
"check": "deno check src/server.ts",
|
||||||
"relays:sync": "deno run -A --unstable scripts/relays.ts sync"
|
"relays:sync": "deno run -A --unstable scripts/relays.ts sync"
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { z } from '@/deps.ts';
|
import { z } from '@/deps.ts';
|
||||||
import { trends } from '@/trends.ts';
|
|
||||||
import { Time } from '@/utils.ts';
|
import { Time } from '@/utils.ts';
|
||||||
import { stripTime } from '@/utils/time.ts';
|
import { stripTime } from '@/utils/time.ts';
|
||||||
|
import { TrendsWorker } from '@/workers/trends.ts';
|
||||||
|
|
||||||
|
await TrendsWorker.open('data/trends.sqlite3');
|
||||||
|
|
||||||
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
||||||
|
|
||||||
const trendingTagsController: AppController = (c) => {
|
const trendingTagsController: AppController = async (c) => {
|
||||||
const limit = limitSchema.parse(c.req.query('limit'));
|
const limit = limitSchema.parse(c.req.query('limit'));
|
||||||
if (limit < 1) return c.json([]);
|
if (limit < 1) return c.json([]);
|
||||||
|
|
||||||
|
@ -16,13 +18,14 @@ const trendingTagsController: AppController = (c) => {
|
||||||
const lastWeek = new Date(now.getTime() - Time.days(7));
|
const lastWeek = new Date(now.getTime() - Time.days(7));
|
||||||
|
|
||||||
/** Most used hashtags within the past 24h. */
|
/** Most used hashtags within the past 24h. */
|
||||||
const tags = trends.getTrendingTags({
|
const tags = await TrendsWorker.getTrendingTags({
|
||||||
since: yesterday,
|
since: yesterday,
|
||||||
until: now,
|
until: now,
|
||||||
limit,
|
limit,
|
||||||
});
|
});
|
||||||
|
|
||||||
return c.json(tags.map(({ name, uses, accounts }) => ({
|
return c.json(
|
||||||
|
await Promise.all(tags.map(async ({ name, uses, accounts }) => ({
|
||||||
name,
|
name,
|
||||||
url: Conf.local(`/tags/${name}`),
|
url: Conf.local(`/tags/${name}`),
|
||||||
history: [
|
history: [
|
||||||
|
@ -33,20 +36,21 @@ const trendingTagsController: AppController = (c) => {
|
||||||
accounts: String(accounts),
|
accounts: String(accounts),
|
||||||
uses: String(uses),
|
uses: String(uses),
|
||||||
},
|
},
|
||||||
...trends.getTagHistory({
|
...(await TrendsWorker.getTagHistory({
|
||||||
tag: name,
|
tag: name,
|
||||||
since: lastWeek,
|
since: lastWeek,
|
||||||
until: now,
|
until: now,
|
||||||
limit: 6,
|
limit: 6,
|
||||||
offset: 1,
|
offset: 1,
|
||||||
}).map((history) => ({
|
})).map((history) => ({
|
||||||
// For some reason, Mastodon wants these to be strings... oh well.
|
// For some reason, Mastodon wants these to be strings... oh well.
|
||||||
day: String(Math.floor(history.day.getTime() / 1000)),
|
day: String(Math.floor(history.day.getTime() / 1000)),
|
||||||
accounts: String(history.accounts),
|
accounts: String(history.accounts),
|
||||||
uses: String(history.uses),
|
uses: String(history.uses),
|
||||||
})),
|
})),
|
||||||
],
|
],
|
||||||
})));
|
}))),
|
||||||
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
export { trendingTagsController };
|
export { trendingTagsController };
|
||||||
|
|
|
@ -10,8 +10,8 @@ import { publish } from '@/pool.ts';
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
import { trends } from '@/trends.ts';
|
|
||||||
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
|
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
|
||||||
|
import { TrendsWorker } from '@/workers/trends.ts';
|
||||||
import { verifySignatureWorker } from '@/workers/verify.ts';
|
import { verifySignatureWorker } from '@/workers/verify.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
@ -90,7 +90,7 @@ async function processDeletions(event: Event): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Track whenever a hashtag is used, for processing trending tags. */
|
/** Track whenever a hashtag is used, for processing trending tags. */
|
||||||
function trackHashtags(event: Event): void {
|
async function trackHashtags(event: Event): Promise<void> {
|
||||||
const date = nostrDate(event.created_at);
|
const date = nostrDate(event.created_at);
|
||||||
|
|
||||||
const tags = event.tags
|
const tags = event.tags
|
||||||
|
@ -102,7 +102,7 @@ function trackHashtags(event: Event): void {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
console.info('tracking tags:', tags);
|
console.info('tracking tags:', tags);
|
||||||
trends.addTagUsages(event.pubkey, tags, date);
|
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
|
||||||
} catch (_e) {
|
} catch (_e) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
import { assertEquals } from '@/deps-test.ts';
|
|
||||||
import { Sqlite } from '@/deps.ts';
|
|
||||||
|
|
||||||
import { TrendsDB } from './trends.ts';
|
|
||||||
|
|
||||||
const db = new Sqlite(':memory:');
|
|
||||||
const trends = new TrendsDB(db);
|
|
||||||
|
|
||||||
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
|
||||||
|
|
||||||
Deno.test('getTrendingTags', () => {
|
|
||||||
trends.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
|
|
||||||
trends.addTagUsages(p8('00000000'), ['hello']);
|
|
||||||
trends.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
|
|
||||||
trends.addTagUsages(p8('00000010'), ['DITTO']);
|
|
||||||
|
|
||||||
const result = trends.getTrendingTags({
|
|
||||||
since: new Date('1999-01-01T00:00:00'),
|
|
||||||
until: new Date('2999-01-01T00:00:00'),
|
|
||||||
threshold: 1,
|
|
||||||
});
|
|
||||||
|
|
||||||
const expected = [
|
|
||||||
{ name: 'ditto', accounts: 3, uses: 3 },
|
|
||||||
{ name: 'hello', accounts: 2, uses: 3 },
|
|
||||||
{ name: 'yolo', accounts: 1, uses: 1 },
|
|
||||||
];
|
|
||||||
|
|
||||||
assertEquals(result, expected);
|
|
||||||
|
|
||||||
trends.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
|
|
||||||
});
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
import { assertEquals } from '@/deps-test.ts';
|
||||||
|
|
||||||
|
import { TrendsWorker } from './trends.ts';
|
||||||
|
|
||||||
|
await TrendsWorker.open(':memory:');
|
||||||
|
|
||||||
|
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
||||||
|
|
||||||
|
Deno.test('getTrendingTags', async () => {
|
||||||
|
await TrendsWorker.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
|
||||||
|
await TrendsWorker.addTagUsages(p8('00000000'), ['hello']);
|
||||||
|
await TrendsWorker.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
|
||||||
|
await TrendsWorker.addTagUsages(p8('00000010'), ['DITTO']);
|
||||||
|
|
||||||
|
const result = await TrendsWorker.getTrendingTags({
|
||||||
|
since: new Date('1999-01-01T00:00:00'),
|
||||||
|
until: new Date('2999-01-01T00:00:00'),
|
||||||
|
threshold: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
const expected = [
|
||||||
|
{ name: 'ditto', accounts: 3, uses: 3 },
|
||||||
|
{ name: 'hello', accounts: 2, uses: 3 },
|
||||||
|
{ name: 'yolo', accounts: 1, uses: 1 },
|
||||||
|
];
|
||||||
|
|
||||||
|
assertEquals(result, expected);
|
||||||
|
|
||||||
|
await TrendsWorker.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
|
||||||
|
});
|
|
@ -0,0 +1,19 @@
|
||||||
|
import { Comlink } from '@/deps.ts';
|
||||||
|
|
||||||
|
import type { TrendsWorker as _TrendsWorker } from '@/workers/trends.worker.ts';
|
||||||
|
|
||||||
|
const worker = new Worker(new URL('./trends.worker.ts', import.meta.url), { type: 'module' });
|
||||||
|
|
||||||
|
const TrendsWorker = Comlink.wrap<typeof _TrendsWorker>(worker);
|
||||||
|
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
const handleEvent = ({ data }: MessageEvent) => {
|
||||||
|
if (data === 'ready') {
|
||||||
|
worker.removeEventListener('message', handleEvent);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
worker.addEventListener('message', handleEvent);
|
||||||
|
});
|
||||||
|
|
||||||
|
export { TrendsWorker };
|
|
@ -1,8 +1,7 @@
|
||||||
import { Sqlite } from '@/deps.ts';
|
import { Comlink, Sqlite } from '@/deps.ts';
|
||||||
import { hashtagSchema } from '@/schema.ts';
|
import { hashtagSchema } from '@/schema.ts';
|
||||||
import { nostrIdSchema } from '@/schemas/nostr.ts';
|
import { nostrIdSchema } from '@/schemas/nostr.ts';
|
||||||
import { Time } from '@/utils.ts';
|
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||||
import { generateDateRange } from '@/utils/time.ts';
|
|
||||||
|
|
||||||
interface GetTrendingTagsOpts {
|
interface GetTrendingTagsOpts {
|
||||||
since: Date;
|
since: Date;
|
||||||
|
@ -19,13 +18,13 @@ interface GetTagHistoryOpts {
|
||||||
offset?: number;
|
offset?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
class TrendsDB {
|
let db: Sqlite;
|
||||||
#db: Sqlite;
|
|
||||||
|
|
||||||
constructor(db: Sqlite) {
|
export const TrendsWorker = {
|
||||||
this.#db = db;
|
open(path: string) {
|
||||||
|
db = new Sqlite(path);
|
||||||
|
|
||||||
this.#db.execute(`
|
db.execute(`
|
||||||
CREATE TABLE IF NOT EXISTS tag_usages (
|
CREATE TABLE IF NOT EXISTS tag_usages (
|
||||||
tag TEXT NOT NULL COLLATE NOCASE,
|
tag TEXT NOT NULL COLLATE NOCASE,
|
||||||
pubkey8 TEXT NOT NULL,
|
pubkey8 TEXT NOT NULL,
|
||||||
|
@ -43,11 +42,11 @@ class TrendsDB {
|
||||||
|
|
||||||
setInterval(cleanup, Time.hours(1));
|
setInterval(cleanup, Time.hours(1));
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
},
|
||||||
|
|
||||||
/** Gets the most used hashtags between the date range. */
|
/** Gets the most used hashtags between the date range. */
|
||||||
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) {
|
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) {
|
||||||
return this.#db.query<string[]>(
|
return db.query<string[]>(
|
||||||
`
|
`
|
||||||
SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*)
|
SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*)
|
||||||
FROM tag_usages
|
FROM tag_usages
|
||||||
|
@ -63,14 +62,14 @@ class TrendsDB {
|
||||||
accounts: Number(row[1]),
|
accounts: Number(row[1]),
|
||||||
uses: Number(row[2]),
|
uses: Number(row[2]),
|
||||||
}));
|
}));
|
||||||
}
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the tag usage count for a specific tag.
|
* Gets the tag usage count for a specific tag.
|
||||||
* It returns an array with counts for each date between the range.
|
* It returns an array with counts for each date between the range.
|
||||||
*/
|
*/
|
||||||
getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
||||||
const result = this.#db.query<string[]>(
|
const result = db.query<string[]>(
|
||||||
`
|
`
|
||||||
SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*)
|
SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*)
|
||||||
FROM tag_usages
|
FROM tag_usages
|
||||||
|
@ -98,28 +97,26 @@ class TrendsDB {
|
||||||
const data = result.find((item) => item.day.getTime() === day.getTime());
|
const data = result.find((item) => item.day.getTime() === day.getTime());
|
||||||
return data || { day, accounts: 0, uses: 0 };
|
return data || { day, accounts: 0, uses: 0 };
|
||||||
});
|
});
|
||||||
}
|
},
|
||||||
|
|
||||||
addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void {
|
addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void {
|
||||||
const pubkey8 = nostrIdSchema.parse(pubkey).substring(0, 8);
|
const pubkey8 = nostrIdSchema.parse(pubkey).substring(0, 8);
|
||||||
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
||||||
|
|
||||||
this.#db.query(
|
db.query(
|
||||||
'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '),
|
'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '),
|
||||||
tags.map((tag) => [tag, pubkey8, date]).flat(),
|
tags.map((tag) => [tag, pubkey8, date]).flat(),
|
||||||
);
|
);
|
||||||
}
|
},
|
||||||
|
|
||||||
cleanupTagUsages(until: Date): void {
|
cleanupTagUsages(until: Date): void {
|
||||||
this.#db.query(
|
db.query(
|
||||||
'DELETE FROM tag_usages WHERE inserted_at < ?',
|
'DELETE FROM tag_usages WHERE inserted_at < ?',
|
||||||
[until],
|
[until],
|
||||||
);
|
);
|
||||||
}
|
},
|
||||||
}
|
};
|
||||||
|
|
||||||
const trends = new TrendsDB(
|
Comlink.expose(TrendsWorker);
|
||||||
new Sqlite('data/trends.sqlite3'),
|
|
||||||
);
|
|
||||||
|
|
||||||
export { trends, TrendsDB };
|
self.postMessage('ready');
|
Loading…
Reference in New Issue