Trends: label trending posts by publishing an event
This commit is contained in:
parent
832e1c7e60
commit
0f59b3c96b
|
@ -6,7 +6,7 @@
|
||||||
"jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1",
|
"jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1",
|
||||||
"jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6",
|
"jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6",
|
||||||
"jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0",
|
"jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0",
|
||||||
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.4",
|
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
|
||||||
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
|
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
|
||||||
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
|
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
|
||||||
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0",
|
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0",
|
||||||
|
@ -107,6 +107,7 @@
|
||||||
"integrity": "5b9c17325cc02e37c71e14ac0103b40446b0402fe183e5f5362af23e9ea162bf",
|
"integrity": "5b9c17325cc02e37c71e14ac0103b40446b0402fe183e5f5362af23e9ea162bf",
|
||||||
"dependencies": [
|
"dependencies": [
|
||||||
"jsr:@std/encoding@^0.224.1",
|
"jsr:@std/encoding@^0.224.1",
|
||||||
|
"npm:@scure/base@^1.1.6",
|
||||||
"npm:@scure/bip32@^1.4.0",
|
"npm:@scure/bip32@^1.4.0",
|
||||||
"npm:@scure/bip39@^1.3.0",
|
"npm:@scure/bip39@^1.3.0",
|
||||||
"npm:kysely@^0.27.3",
|
"npm:kysely@^0.27.3",
|
||||||
|
|
|
@ -4,6 +4,7 @@ import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, t
|
||||||
import { cors, logger, serveStatic } from 'hono/middleware';
|
import { cors, logger, serveStatic } from 'hono/middleware';
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
|
import { cron } from '@/cron.ts';
|
||||||
import { startFirehose } from '@/firehose.ts';
|
import { startFirehose } from '@/firehose.ts';
|
||||||
import { Time } from '@/utils.ts';
|
import { Time } from '@/utils.ts';
|
||||||
|
|
||||||
|
@ -114,6 +115,9 @@ const debug = Debug('ditto:http');
|
||||||
if (Conf.firehoseEnabled) {
|
if (Conf.firehoseEnabled) {
|
||||||
startFirehose();
|
startFirehose();
|
||||||
}
|
}
|
||||||
|
if (Conf.cronEnabled) {
|
||||||
|
cron();
|
||||||
|
}
|
||||||
|
|
||||||
app.use('/api/*', logger(debug));
|
app.use('/api/*', logger(debug));
|
||||||
app.use('/relay/*', logger(debug));
|
app.use('/relay/*', logger(debug));
|
||||||
|
|
|
@ -221,6 +221,10 @@ class Conf {
|
||||||
static get firehoseEnabled(): boolean {
|
static get firehoseEnabled(): boolean {
|
||||||
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
|
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
|
||||||
}
|
}
|
||||||
|
/** Whether to enable Ditto cron jobs. */
|
||||||
|
static get cronEnabled(): boolean {
|
||||||
|
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;
|
||||||
|
}
|
||||||
/** Path to the custom policy module. Must be an absolute path, https:, npm:, or jsr: URI. */
|
/** Path to the custom policy module. Must be an absolute path, https:, npm:, or jsr: URI. */
|
||||||
static get policy(): string {
|
static get policy(): string {
|
||||||
return Deno.env.get('DITTO_POLICY') || new URL('../data/policy.ts', import.meta.url).pathname;
|
return Deno.env.get('DITTO_POLICY') || new URL('../data/policy.ts', import.meta.url).pathname;
|
||||||
|
|
|
@ -1,10 +1,7 @@
|
||||||
import { NostrEvent } from '@nostrify/nostrify';
|
|
||||||
import { sql } from 'kysely';
|
|
||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
|
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
|
||||||
import { hydrateEvents } from '@/storages/hydrate.ts';
|
import { hydrateEvents } from '@/storages/hydrate.ts';
|
||||||
import { Storages } from '@/storages.ts';
|
import { Storages } from '@/storages.ts';
|
||||||
import { Time } from '@/utils.ts';
|
import { Time } from '@/utils.ts';
|
||||||
|
@ -70,13 +67,6 @@ async function getTrendingHashtags() {
|
||||||
})));
|
})));
|
||||||
}
|
}
|
||||||
|
|
||||||
let trendingNotesCache = getTrendingNotes();
|
|
||||||
|
|
||||||
Deno.cron('update trending notes cache', { minute: { every: 15 } }, async () => {
|
|
||||||
const events = await getTrendingNotes();
|
|
||||||
trendingNotesCache = Promise.resolve(events);
|
|
||||||
});
|
|
||||||
|
|
||||||
const trendingStatusesQuerySchema = z.object({
|
const trendingStatusesQuerySchema = z.object({
|
||||||
limit: z.coerce.number().catch(20).transform((value) => Math.min(Math.max(value, 0), 40)),
|
limit: z.coerce.number().catch(20).transform((value) => Math.min(Math.max(value, 0), 40)),
|
||||||
});
|
});
|
||||||
|
@ -85,8 +75,24 @@ const trendingStatusesController: AppController = async (c) => {
|
||||||
const store = await Storages.db();
|
const store = await Storages.db();
|
||||||
const { limit } = trendingStatusesQuerySchema.parse(c.req.query());
|
const { limit } = trendingStatusesQuerySchema.parse(c.req.query());
|
||||||
|
|
||||||
const events = await trendingNotesCache
|
const [label] = await store.query([{
|
||||||
.then((events) => events.slice(0, limit))
|
kinds: [1985],
|
||||||
|
'#L': ['pub.ditto.trends'],
|
||||||
|
'#l': ['notes'],
|
||||||
|
authors: [Conf.pubkey],
|
||||||
|
limit: 1,
|
||||||
|
}]);
|
||||||
|
|
||||||
|
const ids = (label?.tags ?? [])
|
||||||
|
.filter(([name]) => name === 'e')
|
||||||
|
.map(([, id]) => id)
|
||||||
|
.slice(0, limit);
|
||||||
|
|
||||||
|
if (!ids.length) {
|
||||||
|
return c.json([]);
|
||||||
|
}
|
||||||
|
|
||||||
|
const events = await store.query([{ ids }])
|
||||||
.then((events) => hydrateEvents({ events, store }));
|
.then((events) => hydrateEvents({ events, store }));
|
||||||
|
|
||||||
const statuses = await Promise.all(
|
const statuses = await Promise.all(
|
||||||
|
@ -96,27 +102,4 @@ const trendingStatusesController: AppController = async (c) => {
|
||||||
return c.json(statuses.filter(Boolean));
|
return c.json(statuses.filter(Boolean));
|
||||||
};
|
};
|
||||||
|
|
||||||
async function getTrendingNotes(): Promise<NostrEvent[]> {
|
|
||||||
const kysely = await DittoDB.getInstance();
|
|
||||||
const since = Math.floor((Date.now() - Time.days(1)) / 1000);
|
|
||||||
|
|
||||||
const rows = await kysely
|
|
||||||
.selectFrom('nostr_events')
|
|
||||||
.selectAll('nostr_events')
|
|
||||||
.innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id')
|
|
||||||
.where('nostr_events.kind', '=', 1)
|
|
||||||
.where('nostr_events.created_at', '>', since)
|
|
||||||
.orderBy(
|
|
||||||
sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`,
|
|
||||||
'desc',
|
|
||||||
)
|
|
||||||
.limit(20)
|
|
||||||
.execute();
|
|
||||||
|
|
||||||
return rows.map((row) => ({
|
|
||||||
...row,
|
|
||||||
tags: JSON.parse(row.tags),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
export { trendingStatusesController, trendingTagsController };
|
export { trendingStatusesController, trendingTagsController };
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
import { Stickynotes } from '@soapbox/stickynotes';
|
||||||
|
|
||||||
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
|
import { getTrendingNotes } from '@/trends/trending-notes.ts';
|
||||||
|
import { Time } from '@/utils/time.ts';
|
||||||
|
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||||
|
import { handleEvent } from '@/pipeline.ts';
|
||||||
|
|
||||||
|
const console = new Stickynotes('ditto:trends');
|
||||||
|
|
||||||
|
async function updateTrendingNotesCache() {
|
||||||
|
console.info('Updating trending notes cache...');
|
||||||
|
const kysely = await DittoDB.getInstance();
|
||||||
|
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
||||||
|
const signal = AbortSignal.timeout(1000);
|
||||||
|
|
||||||
|
const events = await getTrendingNotes(kysely, yesterday, 20);
|
||||||
|
const signer = new AdminSigner();
|
||||||
|
|
||||||
|
const label = await signer.signEvent({
|
||||||
|
kind: 1985,
|
||||||
|
content: '',
|
||||||
|
tags: [
|
||||||
|
['L', 'pub.ditto.trends'],
|
||||||
|
['l', 'notes', 'pub.ditto.trends'],
|
||||||
|
...events.map(({ id }) => ['e', id]),
|
||||||
|
],
|
||||||
|
created_at: Math.floor(Date.now() / 1000),
|
||||||
|
});
|
||||||
|
|
||||||
|
await handleEvent(label, signal);
|
||||||
|
console.info('Trending notes cache updated.');
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Start cron jobs for the application. */
|
||||||
|
export function cron() {
|
||||||
|
Deno.cron('update trending notes cache', { minute: { every: 15 } }, updateTrendingNotesCache);
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
import { NostrEvent } from '@nostrify/nostrify';
|
||||||
|
import { Kysely, sql } from 'kysely';
|
||||||
|
|
||||||
|
import { DittoTables } from '@/db/DittoTables.ts';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a direct query to the database to get trending kind 1 notes within the specified timeframe.
|
||||||
|
*
|
||||||
|
* This query makes use of cached stats (in the `event_stats` table).
|
||||||
|
* The query is SLOW so it needs to be run on a schedule and cached.
|
||||||
|
*/
|
||||||
|
export async function getTrendingNotes(
|
||||||
|
/** Kysely instance to execute queries on. */
|
||||||
|
kysely: Kysely<DittoTables>,
|
||||||
|
/** Unix timestamp in _seconds_ for the starting point of this query. */
|
||||||
|
since: number,
|
||||||
|
/** Maximum number of trending notes to return. */
|
||||||
|
limit: number,
|
||||||
|
): Promise<NostrEvent[]> {
|
||||||
|
const rows = await kysely
|
||||||
|
.selectFrom('nostr_events')
|
||||||
|
.selectAll('nostr_events')
|
||||||
|
.innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id')
|
||||||
|
.where('nostr_events.kind', '=', 1)
|
||||||
|
.where('nostr_events.created_at', '>', since)
|
||||||
|
.orderBy(
|
||||||
|
sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`,
|
||||||
|
'desc',
|
||||||
|
)
|
||||||
|
.limit(limit)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
return rows.map((row) => ({
|
||||||
|
...row,
|
||||||
|
tags: JSON.parse(row.tags),
|
||||||
|
}));
|
||||||
|
}
|
Loading…
Reference in New Issue