Merge branch 'refactor-trends' into 'main'
Refactor trending tags See merge request soapbox-pub/ditto!282
This commit is contained in:
commit
1c9dbd070e
|
@ -14,7 +14,7 @@
|
||||||
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
||||||
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip"
|
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip"
|
||||||
},
|
},
|
||||||
"unstable": ["ffi", "kv", "worker-options"],
|
"unstable": ["cron", "ffi", "kv", "worker-options"],
|
||||||
"exclude": ["./public"],
|
"exclude": ["./public"],
|
||||||
"imports": {
|
"imports": {
|
||||||
"@/": "./src/",
|
"@/": "./src/",
|
||||||
|
@ -37,7 +37,6 @@
|
||||||
"@std/streams": "jsr:@std/streams@^0.223.0",
|
"@std/streams": "jsr:@std/streams@^0.223.0",
|
||||||
"comlink": "npm:comlink@^4.4.1",
|
"comlink": "npm:comlink@^4.4.1",
|
||||||
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
|
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
|
||||||
"deno-sqlite": "https://raw.githubusercontent.com/alexgleason/deno-sqlite/325f66d8c395e7f6f5ee78ebfa42a0eeea4a942b/mod.ts",
|
|
||||||
"entities": "npm:entities@^4.5.0",
|
"entities": "npm:entities@^4.5.0",
|
||||||
"fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0",
|
"fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0",
|
||||||
"formdata-helper": "npm:formdata-helper@^0.3.0",
|
"formdata-helper": "npm:formdata-helper@^0.3.0",
|
||||||
|
|
|
@ -194,12 +194,8 @@ app.get('/api/v2/search', searchController);
|
||||||
|
|
||||||
app.get('/api/pleroma/frontend_configurations', frontendConfigController);
|
app.get('/api/pleroma/frontend_configurations', frontendConfigController);
|
||||||
|
|
||||||
app.get(
|
app.get('/api/v1/trends/tags', trendingTagsController);
|
||||||
'/api/v1/trends/tags',
|
app.get('/api/v1/trends', trendingTagsController);
|
||||||
cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }),
|
|
||||||
trendingTagsController,
|
|
||||||
);
|
|
||||||
app.get('/api/v1/trends', cacheMiddleware({ cacheName: 'web', expires: Time.minutes(15) }), trendingTagsController);
|
|
||||||
|
|
||||||
app.get('/api/v1/suggestions', suggestionsV1Controller);
|
app.get('/api/v1/suggestions', suggestionsV1Controller);
|
||||||
app.get('/api/v2/suggestions', suggestionsV2Controller);
|
app.get('/api/v2/suggestions', suggestionsV2Controller);
|
||||||
|
|
|
@ -10,10 +10,20 @@ await TrendsWorker.open('data/trends.sqlite3');
|
||||||
|
|
||||||
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
const limitSchema = z.coerce.number().catch(10).transform((value) => Math.min(Math.max(value, 0), 20));
|
||||||
|
|
||||||
|
let cache = getTrends();
|
||||||
|
|
||||||
|
Deno.cron('update trends cache', { minute: { every: 15 } }, async () => {
|
||||||
|
const trends = await getTrends();
|
||||||
|
cache = Promise.resolve(trends);
|
||||||
|
});
|
||||||
|
|
||||||
const trendingTagsController: AppController = async (c) => {
|
const trendingTagsController: AppController = async (c) => {
|
||||||
const limit = limitSchema.parse(c.req.query('limit'));
|
const limit = limitSchema.parse(c.req.query('limit'));
|
||||||
if (limit < 1) return c.json([]);
|
const trends = await cache;
|
||||||
|
return c.json(trends.slice(0, limit));
|
||||||
|
};
|
||||||
|
|
||||||
|
async function getTrends() {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
const yesterday = new Date(now.getTime() - Time.days(1));
|
const yesterday = new Date(now.getTime() - Time.days(1));
|
||||||
const lastWeek = new Date(now.getTime() - Time.days(7));
|
const lastWeek = new Date(now.getTime() - Time.days(7));
|
||||||
|
@ -22,36 +32,34 @@ const trendingTagsController: AppController = async (c) => {
|
||||||
const tags = await TrendsWorker.getTrendingTags({
|
const tags = await TrendsWorker.getTrendingTags({
|
||||||
since: yesterday,
|
since: yesterday,
|
||||||
until: now,
|
until: now,
|
||||||
limit,
|
limit: 20,
|
||||||
});
|
});
|
||||||
|
|
||||||
return c.json(
|
return Promise.all(tags.map(async ({ tag, uses, accounts }) => ({
|
||||||
await Promise.all(tags.map(async ({ name, uses, accounts }) => ({
|
name: tag,
|
||||||
name,
|
url: Conf.local(`/tags/${tag}`),
|
||||||
url: Conf.local(`/tags/${name}`),
|
history: [
|
||||||
history: [
|
// Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below.
|
||||||
// Use the full 24h query for the current day. Then use `offset: 1` to adjust for this below.
|
// This result is more accurate than what Mastodon returns.
|
||||||
// This result is more accurate than what Mastodon returns.
|
{
|
||||||
{
|
day: String(Math.floor(stripTime(now).getTime() / 1000)),
|
||||||
day: String(Math.floor(stripTime(now).getTime() / 1000)),
|
accounts: String(accounts),
|
||||||
accounts: String(accounts),
|
uses: String(uses),
|
||||||
uses: String(uses),
|
},
|
||||||
},
|
...(await TrendsWorker.getTagHistory({
|
||||||
...(await TrendsWorker.getTagHistory({
|
tag,
|
||||||
tag: name,
|
since: lastWeek,
|
||||||
since: lastWeek,
|
until: now,
|
||||||
until: now,
|
limit: 6,
|
||||||
limit: 6,
|
offset: 1,
|
||||||
offset: 1,
|
})).map((history) => ({
|
||||||
})).map((history) => ({
|
// For some reason, Mastodon wants these to be strings... oh well.
|
||||||
// For some reason, Mastodon wants these to be strings... oh well.
|
day: String(Math.floor(history.day.getTime() / 1000)),
|
||||||
day: String(Math.floor(history.day.getTime() / 1000)),
|
accounts: String(history.accounts),
|
||||||
accounts: String(history.accounts),
|
uses: String(history.uses),
|
||||||
uses: String(history.uses),
|
})),
|
||||||
})),
|
],
|
||||||
],
|
})));
|
||||||
}))),
|
}
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
export { trendingTagsController };
|
export { trendingTagsController };
|
||||||
|
|
|
@ -19,9 +19,9 @@ Deno.test('getTrendingTags', async () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
const expected = [
|
const expected = [
|
||||||
{ name: 'ditto', accounts: 3, uses: 3 },
|
{ tag: 'ditto', accounts: 3, uses: 3 },
|
||||||
{ name: 'hello', accounts: 2, uses: 3 },
|
{ tag: 'hello', accounts: 2, uses: 3 },
|
||||||
{ name: 'yolo', accounts: 1, uses: 1 },
|
{ tag: 'yolo', accounts: 1, uses: 1 },
|
||||||
];
|
];
|
||||||
|
|
||||||
assertEquals(result, expected);
|
assertEquals(result, expected);
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
/// <reference lib="webworker" />
|
||||||
|
import { Database as Sqlite } from '@db/sqlite';
|
||||||
import { NSchema } from '@nostrify/nostrify';
|
import { NSchema } from '@nostrify/nostrify';
|
||||||
|
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||||
|
import { Kysely, sql } from 'kysely';
|
||||||
import * as Comlink from 'comlink';
|
import * as Comlink from 'comlink';
|
||||||
import { DB as Sqlite } from 'deno-sqlite';
|
|
||||||
|
|
||||||
import { hashtagSchema } from '@/schema.ts';
|
import { hashtagSchema } from '@/schema.ts';
|
||||||
import { generateDateRange, Time } from '@/utils/time.ts';
|
import { generateDateRange, Time } from '@/utils/time.ts';
|
||||||
|
@ -20,73 +23,94 @@ interface GetTagHistoryOpts {
|
||||||
offset?: number;
|
offset?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
let db: Sqlite;
|
interface TagsDB {
|
||||||
|
tag_usages: {
|
||||||
|
tag: string;
|
||||||
|
pubkey8: string;
|
||||||
|
inserted_at: Date;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let kysely: Kysely<TagsDB>;
|
||||||
|
|
||||||
export const TrendsWorker = {
|
export const TrendsWorker = {
|
||||||
open(path: string) {
|
async open(path: string) {
|
||||||
db = new Sqlite(path);
|
kysely = new Kysely({
|
||||||
|
dialect: new DenoSqlite3Dialect({
|
||||||
|
database: new Sqlite(path),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
db.execute(`
|
await sql`PRAGMA synchronous = normal`.execute(kysely);
|
||||||
CREATE TABLE IF NOT EXISTS tag_usages (
|
await sql`PRAGMA temp_store = memory`.execute(kysely);
|
||||||
tag TEXT NOT NULL COLLATE NOCASE,
|
await sql`PRAGMA foreign_keys = ON`.execute(kysely);
|
||||||
pubkey8 TEXT NOT NULL,
|
await sql`PRAGMA auto_vacuum = FULL`.execute(kysely);
|
||||||
inserted_at INTEGER NOT NULL
|
await sql`PRAGMA journal_mode = WAL`.execute(kysely);
|
||||||
);
|
await sql`PRAGMA mmap_size = 50000000`.execute(kysely);
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_time_tag ON tag_usages(inserted_at, tag);
|
await kysely.schema
|
||||||
`);
|
.createTable('tag_usages')
|
||||||
|
.ifNotExists()
|
||||||
|
.addColumn('tag', 'text', (c) => c.notNull().modifyEnd(sql`collate nocase`))
|
||||||
|
.addColumn('pubkey8', 'text', (c) => c.notNull())
|
||||||
|
.addColumn('inserted_at', 'integer', (c) => c.notNull())
|
||||||
|
.execute();
|
||||||
|
|
||||||
const cleanup = () => {
|
await kysely.schema
|
||||||
console.info('Cleaning up old tag usages...');
|
.createIndex('idx_time_tag')
|
||||||
|
.ifNotExists()
|
||||||
|
.on('tag_usages')
|
||||||
|
.column('inserted_at')
|
||||||
|
.column('tag')
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
Deno.cron('cleanup tag usages older than a week', { hour: { every: 1 } }, async () => {
|
||||||
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
const lastWeek = new Date(new Date().getTime() - Time.days(7));
|
||||||
this.cleanupTagUsages(lastWeek);
|
await this.cleanupTagUsages(lastWeek);
|
||||||
};
|
});
|
||||||
|
|
||||||
setInterval(cleanup, Time.hours(1));
|
|
||||||
cleanup();
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/** Gets the most used hashtags between the date range. */
|
/** Gets the most used hashtags between the date range. */
|
||||||
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts) {
|
getTrendingTags({ since, until, limit = 10, threshold = 3 }: GetTrendingTagsOpts): Promise<{
|
||||||
return db.query<string[]>(
|
tag: string;
|
||||||
`
|
accounts: number;
|
||||||
SELECT tag, COUNT(DISTINCT pubkey8), COUNT(*)
|
uses: number;
|
||||||
FROM tag_usages
|
}[]> {
|
||||||
WHERE inserted_at >= ? AND inserted_at < ?
|
return kysely.selectFrom('tag_usages')
|
||||||
GROUP BY tag
|
.select(({ fn }) => [
|
||||||
HAVING COUNT(DISTINCT pubkey8) >= ?
|
'tag',
|
||||||
ORDER BY COUNT(DISTINCT pubkey8)
|
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||||
DESC LIMIT ?;
|
fn.countAll<number>().as('uses'),
|
||||||
`,
|
])
|
||||||
[since, until, threshold, limit],
|
.where('inserted_at', '>=', since)
|
||||||
).map((row) => ({
|
.where('inserted_at', '<', until)
|
||||||
name: row[0],
|
.groupBy('tag')
|
||||||
accounts: Number(row[1]),
|
.having((c) => c(c.fn.agg('count', ['pubkey8']).distinct(), '>=', threshold))
|
||||||
uses: Number(row[2]),
|
.orderBy((c) => c.fn.agg('count', ['pubkey8']).distinct(), 'desc')
|
||||||
}));
|
.limit(limit)
|
||||||
|
.execute();
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the tag usage count for a specific tag.
|
* Gets the tag usage count for a specific tag.
|
||||||
* It returns an array with counts for each date between the range.
|
* It returns an array with counts for each date between the range.
|
||||||
*/
|
*/
|
||||||
getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
async getTagHistory({ tag, since, until, limit = 7, offset = 0 }: GetTagHistoryOpts) {
|
||||||
const result = db.query<string[]>(
|
const result = await kysely
|
||||||
`
|
.selectFrom('tag_usages')
|
||||||
SELECT date(inserted_at), COUNT(DISTINCT pubkey8), COUNT(*)
|
.select(({ fn }) => [
|
||||||
FROM tag_usages
|
sql<number>`date(inserted_at)`.as('day'),
|
||||||
WHERE tag = ? AND inserted_at >= ? AND inserted_at < ?
|
fn.agg<number>('count', ['pubkey8']).distinct().as('accounts'),
|
||||||
GROUP BY date(inserted_at)
|
fn.countAll<number>().as('uses'),
|
||||||
ORDER BY date(inserted_at) DESC
|
])
|
||||||
LIMIT ?
|
.where('tag', '=', tag)
|
||||||
OFFSET ?;
|
.where('inserted_at', '>=', since)
|
||||||
`,
|
.where('inserted_at', '<', until)
|
||||||
[tag, since, until, limit, offset],
|
.groupBy(sql`date(inserted_at)`)
|
||||||
).map((row) => ({
|
.orderBy(sql`date(inserted_at)`, 'desc')
|
||||||
day: new Date(row[0]),
|
.limit(limit)
|
||||||
accounts: Number(row[1]),
|
.offset(offset)
|
||||||
uses: Number(row[2]),
|
.execute();
|
||||||
}));
|
|
||||||
|
|
||||||
/** Full date range between `since` and `until`. */
|
/** Full date range between `since` and `until`. */
|
||||||
const dateRange = generateDateRange(
|
const dateRange = generateDateRange(
|
||||||
|
@ -96,26 +120,30 @@ export const TrendsWorker = {
|
||||||
|
|
||||||
// Fill in missing dates with 0 usages.
|
// Fill in missing dates with 0 usages.
|
||||||
return dateRange.map((day) => {
|
return dateRange.map((day) => {
|
||||||
const data = result.find((item) => item.day.getTime() === day.getTime());
|
const data = result.find((item) => new Date(item.day).getTime() === day.getTime());
|
||||||
return data || { day, accounts: 0, uses: 0 };
|
if (data) {
|
||||||
|
return { ...data, day: new Date(data.day) };
|
||||||
|
} else {
|
||||||
|
return { day, accounts: 0, uses: 0 };
|
||||||
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
addTagUsages(pubkey: string, hashtags: string[], date = new Date()): void {
|
async addTagUsages(pubkey: string, hashtags: string[], inserted_at = new Date()): Promise<void> {
|
||||||
const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8);
|
const pubkey8 = NSchema.id().parse(pubkey).substring(0, 8);
|
||||||
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
const tags = hashtagSchema.array().min(1).parse(hashtags);
|
||||||
|
|
||||||
db.query(
|
await kysely
|
||||||
'INSERT INTO tag_usages (tag, pubkey8, inserted_at) VALUES ' + tags.map(() => '(?, ?, ?)').join(', '),
|
.insertInto('tag_usages')
|
||||||
tags.map((tag) => [tag, pubkey8, date]).flat(),
|
.values(tags.map((tag) => ({ tag, pubkey8, inserted_at })))
|
||||||
);
|
.execute();
|
||||||
},
|
},
|
||||||
|
|
||||||
cleanupTagUsages(until: Date): void {
|
async cleanupTagUsages(until: Date): Promise<void> {
|
||||||
db.query(
|
await kysely
|
||||||
'DELETE FROM tag_usages WHERE inserted_at < ?',
|
.deleteFrom('tag_usages')
|
||||||
[until],
|
.where('inserted_at', '<', until)
|
||||||
);
|
.execute();
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue