Refactor streaming to use async iterators

This commit is contained in:
Alex Gleason 2023-08-25 13:35:20 -05:00
parent 00c531bbff
commit baace5ea2d
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
4 changed files with 85 additions and 38 deletions

View File

@ -63,11 +63,9 @@ function connectStream(socket: WebSocket) {
send(['EOSE', subId]); send(['EOSE', subId]);
Sub.sub({ for await (const event of Sub.sub(socket, subId, prepared)) {
id: subId, send(['EVENT', subId, event]);
filters: prepared, }
socket,
});
} }
/** Handle EVENT. Store the event. */ /** Handle EVENT. Store the event. */
@ -87,7 +85,7 @@ function connectStream(socket: WebSocket) {
/** Handle CLOSE. Close the subscription. */ /** Handle CLOSE. Close the subscription. */
function handleClose([_, subId]: ClientCLOSE): void { function handleClose([_, subId]: ClientCLOSE): void {
Sub.unsub({ id: subId, socket }); Sub.unsub(socket, subId);
} }
/** Send a message back to the client. */ /** Send a message back to the client. */

View File

@ -93,8 +93,8 @@ const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - T
function streamOut(event: Event, data: EventData) { function streamOut(event: Event, data: EventData) {
if (!isFresh(event)) return; if (!isFresh(event)) return;
for (const { socket, id } of Sub.matches(event, data)) { for (const sub of Sub.matches(event, data)) {
socket.send(JSON.stringify(['EVENT', id, event])); sub.stream(event);
} }
} }

View File

@ -1,52 +1,56 @@
import { type Event } from '@/deps.ts'; import { type Event } from '@/deps.ts';
import { matchDittoFilters } from './filter.ts'; import { Subscription } from '@/subscription.ts';
import type { DittoFilter, EventData } from '@/types.ts'; import type { DittoFilter, EventData } from '@/types.ts';
/** Nostr subscription to receive realtime events. */
interface Subscription {
/** User-defined NIP-01 subscription ID. */
id: string;
/** Event filters for the subscription. */
filters: DittoFilter[];
/** WebSocket to deliver results to. */
socket: WebSocket;
}
/** /**
* Manages Ditto event subscriptions. * Manages Ditto event subscriptions.
*
* Subscriptions can be added, removed, and matched against events. * Subscriptions can be added, removed, and matched against events.
*
* ```ts
* for (const sub of Sub.matches(event)) {
* // Send event to sub.socket
* sub.socket.send(JSON.stringify(event));
* }
* ```
*/ */
class SubscriptionStore { class SubscriptionStore {
#store = new Map<WebSocket, Map<string, Subscription>>(); #store = new Map<WebSocket, Map<string, Subscription>>();
/** Add a subscription to the store. */ /**
sub(data: Subscription): void { * Add a subscription to the store, and then iterate over it.
let subs = this.#store.get(data.socket); *
* ```ts
* for (const event of Sub.sub(socket, subId, filters)) {
* console.log(event);
* }
* ```
*/
sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription {
let subs = this.#store.get(socket);
if (!subs) { if (!subs) {
subs = new Map(); subs = new Map();
this.#store.set(data.socket, subs); this.#store.set(socket, subs);
} }
subs.set(data.id, data); const sub = new Subscription(filters);
this.unsub(socket, id);
subs.set(id, sub);
return sub;
} }
/** Remove a subscription from the store. */ /** Remove a subscription from the store. */
unsub(sub: Pick<Subscription, 'socket' | 'id'>): void { unsub(socket: WebSocket, id: string): void {
this.#store.get(sub.socket)?.delete(sub.id); this.#store.get(socket)?.get(id)?.close();
this.#store.get(socket)?.delete(id);
} }
/** Remove an entire socket. */ /** Remove an entire socket. */
close(socket: WebSocket): void { close(socket: WebSocket): void {
const subs = this.#store.get(socket);
if (subs) {
for (const sub of subs.values()) {
sub.close();
}
}
this.#store.delete(socket); this.#store.delete(socket);
} }
@ -54,16 +58,15 @@ class SubscriptionStore {
* Loop through matching subscriptions to stream out. * Loop through matching subscriptions to stream out.
* *
* ```ts * ```ts
* for (const sub of Sub.matches(event)) { * for (const sub of Sub.matches(event, data)) {
* // Send event to sub.socket * sub.stream(event);
* sub.socket.send(JSON.stringify(event));
* } * }
* ``` * ```
*/ */
*matches(event: Event, data: EventData): Iterable<Subscription> { *matches(event: Event, data: EventData): Iterable<Subscription> {
for (const subs of this.#store.values()) { for (const subs of this.#store.values()) {
for (const sub of subs.values()) { for (const sub of subs.values()) {
if (matchDittoFilters(sub.filters, event, data)) { if (sub.matches(event, data)) {
yield sub; yield sub;
} }
} }

46
src/subscription.ts Normal file
View File

@ -0,0 +1,46 @@
import { type Event } from '@/deps.ts';
import { matchDittoFilters } from '@/filter.ts';
import type { DittoFilter, EventData } from '@/types.ts';
class Subscription<K extends number = number> implements AsyncIterable<Event<K>> {
filters: DittoFilter<K>[];
#next?: (event: Event<K>) => void;
#closed = false;
constructor(filters: DittoFilter<K>[]) {
this.filters = filters;
}
stream(event: Event<K>): void {
if (this.#next) {
this.#next(event);
this.#next = undefined;
}
}
matches(event: Event, data: EventData): boolean {
return matchDittoFilters(this.filters, event, data);
}
close() {
this.#closed = true;
this.#next?.(undefined!);
}
async *[Symbol.asyncIterator]() {
while (true) {
const event = await new Promise<Event<K>>((resolve) => {
this.#next = resolve;
});
if (this.#closed) {
return;
}
yield event;
}
}
}
export { Subscription };