diff --git a/deno.lock b/deno.lock index 1471f47..56277db 100644 --- a/deno.lock +++ b/deno.lock @@ -6,7 +6,7 @@ "jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1", "jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6", "jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0", - "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.4", + "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0", @@ -107,6 +107,7 @@ "integrity": "5b9c17325cc02e37c71e14ac0103b40446b0402fe183e5f5362af23e9ea162bf", "dependencies": [ "jsr:@std/encoding@^0.224.1", + "npm:@scure/base@^1.1.6", "npm:@scure/bip32@^1.4.0", "npm:@scure/bip39@^1.3.0", "npm:kysely@^0.27.3", diff --git a/src/app.ts b/src/app.ts index 9174cb4..565e915 100644 --- a/src/app.ts +++ b/src/app.ts @@ -4,6 +4,7 @@ import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, t import { cors, logger, serveStatic } from 'hono/middleware'; import { Conf } from '@/config.ts'; +import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; import { Time } from '@/utils.ts'; @@ -114,6 +115,9 @@ const debug = Debug('ditto:http'); if (Conf.firehoseEnabled) { startFirehose(); } +if (Conf.cronEnabled) { + cron(); +} app.use('/api/*', logger(debug)); app.use('/relay/*', logger(debug)); diff --git a/src/config.ts b/src/config.ts index d8e322d..e1c0f10 100644 --- a/src/config.ts +++ b/src/config.ts @@ -221,6 +221,10 @@ class Conf { static get firehoseEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; } + /** Whether to enable Ditto cron jobs. */ + static get cronEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; + } /** Path to the custom policy module. Must be an absolute path, https:, npm:, or jsr: URI. */ static get policy(): string { return Deno.env.get('DITTO_POLICY') || new URL('../data/policy.ts', import.meta.url).pathname; diff --git a/src/controllers/api/trends.ts b/src/controllers/api/trends.ts index 93ce1b5..4fd92d1 100644 --- a/src/controllers/api/trends.ts +++ b/src/controllers/api/trends.ts @@ -1,10 +1,7 @@ -import { NostrEvent } from '@nostrify/nostrify'; -import { sql } from 'kysely'; import { z } from 'zod'; import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; -import { DittoDB } from '@/db/DittoDB.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; import { Time } from '@/utils.ts'; @@ -70,13 +67,6 @@ async function getTrendingHashtags() { }))); } -let trendingNotesCache = getTrendingNotes(); - -Deno.cron('update trending notes cache', { minute: { every: 15 } }, async () => { - const events = await getTrendingNotes(); - trendingNotesCache = Promise.resolve(events); -}); - const trendingStatusesQuerySchema = z.object({ limit: z.coerce.number().catch(20).transform((value) => Math.min(Math.max(value, 0), 40)), }); @@ -85,8 +75,24 @@ const trendingStatusesController: AppController = async (c) => { const store = await Storages.db(); const { limit } = trendingStatusesQuerySchema.parse(c.req.query()); - const events = await trendingNotesCache - .then((events) => events.slice(0, limit)) + const [label] = await store.query([{ + kinds: [1985], + '#L': ['pub.ditto.trends'], + '#l': ['notes'], + authors: [Conf.pubkey], + limit: 1, + }]); + + const ids = (label?.tags ?? []) + .filter(([name]) => name === 'e') + .map(([, id]) => id) + .slice(0, limit); + + if (!ids.length) { + return c.json([]); + } + + const events = await store.query([{ ids }]) .then((events) => hydrateEvents({ events, store })); const statuses = await Promise.all( @@ -96,27 +102,4 @@ const trendingStatusesController: AppController = async (c) => { return c.json(statuses.filter(Boolean)); }; -async function getTrendingNotes(): Promise { - const kysely = await DittoDB.getInstance(); - const since = Math.floor((Date.now() - Time.days(1)) / 1000); - - const rows = await kysely - .selectFrom('nostr_events') - .selectAll('nostr_events') - .innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id') - .where('nostr_events.kind', '=', 1) - .where('nostr_events.created_at', '>', since) - .orderBy( - sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`, - 'desc', - ) - .limit(20) - .execute(); - - return rows.map((row) => ({ - ...row, - tags: JSON.parse(row.tags), - })); -} - export { trendingStatusesController, trendingTagsController }; diff --git a/src/cron.ts b/src/cron.ts new file mode 100644 index 0000000..707f738 --- /dev/null +++ b/src/cron.ts @@ -0,0 +1,38 @@ +import { Stickynotes } from '@soapbox/stickynotes'; + +import { DittoDB } from '@/db/DittoDB.ts'; +import { getTrendingNotes } from '@/trends/trending-notes.ts'; +import { Time } from '@/utils/time.ts'; +import { AdminSigner } from '@/signers/AdminSigner.ts'; +import { handleEvent } from '@/pipeline.ts'; + +const console = new Stickynotes('ditto:trends'); + +async function updateTrendingNotesCache() { + console.info('Updating trending notes cache...'); + const kysely = await DittoDB.getInstance(); + const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000); + const signal = AbortSignal.timeout(1000); + + const events = await getTrendingNotes(kysely, yesterday, 20); + const signer = new AdminSigner(); + + const label = await signer.signEvent({ + kind: 1985, + content: '', + tags: [ + ['L', 'pub.ditto.trends'], + ['l', 'notes', 'pub.ditto.trends'], + ...events.map(({ id }) => ['e', id]), + ], + created_at: Math.floor(Date.now() / 1000), + }); + + await handleEvent(label, signal); + console.info('Trending notes cache updated.'); +} + +/** Start cron jobs for the application. */ +export function cron() { + Deno.cron('update trending notes cache', { minute: { every: 15 } }, updateTrendingNotesCache); +} diff --git a/src/trends/trending-notes.ts b/src/trends/trending-notes.ts new file mode 100644 index 0000000..a469aa9 --- /dev/null +++ b/src/trends/trending-notes.ts @@ -0,0 +1,37 @@ +import { NostrEvent } from '@nostrify/nostrify'; +import { Kysely, sql } from 'kysely'; + +import { DittoTables } from '@/db/DittoTables.ts'; + +/** + * Make a direct query to the database to get trending kind 1 notes within the specified timeframe. + * + * This query makes use of cached stats (in the `event_stats` table). + * The query is SLOW so it needs to be run on a schedule and cached. + */ +export async function getTrendingNotes( + /** Kysely instance to execute queries on. */ + kysely: Kysely, + /** Unix timestamp in _seconds_ for the starting point of this query. */ + since: number, + /** Maximum number of trending notes to return. */ + limit: number, +): Promise { + const rows = await kysely + .selectFrom('nostr_events') + .selectAll('nostr_events') + .innerJoin('event_stats', 'event_stats.event_id', 'nostr_events.id') + .where('nostr_events.kind', '=', 1) + .where('nostr_events.created_at', '>', since) + .orderBy( + sql`(event_stats.reposts_count * 2) + (event_stats.replies_count) + (event_stats.reactions_count)`, + 'desc', + ) + .limit(limit) + .execute(); + + return rows.map((row) => ({ + ...row, + tags: JSON.parse(row.tags), + })); +}