diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 19149fa..b99000c 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,12 +1,12 @@ import { AppController } from '@/app.ts'; import { TOKEN_REGEX } from '@/middleware/auth.ts'; -import ws from '@/stream.ts'; +import { streamSchema, ws } from '@/stream.ts'; import { bech32ToPubkey } from '@/utils.ts'; const streamingController: AppController = (c) => { const upgrade = c.req.headers.get('upgrade'); const token = c.req.headers.get('sec-websocket-protocol'); - const stream = c.req.query('stream'); + const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -32,7 +32,7 @@ const streamingController: AppController = (c) => { socket.addEventListener('open', () => { console.log('websocket: connection opened'); if (stream) { - ws.subscribe(conn, { name: stream }); + ws.subscribe(conn, { stream }); } }); diff --git a/src/sign.ts b/src/sign.ts index 69416c9..f222e11 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -1,7 +1,7 @@ import { type AppContext } from '@/app.ts'; import { getEventHash, getPublicKey, getSignature, HTTPException, z } from '@/deps.ts'; import { eventSchema } from '@/schema.ts'; -import ws from '@/stream.ts'; +import { ws } from '@/stream.ts'; import type { Event, EventTemplate, SignedEvent } from '@/event.ts'; diff --git a/src/stream.ts b/src/stream.ts index a96e71c..fa352ee 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,5 +1,23 @@ +import { z } from '@/deps.ts'; + /** Internal key for event subscriptions. */ -type Topic = string; +type Topic = + | `nostr:${string}:${string}` + | 'public' + | 'public:local'; + +/** + * Streaming timelines/categories. + * https://docs.joinmastodon.org/methods/streaming/#streams + */ +const streamSchema = z.enum([ + 'nostr', + 'public', + 'public:local', + 'user', +]); + +type Stream = z.infer; /** Only the necessary metadata needed from the request. */ interface StreamConn { @@ -13,9 +31,9 @@ interface StreamConn { /** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */ // TODO: Make this a discriminated union (needed for hashtags). -interface Stream { +interface StreamSub { /** Name of the channel, eg `user`. */ - name: string; + stream: Stream; /** Additional query params, eg `tag`. */ params?: Record; } @@ -28,8 +46,8 @@ class WebSocketConnections { #topics = new WeakMap>(); /** Add the WebSocket to the streaming channel. */ - subscribe(conn: StreamConn, stream: Stream): void { - const topic = getTopic(conn, stream); + subscribe(conn: StreamConn, sub: StreamSub): void { + const topic = getTopic(conn, sub); if (topic) { this.#addSocket(conn.socket, topic); @@ -38,8 +56,8 @@ class WebSocketConnections { } /** Remove the WebSocket from the streaming channel. */ - unsubscribe(conn: StreamConn, stream: Stream): void { - const topic = getTopic(conn, stream); + unsubscribe(conn: StreamConn, sub: StreamSub): void { + const topic = getTopic(conn, sub); if (topic) { this.#removeSocket(conn.socket, topic); @@ -120,22 +138,21 @@ class WebSocketConnections { * Convert the "stream" parameter into a "topic". * The stream parameter is part of the public-facing API, while the topic is internal. */ -function getTopic(conn: StreamConn, stream: Stream): Topic | undefined { - // Global topics will share the same name as the stream. - if (stream.name.startsWith('public')) { - return stream.name; - // Can't subscribe to non-public topics without a pubkey. - } else if (!conn.pubkey) { - return; - // Nostr signing topics contain the session ID for privacy reasons. - } else if (stream.name === 'nostr') { - return conn.session ? `${stream.name}:${conn.pubkey}:${conn.session}` : undefined; - // User topics will be suffixed with the pubkey. - } else { - return `${stream.name}:${conn.pubkey}`; +function getTopic(conn: StreamConn, sub: StreamSub): Topic | undefined { + switch (sub.stream) { + case 'public': + case 'public:local': + return sub.stream; + default: + if (!conn.pubkey) { + return; + // Nostr signing topics contain the session ID for privacy reasons. + } else if (sub.stream === 'nostr') { + return conn.session ? `nostr:${conn.pubkey}:${conn.session}` : undefined; + } } } const ws = new WebSocketConnections(); -export default ws; +export { type Stream, streamSchema, ws };