Streaming: refactor, add unsubscribeAll method

This commit is contained in:
Alex Gleason 2023-05-20 20:23:01 -05:00
parent 3ffad1df29
commit ec5e0ed330
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
3 changed files with 96 additions and 37 deletions

View File

@ -40,7 +40,7 @@ const streamingController: AppController = (c) => {
socket.addEventListener('close', () => { socket.addEventListener('close', () => {
console.log('websocket: connection closed'); console.log('websocket: connection closed');
ws.unsubscribe(conn, { name: stream! }); ws.unsubscribeAll(socket);
}); });
return response; return response;

View File

@ -9,8 +9,6 @@ function getSignStream(c: AppContext): WebSocket | undefined {
const pubkey = c.get('pubkey'); const pubkey = c.get('pubkey');
const session = c.get('session'); const session = c.get('session');
console.log(`nostr:${pubkey}:${session}`);
if (pubkey && session) { if (pubkey && session) {
const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`); const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`);
return socket; return socket;

View File

@ -1,49 +1,118 @@
/** Internal key for event subscriptions. */
type Topic = string; type Topic = string;
/** Only the necessary metadata needed from the request. */
interface StreamConn { interface StreamConn {
/** Hex pubkey parsed from the `Sec-Websocket-Protocol` header. */
pubkey?: string; pubkey?: string;
/** Base62 session UUID parsed from the `Sec-Websocket-Protocol` header. */
session?: string; session?: string;
/** The WebSocket stream. */
socket: WebSocket; socket: WebSocket;
} }
/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */
// TODO: Make this a discriminated union (needed for hashtags). // TODO: Make this a discriminated union (needed for hashtags).
interface Stream { interface Stream {
/** Name of the channel, eg `user`. */
name: string; name: string;
/** Additional query params, eg `tag`. */
params?: Record<string, string>; params?: Record<string, string>;
} }
const sockets = new Map<Topic, Set<WebSocket>>(); /** Class to organize WebSocket connections by topic. */
class WebSocketConnections {
/** Set of WebSockets by topic. */
#sockets = new Map<Topic, Set<WebSocket>>();
/** Set of topics by WebSocket. We need to track this so we can unsubscribe properly. */
#topics = new WeakMap<WebSocket, Set<Topic>>();
/** Add the WebSocket to the streaming channel. */
subscribe(conn: StreamConn, stream: Stream): void {
const topic = getTopic(conn, stream);
if (topic) {
this.#addSocket(conn.socket, topic);
this.#addTopic(conn.socket, topic);
}
}
/** Remove the WebSocket from the streaming channel. */
unsubscribe(conn: StreamConn, stream: Stream): void {
const topic = getTopic(conn, stream);
if (topic) {
this.#removeSocket(conn.socket, topic);
this.#removeTopic(conn.socket, topic);
}
}
/** Remove the WebSocket from all its streaming channels. */
unsubscribeAll(socket: WebSocket): void {
const topics = this.#topics.get(socket);
if (topics) {
for (const topic of topics) {
this.#removeSocket(socket, topic);
}
}
this.#topics.delete(socket);
}
/** Get WebSockets for the given topic. */
getSockets(topic: Topic): Set<WebSocket> {
return this.#sockets.get(topic) ?? new Set<WebSocket>();
}
/** Add a WebSocket to a topics set in the state. */
#addSocket(socket: WebSocket, topic: Topic): void {
let subscribers = this.#sockets.get(topic);
function addSocket(socket: WebSocket, topic: Topic): void {
let subscribers = sockets.get(topic);
if (!subscribers) { if (!subscribers) {
subscribers = new Set<WebSocket>(); subscribers = new Set<WebSocket>();
sockets.set(topic, subscribers); this.#sockets.set(topic, subscribers);
} }
subscribers.add(socket); subscribers.add(socket);
} }
function removeSocket(socket: WebSocket, topic: Topic): void { /** Remove a WebSocket from a topics set in the state. */
const subscribers = sockets.get(topic); #removeSocket(socket: WebSocket, topic: Topic): void {
const subscribers = this.#sockets.get(topic);
if (subscribers) { if (subscribers) {
subscribers.delete(socket); subscribers.delete(socket);
if (subscribers.size === 0) { if (subscribers.size === 0) {
sockets.delete(topic); this.#sockets.delete(topic);
} }
} }
} }
function subscribe(conn: StreamConn, stream: Stream): void { /** Add a topic to a WebSocket set in the state. */
const topic = getTopic(conn, stream); #addTopic(socket: WebSocket, topic: Topic): void {
if (topic) { let topics = this.#topics.get(socket);
addSocket(conn.socket, topic);
} if (!topics) {
topics = new Set<Topic>();
this.#topics.set(socket, topics);
} }
function unsubscribe(conn: StreamConn, stream: Stream): void { topics.add(topic);
const topic = getTopic(conn, stream); }
if (topic) {
removeSocket(conn.socket, topic); /** Remove a topic from a WebSocket set in the state. */
#removeTopic(socket: WebSocket, topic: Topic): void {
const topics = this.#topics.get(socket);
if (topics) {
topics.delete(topic);
if (topics.size === 0) {
this.#topics.delete(socket);
}
}
} }
} }
@ -67,14 +136,6 @@ function getTopic(conn: StreamConn, stream: Stream): Topic | undefined {
} }
} }
function getSockets(topic: Topic): Set<WebSocket> { const ws = new WebSocketConnections();
return sockets.get(topic) ?? new Set<WebSocket>();
}
const ws = {
subscribe,
unsubscribe,
getSockets,
};
export default ws; export default ws;