From 7a18a19b2f024e7d48bb222a9866b0cc6d9a3a8d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 13:18:35 -0500 Subject: [PATCH 1/6] Remove subs.ts & subscription.ts, refactor around it --- src/controllers/api/streaming.ts | 35 +++++++------ src/controllers/nostr/relay.ts | 25 +++++++--- src/pipeline.ts | 15 +++--- src/storages/InternalRelay.ts | 12 +++-- src/subs.ts | 84 -------------------------------- src/subscription.ts | 49 ------------------- 6 files changed, 54 insertions(+), 166 deletions(-) delete mode 100644 src/subs.ts delete mode 100644 src/subscription.ts diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f2e2638..8fa1e24 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { Debug } from '@/deps.ts'; import { getFeedPubkeys } from '@/queries.ts'; -import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { eventsDB } from '@/storages.ts'; +import { Storages } from '@/storages.ts'; const debug = Debug('ditto:streaming'); @@ -38,6 +38,7 @@ const streamingController: AppController = (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); + const controller = new AbortController(); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -63,33 +64,37 @@ const streamingController: AppController = (c) => { socket.onopen = async () => { if (!stream) return; + const filter = await topicToFilter(stream, c.req.query(), pubkey); + if (!filter) return; - if (filter) { - for await (const event of Sub.sub(socket, '1', [filter])) { - if (event.kind === 6) { - await hydrateEvents({ - events: [event], - storage: eventsDB, - signal: AbortSignal.timeout(1000), - }); + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], + storage: eventsDB, + signal: AbortSignal.timeout(1000), + }); - const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') }); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); if (status) { send('update', status); } - continue; } - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } } } }; socket.onclose = () => { - Sub.close(socket); + controller.abort(); }; return response; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index c0e905b..6705a89 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -11,8 +11,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import type { AppController } from '@/app.ts'; @@ -29,6 +28,8 @@ type RelayMsg = /** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { + const controllers = new Map(); + socket.onmessage = (e) => { const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); if (result.success) { @@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) { }; socket.onclose = () => { - Sub.close(socket); + for (const controller of controllers.values()) { + controller.abort(); + } }; /** Handle client message. */ @@ -64,14 +67,20 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); + const controller = new AbortController(); + controllers.get(subId)?.abort(); + controllers.set(subId, controller); + for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - for await (const event of Sub.sub(socket, subId, filters)) { - send(['EVENT', subId, purifyEvent(event)]); + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } } } @@ -93,7 +102,11 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub(socket, subId); + const controller = controllers.get(subId); + if (controller) { + controller.abort(); + controllers.delete(subId); + } } /** Handle COUNT. Return the number of events matching the filters. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index cbae505..c3feeab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { updateStats } from '@/stats.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { cache, eventsDB, reqmeister } from '@/storages.ts'; -import { Sub } from '@/subs.ts'; +import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; @@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10); +function isFresh(event: NostrEvent): boolean { + return eventAge(event) < Time.seconds(10); +} /** Distribute the event through active subscriptions. */ -function streamOut(event: NostrEvent) { - if (!isFresh(event)) return; - - for (const sub of Sub.matches(event)) { - sub.stream(event); +async function streamOut(event: NostrEvent): Promise { + if (isFresh(event)) { + await Storages.pubsub.event(event); } } diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index ac28045..70aba29 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -12,6 +12,7 @@ import { import { matchFilter } from '@/deps.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; /** * PubSub event store for streaming events within the application. @@ -20,9 +21,12 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts'; export class InternalRelay implements NRelay { private subs = new Map }>(); - async *req(filters: NostrFilter[]): AsyncGenerator { + async *req( + filters: NostrFilter[], + opts: { signal?: AbortSignal }, + ): AsyncGenerator { const id = crypto.randomUUID(); - const machina = new Machina(); + const machina = new Machina(opts?.signal); yield ['EOSE', id]; @@ -49,10 +53,10 @@ export class InternalRelay implements NRelay { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain === event.author_domain) { - return machina.push(event); + return machina.push(purifyEvent(event)); } } else { - return machina.push(event); + return machina.push(purifyEvent(event)); } } } diff --git a/src/subs.ts b/src/subs.ts deleted file mode 100644 index d3610c2..0000000 --- a/src/subs.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { NostrFilter } from '@nostrify/nostrify'; -import { Debug } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { Subscription } from '@/subscription.ts'; - -const debug = Debug('ditto:subs'); - -/** - * Manages Ditto event subscriptions. - * Subscriptions can be added, removed, and matched against events. - */ -class SubscriptionStore { - #store = new Map>(); - - /** - * Add a subscription to the store, and then iterate over it. - * - * ```ts - * for (const event of Sub.sub(socket, subId, filters)) { - * console.log(event); - * } - * ``` - */ - sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription { - debug('sub', id, JSON.stringify(filters)); - let subs = this.#store.get(socket); - - if (!subs) { - subs = new Map(); - this.#store.set(socket, subs); - } - - const sub = new Subscription(filters); - - this.unsub(socket, id); - subs.set(id, sub as unknown as Subscription); - - return sub; - } - - /** Remove a subscription from the store. */ - unsub(socket: unknown, id: string): void { - debug('unsub', id); - this.#store.get(socket)?.get(id)?.close(); - this.#store.get(socket)?.delete(id); - } - - /** Remove an entire socket. */ - close(socket: unknown): void { - debug('close', (socket as any)?.constructor?.name); - const subs = this.#store.get(socket); - - if (subs) { - for (const sub of subs.values()) { - sub.close(); - } - } - - this.#store.delete(socket); - } - - /** - * Loop through matching subscriptions to stream out. - * - * ```ts - * for (const sub of Sub.matches(event, data)) { - * sub.stream(event); - * } - * ``` - */ - *matches(event: DittoEvent): Iterable { - for (const subs of this.#store.values()) { - for (const sub of subs.values()) { - if (sub.matches(event)) { - yield sub; - } - } - } - } -} - -const Sub = new SubscriptionStore(); - -export { Sub }; diff --git a/src/subscription.ts b/src/subscription.ts deleted file mode 100644 index 840a0b4..0000000 --- a/src/subscription.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify'; -import { Machina, matchFilter } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; - -class Subscription implements AsyncIterable { - filters: NostrFilter[]; - #machina: Machina; - - constructor(filters: NostrFilter[]) { - this.filters = filters; - this.#machina = new Machina(); - } - - stream(event: NostrEvent): void { - this.#machina.push(event); - } - - matches(event: DittoEvent): boolean { - for (const filter of this.filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain) { - return domain === event.author_domain; - } - } - - return true; - } - } - - return false; - } - - close() { - this.#machina.close(); - } - - [Symbol.asyncIterator]() { - return this.#machina.stream(); - } -} - -export { Subscription }; From 7aa931a69ed181fe5aec6ed68185d850b62300c1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 16:36:20 -0500 Subject: [PATCH 2/6] try-catch the InternalRelay req's --- deno.json | 2 +- src/controllers/api/streaming.ts | 36 ++++++++++++++++++-------------- src/controllers/nostr/relay.ts | 10 ++++++--- src/utils/api.ts | 6 ------ 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/deno.json b/deno.json index d3e5acd..6d28237 100644 --- a/deno.json +++ b/deno.json @@ -16,7 +16,7 @@ "exclude": ["./public"], "imports": { "@/": "./src/", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.12.1", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.2", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0", diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8fa1e24..668218d 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -68,28 +68,32 @@ const streamingController: AppController = (c) => { const filter = await topicToFilter(stream, c.req.query(), pubkey); if (!filter) return; - for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - const [event] = await hydrateEvents({ - events: [msg[2]], - storage: eventsDB, - signal: AbortSignal.timeout(1000), - }); + try { + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], + storage: eventsDB, + signal: AbortSignal.timeout(1000), + }); - if (event.kind === 1) { - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } - } - if (event.kind === 6) { - const status = await renderReblog(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } } } + } catch (e) { + debug('streaming error:', e); } }; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 6705a89..1fa53c1 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -77,10 +77,14 @@ function connectStream(socket: WebSocket) { send(['EOSE', subId]); - for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - send(['EVENT', subId, msg[2]]); + try { + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } } + } catch (_e) { + controllers.delete(subId); } } diff --git a/src/utils/api.ts b/src/utils/api.ts index 79b383d..cd4e6e2 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -18,12 +18,6 @@ type EventStub = TypeFest.SetOptional { - const pubkey = c.get('pubkey'); - - if (!pubkey) { - throw new HTTPException(401); - } - const signer = new APISigner(c); const event = await signer.signEvent({ From 05534d532bd48f159f4e0e6cd1d4099e99a5dfda Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 18:28:19 -0500 Subject: [PATCH 3/6] APISigner: refactor with InternalRelay --- src/signers/APISigner.ts | 36 +++++++++++++++++------------------ src/storages.ts | 13 +++++++++++++ src/storages/InternalRelay.ts | 2 +- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/signers/APISigner.ts b/src/signers/APISigner.ts index 5428043..4322142 100644 --- a/src/signers/APISigner.ts +++ b/src/signers/APISigner.ts @@ -6,7 +6,7 @@ import { Stickynotes } from '@/deps.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts'; import { jsonSchema } from '@/schema.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import { eventMatchesTemplate } from '@/utils.ts'; import { createAdminEvent } from '@/utils/api.ts'; @@ -78,27 +78,25 @@ export class APISigner implements NostrSigner { messageId: string, template: Omit, ): Promise { - const sub = Sub.sub(messageId, '1', [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }]); + const sub = Storages.pubsub.req( + [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }], + { signal: this.#c.req.raw.signal }, + ); - const close = (): void => { - Sub.close(messageId); - this.#c.req.raw.signal.removeEventListener('abort', close); - }; + for await (const msg of sub) { + if (msg[0] === 'EVENT') { + const event = msg[2]; + const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); - this.#c.req.raw.signal.addEventListener('abort', close); + const result = jsonSchema + .pipe(connectResponseSchema) + .refine((msg) => msg.id === messageId, 'Message ID mismatch') + .refine((msg) => eventMatchesTemplate(msg.result, template), 'Event template mismatch') + .safeParse(decrypted); - for await (const event of sub) { - const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); - - const result = jsonSchema - .pipe(connectResponseSchema) - .refine((msg) => msg.id === messageId, 'Message ID mismatch') - .refine((msg) => eventMatchesTemplate(msg.result, template), 'Event template mismatch') - .safeParse(decrypted); - - if (result.success) { - close(); - return result.data.result; + if (result.success) { + return result.data.result; + } } } diff --git a/src/storages.ts b/src/storages.ts index 107aa17..32d8dd2 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -8,6 +8,7 @@ import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; +import { InternalRelay } from '@/storages/InternalRelay.ts'; import { Time } from '@/utils/time.ts'; /** Relay pool storage. */ @@ -43,4 +44,16 @@ const searchStore = new SearchStore({ fallback: optimizer, }); +export class Storages { + private static _subsub: InternalRelay | undefined; + + static get pubsub(): InternalRelay { + if (!this._subsub) { + this._subsub = new InternalRelay(); + } + + return this._subsub; + } +} + export { cache, client, eventsDB, optimizer, reqmeister, searchStore }; diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index 70aba29..e2f4fad 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -23,7 +23,7 @@ export class InternalRelay implements NRelay { async *req( filters: NostrFilter[], - opts: { signal?: AbortSignal }, + opts?: { signal?: AbortSignal }, ): AsyncGenerator { const id = crypto.randomUUID(); const machina = new Machina(opts?.signal); From 333dfca270edd9d2277ca9327a22924f7c262916 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 18:31:16 -0500 Subject: [PATCH 4/6] Storages: typofix subsub -> pubsub --- src/storages.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/storages.ts b/src/storages.ts index 32d8dd2..6c6a4a5 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -45,14 +45,14 @@ const searchStore = new SearchStore({ }); export class Storages { - private static _subsub: InternalRelay | undefined; + private static _pubsub: InternalRelay | undefined; static get pubsub(): InternalRelay { - if (!this._subsub) { - this._subsub = new InternalRelay(); + if (!this._pubsub) { + this._pubsub = new InternalRelay(); } - return this._subsub; + return this._pubsub; } } From 7ee258fe87f3d6eb4deb4a2aeb475d93469acce3 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 18:46:34 -0500 Subject: [PATCH 5/6] relay: fix local filtering logic --- src/controllers/nostr/relay.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 1fa53c1..30d7316 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -14,6 +14,7 @@ import { import { Storages } from '@/storages.ts'; import type { AppController } from '@/app.ts'; +import { Conf } from '@/config.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; @@ -129,11 +130,12 @@ function connectStream(socket: WebSocket) { /** Enforce the filters with certain criteria. */ function prepareFilters(filters: ClientREQ[2][]): NostrFilter[] { - return filters.map((filter) => ({ - ...filter, + return filters.map((filter) => { + const narrow = Boolean(filter.ids?.length || filter.authors?.length); + const search = narrow ? filter.search : `domain:${Conf.url.host} ${filter.search ?? ''}`; // Return only local events unless the query is already narrow. - local: (filter.ids?.length || filter.authors?.length) ? undefined : true, - })); + return { ...filter, search }; + }); } const relayController: AppController = (c, next) => { From 4e4b7711c993b4eac93db8df05a5ec1942cd5532 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 18:48:02 -0500 Subject: [PATCH 6/6] relay: NSchema --- src/controllers/nostr/relay.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 30d7316..3db72c3 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,8 +1,7 @@ -import { NostrEvent, NostrFilter } from '@nostrify/nostrify'; +import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; import { eventsDB } from '@/storages.ts'; import * as pipeline from '@/pipeline.ts'; -import { jsonSchema } from '@/schema.ts'; import { type ClientCLOSE, type ClientCOUNT, @@ -32,7 +31,7 @@ function connectStream(socket: WebSocket) { const controllers = new Map(); socket.onmessage = (e) => { - const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); + const result = n.json().pipe(clientMsgSchema).safeParse(e.data); if (result.success) { handleMsg(result.data); } else {