diff --git a/src/app.ts b/src/app.ts index eb2a12a..6c6952a 100644 --- a/src/app.ts +++ b/src/app.ts @@ -31,6 +31,8 @@ interface AppEnv extends HonoEnv { pubkey?: string; /** Hex secret key for the current user. Optional, but easiest way to use legacy Mastodon apps. */ seckey?: string; + /** UUID from the access token. Used for WebSocket event signing. */ + session?: string; }; } diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 6f85553..df16460 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,14 +1,12 @@ import { AppController } from '@/app.ts'; -import { nip19 } from '@/deps.ts'; import { TOKEN_REGEX } from '@/middleware/auth.ts'; -import { signStreams } from '@/sign.ts'; +import ws from '@/stream.ts'; +import { bech32ToPubkey } from '@/utils.ts'; const streamingController: AppController = (c) => { const upgrade = c.req.headers.get('upgrade'); const token = c.req.headers.get('sec-websocket-protocol'); - const stream = c.req.query('stream'); - const nostr = c.req.query('nostr'); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -18,25 +16,31 @@ const streamingController: AppController = (c) => { return c.json({ error: 'Missing access token' }, 401); } - if (!(new RegExp(`^${TOKEN_REGEX.source}$`)).test(token)) { + const match = token.match(new RegExp(`^${TOKEN_REGEX.source}$`)); + if (!match) { return c.json({ error: 'Invalid access token' }, 401); } const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token }); + const conn = { + socket, + session: match[2], + pubkey: bech32ToPubkey(match[1]), + }; + socket.addEventListener('open', () => { console.log('websocket: connection opened'); - // Only send signing events if the user has a session ID. - if (stream === 'user' && nostr === 'true' && new RegExp(`^${nip19.BECH32_REGEX.source}_\\w+$`).test(token)) { - signStreams.set(token, socket); + if (stream) { + ws.subscribe(conn, { name: stream }); } }); socket.addEventListener('message', (e) => console.log('websocket message: ', e.data)); socket.addEventListener('close', () => { - signStreams.delete(token); console.log('websocket: connection closed'); + ws.unsubscribe(conn, { name: stream! }); }); return response; diff --git a/src/deps.ts b/src/deps.ts index 00c1d16..ba275ba 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -14,12 +14,12 @@ export { type Filter, getEventHash, getPublicKey, + getSignature, Kind, matchFilter, nip05, nip19, nip21, - signEvent as getSignature, } from 'npm:nostr-tools@^1.11.1'; export { findReplyTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts'; export { parseFormData } from 'npm:formdata-helper@^0.3.0'; diff --git a/src/middleware/auth.ts b/src/middleware/auth.ts index 4775e52..358c5bd 100644 --- a/src/middleware/auth.ts +++ b/src/middleware/auth.ts @@ -12,7 +12,8 @@ const setAuth: AppMiddleware = async (c, next) => { const match = authHeader?.match(BEARER_REGEX); if (match) { - const [_, _token, bech32, _sessionId] = match; + const [_, _token, bech32, session] = match; + c.set('session', session); try { const decoded = nip19.decode(bech32!); diff --git a/src/sign.ts b/src/sign.ts index 01430f6..6cc1154 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -1,17 +1,20 @@ import { type AppContext } from '@/app.ts'; import { getEventHash, getPublicKey, getSignature, HTTPException } from '@/deps.ts'; +import ws from '@/stream.ts'; import type { Event, EventTemplate, SignedEvent } from '@/event.ts'; -/** Map of OAuth tokens to WebSocket signing streams. */ -// FIXME: People can eavesdrop on other people's signing streams. -// TODO: Add a secret to the Authorization header. -export const signStreams = new Map(); - /** Get signing WebSocket from app context. */ function getSignStream(c: AppContext): WebSocket | undefined { - const token = c.req.headers.get('authorization')?.replace(/^Bearer /, ''); - return token ? signStreams.get(token) : undefined; + const pubkey = c.get('pubkey'); + const session = c.get('session'); + + console.log(`nostr:${pubkey}:${session}`); + + if (pubkey && session) { + const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`); + return socket; + } } /** @@ -27,15 +30,20 @@ async function signEvent(event: EventTemplate, c: if (!seckey && stream) { try { return await new Promise>((resolve, reject) => { - stream.addEventListener('message', (e) => { + const handleMessage = (e: MessageEvent) => { // TODO: parse and validate with zod const data = JSON.parse(e.data); if (data.event === 'nostr.sign') { + stream.removeEventListener('message', handleMessage); resolve(JSON.parse(data.payload)); } - }); + }; + stream.addEventListener('message', handleMessage); stream.send(JSON.stringify({ event: 'nostr.sign', payload: JSON.stringify(event) })); - setTimeout(reject, 60000); + setTimeout(() => { + stream.removeEventListener('message', handleMessage); + reject(); + }, 60000); }); } catch (_e) { throw new HTTPException(408, { diff --git a/src/stream.ts b/src/stream.ts new file mode 100644 index 0000000..dc52a81 --- /dev/null +++ b/src/stream.ts @@ -0,0 +1,80 @@ +type Topic = string; + +interface StreamConn { + pubkey?: string; + session?: string; + socket: WebSocket; +} + +// TODO: Make this a discriminated union (needed for hashtags). +interface Stream { + name: string; + params?: Record; +} + +const sockets = new Map>(); + +function addSocket(socket: WebSocket, topic: Topic): void { + let subscribers = sockets.get(topic); + if (!subscribers) { + subscribers = new Set(); + sockets.set(topic, subscribers); + } + subscribers.add(socket); +} + +function removeSocket(socket: WebSocket, topic: Topic): void { + const subscribers = sockets.get(topic); + if (subscribers) { + subscribers.delete(socket); + if (subscribers.size === 0) { + sockets.delete(topic); + } + } +} + +function subscribe(conn: StreamConn, stream: Stream): void { + const topic = getTopic(conn, stream); + if (topic) { + addSocket(conn.socket, topic); + } +} + +function unsubscribe(conn: StreamConn, stream: Stream): void { + const topic = getTopic(conn, stream); + if (topic) { + removeSocket(conn.socket, topic); + } +} + +/** + * Convert the "stream" parameter into a "topic". + * The stream parameter is part of the public-facing API, while the topic is internal. + */ +function getTopic(conn: StreamConn, stream: Stream): Topic | undefined { + // Global topics will share the same name as the stream. + if (stream.name.startsWith('public')) { + return stream.name; + // Can't subscribe to non-public topics without a pubkey. + } else if (!conn.pubkey) { + return; + // Nostr signing topics contain the session ID for privacy reasons. + } else if (stream.name === 'nostr') { + return conn.session ? `${stream.name}:${conn.pubkey}:${conn.session}` : undefined; + // User topics will be suffixed with the pubkey. + } else { + return `${stream.name}:${conn.pubkey}`; + } +} + +function getSockets(topic: Topic): Set { + return sockets.get(topic) ?? new Set(); +} + +const ws = { + subscribe, + unsubscribe, + getSockets, +}; + +export default ws;