From fc47116dd38bef6eb8dea8f22f38f78db71a3600 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 12:32:39 -0500 Subject: [PATCH 1/4] APISigner: reimplement with NConnectSigner and InternalRelay --- deno.json | 2 +- src/signers/APISigner.ts | 130 +++++++++++++-------------------------- src/storages.ts | 13 ++++ 3 files changed, 57 insertions(+), 88 deletions(-) diff --git a/deno.json b/deno.json index d3e5acd..0457bc5 100644 --- a/deno.json +++ b/deno.json @@ -16,7 +16,7 @@ "exclude": ["./public"], "imports": { "@/": "./src/", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.12.1", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.0", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0", diff --git a/src/signers/APISigner.ts b/src/signers/APISigner.ts index 5428043..e9914b1 100644 --- a/src/signers/APISigner.ts +++ b/src/signers/APISigner.ts @@ -1,14 +1,10 @@ -import { NostrEvent, NostrSigner, NSecSigner } from '@nostrify/nostrify'; +// deno-lint-ignore-file require-await + +import { NConnectSigner, NostrEvent, NostrSigner, NSecSigner } from '@nostrify/nostrify'; import { HTTPException } from 'hono'; import { type AppContext } from '@/app.ts'; -import { Conf } from '@/config.ts'; -import { Stickynotes } from '@/deps.ts'; -import { connectResponseSchema } from '@/schemas/nostr.ts'; -import { jsonSchema } from '@/schema.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; -import { Sub } from '@/subs.ts'; -import { eventMatchesTemplate } from '@/utils.ts'; -import { createAdminEvent } from '@/utils/api.ts'; +import { Storages } from '@/storages.ts'; /** * Sign Nostr event using the app context. @@ -17,93 +13,53 @@ import { createAdminEvent } from '@/utils/api.ts'; * - Otherwise, it will use NIP-46 to sign the event. */ export class APISigner implements NostrSigner { - #c: AppContext; - #console = new Stickynotes('ditto:sign'); + private signer: NostrSigner; constructor(c: AppContext) { - this.#c = c; - } - - // deno-lint-ignore require-await - async getPublicKey(): Promise { - const pubkey = this.#c.get('pubkey'); - if (pubkey) { - return pubkey; - } else { - throw new HTTPException(401, { message: 'Missing pubkey' }); - } - } - - async signEvent(event: Omit): Promise { - const seckey = this.#c.get('seckey'); - - if (seckey) { - this.#console.debug(`Signing Event<${event.kind}> with secret key`); - return new NSecSigner(seckey).signEvent(event); - } - - this.#console.debug(`Signing Event<${event.kind}> with NIP-46`); - return await this.#signNostrConnect(event); - } - - /** Sign event with NIP-46, waiting in the background for the signed event. */ - async #signNostrConnect(event: Omit): Promise { - const pubkey = this.#c.get('pubkey'); + const seckey = c.get('seckey'); + const pubkey = c.get('pubkey'); if (!pubkey) { throw new HTTPException(401, { message: 'Missing pubkey' }); } - const messageId = crypto.randomUUID(); - - createAdminEvent({ - kind: 24133, - content: await new AdminSigner().nip04.encrypt( + if (seckey) { + this.signer = new NSecSigner(seckey); + } else { + this.signer = new NConnectSigner({ pubkey, - JSON.stringify({ - id: messageId, - method: 'sign_event', - params: [event], - }), - ), - tags: [['p', pubkey]], - }, this.#c); - - return this.#awaitSignedEvent(pubkey, messageId, event); - } - - /** Wait for signed event to be sent through Nostr relay. */ - async #awaitSignedEvent( - pubkey: string, - messageId: string, - template: Omit, - ): Promise { - const sub = Sub.sub(messageId, '1', [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }]); - - const close = (): void => { - Sub.close(messageId); - this.#c.req.raw.signal.removeEventListener('abort', close); - }; - - this.#c.req.raw.signal.addEventListener('abort', close); - - for await (const event of sub) { - const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); - - const result = jsonSchema - .pipe(connectResponseSchema) - .refine((msg) => msg.id === messageId, 'Message ID mismatch') - .refine((msg) => eventMatchesTemplate(msg.result, template), 'Event template mismatch') - .safeParse(decrypted); - - if (result.success) { - close(); - return result.data.result; - } + relay: Storages.pubsub, + signer: new AdminSigner(), + timeout: 60000, + }); } - - throw new HTTPException(408, { - res: this.#c.json({ id: 'ditto.timeout', error: 'Signing timeout' }), - }); } + + async getPublicKey(): Promise { + return this.signer.getPublicKey(); + } + + async signEvent(event: Omit): Promise { + return this.signer.signEvent(event); + } + + readonly nip04 = { + encrypt: async (pubkey: string, plaintext: string): Promise => { + return this.signer.nip04!.encrypt(pubkey, plaintext); + }, + + decrypt: async (pubkey: string, ciphertext: string): Promise => { + return this.signer.nip04!.decrypt(pubkey, ciphertext); + }, + }; + + readonly nip44 = { + encrypt: async (pubkey: string, plaintext: string): Promise => { + return this.signer.nip44!.encrypt(pubkey, plaintext); + }, + + decrypt: async (pubkey: string, ciphertext: string): Promise => { + return this.signer.nip44!.decrypt(pubkey, ciphertext); + }, + }; } diff --git a/src/storages.ts b/src/storages.ts index 107aa17..32d8dd2 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -8,6 +8,7 @@ import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; +import { InternalRelay } from '@/storages/InternalRelay.ts'; import { Time } from '@/utils/time.ts'; /** Relay pool storage. */ @@ -43,4 +44,16 @@ const searchStore = new SearchStore({ fallback: optimizer, }); +export class Storages { + private static _subsub: InternalRelay | undefined; + + static get pubsub(): InternalRelay { + if (!this._subsub) { + this._subsub = new InternalRelay(); + } + + return this._subsub; + } +} + export { cache, client, eventsDB, optimizer, reqmeister, searchStore }; From f33ad040841581951bb32decf456f84643023a35 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 13:18:35 -0500 Subject: [PATCH 2/4] Remove subs.ts & subscription.ts, refactor around it --- src/controllers/api/streaming.ts | 35 +++++++------ src/controllers/nostr/relay.ts | 25 +++++++--- src/pipeline.ts | 15 +++--- src/storages/InternalRelay.ts | 12 +++-- src/subs.ts | 84 -------------------------------- src/subscription.ts | 49 ------------------- 6 files changed, 54 insertions(+), 166 deletions(-) delete mode 100644 src/subs.ts delete mode 100644 src/subscription.ts diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f2e2638..8fa1e24 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { Debug } from '@/deps.ts'; import { getFeedPubkeys } from '@/queries.ts'; -import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { eventsDB } from '@/storages.ts'; +import { Storages } from '@/storages.ts'; const debug = Debug('ditto:streaming'); @@ -38,6 +38,7 @@ const streamingController: AppController = (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); + const controller = new AbortController(); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -63,33 +64,37 @@ const streamingController: AppController = (c) => { socket.onopen = async () => { if (!stream) return; + const filter = await topicToFilter(stream, c.req.query(), pubkey); + if (!filter) return; - if (filter) { - for await (const event of Sub.sub(socket, '1', [filter])) { - if (event.kind === 6) { - await hydrateEvents({ - events: [event], - storage: eventsDB, - signal: AbortSignal.timeout(1000), - }); + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], + storage: eventsDB, + signal: AbortSignal.timeout(1000), + }); - const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') }); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); if (status) { send('update', status); } - continue; } - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } } } }; socket.onclose = () => { - Sub.close(socket); + controller.abort(); }; return response; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index c0e905b..6705a89 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -11,8 +11,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import type { AppController } from '@/app.ts'; @@ -29,6 +28,8 @@ type RelayMsg = /** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { + const controllers = new Map(); + socket.onmessage = (e) => { const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); if (result.success) { @@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) { }; socket.onclose = () => { - Sub.close(socket); + for (const controller of controllers.values()) { + controller.abort(); + } }; /** Handle client message. */ @@ -64,14 +67,20 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); + const controller = new AbortController(); + controllers.get(subId)?.abort(); + controllers.set(subId, controller); + for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - for await (const event of Sub.sub(socket, subId, filters)) { - send(['EVENT', subId, purifyEvent(event)]); + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } } } @@ -93,7 +102,11 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub(socket, subId); + const controller = controllers.get(subId); + if (controller) { + controller.abort(); + controllers.delete(subId); + } } /** Handle COUNT. Return the number of events matching the filters. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index cbae505..c3feeab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { updateStats } from '@/stats.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { cache, eventsDB, reqmeister } from '@/storages.ts'; -import { Sub } from '@/subs.ts'; +import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; @@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10); +function isFresh(event: NostrEvent): boolean { + return eventAge(event) < Time.seconds(10); +} /** Distribute the event through active subscriptions. */ -function streamOut(event: NostrEvent) { - if (!isFresh(event)) return; - - for (const sub of Sub.matches(event)) { - sub.stream(event); +async function streamOut(event: NostrEvent): Promise { + if (isFresh(event)) { + await Storages.pubsub.event(event); } } diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index ac28045..70aba29 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -12,6 +12,7 @@ import { import { matchFilter } from '@/deps.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; /** * PubSub event store for streaming events within the application. @@ -20,9 +21,12 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts'; export class InternalRelay implements NRelay { private subs = new Map }>(); - async *req(filters: NostrFilter[]): AsyncGenerator { + async *req( + filters: NostrFilter[], + opts: { signal?: AbortSignal }, + ): AsyncGenerator { const id = crypto.randomUUID(); - const machina = new Machina(); + const machina = new Machina(opts?.signal); yield ['EOSE', id]; @@ -49,10 +53,10 @@ export class InternalRelay implements NRelay { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain === event.author_domain) { - return machina.push(event); + return machina.push(purifyEvent(event)); } } else { - return machina.push(event); + return machina.push(purifyEvent(event)); } } } diff --git a/src/subs.ts b/src/subs.ts deleted file mode 100644 index d3610c2..0000000 --- a/src/subs.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { NostrFilter } from '@nostrify/nostrify'; -import { Debug } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { Subscription } from '@/subscription.ts'; - -const debug = Debug('ditto:subs'); - -/** - * Manages Ditto event subscriptions. - * Subscriptions can be added, removed, and matched against events. - */ -class SubscriptionStore { - #store = new Map>(); - - /** - * Add a subscription to the store, and then iterate over it. - * - * ```ts - * for (const event of Sub.sub(socket, subId, filters)) { - * console.log(event); - * } - * ``` - */ - sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription { - debug('sub', id, JSON.stringify(filters)); - let subs = this.#store.get(socket); - - if (!subs) { - subs = new Map(); - this.#store.set(socket, subs); - } - - const sub = new Subscription(filters); - - this.unsub(socket, id); - subs.set(id, sub as unknown as Subscription); - - return sub; - } - - /** Remove a subscription from the store. */ - unsub(socket: unknown, id: string): void { - debug('unsub', id); - this.#store.get(socket)?.get(id)?.close(); - this.#store.get(socket)?.delete(id); - } - - /** Remove an entire socket. */ - close(socket: unknown): void { - debug('close', (socket as any)?.constructor?.name); - const subs = this.#store.get(socket); - - if (subs) { - for (const sub of subs.values()) { - sub.close(); - } - } - - this.#store.delete(socket); - } - - /** - * Loop through matching subscriptions to stream out. - * - * ```ts - * for (const sub of Sub.matches(event, data)) { - * sub.stream(event); - * } - * ``` - */ - *matches(event: DittoEvent): Iterable { - for (const subs of this.#store.values()) { - for (const sub of subs.values()) { - if (sub.matches(event)) { - yield sub; - } - } - } - } -} - -const Sub = new SubscriptionStore(); - -export { Sub }; diff --git a/src/subscription.ts b/src/subscription.ts deleted file mode 100644 index 840a0b4..0000000 --- a/src/subscription.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify'; -import { Machina, matchFilter } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; - -class Subscription implements AsyncIterable { - filters: NostrFilter[]; - #machina: Machina; - - constructor(filters: NostrFilter[]) { - this.filters = filters; - this.#machina = new Machina(); - } - - stream(event: NostrEvent): void { - this.#machina.push(event); - } - - matches(event: DittoEvent): boolean { - for (const filter of this.filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain) { - return domain === event.author_domain; - } - } - - return true; - } - } - - return false; - } - - close() { - this.#machina.close(); - } - - [Symbol.asyncIterator]() { - return this.#machina.stream(); - } -} - -export { Subscription }; From 60ca3652bb44ff0410e0d8b08218eca1df46e6a4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 16:36:20 -0500 Subject: [PATCH 3/4] try-catch the InternalRelay req's --- deno.json | 2 +- src/controllers/api/streaming.ts | 36 ++++++++++++++++++-------------- src/controllers/nostr/relay.ts | 10 ++++++--- src/utils/api.ts | 6 ------ 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/deno.json b/deno.json index 0457bc5..6d28237 100644 --- a/deno.json +++ b/deno.json @@ -16,7 +16,7 @@ "exclude": ["./public"], "imports": { "@/": "./src/", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.0", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.2", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0", diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8fa1e24..668218d 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -68,28 +68,32 @@ const streamingController: AppController = (c) => { const filter = await topicToFilter(stream, c.req.query(), pubkey); if (!filter) return; - for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - const [event] = await hydrateEvents({ - events: [msg[2]], - storage: eventsDB, - signal: AbortSignal.timeout(1000), - }); + try { + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], + storage: eventsDB, + signal: AbortSignal.timeout(1000), + }); - if (event.kind === 1) { - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } - } - if (event.kind === 6) { - const status = await renderReblog(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } } } + } catch (e) { + debug('streaming error:', e); } }; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 6705a89..1fa53c1 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -77,10 +77,14 @@ function connectStream(socket: WebSocket) { send(['EOSE', subId]); - for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - send(['EVENT', subId, msg[2]]); + try { + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } } + } catch (_e) { + controllers.delete(subId); } } diff --git a/src/utils/api.ts b/src/utils/api.ts index 79b383d..cd4e6e2 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -18,12 +18,6 @@ type EventStub = TypeFest.SetOptional { - const pubkey = c.get('pubkey'); - - if (!pubkey) { - throw new HTTPException(401); - } - const signer = new APISigner(c); const event = await signer.signEvent({ From 9496917be144df3dc697db693686f1aec992ba8f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 21:45:26 -0500 Subject: [PATCH 4/4] Upgrade Nostrify --- deno.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deno.json b/deno.json index 6d28237..f345fc8 100644 --- a/deno.json +++ b/deno.json @@ -16,7 +16,7 @@ "exclude": ["./public"], "imports": { "@/": "./src/", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.2", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.3", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0",