Refactor client and firehose

This commit is contained in:
Alex Gleason 2024-05-14 16:25:24 -05:00
parent 3c706dc81b
commit 08c9ee0670
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
6 changed files with 69 additions and 63 deletions

View File

@ -3,7 +3,8 @@ import Debug from '@soapbox/stickynotes/debug';
import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, type MiddlewareHandler } from 'hono';
import { cors, logger, serveStatic } from 'hono/middleware';
import '@/firehose.ts';
import { Conf } from '@/config.ts';
import { startFirehose } from '@/firehose.ts';
import { Time } from '@/utils.ts';
import { actorController } from '@/controllers/activitypub/actor.ts';
@ -108,6 +109,10 @@ const app = new Hono<AppEnv>();
const debug = Debug('ditto:http');
if (Conf.firehoseEnabled) {
startFirehose();
}
app.use('/api/*', logger(debug));
app.use('/relay/*', logger(debug));
app.use('/.well-known/*', logger(debug));

View File

@ -215,6 +215,10 @@ class Conf {
return Number(Deno.env.get('PG_POOL_SIZE') ?? 10);
},
};
/** Whether to enable requesting events from known relays. */
static get firehoseEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
}
}
const optionalBooleanSchema = z

View File

@ -1,29 +1,28 @@
import { NostrEvent } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug';
import { Stickynotes } from '@soapbox/stickynotes';
import { activeRelays, pool } from '@/pool.ts';
import { Storages } from '@/storages.ts';
import { nostrNow } from '@/utils.ts';
import * as pipeline from './pipeline.ts';
const debug = Debug('ditto:firehose');
const console = new Stickynotes('ditto:firehose');
// This file watches events on all known relays and performs
// side-effects based on them, such as trending hashtag tracking
// and storing events for notifications and the home feed.
pool.subscribe(
[{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }],
activeRelays,
handleEvent,
undefined,
undefined,
);
/**
* This function watches events on all known relays and performs
* side-effects based on them, such as trending hashtag tracking
* and storing events for notifications and the home feed.
*/
export async function startFirehose() {
const store = await Storages.client();
/** Handle events through the firehose pipeline. */
function handleEvent(event: NostrEvent): Promise<void> {
debug(`NostrEvent<${event.kind}> ${event.id}`);
for await (const msg of store.req([{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }])) {
if (msg[0] === 'EVENT') {
const event = msg[2];
console.debug(`NostrEvent<${event.kind}> ${event.id}`);
return pipeline
pipeline
.handleEvent(event, AbortSignal.timeout(5000))
.catch(() => {});
}
}
}

View File

@ -1,34 +0,0 @@
import { RelayPoolWorker } from 'nostr-relaypool';
import { Storages } from '@/storages.ts';
import { Conf } from '@/config.ts';
const [relayList] = await Storages.db.query([
{ kinds: [10002], authors: [Conf.pubkey], limit: 1 },
]);
const tags = relayList?.tags ?? [];
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
if (name === 'r' && !marker) {
acc.push(url);
}
return acc;
}, []);
console.log(`pool: connecting to ${activeRelays.length} relays.`);
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
type: 'module',
});
// @ts-ignore Wrong types.
const pool = new RelayPoolWorker(worker, activeRelays, {
autoReconnect: true,
// The pipeline verifies events.
skipVerification: true,
// The logging feature overwhelms the CPU and creates too many logs.
logErrorsAndNotices: false,
});
export { activeRelays, pool };

View File

@ -47,6 +47,7 @@ async function updateStats(event: NostrEvent) {
/** Calculate stats changes ahead of time so we can build an efficient query. */
async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise<StatDiff[]> {
const store = await Storages.db();
const statDiffs: StatDiff[] = [];
const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1];
@ -65,7 +66,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
case 5: {
if (!firstTaggedId) break;
const [repostedEvent] = await Storages.db.query(
const [repostedEvent] = await store.query(
[{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }],
{ limit: 1 },
);
@ -77,7 +78,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1];
if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break;
const [eventBeingReposted] = await Storages.db.query(
const [eventBeingReposted] = await store.query(
[{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }],
{ limit: 1 },
);
@ -155,7 +156,9 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
/** Get the last version of the event, if any. */
async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined> {
if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) {
const [prev] = await Storages.db.query([
const store = await Storages.db();
const [prev] = await store.query([
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 },
]);

View File

@ -2,7 +2,6 @@
import { NCache } from '@nostrify/nostrify';
import { Conf } from '@/config.ts';
import { db } from '@/db.ts';
import { activeRelays, pool } from '@/pool.ts';
import { EventsDB } from '@/storages/events-db.ts';
import { Optimizer } from '@/storages/optimizer.ts';
import { PoolStore } from '@/storages/pool-store.ts';
@ -49,12 +48,42 @@ export class Storages {
/** Relay pool storage. */
public static async client(): Promise<PoolStore> {
if (!this._client) {
this._client = Promise.resolve(
new PoolStore({
this._client = (async () => {
const db = await this.db();
const [relayList] = await db.query([
{ kinds: [10002], authors: [Conf.pubkey], limit: 1 },
]);
const tags = relayList?.tags ?? [];
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
if (name === 'r' && !marker) {
acc.push(url);
}
return acc;
}, []);
console.log(`pool: connecting to ${activeRelays.length} relays.`);
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
type: 'module',
});
// @ts-ignore Wrong types.
const pool = new RelayPoolWorker(worker, activeRelays, {
autoReconnect: true,
// The pipeline verifies events.
skipVerification: true,
// The logging feature overwhelms the CPU and creates too many logs.
logErrorsAndNotices: false,
});
return new PoolStore({
pool,
relays: activeRelays,
}),
);
});
})();
}
return this._client;
}