From 87c67c0a397d18c9089e733cf9577776744fffe0 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 16:47:22 -0500 Subject: [PATCH] Add pipeline module --- src/firehose.ts | 54 +++++---------------------------------------- src/pipeline.ts | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 48 deletions(-) create mode 100644 src/pipeline.ts diff --git a/src/firehose.ts b/src/firehose.ts index c717c07..8510a70 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,9 +1,8 @@ -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; -import { addRelays, getActiveRelays } from '@/db/relays.ts'; -import { findUser } from '@/db/users.ts'; +import { getActiveRelays } from '@/db/relays.ts'; import { type Event, RelayPool } from '@/deps.ts'; -import { trends } from '@/trends.ts'; -import { isRelay, nostrDate, nostrNow } from '@/utils.ts'; +import { nostrNow } from '@/utils.ts'; + +import * as pipeline from './pipeline.ts'; const relays = await getActiveRelays(); const pool = new RelayPool(relays); @@ -20,48 +19,7 @@ pool.subscribe( ); /** Handle events through the firehose pipeline. */ -async function handleEvent(event: Event): Promise { +function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - - trackHashtags(event); - trackRelays(event); - - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); - } -} - -/** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { - const date = nostrDate(event.created_at); - - const tags = event.tags - .filter((tag) => tag[0] === 't') - .map((tag) => tag[1]) - .slice(0, 5); - - if (!tags.length) return; - - try { - console.info('tracking tags:', tags); - trends.addTagUsages(event.pubkey, tags, date); - } catch (_e) { - // do nothing - } -} - -/** Tracks known relays in the database. */ -function trackRelays(event: Event) { - const relays = new Set<`wss://${string}`>(); - - event.tags.forEach((tag) => { - if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { - relays.add(tag[2]); - } - if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { - relays.add(tag[1]); - } - }); - - return addRelays([...relays]); + return pipeline.handleEvent(event); } diff --git a/src/pipeline.ts b/src/pipeline.ts new file mode 100644 index 0000000..b22128b --- /dev/null +++ b/src/pipeline.ts @@ -0,0 +1,58 @@ +import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; +import { addRelays } from '@/db/relays.ts'; +import { findUser } from '@/db/users.ts'; +import { type Event } from '@/deps.ts'; +import { trends } from '@/trends.ts'; +import { isRelay, nostrDate } from '@/utils.ts'; + +/** + * Common pipeline function to process (and maybe store) events. + * It is idempotent, so it can be called multiple times for the same event. + */ +async function handleEvent(event: Event): Promise { + console.info(`firehose: Event<${event.kind}> ${event.id}`); + + trackHashtags(event); + trackRelays(event); + + if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { + insertEvent(event).catch(console.warn); + } +} + +/** Track whenever a hashtag is used, for processing trending tags. */ +function trackHashtags(event: Event): void { + const date = nostrDate(event.created_at); + + const tags = event.tags + .filter((tag) => tag[0] === 't') + .map((tag) => tag[1]) + .slice(0, 5); + + if (!tags.length) return; + + try { + console.info('tracking tags:', tags); + trends.addTagUsages(event.pubkey, tags, date); + } catch (_e) { + // do nothing + } +} + +/** Tracks known relays in the database. */ +function trackRelays(event: Event) { + const relays = new Set<`wss://${string}`>(); + + event.tags.forEach((tag) => { + if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { + relays.add(tag[2]); + } + if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { + relays.add(tag[1]); + } + }); + + return addRelays([...relays]); +} + +export { handleEvent };