diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 1c85259..ca8cafc 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,10 +1,20 @@ import { AppController } from '@/app.ts'; +import { z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { TOKEN_REGEX } from '@/middleware/auth19.ts'; -import { streamSchema, ws } from '@/stream.ts'; import { Sub } from '@/subs.ts'; import { toStatus } from '@/transformers/nostr-to-mastoapi.ts'; -import { bech32ToPubkey } from '@/utils.ts'; + +/** + * Streaming timelines/categories. + * https://docs.joinmastodon.org/methods/streaming/#streams + */ +const streamSchema = z.enum([ + 'nostr', + 'public', + 'public:local', + 'user', +]); const streamingController: AppController = (c) => { const upgrade = c.req.headers.get('upgrade'); @@ -26,12 +36,6 @@ const streamingController: AppController = (c) => { const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token }); - const conn = { - socket, - session: match[2], - pubkey: bech32ToPubkey(match[1]), - }; - function send(name: string, payload: object) { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ @@ -44,9 +48,6 @@ const streamingController: AppController = (c) => { socket.onopen = async () => { if (!stream) return; - - ws.subscribe(conn, { stream }); - const filter = topicToFilter(stream); if (filter) { @@ -60,7 +61,6 @@ const streamingController: AppController = (c) => { }; socket.onclose = () => { - ws.unsubscribeAll(socket); Sub.close(socket); }; diff --git a/src/stream.ts b/src/stream.ts deleted file mode 100644 index fa352ee..0000000 --- a/src/stream.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { z } from '@/deps.ts'; - -/** Internal key for event subscriptions. */ -type Topic = - | `nostr:${string}:${string}` - | 'public' - | 'public:local'; - -/** - * Streaming timelines/categories. - * https://docs.joinmastodon.org/methods/streaming/#streams - */ -const streamSchema = z.enum([ - 'nostr', - 'public', - 'public:local', - 'user', -]); - -type Stream = z.infer; - -/** Only the necessary metadata needed from the request. */ -interface StreamConn { - /** Hex pubkey parsed from the `Sec-Websocket-Protocol` header. */ - pubkey?: string; - /** Base62 session UUID parsed from the `Sec-Websocket-Protocol` header. */ - session?: string; - /** The WebSocket stream. */ - socket: WebSocket; -} - -/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */ -// TODO: Make this a discriminated union (needed for hashtags). -interface StreamSub { - /** Name of the channel, eg `user`. */ - stream: Stream; - /** Additional query params, eg `tag`. */ - params?: Record; -} - -/** Class to organize WebSocket connections by topic. */ -class WebSocketConnections { - /** Set of WebSockets by topic. */ - #sockets = new Map>(); - /** Set of topics by WebSocket. We need to track this so we can unsubscribe properly. */ - #topics = new WeakMap>(); - - /** Add the WebSocket to the streaming channel. */ - subscribe(conn: StreamConn, sub: StreamSub): void { - const topic = getTopic(conn, sub); - - if (topic) { - this.#addSocket(conn.socket, topic); - this.#addTopic(conn.socket, topic); - } - } - - /** Remove the WebSocket from the streaming channel. */ - unsubscribe(conn: StreamConn, sub: StreamSub): void { - const topic = getTopic(conn, sub); - - if (topic) { - this.#removeSocket(conn.socket, topic); - this.#removeTopic(conn.socket, topic); - } - } - - /** Remove the WebSocket from all its streaming channels. */ - unsubscribeAll(socket: WebSocket): void { - const topics = this.#topics.get(socket); - - if (topics) { - for (const topic of topics) { - this.#removeSocket(socket, topic); - } - } - - this.#topics.delete(socket); - } - - /** Get WebSockets for the given topic. */ - getSockets(topic: Topic): Set { - return this.#sockets.get(topic) ?? new Set(); - } - - /** Add a WebSocket to a topics set in the state. */ - #addSocket(socket: WebSocket, topic: Topic): void { - let subscribers = this.#sockets.get(topic); - - if (!subscribers) { - subscribers = new Set(); - this.#sockets.set(topic, subscribers); - } - - subscribers.add(socket); - } - - /** Remove a WebSocket from a topics set in the state. */ - #removeSocket(socket: WebSocket, topic: Topic): void { - const subscribers = this.#sockets.get(topic); - - if (subscribers) { - subscribers.delete(socket); - - if (subscribers.size === 0) { - this.#sockets.delete(topic); - } - } - } - - /** Add a topic to a WebSocket set in the state. */ - #addTopic(socket: WebSocket, topic: Topic): void { - let topics = this.#topics.get(socket); - - if (!topics) { - topics = new Set(); - this.#topics.set(socket, topics); - } - - topics.add(topic); - } - - /** Remove a topic from a WebSocket set in the state. */ - #removeTopic(socket: WebSocket, topic: Topic): void { - const topics = this.#topics.get(socket); - - if (topics) { - topics.delete(topic); - - if (topics.size === 0) { - this.#topics.delete(socket); - } - } - } -} - -/** - * Convert the "stream" parameter into a "topic". - * The stream parameter is part of the public-facing API, while the topic is internal. - */ -function getTopic(conn: StreamConn, sub: StreamSub): Topic | undefined { - switch (sub.stream) { - case 'public': - case 'public:local': - return sub.stream; - default: - if (!conn.pubkey) { - return; - // Nostr signing topics contain the session ID for privacy reasons. - } else if (sub.stream === 'nostr') { - return conn.session ? `nostr:${conn.pubkey}:${conn.session}` : undefined; - } - } -} - -const ws = new WebSocketConnections(); - -export { type Stream, streamSchema, ws }; diff --git a/src/utils.ts b/src/utils.ts index 7f47f31..e202982 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -102,7 +102,6 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean { } export { - bech32ToPubkey, eventAge, eventDateComparator, findTag,