Streaming: parse stream channel param

This commit is contained in:
Alex Gleason 2023-05-20 23:47:31 -05:00
parent da6e31c647
commit 161c77b85d
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
3 changed files with 42 additions and 25 deletions

View File

@ -1,12 +1,12 @@
import { AppController } from '@/app.ts'; import { AppController } from '@/app.ts';
import { TOKEN_REGEX } from '@/middleware/auth.ts'; import { TOKEN_REGEX } from '@/middleware/auth.ts';
import ws from '@/stream.ts'; import { streamSchema, ws } from '@/stream.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey } from '@/utils.ts';
const streamingController: AppController = (c) => { const streamingController: AppController = (c) => {
const upgrade = c.req.headers.get('upgrade'); const upgrade = c.req.headers.get('upgrade');
const token = c.req.headers.get('sec-websocket-protocol'); const token = c.req.headers.get('sec-websocket-protocol');
const stream = c.req.query('stream'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream'));
if (upgrade?.toLowerCase() !== 'websocket') { if (upgrade?.toLowerCase() !== 'websocket') {
return c.text('Please use websocket protocol', 400); return c.text('Please use websocket protocol', 400);
@ -32,7 +32,7 @@ const streamingController: AppController = (c) => {
socket.addEventListener('open', () => { socket.addEventListener('open', () => {
console.log('websocket: connection opened'); console.log('websocket: connection opened');
if (stream) { if (stream) {
ws.subscribe(conn, { name: stream }); ws.subscribe(conn, { stream });
} }
}); });

View File

@ -1,7 +1,7 @@
import { type AppContext } from '@/app.ts'; import { type AppContext } from '@/app.ts';
import { getEventHash, getPublicKey, getSignature, HTTPException, z } from '@/deps.ts'; import { getEventHash, getPublicKey, getSignature, HTTPException, z } from '@/deps.ts';
import { eventSchema } from '@/schema.ts'; import { eventSchema } from '@/schema.ts';
import ws from '@/stream.ts'; import { ws } from '@/stream.ts';
import type { Event, EventTemplate, SignedEvent } from '@/event.ts'; import type { Event, EventTemplate, SignedEvent } from '@/event.ts';

View File

@ -1,5 +1,23 @@
import { z } from '@/deps.ts';
/** Internal key for event subscriptions. */ /** Internal key for event subscriptions. */
type Topic = string; type Topic =
| `nostr:${string}:${string}`
| 'public'
| 'public:local';
/**
* Streaming timelines/categories.
* https://docs.joinmastodon.org/methods/streaming/#streams
*/
const streamSchema = z.enum([
'nostr',
'public',
'public:local',
'user',
]);
type Stream = z.infer<typeof streamSchema>;
/** Only the necessary metadata needed from the request. */ /** Only the necessary metadata needed from the request. */
interface StreamConn { interface StreamConn {
@ -13,9 +31,9 @@ interface StreamConn {
/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */ /** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */
// TODO: Make this a discriminated union (needed for hashtags). // TODO: Make this a discriminated union (needed for hashtags).
interface Stream { interface StreamSub {
/** Name of the channel, eg `user`. */ /** Name of the channel, eg `user`. */
name: string; stream: Stream;
/** Additional query params, eg `tag`. */ /** Additional query params, eg `tag`. */
params?: Record<string, string>; params?: Record<string, string>;
} }
@ -28,8 +46,8 @@ class WebSocketConnections {
#topics = new WeakMap<WebSocket, Set<Topic>>(); #topics = new WeakMap<WebSocket, Set<Topic>>();
/** Add the WebSocket to the streaming channel. */ /** Add the WebSocket to the streaming channel. */
subscribe(conn: StreamConn, stream: Stream): void { subscribe(conn: StreamConn, sub: StreamSub): void {
const topic = getTopic(conn, stream); const topic = getTopic(conn, sub);
if (topic) { if (topic) {
this.#addSocket(conn.socket, topic); this.#addSocket(conn.socket, topic);
@ -38,8 +56,8 @@ class WebSocketConnections {
} }
/** Remove the WebSocket from the streaming channel. */ /** Remove the WebSocket from the streaming channel. */
unsubscribe(conn: StreamConn, stream: Stream): void { unsubscribe(conn: StreamConn, sub: StreamSub): void {
const topic = getTopic(conn, stream); const topic = getTopic(conn, sub);
if (topic) { if (topic) {
this.#removeSocket(conn.socket, topic); this.#removeSocket(conn.socket, topic);
@ -120,22 +138,21 @@ class WebSocketConnections {
* Convert the "stream" parameter into a "topic". * Convert the "stream" parameter into a "topic".
* The stream parameter is part of the public-facing API, while the topic is internal. * The stream parameter is part of the public-facing API, while the topic is internal.
*/ */
function getTopic(conn: StreamConn, stream: Stream): Topic | undefined { function getTopic(conn: StreamConn, sub: StreamSub): Topic | undefined {
// Global topics will share the same name as the stream. switch (sub.stream) {
if (stream.name.startsWith('public')) { case 'public':
return stream.name; case 'public:local':
// Can't subscribe to non-public topics without a pubkey. return sub.stream;
} else if (!conn.pubkey) { default:
return; if (!conn.pubkey) {
// Nostr signing topics contain the session ID for privacy reasons. return;
} else if (stream.name === 'nostr') { // Nostr signing topics contain the session ID for privacy reasons.
return conn.session ? `${stream.name}:${conn.pubkey}:${conn.session}` : undefined; } else if (sub.stream === 'nostr') {
// User topics will be suffixed with the pubkey. return conn.session ? `nostr:${conn.pubkey}:${conn.session}` : undefined;
} else { }
return `${stream.name}:${conn.pubkey}`;
} }
} }
const ws = new WebSocketConnections(); const ws = new WebSocketConnections();
export default ws; export { type Stream, streamSchema, ws };