From a676b71d23cfdcc36740d0c735909a7984e501c9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 23 Aug 2023 23:25:38 -0500 Subject: [PATCH] relay: make Nostr streaming work --- src/controllers/nostr/relay.ts | 27 ++++++++---- src/pipeline.ts | 9 ++++ src/subs.ts | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 src/subs.ts diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 8befa56..b304413 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -8,6 +8,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; +import { Sub } from '@/subs.ts'; import type { AppController } from '@/app.ts'; import type { Event, Filter } from '@/deps.ts'; @@ -49,15 +50,24 @@ function connectStream(socket: WebSocket) { } /** Handle REQ. Start a subscription. */ - async function handleReq([_, sub, ...filters]: ClientREQ) { - for (const event of await eventsDB.getFilters(prepareFilters(filters))) { - send(['EVENT', sub, event]); + async function handleReq([_, subId, ...filters]: ClientREQ): Promise { + const prepared = prepareFilters(filters); + + for (const event of await eventsDB.getFilters(prepared)) { + send(['EVENT', subId, event]); } - send(['EOSE', sub]); + + send(['EOSE', subId]); + + Sub.sub({ + id: subId, + filters: prepared, + socket, + }); } /** Handle EVENT. Store the event. */ - async function handleEvent([_, event]: ClientEVENT) { + async function handleEvent([_, event]: ClientEVENT): Promise { try { // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(event); @@ -72,13 +82,12 @@ function connectStream(socket: WebSocket) { } /** Handle CLOSE. Close the subscription. */ - function handleClose([_, _sub]: ClientCLOSE) { - // TODO: ??? - return; + function handleClose([_, subId]: ClientCLOSE): void { + Sub.unsub({ id: subId, socket }); } /** Send a message back to the client. */ - function send(msg: RelayMsg) { + function send(msg: RelayMsg): void { return socket.send(JSON.stringify(msg)); } } diff --git a/src/pipeline.ts b/src/pipeline.ts index af72b79..7632464 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -3,6 +3,7 @@ import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event } from '@/deps.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; import { isRelay, nostrDate } from '@/utils.ts'; @@ -15,6 +16,7 @@ async function handleEvent(event: Event): Promise { storeEvent(event), trackRelays(event), trackHashtags(event), + streamOut(event), ]); } @@ -63,6 +65,13 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Distribute the event through active subscriptions. */ +function streamOut(event: Event) { + for (const sub of Sub.matches(event)) { + sub.socket.send(JSON.stringify(['EVENT', event])); + } +} + /** NIP-20 command line result. */ class RelayError extends Error { constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { diff --git a/src/subs.ts b/src/subs.ts new file mode 100644 index 0000000..2a63214 --- /dev/null +++ b/src/subs.ts @@ -0,0 +1,75 @@ +import { type Event, matchFilters } from '@/deps.ts'; + +import type { DittoFilter } from '@/types.ts'; + +/** Nostr subscription to receive realtime events. */ +interface Subscription { + /** User-defined NIP-01 subscription ID. */ + id: string; + /** Event filters for the subscription. */ + filters: DittoFilter[]; + /** WebSocket to deliver results to. */ + socket: WebSocket; +} + +/** + * Manages Ditto event subscriptions. + * + * Subscriptions can be added, removed, and matched against events. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ +class SubscriptionStore { + #store = new Map>(); + + /** Add a subscription to the store. */ + sub(data: Subscription): void { + let subs = this.#store.get(data.socket); + + if (!subs) { + subs = new Map(); + this.#store.set(data.socket, subs); + } + + subs.set(data.id, data); + } + + /** Remove a subscription from the store. */ + unsub(sub: Pick): void { + this.#store.get(sub.socket)?.delete(sub.id); + } + + /** Remove an entire socket. */ + close(socket: WebSocket): void { + this.#store.delete(socket); + } + + /** + * Loop through matching subscriptions to stream out. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ + *matches(event: Event): Iterable { + for (const subs of this.#store.values()) { + for (const sub of subs.values()) { + if (matchFilters(sub.filters, event)) { + yield sub; + } + } + } + } +} + +const Sub = new SubscriptionStore(); + +export { Sub };