Merge branch 'hashtag-labels' into 'main'
Save trending hashtags as labels See merge request soapbox-pub/ditto!348
This commit is contained in:
commit
71e6af1292
|
@ -1091,15 +1091,18 @@
|
|||
"https://deno.land/x/hono@v3.10.1/adapter/deno/serve-static.ts": "ba10cf6aaf39da942b0d49c3b9877ddba69d41d414c6551d890beb1085f58eea",
|
||||
"https://deno.land/x/hono@v3.10.1/client/client.ts": "ff340f58041203879972dd368b011ed130c66914f789826610869a90603406bf",
|
||||
"https://deno.land/x/hono@v3.10.1/client/index.ts": "3ff4cf246f3543f827a85a2c84d66a025ac350ee927613629bda47e854bfb7ba",
|
||||
"https://deno.land/x/hono@v3.10.1/client/types.ts": "52c66cbe74540e1811259a48c30622ac915666196eb978092d166435cbc15213",
|
||||
"https://deno.land/x/hono@v3.10.1/client/utils.ts": "053273c002963b549d38268a1b423ac8ca211a8028bdab1ed0b781a62aa5e661",
|
||||
"https://deno.land/x/hono@v3.10.1/compose.ts": "e8ab4b345aa367f2dd65f221c9fe829dd885326a613f4215b654f93a4066bb5c",
|
||||
"https://deno.land/x/hono@v3.10.1/context.ts": "261cc8b8b1e8f04b98beab1cca6692f317b7dc6d2b75b4f84c982e54cf1db730",
|
||||
"https://deno.land/x/hono@v3.10.1/helper/adapter/index.ts": "eea9b4caedbfa3a3b4a020bf46c88c0171a00008cd6c10708cd85a3e39d86e62",
|
||||
"https://deno.land/x/hono@v3.10.1/helper/cookie/index.ts": "55ccd20bbd8d9a8bb2ecd998e90845c1d306c19027f54b3d1b89a5be35968b80",
|
||||
"https://deno.land/x/hono@v3.10.1/helper/html/index.ts": "aba19e8d29f217c7fffa5719cf606c4e259b540d51296e82bbea3c992e2ecbc6",
|
||||
"https://deno.land/x/hono@v3.10.1/hono-base.ts": "cc55e0a4c63a7bdf44df3e804ea4737d5399eeb6606b45d102f8e48c3ff1e925",
|
||||
"https://deno.land/x/hono@v3.10.1/hono.ts": "2cc4c292e541463a4d6f83edbcea58048d203e9564ae62ec430a3d466b49a865",
|
||||
"https://deno.land/x/hono@v3.10.1/http-exception.ts": "6071df078b5f76d279684d52fe82a590f447a64ffe1b75eb5064d0c8a8d2d676",
|
||||
"https://deno.land/x/hono@v3.10.1/jsx/index.ts": "019512d3a9b3897b879e87fa5fb179cd34f3d326f8ff8b93379c2bb707ec168a",
|
||||
"https://deno.land/x/hono@v3.10.1/jsx/intrinsic-elements.ts": "03250beb610bda1c72017bc0912c2505ff764b7a8d869e7e4add40eb4cfec096",
|
||||
"https://deno.land/x/hono@v3.10.1/jsx/streaming.ts": "5d03b4d02eaa396c8f0f33c3f6e8c7ed3afb7598283c2d4a7ddea0ada8c212a7",
|
||||
"https://deno.land/x/hono@v3.10.1/middleware.ts": "57b2047c4b9d775a052a9c44a3b805802c1d1cb477ab9c4bb6185d27382d1b96",
|
||||
"https://deno.land/x/hono@v3.10.1/middleware/basic-auth/index.ts": "5505288ccf9364f56f7be2dfac841543b72e20656e54ac646a1a73a0aa853261",
|
||||
|
@ -1131,6 +1134,7 @@
|
|||
"https://deno.land/x/hono@v3.10.1/router/trie-router/index.ts": "3eb75e7f71ba81801631b30de6b1f5cefb2c7239c03797e2b2cbab5085911b41",
|
||||
"https://deno.land/x/hono@v3.10.1/router/trie-router/node.ts": "3af15fa9c9994a8664a2b7a7c11233504b5bb9d4fcf7bb34cf30d7199052c39f",
|
||||
"https://deno.land/x/hono@v3.10.1/router/trie-router/router.ts": "54ced78d35676302c8fcdda4204f7bdf5a7cc907fbf9967c75674b1e394f830d",
|
||||
"https://deno.land/x/hono@v3.10.1/types.ts": "edc414a92383f9deb82f5f7a09e95bcf76f6100c23457c27d041986768f5345c",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/body.ts": "7a16a6656331a96bcae57642f8d5e3912bd361cbbcc2c0d2157ecc3f218f7a92",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/buffer.ts": "9066a973e64498cb262c7e932f47eed525a51677b17f90893862b7279dc0773e",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/cookie.ts": "19920ba6756944aae1ad8585c3ddeaa9df479733f59d05359db096f7361e5e4b",
|
||||
|
@ -1138,11 +1142,13 @@
|
|||
"https://deno.land/x/hono@v3.10.1/utils/encode.ts": "3b7c7d736123b5073542b34321700d4dbf5ff129c138f434bb2144a4d425ee89",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/filepath.ts": "18461b055a914d6da85077f453051b516281bb17cf64fa74bf5ef604dc9d2861",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/html.ts": "01c1520a4256f899da1954357cf63ae11c348eda141a505f72d7090cf5481aba",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/http-status.ts": "e0c4343ea7717c314dc600131e16b636c29d61cfdaf9df93b267258d1729d1a0",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/jwt/index.ts": "5e4b82a42eb3603351dfce726cd781ca41cb57437395409d227131aec348d2d5",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/jwt/jwt.ts": "02ff7bbf1298ffcc7a40266842f8eac44b6c136453e32d4441e24d0cbfba3a95",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/jwt/types.ts": "58ddf908f76ba18d9c62ddfc2d1e40cc2e306bf987409a6169287efa81ce2546",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/mime.ts": "0105d2b5e8e91f07acc70f5d06b388313995d62af23c802fcfba251f5a744d95",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/stream.ts": "1789dcc73c5b0ede28f83d7d34e47ae432c20e680907cb3275a9c9187f293983",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/types.ts": "ddff055e6d35066232efdfbd42c8954e855c04279c27dcd735d929b6b4f319b3",
|
||||
"https://deno.land/x/hono@v3.10.1/utils/url.ts": "5fc3307ef3cb2e6f34ec2a03e3d7f2126c6a9f5f0eab677222df3f0e40bd7567",
|
||||
"https://deno.land/x/hono@v3.10.1/validator/index.ts": "6c986e8b91dcf857ecc8164a506ae8eea8665792a4ff7215471df669c632ae7c",
|
||||
"https://deno.land/x/hono@v3.10.1/validator/validator.ts": "afa5e52495e0996fbba61996736fab5c486590d72d376f809e9f9ff4e0c463e9",
|
||||
|
|
|
@ -1,20 +1,16 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { NostrEvent, NStore } from '@nostrify/nostrify';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { type AppController } from '@/app.ts';
|
||||
import { AppController } from '@/app.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { hydrateEvents } from '@/storages/hydrate.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { Time } from '@/utils.ts';
|
||||
import { stripTime } from '@/utils/time.ts';
|
||||
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||
import { renderStatus } from '@/views/mastodon/statuses.ts';
|
||||
import { TrendsWorker } from '@/workers/trends.ts';
|
||||
|
||||
await TrendsWorker.open('data/trends.sqlite3');
|
||||
|
||||
let trendingHashtagsCache = getTrendingHashtags();
|
||||
|
||||
Deno.cron('update trends cache', { minute: { every: 15 } }, async () => {
|
||||
Deno.cron('update trending hashtags cache', { minute: { every: 15 } }, async () => {
|
||||
const trends = await getTrendingHashtags();
|
||||
trendingHashtagsCache = Promise.resolve(trends);
|
||||
});
|
||||
|
@ -31,42 +27,24 @@ const trendingTagsController: AppController = async (c) => {
|
|||
};
|
||||
|
||||
async function getTrendingHashtags() {
|
||||
const now = new Date();
|
||||
const yesterday = new Date(now.getTime() - Time.days(1));
|
||||
const lastWeek = new Date(now.getTime() - Time.days(7));
|
||||
const store = await Storages.db();
|
||||
const trends = await getTrendingTags(store, 't');
|
||||
|
||||
/** Most used hashtags within the past 24h. */
|
||||
const tags = await TrendsWorker.getTrendingTags({
|
||||
since: yesterday,
|
||||
until: now,
|
||||
limit: 20,
|
||||
});
|
||||
return trends.map((trend) => {
|
||||
const hashtag = trend.value;
|
||||
|
||||
return Promise.all(tags.map(async ({ tag, uses, accounts }) => ({
|
||||
name: tag,
|
||||
url: Conf.local(`/tags/${tag}`),
|
||||
history: [
|
||||
// Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below.
|
||||
// This result is more accurate than what Mastodon returns.
|
||||
{
|
||||
day: String(Math.floor(stripTime(now).getTime() / 1000)),
|
||||
accounts: String(accounts),
|
||||
const history = trend.history.map(({ day, authors, uses }) => ({
|
||||
day: String(day),
|
||||
accounts: String(authors),
|
||||
uses: String(uses),
|
||||
},
|
||||
...(await TrendsWorker.getTagHistory({
|
||||
tag,
|
||||
since: lastWeek,
|
||||
until: now,
|
||||
limit: 6,
|
||||
offset: 1,
|
||||
})).map((history) => ({
|
||||
// For some reason, Mastodon wants these to be strings... oh well.
|
||||
day: String(Math.floor(history.day.getTime() / 1000)),
|
||||
accounts: String(history.accounts),
|
||||
uses: String(history.uses),
|
||||
})),
|
||||
],
|
||||
})));
|
||||
}));
|
||||
|
||||
return {
|
||||
name: hashtag,
|
||||
url: Conf.local(`/tags/${hashtag}`),
|
||||
history,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
const trendingStatusesQuerySchema = z.object({
|
||||
|
@ -81,7 +59,7 @@ const trendingStatusesController: AppController = async (c) => {
|
|||
const [label] = await store.query([{
|
||||
kinds: [1985],
|
||||
'#L': ['pub.ditto.trends'],
|
||||
'#l': ['notes'],
|
||||
'#l': ['#e'],
|
||||
authors: [Conf.pubkey],
|
||||
limit: 1,
|
||||
}]);
|
||||
|
@ -95,7 +73,7 @@ const trendingStatusesController: AppController = async (c) => {
|
|||
return c.json([]);
|
||||
}
|
||||
|
||||
const results = await store.query([{ ids }])
|
||||
const results = await store.query([{ kinds: [1], ids }])
|
||||
.then((events) => hydrateEvents({ events, store }));
|
||||
|
||||
// Sort events in the order they appear in the label.
|
||||
|
@ -110,4 +88,69 @@ const trendingStatusesController: AppController = async (c) => {
|
|||
return c.json(statuses.filter(Boolean));
|
||||
};
|
||||
|
||||
interface TrendingTag {
|
||||
name: string;
|
||||
value: string;
|
||||
history: {
|
||||
day: number;
|
||||
authors: number;
|
||||
uses: number;
|
||||
}[];
|
||||
}
|
||||
|
||||
export async function getTrendingTags(store: NStore, tagName: string): Promise<TrendingTag[]> {
|
||||
const filter = {
|
||||
kinds: [1985],
|
||||
'#L': ['pub.ditto.trends'],
|
||||
'#l': [`#${tagName}`],
|
||||
authors: [Conf.pubkey],
|
||||
limit: 1,
|
||||
};
|
||||
|
||||
const [label] = await store.query([filter]);
|
||||
|
||||
if (!label) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const tags = label.tags.filter(([name]) => name === tagName);
|
||||
|
||||
const now = new Date();
|
||||
const lastWeek = new Date(now.getTime() - Time.days(7));
|
||||
const dates = generateDateRange(lastWeek, now).reverse();
|
||||
|
||||
return Promise.all(tags.map(async ([_, value]) => {
|
||||
const filters = dates.map((date) => ({
|
||||
...filter,
|
||||
[`#${tagName}`]: [value],
|
||||
since: Math.floor(date.getTime() / 1000),
|
||||
until: Math.floor((date.getTime() + Time.days(1)) / 1000),
|
||||
}));
|
||||
|
||||
const labels = await store.query(filters);
|
||||
|
||||
const history = dates.map((date) => {
|
||||
const label = labels.find((label) => {
|
||||
const since = Math.floor(date.getTime() / 1000);
|
||||
const until = Math.floor((date.getTime() + Time.days(1)) / 1000);
|
||||
return label.created_at >= since && label.created_at < until;
|
||||
});
|
||||
|
||||
const [, , , accounts, uses] = label?.tags.find((tag) => tag[0] === tagName && tag[1] === value) ?? [];
|
||||
|
||||
return {
|
||||
day: Math.floor(date.getTime() / 1000),
|
||||
authors: Number(accounts || 0),
|
||||
uses: Number(uses || 0),
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
name: tagName,
|
||||
value,
|
||||
history,
|
||||
};
|
||||
}));
|
||||
}
|
||||
|
||||
export { trendingStatusesController, trendingTagsController };
|
||||
|
|
40
src/cron.ts
40
src/cron.ts
|
@ -1,20 +1,35 @@
|
|||
import { Stickynotes } from '@soapbox/stickynotes';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { getTrendingNotes } from '@/trends/trending-notes.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||
import { handleEvent } from '@/pipeline.ts';
|
||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||
import { getTrendingTagValues } from '@/trends/trending-tag-values.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
||||
const console = new Stickynotes('ditto:trends');
|
||||
|
||||
async function updateTrendingNotesCache() {
|
||||
console.info('Updating trending notes cache...');
|
||||
async function updateTrendingTags(tagName: string, kinds: number[], limit: number, extra = '', aliases?: string[]) {
|
||||
console.info(`Updating trending #${tagName}...`);
|
||||
const kysely = await DittoDB.getInstance();
|
||||
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
||||
const signal = AbortSignal.timeout(1000);
|
||||
|
||||
const events = await getTrendingNotes(kysely, yesterday, 20);
|
||||
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
|
||||
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
|
||||
|
||||
const trends = await getTrendingTagValues(kysely, tagNames, {
|
||||
kinds,
|
||||
since: yesterday,
|
||||
until: now,
|
||||
limit,
|
||||
});
|
||||
|
||||
if (!trends.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const signer = new AdminSigner();
|
||||
|
||||
const label = await signer.signEvent({
|
||||
|
@ -22,17 +37,20 @@ async function updateTrendingNotesCache() {
|
|||
content: '',
|
||||
tags: [
|
||||
['L', 'pub.ditto.trends'],
|
||||
['l', 'notes', 'pub.ditto.trends'],
|
||||
...events.map(({ id }) => ['e', id]),
|
||||
['l', `#${tagName}`, 'pub.ditto.trends'],
|
||||
...trends.map(({ value, authors, uses }) => [tagName, value, extra, authors.toString(), uses.toString()]),
|
||||
],
|
||||
created_at: Math.floor(Date.now() / 1000),
|
||||
});
|
||||
|
||||
await handleEvent(label, signal);
|
||||
console.info('Trending notes cache updated.');
|
||||
console.info(`Trending #${tagName} updated.`);
|
||||
}
|
||||
|
||||
/** Start cron jobs for the application. */
|
||||
export function cron() {
|
||||
Deno.cron('update trending notes cache', { minute: { every: 15 } }, updateTrendingNotesCache);
|
||||
Deno.cron('update trending pubkeys', '0 * * * *', () => updateTrendingTags('p', [1, 3], 40, Conf.relay));
|
||||
Deno.cron('update trending notes', '15 * * * *', () => updateTrendingTags('e', [1, 6, 7], 40, Conf.relay, ['q']));
|
||||
Deno.cron('update trending hashtags', '30 * * * *', () => updateTrendingTags('t', [1], 20));
|
||||
Deno.cron('update trending links', '45 * * * *', () => updateTrendingTags('r', [1], 20));
|
||||
}
|
||||
|
|
|
@ -13,9 +13,8 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
|
|||
import { RelayError } from '@/RelayError.ts';
|
||||
import { hydrateEvents } from '@/storages/hydrate.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { eventAge, nostrDate, parseNip05, Time } from '@/utils.ts';
|
||||
import { eventAge, parseNip05, Time } from '@/utils.ts';
|
||||
import { policyWorker } from '@/workers/policy.ts';
|
||||
import { TrendsWorker } from '@/workers/trends.ts';
|
||||
import { verifyEventWorker } from '@/workers/verify.ts';
|
||||
import { nip05Cache } from '@/utils/nip05.ts';
|
||||
import { updateStats } from '@/utils/stats.ts';
|
||||
|
@ -49,7 +48,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
|||
storeEvent(event, signal),
|
||||
parseMetadata(event, signal),
|
||||
DVM.event(event),
|
||||
trackHashtags(event),
|
||||
processMedia(event),
|
||||
streamOut(event),
|
||||
]);
|
||||
|
@ -150,25 +148,6 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
|
|||
}
|
||||
}
|
||||
|
||||
/** Track whenever a hashtag is used, for processing trending tags. */
|
||||
async function trackHashtags(event: NostrEvent): Promise<void> {
|
||||
const date = nostrDate(event.created_at);
|
||||
|
||||
const tags = event.tags
|
||||
.filter((tag) => tag[0] === 't')
|
||||
.map((tag) => tag[1])
|
||||
.slice(0, 5);
|
||||
|
||||
if (!tags.length) return;
|
||||
|
||||
try {
|
||||
debug('tracking tags:', JSON.stringify(tags));
|
||||
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
|
||||
} catch (_e) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
/** Delete unattached media entries that are attached to the event. */
|
||||
function processMedia({ tags, pubkey, user }: DittoEvent) {
|
||||
if (user) {
|
||||
|
|
|
@ -36,7 +36,7 @@ class EventsDB implements NStore {
|
|||
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
|
||||
'proxy': ({ count, value }) => count === 0 && isURL(value),
|
||||
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
||||
't': ({ count, value }) => count < 5 && value.length < 50,
|
||||
't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50,
|
||||
'name': ({ event, count }) => event.kind === 30361 && count === 0,
|
||||
'role': ({ event, count }) => event.kind === 30361 && count === 0,
|
||||
};
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
|
||||
/**
|
||||
* Make a direct query to the database to get trending kind 1 notes within the specified timeframe.
|
||||
*
|
||||
* This query makes use of cached stats (in the `event_stats` table).
|
||||
* The query is SLOW so it needs to be run on a schedule and cached.
|
||||
*/
|
||||
export async function getTrendingNotes(
|
||||
/** Kysely instance to execute queries on. */
|
||||
kysely: Kysely<DittoTables>,
|
||||
/** Unix timestamp in _seconds_ for the starting point of this query. */
|
||||
since: number,
|
||||
/** Maximum number of trending notes to return. */
|
||||
limit: number,
|
||||
): Promise<NostrEvent[]> {
|
||||
const rows = await kysely
|
||||
.selectFrom('nostr_events')
|
||||
.selectAll('nostr_events')
|
||||
.innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id')
|
||||
.where('nostr_events.kind', '=', 1)
|
||||
.where('nostr_events.created_at', '>', since)
|
||||
.orderBy(
|
||||
sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`,
|
||||
'desc',
|
||||
)
|
||||
.limit(limit)
|
||||
.execute();
|
||||
|
||||
return rows.map((row) => ({
|
||||
...row,
|
||||
tags: JSON.parse(row.tags),
|
||||
}));
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
import { NostrFilter } from '@nostrify/nostrify';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
|
||||
/** Get trending tag values for a given tag in the given time frame. */
|
||||
export async function getTrendingTagValues(
|
||||
/** Kysely instance to execute queries on. */
|
||||
kysely: Kysely<DittoTables>,
|
||||
/** Tag name to filter by, eg `t` or `r`. */
|
||||
tagNames: string[],
|
||||
/** Filter of eligible events. */
|
||||
filter: NostrFilter,
|
||||
): Promise<{ value: string; authors: number; uses: number }[]> {
|
||||
let query = kysely
|
||||
.selectFrom('nostr_tags')
|
||||
.innerJoin('nostr_events', 'nostr_events.id', 'nostr_tags.event_id')
|
||||
.select(({ fn }) => [
|
||||
'nostr_tags.value',
|
||||
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('nostr_tags.name', 'in', tagNames)
|
||||
.groupBy('nostr_tags.value')
|
||||
.orderBy((c) => c.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
|
||||
|
||||
if (filter.kinds) {
|
||||
query = query.where('nostr_events.kind', 'in', filter.kinds);
|
||||
}
|
||||
if (typeof filter.since === 'number') {
|
||||
query = query.where('nostr_events.created_at', '>=', filter.since);
|
||||
}
|
||||
if (typeof filter.until === 'number') {
|
||||
query = query.where('nostr_events.created_at', '<=', filter.until);
|
||||
}
|
||||
if (typeof filter.limit === 'number') {
|
||||
query = query.limit(filter.limit);
|
||||
}
|
||||
|
||||
const rows = await query.execute();
|
||||
|
||||
return rows.map((row) => ({
|
||||
value: row.value,
|
||||
authors: Number(row.authors),
|
||||
uses: Number(row.uses),
|
||||
}));
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
import { assertEquals } from '@std/assert';
|
||||
|
||||
import { TrendsWorker } from './trends.ts';
|
||||
|
||||
await TrendsWorker.open(':memory:');
|
||||
|
||||
const p8 = (pubkey8: string) => `${pubkey8}00000000000000000000000000000000000000000000000000000000`;
|
||||
|
||||
Deno.test('getTrendingTags', async () => {
|
||||
await TrendsWorker.addTagUsages(p8('00000000'), ['ditto', 'hello', 'yolo']);
|
||||
await TrendsWorker.addTagUsages(p8('00000000'), ['hello']);
|
||||
await TrendsWorker.addTagUsages(p8('00000001'), ['Ditto', 'hello']);
|
||||
await TrendsWorker.addTagUsages(p8('00000010'), ['DITTO']);
|
||||
|
||||
const result = await TrendsWorker.getTrendingTags({
|
||||
since: new Date('1999-01-01T00:00:00'),
|
||||
until: new Date('2999-01-01T00:00:00'),
|
||||
threshold: 1,
|
||||
});
|
||||
|
||||
const expected = [
|
||||
{ tag: 'ditto', accounts: 3, uses: 3 },
|
||||
{ tag: 'hello', accounts: 2, uses: 3 },
|
||||
{ tag: 'yolo', accounts: 1, uses: 1 },
|
||||
];
|
||||
|
||||
assertEquals(result, expected);
|
||||
|
||||
await TrendsWorker.cleanupTagUsages(new Date('2999-01-01T00:00:00'));
|
||||
});
|
|
@ -1,19 +0,0 @@
|
|||
import * as Comlink from 'comlink';
|
||||
|
||||
import type { TrendsWorker as _TrendsWorker } from '@/workers/trends.worker.ts';
|
||||
|
||||
const worker = new Worker(new URL('./trends.worker.ts', import.meta.url), { type: 'module' });
|
||||
|
||||
const TrendsWorker = Comlink.wrap<typeof _TrendsWorker>(worker);
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const handleEvent = ({ data }: MessageEvent) => {
|
||||
if (data === 'ready') {
|
||||
worker.removeEventListener('message', handleEvent);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
worker.addEventListener('message', handleEvent);
|
||||
});
|
||||
|
||||
export { TrendsWorker };
|
|
@ -1,152 +0,0 @@
|
|||
/// <reference lib="webworker" />
|
||||
import { Database as Sqlite } from '@db/sqlite';
|
||||
import { NSchema } from '@nostrify/nostrify';
|
||||
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import { hashtagSchema } from '@/schema.ts';
|
||||
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||
|
||||
interface GetTrendingTagsOpts {
|
||||
since: Date;
|
||||
until: Date;
|
||||
limit?: number;
|
||||
threshold?: number;
|
||||
}
|
||||
|
||||
interface GetTagHistoryOpts {
|
||||
tag: string;
|
||||
since: Date;
|
||||
until: Date;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}
|
||||
|
||||
interface TagsDB {
|
||||
tag_usages: {
|
||||
tag: string;
|
||||
pubkey8: string;
|
||||
inserted_at: Date;
|
||||
};
|
||||
}
|
||||
|
||||
let kysely: Kysely<TagsDB>;
|
||||
|
||||
export const TrendsWorker = {
|
||||
async open(path: string) {
|
||||
kysely = new Kysely({
|
||||
dialect: new DenoSqlite3Dialect({
|
||||
database: new Sqlite(path),
|
||||
}),
|
||||
});
|
||||
|
||||
await sql`PRAGMA synchronous = normal`.execute(kysely);
|
||||
await sql`PRAGMA temp_store = memory`.execute(kysely);
|
||||
await sql`PRAGMA foreign_keys = ON`.execute(kysely);
|
||||
await sql`PRAGMA auto_vacuum = FULL`.execute(kysely);
|
||||
await sql`PRAGMA journal_mode = WAL`.execute(kysely);
|
||||
await sql`PRAGMA mmap_size = 50000000`.execute(kysely);
|
||||
|
||||
await kysely.schema
|
||||
.createTable('tag_usages')
|
||||
.ifNotExists()
|
||||
.addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
|
||||
.addColumn('pubkey8', 'text', (c) => c.notNull())
|
||||
.addColumn('inserted_at', 'integer', (c) => c.notNull())
|
||||
.execute();
|
||||
|
||||
await kysely.schema
|
||||
.createIndex('idx_time_tag')
|
||||
.ifNotExists()
|
||||
.on('tag_usages')
|
||||
.column('inserted_at')
|
||||
.column('tag')
|
||||
.execute();
|
||||
|
||||
Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
|
||||
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
||||
await this.cleanupTagUsages(lastWeek);
|
||||
});
|
||||
},
|
||||
|
||||
/** Gets the most used hashtags between the date range. */
|
||||
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{
|
||||
tag: string;
|
||||
accounts: number;
|
||||
uses: number;
|
||||
}[]> {
|
||||
return kysely.selectFrom('tag_usages')
|
||||
.select(({ fn }) => [
|
||||
'tag',
|
||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('inserted_at', '>=', since)
|
||||
.where('inserted_at', '<', until)
|
||||
.groupBy('tag')
|
||||
.having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold))
|
||||
.orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc')
|
||||
.limit(limit)
|
||||
.execute();
|
||||
},
|
||||
|
||||
/**
|
||||
* Gets the tag usage count for a specific tag.
|
||||
* It returns an array with counts for each date between the range.
|
||||
*/
|
||||
async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
||||
const result = await kysely
|
||||
.selectFrom('tag_usages')
|
||||
.select(({ fn }) => [
|
||||
sql<number>`date(inserted_at)`.as('day'),
|
||||
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('tag', '=', tag)
|
||||
.where('inserted_at', '>=', since)
|
||||
.where('inserted_at', '<', until)
|
||||
.groupBy(sql`date(inserted_at)`)
|
||||
.orderBy(sql`date(inserted_at)`, 'desc')
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
.execute();
|
||||
|
||||
/** Full date range between `since` and `until`. */
|
||||
const dateRange = generateDateRange(
|
||||
new Date(since.getTime() + Time.days(1)),
|
||||
new Date(until.getTime() - Time.days(offset)),
|
||||
).reverse();
|
||||
|
||||
// Fill in missing dates with 0 usages.
|
||||
return dateRange.map((day) => {
|
||||
const data = result.find((item) => new Date(item.day).getTime() === day.getTime());
|
||||
if (data) {
|
||||
return { ...data, day: new Date(data.day) };
|
||||
} else {
|
||||
return { day, accounts: 0, uses: 0 };
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise<void> {
|
||||
const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8);
|
||||
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
||||
|
||||
await kysely
|
||||
.insertInto('tag_usages')
|
||||
.values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
|
||||
.execute();
|
||||
},
|
||||
|
||||
async cleanupTagUsages(until: Date): Promise<void> {
|
||||
await kysely
|
||||
.deleteFrom('tag_usages')
|
||||
.where('inserted_at', '<', until)
|
||||
.execute();
|
||||
},
|
||||
};
|
||||
|
||||
Comlink.expose(TrendsWorker);
|
||||
|
||||
self.postMessage('ready');
|
Loading…
Reference in New Issue