Merge branch 'trends-labels' into 'main'

Trends: label trending posts by publishing an event

See merge request soapbox-pub/ditto!346
This commit is contained in:
Alex Gleason 2024-06-02 17:20:27 +00:00
commit 2b9154b05a
6 changed files with 108 additions and 35 deletions

View File

@ -6,7 +6,7 @@
"jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1", "jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1",
"jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6", "jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6",
"jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0", "jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0",
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0",
@ -107,6 +107,7 @@
"integrity": "5b9c17325cc02e37c71e14ac0103b40446b0402fe183e5f5362af23e9ea162bf", "integrity": "5b9c17325cc02e37c71e14ac0103b40446b0402fe183e5f5362af23e9ea162bf",
"dependencies": [ "dependencies": [
"jsr:@std/encoding@^0.224.1", "jsr:@std/encoding@^0.224.1",
"npm:@scure/base@^1.1.6",
"npm:@scure/bip32@^1.4.0", "npm:@scure/bip32@^1.4.0",
"npm:@scure/bip39@^1.3.0", "npm:@scure/bip39@^1.3.0",
"npm:kysely@^0.27.3", "npm:kysely@^0.27.3",

View File

@ -4,6 +4,7 @@ import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, t
import { cors, logger, serveStatic } from 'hono/middleware'; import { cors, logger, serveStatic } from 'hono/middleware';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { cron } from '@/cron.ts';
import { startFirehose } from '@/firehose.ts'; import { startFirehose } from '@/firehose.ts';
import { Time } from '@/utils.ts'; import { Time } from '@/utils.ts';
@ -114,6 +115,9 @@ const debug = Debug('ditto:http');
if (Conf.firehoseEnabled) { if (Conf.firehoseEnabled) {
startFirehose(); startFirehose();
} }
if (Conf.cronEnabled) {
cron();
}
app.use('/api/*', logger(debug)); app.use('/api/*', logger(debug));
app.use('/relay/*', logger(debug)); app.use('/relay/*', logger(debug));

View File

@ -221,6 +221,10 @@ class Conf {
static get firehoseEnabled(): boolean { static get firehoseEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
} }
/** Whether to enable Ditto cron jobs. */
static get cronEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;
}
/** Path to the custom policy module. Must be an absolute path, https:, npm:, or jsr: URI. */ /** Path to the custom policy module. Must be an absolute path, https:, npm:, or jsr: URI. */
static get policy(): string { static get policy(): string {
return Deno.env.get('DITTO_POLICY') || new URL('../data/policy.ts', import.meta.url).pathname; return Deno.env.get('DITTO_POLICY') || new URL('../data/policy.ts', import.meta.url).pathname;

View File

@ -1,10 +1,8 @@
import { NostrEvent } from '@nostrify/nostrify'; import { NostrEvent } from '@nostrify/nostrify';
import { sql } from 'kysely';
import { z } from 'zod'; import { z } from 'zod';
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { Time } from '@/utils.ts'; import { Time } from '@/utils.ts';
@ -70,13 +68,6 @@ async function getTrendingHashtags() {
}))); })));
} }
let trendingNotesCache = getTrendingNotes();
Deno.cron('update trending notes cache', { minute: { every: 15 } }, async () => {
const events = await getTrendingNotes();
trendingNotesCache = Promise.resolve(events);
});
const trendingStatusesQuerySchema = z.object({ const trendingStatusesQuerySchema = z.object({
limit: z.coerce.number().catch(20).transform((value) => Math.min(Math.max(value, 0), 40)), limit: z.coerce.number().catch(20).transform((value) => Math.min(Math.max(value, 0), 40)),
}); });
@ -85,10 +76,31 @@ const trendingStatusesController: AppController = async (c) => {
const store = await Storages.db(); const store = await Storages.db();
const { limit } = trendingStatusesQuerySchema.parse(c.req.query()); const { limit } = trendingStatusesQuerySchema.parse(c.req.query());
const events = await trendingNotesCache const [label] = await store.query([{
.then((events) => events.slice(0, limit)) kinds: [1985],
'#L': ['pub.ditto.trends'],
'#l': ['notes'],
authors: [Conf.pubkey],
limit: 1,
}]);
const ids = (label?.tags ?? [])
.filter(([name]) => name === 'e')
.map(([, id]) => id)
.slice(0, limit);
if (!ids.length) {
return c.json([]);
}
const results = await store.query([{ ids }])
.then((events) => hydrateEvents({ events, store })); .then((events) => hydrateEvents({ events, store }));
// Sort events in the order they appear in the label.
const events = ids
.map((id) => results.find((event) => event.id === id))
.filter((event): event is NostrEvent => !!event);
const statuses = await Promise.all( const statuses = await Promise.all(
events.map((event) => renderStatus(event, {})), events.map((event) => renderStatus(event, {})),
); );
@ -96,27 +108,4 @@ const trendingStatusesController: AppController = async (c) => {
return c.json(statuses.filter(Boolean)); return c.json(statuses.filter(Boolean));
}; };
async function getTrendingNotes(): Promise<NostrEvent[]> {
const kysely = await DittoDB.getInstance();
const since = Math.floor((Date.now() - Time.days(1)) / 1000);
const rows = await kysely
.selectFrom('nostr_events')
.selectAll('nostr_events')
.innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id')
.where('nostr_events.kind', '=', 1)
.where('nostr_events.created_at', '>', since)
.orderBy(
sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`,
'desc',
)
.limit(20)
.execute();
return rows.map((row) => ({
...row,
tags: JSON.parse(row.tags),
}));
}
export { trendingStatusesController, trendingTagsController }; export { trendingStatusesController, trendingTagsController };

38
src/cron.ts Normal file
View File

@ -0,0 +1,38 @@
import { Stickynotes } from '@soapbox/stickynotes';
import { DittoDB } from '@/db/DittoDB.ts';
import { getTrendingNotes } from '@/trends/trending-notes.ts';
import { Time } from '@/utils/time.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts';
import { handleEvent } from '@/pipeline.ts';
const console = new Stickynotes('ditto:trends');
async function updateTrendingNotesCache() {
console.info('Updating trending notes cache...');
const kysely = await DittoDB.getInstance();
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
const signal = AbortSignal.timeout(1000);
const events = await getTrendingNotes(kysely, yesterday, 20);
const signer = new AdminSigner();
const label = await signer.signEvent({
kind: 1985,
content: '',
tags: [
['L', 'pub.ditto.trends'],
['l', 'notes', 'pub.ditto.trends'],
...events.map(({ id }) => ['e', id]),
],
created_at: Math.floor(Date.now() / 1000),
});
await handleEvent(label, signal);
console.info('Trending notes cache updated.');
}
/** Start cron jobs for the application. */
export function cron() {
Deno.cron('update trending notes cache', { minute: { every: 15 } }, updateTrendingNotesCache);
}

View File

@ -0,0 +1,37 @@
import { NostrEvent } from '@nostrify/nostrify';
import { Kysely, sql } from 'kysely';
import { DittoTables } from '@/db/DittoTables.ts';
/**
* Make a direct query to the database to get trending kind 1 notes within the specified timeframe.
*
* This query makes use of cached stats (in the `event_stats` table).
* The query is SLOW so it needs to be run on a schedule and cached.
*/
export async function getTrendingNotes(
/** Kysely instance to execute queries on. */
kysely: Kysely<DittoTables>,
/** Unix timestamp in _seconds_ for the starting point of this query. */
since: number,
/** Maximum number of trending notes to return. */
limit: number,
): Promise<NostrEvent[]> {
const rows = await kysely
.selectFrom('nostr_events')
.selectAll('nostr_events')
.innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id')
.where('nostr_events.kind', '=', 1)
.where('nostr_events.created_at', '>', since)
.orderBy(
sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`,
'desc',
)
.limit(limit)
.execute();
return rows.map((row) => ({
...row,
tags: JSON.parse(row.tags),
}));
}