Build a websocket topics framework

This commit is contained in:
Alex Gleason 2023-05-20 19:39:05 -05:00
parent 02160f8c9e
commit 3ffad1df29
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
6 changed files with 116 additions and 21 deletions

View File

@ -31,6 +31,8 @@ interface AppEnv extends HonoEnv {
pubkey?: string;
/** Hex secret key for the current user. Optional, but easiest way to use legacy Mastodon apps. */
seckey?: string;
/** UUID from the access token. Used for WebSocket event signing. */
session?: string;
};
}

View File

@ -1,14 +1,12 @@
import { AppController } from '@/app.ts';
import { nip19 } from '@/deps.ts';
import { TOKEN_REGEX } from '@/middleware/auth.ts';
import { signStreams } from '@/sign.ts';
import ws from '@/stream.ts';
import { bech32ToPubkey } from '@/utils.ts';
const streamingController: AppController = (c) => {
const upgrade = c.req.headers.get('upgrade');
const token = c.req.headers.get('sec-websocket-protocol');
const stream = c.req.query('stream');
const nostr = c.req.query('nostr');
if (upgrade?.toLowerCase() !== 'websocket') {
return c.text('Please use websocket protocol', 400);
@ -18,25 +16,31 @@ const streamingController: AppController = (c) => {
return c.json({ error: 'Missing access token' }, 401);
}
if (!(new RegExp(`^${TOKEN_REGEX.source}$`)).test(token)) {
const match = token.match(new RegExp(`^${TOKEN_REGEX.source}$`));
if (!match) {
return c.json({ error: 'Invalid access token' }, 401);
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token });
const conn = {
socket,
session: match[2],
pubkey: bech32ToPubkey(match[1]),
};
socket.addEventListener('open', () => {
console.log('websocket: connection opened');
// Only send signing events if the user has a session ID.
if (stream === 'user' && nostr === 'true' && new RegExp(`^${nip19.BECH32_REGEX.source}_\\w+$`).test(token)) {
signStreams.set(token, socket);
if (stream) {
ws.subscribe(conn, { name: stream });
}
});
socket.addEventListener('message', (e) => console.log('websocket message: ', e.data));
socket.addEventListener('close', () => {
signStreams.delete(token);
console.log('websocket: connection closed');
ws.unsubscribe(conn, { name: stream! });
});
return response;

View File

@ -14,12 +14,12 @@ export {
type Filter,
getEventHash,
getPublicKey,
getSignature,
Kind,
matchFilter,
nip05,
nip19,
nip21,
signEvent as getSignature,
} from 'npm:nostr-tools@^1.11.1';
export { findReplyTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts';
export { parseFormData } from 'npm:formdata-helper@^0.3.0';

View File

@ -12,7 +12,8 @@ const setAuth: AppMiddleware = async (c, next) => {
const match = authHeader?.match(BEARER_REGEX);
if (match) {
const [_, _token, bech32, _sessionId] = match;
const [_, _token, bech32, session] = match;
c.set('session', session);
try {
const decoded = nip19.decode(bech32!);

View File

@ -1,17 +1,20 @@
import { type AppContext } from '@/app.ts';
import { getEventHash, getPublicKey, getSignature, HTTPException } from '@/deps.ts';
import ws from '@/stream.ts';
import type { Event, EventTemplate, SignedEvent } from '@/event.ts';
/** Map of OAuth tokens to WebSocket signing streams. */
// FIXME: People can eavesdrop on other people's signing streams.
// TODO: Add a secret to the Authorization header.
export const signStreams = new Map<string, WebSocket>();
/** Get signing WebSocket from app context. */
function getSignStream(c: AppContext): WebSocket | undefined {
const token = c.req.headers.get('authorization')?.replace(/^Bearer /, '');
return token ? signStreams.get(token) : undefined;
const pubkey = c.get('pubkey');
const session = c.get('session');
console.log(`nostr:${pubkey}:${session}`);
if (pubkey && session) {
const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`);
return socket;
}
}
/**
@ -27,15 +30,20 @@ async function signEvent<K extends number = number>(event: EventTemplate<K>, c:
if (!seckey && stream) {
try {
return await new Promise<SignedEvent<K>>((resolve, reject) => {
stream.addEventListener('message', (e) => {
const handleMessage = (e: MessageEvent) => {
// TODO: parse and validate with zod
const data = JSON.parse(e.data);
if (data.event === 'nostr.sign') {
stream.removeEventListener('message', handleMessage);
resolve(JSON.parse(data.payload));
}
});
};
stream.addEventListener('message', handleMessage);
stream.send(JSON.stringify({ event: 'nostr.sign', payload: JSON.stringify(event) }));
setTimeout(reject, 60000);
setTimeout(() => {
stream.removeEventListener('message', handleMessage);
reject();
}, 60000);
});
} catch (_e) {
throw new HTTPException(408, {

80
src/stream.ts Normal file
View File

@ -0,0 +1,80 @@
type Topic = string;
interface StreamConn {
pubkey?: string;
session?: string;
socket: WebSocket;
}
// TODO: Make this a discriminated union (needed for hashtags).
interface Stream {
name: string;
params?: Record<string, string>;
}
const sockets = new Map<Topic, Set<WebSocket>>();
function addSocket(socket: WebSocket, topic: Topic): void {
let subscribers = sockets.get(topic);
if (!subscribers) {
subscribers = new Set<WebSocket>();
sockets.set(topic, subscribers);
}
subscribers.add(socket);
}
function removeSocket(socket: WebSocket, topic: Topic): void {
const subscribers = sockets.get(topic);
if (subscribers) {
subscribers.delete(socket);
if (subscribers.size === 0) {
sockets.delete(topic);
}
}
}
function subscribe(conn: StreamConn, stream: Stream): void {
const topic = getTopic(conn, stream);
if (topic) {
addSocket(conn.socket, topic);
}
}
function unsubscribe(conn: StreamConn, stream: Stream): void {
const topic = getTopic(conn, stream);
if (topic) {
removeSocket(conn.socket, topic);
}
}
/**
* Convert the "stream" parameter into a "topic".
* The stream parameter is part of the public-facing API, while the topic is internal.
*/
function getTopic(conn: StreamConn, stream: Stream): Topic | undefined {
// Global topics will share the same name as the stream.
if (stream.name.startsWith('public')) {
return stream.name;
// Can't subscribe to non-public topics without a pubkey.
} else if (!conn.pubkey) {
return;
// Nostr signing topics contain the session ID for privacy reasons.
} else if (stream.name === 'nostr') {
return conn.session ? `${stream.name}:${conn.pubkey}:${conn.session}` : undefined;
// User topics will be suffixed with the pubkey.
} else {
return `${stream.name}:${conn.pubkey}`;
}
}
function getSockets(topic: Topic): Set<WebSocket> {
return sockets.get(topic) ?? new Set<WebSocket>();
}
const ws = {
subscribe,
unsubscribe,
getSockets,
};
export default ws;