From baace5ea2d63c5e4fe75cabd73c05bdbe05cc7a2 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 13:35:20 -0500 Subject: [PATCH] Refactor streaming to use async iterators --- src/controllers/nostr/relay.ts | 10 +++--- src/pipeline.ts | 4 +-- src/subs.ts | 63 ++++++++++++++++++---------------- src/subscription.ts | 46 +++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 src/subscription.ts diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index e7b954e..92607f8 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -63,11 +63,9 @@ function connectStream(socket: WebSocket) { send(['EOSE', subId]); - Sub.sub({ - id: subId, - filters: prepared, - socket, - }); + for await (const event of Sub.sub(socket, subId, prepared)) { + send(['EVENT', subId, event]); + } } /** Handle EVENT. Store the event. */ @@ -87,7 +85,7 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub({ id: subId, socket }); + Sub.unsub(socket, subId); } /** Send a message back to the client. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index a6eb15f..e1a9c3a 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -93,8 +93,8 @@ const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - T function streamOut(event: Event, data: EventData) { if (!isFresh(event)) return; - for (const { socket, id } of Sub.matches(event, data)) { - socket.send(JSON.stringify(['EVENT', id, event])); + for (const sub of Sub.matches(event, data)) { + sub.stream(event); } } diff --git a/src/subs.ts b/src/subs.ts index 82041c6..f550678 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,52 +1,56 @@ import { type Event } from '@/deps.ts'; -import { matchDittoFilters } from './filter.ts'; +import { Subscription } from '@/subscription.ts'; import type { DittoFilter, EventData } from '@/types.ts'; -/** Nostr subscription to receive realtime events. */ -interface Subscription { - /** User-defined NIP-01 subscription ID. */ - id: string; - /** Event filters for the subscription. */ - filters: DittoFilter[]; - /** WebSocket to deliver results to. */ - socket: WebSocket; -} - /** * Manages Ditto event subscriptions. - * * Subscriptions can be added, removed, and matched against events. - * - * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); - * } - * ``` */ class SubscriptionStore { #store = new Map>(); - /** Add a subscription to the store. */ - sub(data: Subscription): void { - let subs = this.#store.get(data.socket); + /** + * Add a subscription to the store, and then iterate over it. + * + * ```ts + * for (const event of Sub.sub(socket, subId, filters)) { + * console.log(event); + * } + * ``` + */ + sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription { + let subs = this.#store.get(socket); if (!subs) { subs = new Map(); - this.#store.set(data.socket, subs); + this.#store.set(socket, subs); } - subs.set(data.id, data); + const sub = new Subscription(filters); + + this.unsub(socket, id); + subs.set(id, sub); + + return sub; } /** Remove a subscription from the store. */ - unsub(sub: Pick): void { - this.#store.get(sub.socket)?.delete(sub.id); + unsub(socket: WebSocket, id: string): void { + this.#store.get(socket)?.get(id)?.close(); + this.#store.get(socket)?.delete(id); } /** Remove an entire socket. */ close(socket: WebSocket): void { + const subs = this.#store.get(socket); + + if (subs) { + for (const sub of subs.values()) { + sub.close(); + } + } + this.#store.delete(socket); } @@ -54,16 +58,15 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); + * for (const sub of Sub.matches(event, data)) { + * sub.stream(event); * } * ``` */ *matches(event: Event, data: EventData): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchDittoFilters(sub.filters, event, data)) { + if (sub.matches(event, data)) { yield sub; } } diff --git a/src/subscription.ts b/src/subscription.ts new file mode 100644 index 0000000..24a78f0 --- /dev/null +++ b/src/subscription.ts @@ -0,0 +1,46 @@ +import { type Event } from '@/deps.ts'; +import { matchDittoFilters } from '@/filter.ts'; + +import type { DittoFilter, EventData } from '@/types.ts'; + +class Subscription implements AsyncIterable> { + filters: DittoFilter[]; + #next?: (event: Event) => void; + #closed = false; + + constructor(filters: DittoFilter[]) { + this.filters = filters; + } + + stream(event: Event): void { + if (this.#next) { + this.#next(event); + this.#next = undefined; + } + } + + matches(event: Event, data: EventData): boolean { + return matchDittoFilters(this.filters, event, data); + } + + close() { + this.#closed = true; + this.#next?.(undefined!); + } + + async *[Symbol.asyncIterator]() { + while (true) { + const event = await new Promise>((resolve) => { + this.#next = resolve; + }); + + if (this.#closed) { + return; + } + + yield event; + } + } +} + +export { Subscription };