From 08c9ee0670f3d583af56e24271153006ef01b523 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 14 May 2024 16:25:24 -0500 Subject: [PATCH] Refactor client and firehose --- src/app.ts | 7 ++++++- src/config.ts | 4 ++++ src/firehose.ts | 39 +++++++++++++++++++-------------------- src/pool.ts | 34 ---------------------------------- src/stats.ts | 9 ++++++--- src/storages.ts | 39 ++++++++++++++++++++++++++++++++++----- 6 files changed, 69 insertions(+), 63 deletions(-) delete mode 100644 src/pool.ts diff --git a/src/app.ts b/src/app.ts index 84ad780..059e883 100644 --- a/src/app.ts +++ b/src/app.ts @@ -3,7 +3,8 @@ import Debug from '@soapbox/stickynotes/debug'; import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, type MiddlewareHandler } from 'hono'; import { cors, logger, serveStatic } from 'hono/middleware'; -import '@/firehose.ts'; +import { Conf } from '@/config.ts'; +import { startFirehose } from '@/firehose.ts'; import { Time } from '@/utils.ts'; import { actorController } from '@/controllers/activitypub/actor.ts'; @@ -108,6 +109,10 @@ const app = new Hono(); const debug = Debug('ditto:http'); +if (Conf.firehoseEnabled) { + startFirehose(); +} + app.use('/api/*', logger(debug)); app.use('/relay/*', logger(debug)); app.use('/.well-known/*', logger(debug)); diff --git a/src/config.ts b/src/config.ts index 4266033..3d3e51b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -215,6 +215,10 @@ class Conf { return Number(Deno.env.get('PG_POOL_SIZE') ?? 10); }, }; + /** Whether to enable requesting events from known relays. */ + static get firehoseEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; + } } const optionalBooleanSchema = z diff --git a/src/firehose.ts b/src/firehose.ts index d7aaab3..2c776fe 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,29 +1,28 @@ -import { NostrEvent } from '@nostrify/nostrify'; -import Debug from '@soapbox/stickynotes/debug'; +import { Stickynotes } from '@soapbox/stickynotes'; -import { activeRelays, pool } from '@/pool.ts'; +import { Storages } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; -const debug = Debug('ditto:firehose'); +const console = new Stickynotes('ditto:firehose'); -// This file watches events on all known relays and performs -// side-effects based on them, such as trending hashtag tracking -// and storing events for notifications and the home feed. -pool.subscribe( - [{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }], - activeRelays, - handleEvent, - undefined, - undefined, -); +/** + * This function watches events on all known relays and performs + * side-effects based on them, such as trending hashtag tracking + * and storing events for notifications and the home feed. + */ +export async function startFirehose() { + const store = await Storages.client(); -/** Handle events through the firehose pipeline. */ -function handleEvent(event: NostrEvent): Promise { - debug(`NostrEvent<${event.kind}> ${event.id}`); + for await (const msg of store.req([{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }])) { + if (msg[0] === 'EVENT') { + const event = msg[2]; + console.debug(`NostrEvent<${event.kind}> ${event.id}`); - return pipeline - .handleEvent(event, AbortSignal.timeout(5000)) - .catch(() => {}); + pipeline + .handleEvent(event, AbortSignal.timeout(5000)) + .catch(() => {}); + } + } } diff --git a/src/pool.ts b/src/pool.ts deleted file mode 100644 index 3ac1a1d..0000000 --- a/src/pool.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { RelayPoolWorker } from 'nostr-relaypool'; - -import { Storages } from '@/storages.ts'; -import { Conf } from '@/config.ts'; - -const [relayList] = await Storages.db.query([ - { kinds: [10002], authors: [Conf.pubkey], limit: 1 }, -]); - -const tags = relayList?.tags ?? []; - -const activeRelays = tags.reduce((acc, [name, url, marker]) => { - if (name === 'r' && !marker) { - acc.push(url); - } - return acc; -}, []); - -console.log(`pool: connecting to ${activeRelays.length} relays.`); - -const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { - type: 'module', -}); - -// @ts-ignore Wrong types. -const pool = new RelayPoolWorker(worker, activeRelays, { - autoReconnect: true, - // The pipeline verifies events. - skipVerification: true, - // The logging feature overwhelms the CPU and creates too many logs. - logErrorsAndNotices: false, -}); - -export { activeRelays, pool }; diff --git a/src/stats.ts b/src/stats.ts index 21a4d97..bff2aeb 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -47,6 +47,7 @@ async function updateStats(event: NostrEvent) { /** Calculate stats changes ahead of time so we can build an efficient query. */ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise { + const store = await Storages.db(); const statDiffs: StatDiff[] = []; const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1]; @@ -65,7 +66,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr case 5: { if (!firstTaggedId) break; - const [repostedEvent] = await Storages.db.query( + const [repostedEvent] = await store.query( [{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }], { limit: 1 }, ); @@ -77,7 +78,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1]; if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break; - const [eventBeingReposted] = await Storages.db.query( + const [eventBeingReposted] = await store.query( [{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }], { limit: 1 }, ); @@ -155,7 +156,9 @@ function eventStatsQuery(diffs: EventStatDiff[]) { /** Get the last version of the event, if any. */ async function getPrevEvent(event: NostrEvent): Promise { if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) { - const [prev] = await Storages.db.query([ + const store = await Storages.db(); + + const [prev] = await store.query([ { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, ]); diff --git a/src/storages.ts b/src/storages.ts index 056228b..cbda925 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,7 +2,6 @@ import { NCache } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; -import { activeRelays, pool } from '@/pool.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; @@ -49,12 +48,42 @@ export class Storages { /** Relay pool storage. */ public static async client(): Promise { if (!this._client) { - this._client = Promise.resolve( - new PoolStore({ + this._client = (async () => { + const db = await this.db(); + + const [relayList] = await db.query([ + { kinds: [10002], authors: [Conf.pubkey], limit: 1 }, + ]); + + const tags = relayList?.tags ?? []; + + const activeRelays = tags.reduce((acc, [name, url, marker]) => { + if (name === 'r' && !marker) { + acc.push(url); + } + return acc; + }, []); + + console.log(`pool: connecting to ${activeRelays.length} relays.`); + + const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { + type: 'module', + }); + + // @ts-ignore Wrong types. + const pool = new RelayPoolWorker(worker, activeRelays, { + autoReconnect: true, + // The pipeline verifies events. + skipVerification: true, + // The logging feature overwhelms the CPU and creates too many logs. + logErrorsAndNotices: false, + }); + + return new PoolStore({ pool, relays: activeRelays, - }), - ); + }); + })(); } return this._client; }