Remove legacy stream.ts module
This commit is contained in:
parent
320d2f493e
commit
2e8b26cf4f
|
@ -1,10 +1,20 @@
|
||||||
import { AppController } from '@/app.ts';
|
import { AppController } from '@/app.ts';
|
||||||
|
import { z } from '@/deps.ts';
|
||||||
import { type DittoFilter } from '@/filter.ts';
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
|
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
|
||||||
import { streamSchema, ws } from '@/stream.ts';
|
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
||||||
import { bech32ToPubkey } from '@/utils.ts';
|
|
||||||
|
/**
|
||||||
|
* Streaming timelines/categories.
|
||||||
|
* https://docs.joinmastodon.org/methods/streaming/#streams
|
||||||
|
*/
|
||||||
|
const streamSchema = z.enum([
|
||||||
|
'nostr',
|
||||||
|
'public',
|
||||||
|
'public:local',
|
||||||
|
'user',
|
||||||
|
]);
|
||||||
|
|
||||||
const streamingController: AppController = (c) => {
|
const streamingController: AppController = (c) => {
|
||||||
const upgrade = c.req.headers.get('upgrade');
|
const upgrade = c.req.headers.get('upgrade');
|
||||||
|
@ -26,12 +36,6 @@ const streamingController: AppController = (c) => {
|
||||||
|
|
||||||
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token });
|
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token });
|
||||||
|
|
||||||
const conn = {
|
|
||||||
socket,
|
|
||||||
session: match[2],
|
|
||||||
pubkey: bech32ToPubkey(match[1]),
|
|
||||||
};
|
|
||||||
|
|
||||||
function send(name: string, payload: object) {
|
function send(name: string, payload: object) {
|
||||||
if (socket.readyState === WebSocket.OPEN) {
|
if (socket.readyState === WebSocket.OPEN) {
|
||||||
socket.send(JSON.stringify({
|
socket.send(JSON.stringify({
|
||||||
|
@ -44,9 +48,6 @@ const streamingController: AppController = (c) => {
|
||||||
|
|
||||||
socket.onopen = async () => {
|
socket.onopen = async () => {
|
||||||
if (!stream) return;
|
if (!stream) return;
|
||||||
|
|
||||||
ws.subscribe(conn, { stream });
|
|
||||||
|
|
||||||
const filter = topicToFilter(stream);
|
const filter = topicToFilter(stream);
|
||||||
|
|
||||||
if (filter) {
|
if (filter) {
|
||||||
|
@ -60,7 +61,6 @@ const streamingController: AppController = (c) => {
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.onclose = () => {
|
socket.onclose = () => {
|
||||||
ws.unsubscribeAll(socket);
|
|
||||||
Sub.close(socket);
|
Sub.close(socket);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
158
src/stream.ts
158
src/stream.ts
|
@ -1,158 +0,0 @@
|
||||||
import { z } from '@/deps.ts';
|
|
||||||
|
|
||||||
/** Internal key for event subscriptions. */
|
|
||||||
type Topic =
|
|
||||||
| `nostr:${string}:${string}`
|
|
||||||
| 'public'
|
|
||||||
| 'public:local';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Streaming timelines/categories.
|
|
||||||
* https://docs.joinmastodon.org/methods/streaming/#streams
|
|
||||||
*/
|
|
||||||
const streamSchema = z.enum([
|
|
||||||
'nostr',
|
|
||||||
'public',
|
|
||||||
'public:local',
|
|
||||||
'user',
|
|
||||||
]);
|
|
||||||
|
|
||||||
type Stream = z.infer<typeof streamSchema>;
|
|
||||||
|
|
||||||
/** Only the necessary metadata needed from the request. */
|
|
||||||
interface StreamConn {
|
|
||||||
/** Hex pubkey parsed from the `Sec-Websocket-Protocol` header. */
|
|
||||||
pubkey?: string;
|
|
||||||
/** Base62 session UUID parsed from the `Sec-Websocket-Protocol` header. */
|
|
||||||
session?: string;
|
|
||||||
/** The WebSocket stream. */
|
|
||||||
socket: WebSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */
|
|
||||||
// TODO: Make this a discriminated union (needed for hashtags).
|
|
||||||
interface StreamSub {
|
|
||||||
/** Name of the channel, eg `user`. */
|
|
||||||
stream: Stream;
|
|
||||||
/** Additional query params, eg `tag`. */
|
|
||||||
params?: Record<string, string>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Class to organize WebSocket connections by topic. */
|
|
||||||
class WebSocketConnections {
|
|
||||||
/** Set of WebSockets by topic. */
|
|
||||||
#sockets = new Map<Topic, Set<WebSocket>>();
|
|
||||||
/** Set of topics by WebSocket. We need to track this so we can unsubscribe properly. */
|
|
||||||
#topics = new WeakMap<WebSocket, Set<Topic>>();
|
|
||||||
|
|
||||||
/** Add the WebSocket to the streaming channel. */
|
|
||||||
subscribe(conn: StreamConn, sub: StreamSub): void {
|
|
||||||
const topic = getTopic(conn, sub);
|
|
||||||
|
|
||||||
if (topic) {
|
|
||||||
this.#addSocket(conn.socket, topic);
|
|
||||||
this.#addTopic(conn.socket, topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Remove the WebSocket from the streaming channel. */
|
|
||||||
unsubscribe(conn: StreamConn, sub: StreamSub): void {
|
|
||||||
const topic = getTopic(conn, sub);
|
|
||||||
|
|
||||||
if (topic) {
|
|
||||||
this.#removeSocket(conn.socket, topic);
|
|
||||||
this.#removeTopic(conn.socket, topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Remove the WebSocket from all its streaming channels. */
|
|
||||||
unsubscribeAll(socket: WebSocket): void {
|
|
||||||
const topics = this.#topics.get(socket);
|
|
||||||
|
|
||||||
if (topics) {
|
|
||||||
for (const topic of topics) {
|
|
||||||
this.#removeSocket(socket, topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.#topics.delete(socket);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get WebSockets for the given topic. */
|
|
||||||
getSockets(topic: Topic): Set<WebSocket> {
|
|
||||||
return this.#sockets.get(topic) ?? new Set<WebSocket>();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add a WebSocket to a topics set in the state. */
|
|
||||||
#addSocket(socket: WebSocket, topic: Topic): void {
|
|
||||||
let subscribers = this.#sockets.get(topic);
|
|
||||||
|
|
||||||
if (!subscribers) {
|
|
||||||
subscribers = new Set<WebSocket>();
|
|
||||||
this.#sockets.set(topic, subscribers);
|
|
||||||
}
|
|
||||||
|
|
||||||
subscribers.add(socket);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Remove a WebSocket from a topics set in the state. */
|
|
||||||
#removeSocket(socket: WebSocket, topic: Topic): void {
|
|
||||||
const subscribers = this.#sockets.get(topic);
|
|
||||||
|
|
||||||
if (subscribers) {
|
|
||||||
subscribers.delete(socket);
|
|
||||||
|
|
||||||
if (subscribers.size === 0) {
|
|
||||||
this.#sockets.delete(topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add a topic to a WebSocket set in the state. */
|
|
||||||
#addTopic(socket: WebSocket, topic: Topic): void {
|
|
||||||
let topics = this.#topics.get(socket);
|
|
||||||
|
|
||||||
if (!topics) {
|
|
||||||
topics = new Set<Topic>();
|
|
||||||
this.#topics.set(socket, topics);
|
|
||||||
}
|
|
||||||
|
|
||||||
topics.add(topic);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Remove a topic from a WebSocket set in the state. */
|
|
||||||
#removeTopic(socket: WebSocket, topic: Topic): void {
|
|
||||||
const topics = this.#topics.get(socket);
|
|
||||||
|
|
||||||
if (topics) {
|
|
||||||
topics.delete(topic);
|
|
||||||
|
|
||||||
if (topics.size === 0) {
|
|
||||||
this.#topics.delete(socket);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert the "stream" parameter into a "topic".
|
|
||||||
* The stream parameter is part of the public-facing API, while the topic is internal.
|
|
||||||
*/
|
|
||||||
function getTopic(conn: StreamConn, sub: StreamSub): Topic | undefined {
|
|
||||||
switch (sub.stream) {
|
|
||||||
case 'public':
|
|
||||||
case 'public:local':
|
|
||||||
return sub.stream;
|
|
||||||
default:
|
|
||||||
if (!conn.pubkey) {
|
|
||||||
return;
|
|
||||||
// Nostr signing topics contain the session ID for privacy reasons.
|
|
||||||
} else if (sub.stream === 'nostr') {
|
|
||||||
return conn.session ? `nostr:${conn.pubkey}:${conn.session}` : undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const ws = new WebSocketConnections();
|
|
||||||
|
|
||||||
export { type Stream, streamSchema, ws };
|
|
|
@ -102,7 +102,6 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean {
|
||||||
}
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
bech32ToPubkey,
|
|
||||||
eventAge,
|
eventAge,
|
||||||
eventDateComparator,
|
eventDateComparator,
|
||||||
findTag,
|
findTag,
|
||||||
|
|
Loading…
Reference in New Issue