Make MastoAPI streaming work for public feeds
This commit is contained in:
parent
d1117f5513
commit
f7cd67c572
|
@ -1,6 +1,10 @@
|
||||||
import { AppController } from '@/app.ts';
|
import { AppController } from '@/app.ts';
|
||||||
|
import { type Event } from '@/deps.ts';
|
||||||
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
|
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
|
||||||
import { streamSchema, ws } from '@/stream.ts';
|
import { streamSchema, ws } from '@/stream.ts';
|
||||||
|
import { Sub } from '@/subs.ts';
|
||||||
|
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
||||||
import { bech32ToPubkey } from '@/utils.ts';
|
import { bech32ToPubkey } from '@/utils.ts';
|
||||||
|
|
||||||
const streamingController: AppController = (c) => {
|
const streamingController: AppController = (c) => {
|
||||||
|
@ -29,10 +33,30 @@ const streamingController: AppController = (c) => {
|
||||||
pubkey: bech32ToPubkey(match[1]),
|
pubkey: bech32ToPubkey(match[1]),
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.addEventListener('open', () => {
|
function send(name: string, payload: object) {
|
||||||
|
if (socket.readyState === WebSocket.OPEN) {
|
||||||
|
socket.send(JSON.stringify({
|
||||||
|
event: name,
|
||||||
|
payload: JSON.stringify(payload),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.addEventListener('open', async () => {
|
||||||
console.log('websocket: connection opened');
|
console.log('websocket: connection opened');
|
||||||
if (stream) {
|
if (!stream) return;
|
||||||
|
|
||||||
ws.subscribe(conn, { stream });
|
ws.subscribe(conn, { stream });
|
||||||
|
|
||||||
|
const filter = topicToFilter(stream);
|
||||||
|
|
||||||
|
if (filter) {
|
||||||
|
for await (const event of Sub.sub(socket, '1', [filter])) {
|
||||||
|
const status = await toStatus(event);
|
||||||
|
if (status) {
|
||||||
|
send('update', status);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -46,4 +70,13 @@ const streamingController: AppController = (c) => {
|
||||||
return response;
|
return response;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function topicToFilter(topic: string): DittoFilter<1> | undefined {
|
||||||
|
switch (topic) {
|
||||||
|
case 'public':
|
||||||
|
return { kinds: [1] };
|
||||||
|
case 'public:local':
|
||||||
|
return { kinds: [1], local: true };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export { streamingController };
|
export { streamingController };
|
||||||
|
|
|
@ -20,7 +20,7 @@ class SubscriptionStore {
|
||||||
* }
|
* }
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription {
|
sub<K extends number>(socket: WebSocket, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
||||||
let subs = this.#store.get(socket);
|
let subs = this.#store.get(socket);
|
||||||
|
|
||||||
if (!subs) {
|
if (!subs) {
|
||||||
|
@ -31,7 +31,7 @@ class SubscriptionStore {
|
||||||
const sub = new Subscription(filters);
|
const sub = new Subscription(filters);
|
||||||
|
|
||||||
this.unsub(socket, id);
|
this.unsub(socket, id);
|
||||||
subs.set(id, sub);
|
subs.set(id, sub as unknown as Subscription);
|
||||||
|
|
||||||
return sub;
|
return sub;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue