From 45abaf14a426f8fab1ecc916e8e04c7247a79680 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 18:07:25 -0500 Subject: [PATCH] pipeline: refactor, use pipeline from relay --- src/controllers/nostr/relay.ts | 8 ++++---- src/pipeline.ts | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 24316e7..1537047 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,5 @@ import * as eventsDB from '@/db/events.ts'; -import { findUser } from '@/db/users.ts'; +import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; import { type ClientCLOSE, @@ -53,10 +53,10 @@ function connectStream(socket: WebSocket) { } async function handleEvent([_, event]: ClientEVENT) { - if (await findUser({ pubkey: event.pubkey })) { - eventsDB.insertEvent(event); + try { + await pipeline.handleEvent(event); send(['OK', event.id, true, '']); - } else { + } catch (_e) { send(['OK', event.id, false, 'blocked: only registered users can post']); } } diff --git a/src/pipeline.ts b/src/pipeline.ts index b22128b..95f0a82 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,4 +1,4 @@ -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; +import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event } from '@/deps.ts'; @@ -12,16 +12,23 @@ import { isRelay, nostrDate } from '@/utils.ts'; async function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - trackHashtags(event); - trackRelays(event); + await Promise.all([ + trackHashtags(event), + storeEvent(event), + trackRelays(event), + ]); +} - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); +/** Maybe store the event, if eligible. */ +async function storeEvent(event: Event): Promise { + if (await findUser({ pubkey: event.pubkey }) || await eventsDB.isLocallyFollowed(event.pubkey)) { + await eventsDB.insertEvent(event).catch(console.warn); } } /** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { +// deno-lint-ignore require-await +async function trackHashtags(event: Event): Promise { const date = nostrDate(event.created_at); const tags = event.tags