diff --git a/deno.json b/deno.json index d3e5acd..6d28237 100644 --- a/deno.json +++ b/deno.json @@ -16,7 +16,7 @@ "exclude": ["./public"], "imports": { "@/": "./src/", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.12.1", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.2", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0", diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f2e2638..668218d 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { Debug } from '@/deps.ts'; import { getFeedPubkeys } from '@/queries.ts'; -import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { eventsDB } from '@/storages.ts'; +import { Storages } from '@/storages.ts'; const debug = Debug('ditto:streaming'); @@ -38,6 +38,7 @@ const streamingController: AppController = (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); + const controller = new AbortController(); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -63,33 +64,41 @@ const streamingController: AppController = (c) => { socket.onopen = async () => { if (!stream) return; - const filter = await topicToFilter(stream, c.req.query(), pubkey); - if (filter) { - for await (const event of Sub.sub(socket, '1', [filter])) { - if (event.kind === 6) { - await hydrateEvents({ - events: [event], + const filter = await topicToFilter(stream, c.req.query(), pubkey); + if (!filter) return; + + try { + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], storage: eventsDB, signal: AbortSignal.timeout(1000), }); - const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') }); - if (status) { - send('update', status); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } + } + + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } - continue; - } - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); } } + } catch (e) { + debug('streaming error:', e); } }; socket.onclose = () => { - Sub.close(socket); + controller.abort(); }; return response; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index c0e905b..3db72c3 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,8 +1,7 @@ -import { NostrEvent, NostrFilter } from '@nostrify/nostrify'; +import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; import { eventsDB } from '@/storages.ts'; import * as pipeline from '@/pipeline.ts'; -import { jsonSchema } from '@/schema.ts'; import { type ClientCLOSE, type ClientCOUNT, @@ -11,10 +10,10 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import type { AppController } from '@/app.ts'; +import { Conf } from '@/config.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; @@ -29,8 +28,10 @@ type RelayMsg = /** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { + const controllers = new Map(); + socket.onmessage = (e) => { - const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); + const result = n.json().pipe(clientMsgSchema).safeParse(e.data); if (result.success) { handleMsg(result.data); } else { @@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) { }; socket.onclose = () => { - Sub.close(socket); + for (const controller of controllers.values()) { + controller.abort(); + } }; /** Handle client message. */ @@ -64,14 +67,24 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); + const controller = new AbortController(); + controllers.get(subId)?.abort(); + controllers.set(subId, controller); + for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - for await (const event of Sub.sub(socket, subId, filters)) { - send(['EVENT', subId, purifyEvent(event)]); + try { + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } + } + } catch (_e) { + controllers.delete(subId); } } @@ -93,7 +106,11 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub(socket, subId); + const controller = controllers.get(subId); + if (controller) { + controller.abort(); + controllers.delete(subId); + } } /** Handle COUNT. Return the number of events matching the filters. */ @@ -112,11 +129,12 @@ function connectStream(socket: WebSocket) { /** Enforce the filters with certain criteria. */ function prepareFilters(filters: ClientREQ[2][]): NostrFilter[] { - return filters.map((filter) => ({ - ...filter, + return filters.map((filter) => { + const narrow = Boolean(filter.ids?.length || filter.authors?.length); + const search = narrow ? filter.search : `domain:${Conf.url.host} ${filter.search ?? ''}`; // Return only local events unless the query is already narrow. - local: (filter.ids?.length || filter.authors?.length) ? undefined : true, - })); + return { ...filter, search }; + }); } const relayController: AppController = (c, next) => { diff --git a/src/pipeline.ts b/src/pipeline.ts index cbae505..c3feeab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { updateStats } from '@/stats.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { cache, eventsDB, reqmeister } from '@/storages.ts'; -import { Sub } from '@/subs.ts'; +import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; @@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10); +function isFresh(event: NostrEvent): boolean { + return eventAge(event) < Time.seconds(10); +} /** Distribute the event through active subscriptions. */ -function streamOut(event: NostrEvent) { - if (!isFresh(event)) return; - - for (const sub of Sub.matches(event)) { - sub.stream(event); +async function streamOut(event: NostrEvent): Promise { + if (isFresh(event)) { + await Storages.pubsub.event(event); } } diff --git a/src/signers/APISigner.ts b/src/signers/APISigner.ts index 5428043..4322142 100644 --- a/src/signers/APISigner.ts +++ b/src/signers/APISigner.ts @@ -6,7 +6,7 @@ import { Stickynotes } from '@/deps.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts'; import { jsonSchema } from '@/schema.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import { eventMatchesTemplate } from '@/utils.ts'; import { createAdminEvent } from '@/utils/api.ts'; @@ -78,27 +78,25 @@ export class APISigner implements NostrSigner { messageId: string, template: Omit, ): Promise { - const sub = Sub.sub(messageId, '1', [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }]); + const sub = Storages.pubsub.req( + [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }], + { signal: this.#c.req.raw.signal }, + ); - const close = (): void => { - Sub.close(messageId); - this.#c.req.raw.signal.removeEventListener('abort', close); - }; + for await (const msg of sub) { + if (msg[0] === 'EVENT') { + const event = msg[2]; + const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); - this.#c.req.raw.signal.addEventListener('abort', close); + const result = jsonSchema + .pipe(connectResponseSchema) + .refine((msg) => msg.id === messageId, 'Message ID mismatch') + .refine((msg) => eventMatchesTemplate(msg.result, template), 'Event template mismatch') + .safeParse(decrypted); - for await (const event of sub) { - const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); - - const result = jsonSchema - .pipe(connectResponseSchema) - .refine((msg) => msg.id === messageId, 'Message ID mismatch') - .refine((msg) => eventMatchesTemplate(msg.result, template), 'Event template mismatch') - .safeParse(decrypted); - - if (result.success) { - close(); - return result.data.result; + if (result.success) { + return result.data.result; + } } } diff --git a/src/storages.ts b/src/storages.ts index 107aa17..6c6a4a5 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -8,6 +8,7 @@ import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; +import { InternalRelay } from '@/storages/InternalRelay.ts'; import { Time } from '@/utils/time.ts'; /** Relay pool storage. */ @@ -43,4 +44,16 @@ const searchStore = new SearchStore({ fallback: optimizer, }); +export class Storages { + private static _pubsub: InternalRelay | undefined; + + static get pubsub(): InternalRelay { + if (!this._pubsub) { + this._pubsub = new InternalRelay(); + } + + return this._pubsub; + } +} + export { cache, client, eventsDB, optimizer, reqmeister, searchStore }; diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts new file mode 100644 index 0000000..e2f4fad --- /dev/null +++ b/src/storages/InternalRelay.ts @@ -0,0 +1,71 @@ +// deno-lint-ignore-file require-await +import { + Machina, + NIP50, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NRelay, +} from '@nostrify/nostrify'; + +import { matchFilter } from '@/deps.ts'; +import { DittoEvent } from '@/interfaces/DittoEvent.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; + +/** + * PubSub event store for streaming events within the application. + * The pipeline should push events to it, then anything in the application can subscribe to it. + */ +export class InternalRelay implements NRelay { + private subs = new Map }>(); + + async *req( + filters: NostrFilter[], + opts?: { signal?: AbortSignal }, + ): AsyncGenerator { + const id = crypto.randomUUID(); + const machina = new Machina(opts?.signal); + + yield ['EOSE', id]; + + this.subs.set(id, { filters, machina }); + + try { + for await (const event of machina) { + yield ['EVENT', id, event]; + } + } finally { + this.subs.delete(id); + } + } + + async event(event: DittoEvent): Promise { + for (const { filters, machina } of this.subs.values()) { + for (const filter of filters) { + if (matchFilter(filter, event)) { + if (filter.search) { + const tokens = NIP50.parseInput(filter.search); + + const domain = (tokens.find((t) => + typeof t === 'object' && t.key === 'domain' + ) as { key: 'domain'; value: string } | undefined)?.value; + + if (domain === event.author_domain) { + return machina.push(purifyEvent(event)); + } + } else { + return machina.push(purifyEvent(event)); + } + } + } + } + + return Promise.resolve(); + } + + async query(): Promise { + return []; + } +} diff --git a/src/subs.ts b/src/subs.ts deleted file mode 100644 index d3610c2..0000000 --- a/src/subs.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { NostrFilter } from '@nostrify/nostrify'; -import { Debug } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { Subscription } from '@/subscription.ts'; - -const debug = Debug('ditto:subs'); - -/** - * Manages Ditto event subscriptions. - * Subscriptions can be added, removed, and matched against events. - */ -class SubscriptionStore { - #store = new Map>(); - - /** - * Add a subscription to the store, and then iterate over it. - * - * ```ts - * for (const event of Sub.sub(socket, subId, filters)) { - * console.log(event); - * } - * ``` - */ - sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription { - debug('sub', id, JSON.stringify(filters)); - let subs = this.#store.get(socket); - - if (!subs) { - subs = new Map(); - this.#store.set(socket, subs); - } - - const sub = new Subscription(filters); - - this.unsub(socket, id); - subs.set(id, sub as unknown as Subscription); - - return sub; - } - - /** Remove a subscription from the store. */ - unsub(socket: unknown, id: string): void { - debug('unsub', id); - this.#store.get(socket)?.get(id)?.close(); - this.#store.get(socket)?.delete(id); - } - - /** Remove an entire socket. */ - close(socket: unknown): void { - debug('close', (socket as any)?.constructor?.name); - const subs = this.#store.get(socket); - - if (subs) { - for (const sub of subs.values()) { - sub.close(); - } - } - - this.#store.delete(socket); - } - - /** - * Loop through matching subscriptions to stream out. - * - * ```ts - * for (const sub of Sub.matches(event, data)) { - * sub.stream(event); - * } - * ``` - */ - *matches(event: DittoEvent): Iterable { - for (const subs of this.#store.values()) { - for (const sub of subs.values()) { - if (sub.matches(event)) { - yield sub; - } - } - } - } -} - -const Sub = new SubscriptionStore(); - -export { Sub }; diff --git a/src/subscription.ts b/src/subscription.ts deleted file mode 100644 index 840a0b4..0000000 --- a/src/subscription.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify'; -import { Machina, matchFilter } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; - -class Subscription implements AsyncIterable { - filters: NostrFilter[]; - #machina: Machina; - - constructor(filters: NostrFilter[]) { - this.filters = filters; - this.#machina = new Machina(); - } - - stream(event: NostrEvent): void { - this.#machina.push(event); - } - - matches(event: DittoEvent): boolean { - for (const filter of this.filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain) { - return domain === event.author_domain; - } - } - - return true; - } - } - - return false; - } - - close() { - this.#machina.close(); - } - - [Symbol.asyncIterator]() { - return this.#machina.stream(); - } -} - -export { Subscription }; diff --git a/src/utils/api.ts b/src/utils/api.ts index 79b383d..cd4e6e2 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -18,12 +18,6 @@ type EventStub = TypeFest.SetOptional { - const pubkey = c.get('pubkey'); - - if (!pubkey) { - throw new HTTPException(401); - } - const signer = new APISigner(c); const event = await signer.signEvent({