Remove subs.ts & subscription.ts, refactor around it
This commit is contained in:
parent
8407583d83
commit
7a18a19b2f
|
@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts';
|
|||
import { Conf } from '@/config.ts';
|
||||
import { Debug } from '@/deps.ts';
|
||||
import { getFeedPubkeys } from '@/queries.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { bech32ToPubkey } from '@/utils.ts';
|
||||
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
|
||||
import { hydrateEvents } from '@/storages/hydrate.ts';
|
||||
import { eventsDB } from '@/storages.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
|
||||
const debug = Debug('ditto:streaming');
|
||||
|
||||
|
@ -38,6 +38,7 @@ const streamingController: AppController = (c) => {
|
|||
const upgrade = c.req.header('upgrade');
|
||||
const token = c.req.header('sec-websocket-protocol');
|
||||
const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream'));
|
||||
const controller = new AbortController();
|
||||
|
||||
if (upgrade?.toLowerCase() !== 'websocket') {
|
||||
return c.text('Please use websocket protocol', 400);
|
||||
|
@ -63,33 +64,37 @@ const streamingController: AppController = (c) => {
|
|||
|
||||
socket.onopen = async () => {
|
||||
if (!stream) return;
|
||||
|
||||
const filter = await topicToFilter(stream, c.req.query(), pubkey);
|
||||
if (!filter) return;
|
||||
|
||||
if (filter) {
|
||||
for await (const event of Sub.sub(socket, '1', [filter])) {
|
||||
if (event.kind === 6) {
|
||||
await hydrateEvents({
|
||||
events: [event],
|
||||
storage: eventsDB,
|
||||
signal: AbortSignal.timeout(1000),
|
||||
});
|
||||
for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
const [event] = await hydrateEvents({
|
||||
events: [msg[2]],
|
||||
storage: eventsDB,
|
||||
signal: AbortSignal.timeout(1000),
|
||||
});
|
||||
|
||||
const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') });
|
||||
if (event.kind === 1) {
|
||||
const status = await renderStatus(event, { viewerPubkey: pubkey });
|
||||
if (status) {
|
||||
send('update', status);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const status = await renderStatus(event, { viewerPubkey: pubkey });
|
||||
if (status) {
|
||||
send('update', status);
|
||||
|
||||
if (event.kind === 6) {
|
||||
const status = await renderReblog(event, { viewerPubkey: pubkey });
|
||||
if (status) {
|
||||
send('update', status);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
socket.onclose = () => {
|
||||
Sub.close(socket);
|
||||
controller.abort();
|
||||
};
|
||||
|
||||
return response;
|
||||
|
|
|
@ -11,8 +11,7 @@ import {
|
|||
clientMsgSchema,
|
||||
type ClientREQ,
|
||||
} from '@/schemas/nostr.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
|
||||
import type { AppController } from '@/app.ts';
|
||||
|
||||
|
@ -29,6 +28,8 @@ type RelayMsg =
|
|||
|
||||
/** Set up the Websocket connection. */
|
||||
function connectStream(socket: WebSocket) {
|
||||
const controllers = new Map<string, AbortController>();
|
||||
|
||||
socket.onmessage = (e) => {
|
||||
const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data);
|
||||
if (result.success) {
|
||||
|
@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) {
|
|||
};
|
||||
|
||||
socket.onclose = () => {
|
||||
Sub.close(socket);
|
||||
for (const controller of controllers.values()) {
|
||||
controller.abort();
|
||||
}
|
||||
};
|
||||
|
||||
/** Handle client message. */
|
||||
|
@ -64,14 +67,20 @@ function connectStream(socket: WebSocket) {
|
|||
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
|
||||
const filters = prepareFilters(rest);
|
||||
|
||||
const controller = new AbortController();
|
||||
controllers.get(subId)?.abort();
|
||||
controllers.set(subId, controller);
|
||||
|
||||
for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) {
|
||||
send(['EVENT', subId, event]);
|
||||
}
|
||||
|
||||
send(['EOSE', subId]);
|
||||
|
||||
for await (const event of Sub.sub(socket, subId, filters)) {
|
||||
send(['EVENT', subId, purifyEvent(event)]);
|
||||
for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
send(['EVENT', subId, msg[2]]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,7 +102,11 @@ function connectStream(socket: WebSocket) {
|
|||
|
||||
/** Handle CLOSE. Close the subscription. */
|
||||
function handleClose([_, subId]: ClientCLOSE): void {
|
||||
Sub.unsub(socket, subId);
|
||||
const controller = controllers.get(subId);
|
||||
if (controller) {
|
||||
controller.abort();
|
||||
controllers.delete(subId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Handle COUNT. Return the number of events matching the filters. */
|
||||
|
|
|
@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts';
|
|||
import { DVM } from '@/pipeline/DVM.ts';
|
||||
import { updateStats } from '@/stats.ts';
|
||||
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { cache, eventsDB, reqmeister } from '@/storages.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts';
|
||||
import { getTagSet } from '@/tags.ts';
|
||||
import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) {
|
|||
}
|
||||
|
||||
/** Determine if the event is being received in a timely manner. */
|
||||
const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10);
|
||||
function isFresh(event: NostrEvent): boolean {
|
||||
return eventAge(event) < Time.seconds(10);
|
||||
}
|
||||
|
||||
/** Distribute the event through active subscriptions. */
|
||||
function streamOut(event: NostrEvent) {
|
||||
if (!isFresh(event)) return;
|
||||
|
||||
for (const sub of Sub.matches(event)) {
|
||||
sub.stream(event);
|
||||
async function streamOut(event: NostrEvent): Promise<void> {
|
||||
if (isFresh(event)) {
|
||||
await Storages.pubsub.event(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import {
|
|||
|
||||
import { matchFilter } from '@/deps.ts';
|
||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
|
||||
/**
|
||||
* PubSub event store for streaming events within the application.
|
||||
|
@ -20,9 +21,12 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
|||
export class InternalRelay implements NRelay {
|
||||
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
||||
|
||||
async *req(filters: NostrFilter[]): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
async *req(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal },
|
||||
): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
const id = crypto.randomUUID();
|
||||
const machina = new Machina<NostrEvent>();
|
||||
const machina = new Machina<NostrEvent>(opts?.signal);
|
||||
|
||||
yield ['EOSE', id];
|
||||
|
||||
|
@ -49,10 +53,10 @@ export class InternalRelay implements NRelay {
|
|||
) as { key: 'domain'; value: string } | undefined)?.value;
|
||||
|
||||
if (domain === event.author_domain) {
|
||||
return machina.push(event);
|
||||
return machina.push(purifyEvent(event));
|
||||
}
|
||||
} else {
|
||||
return machina.push(event);
|
||||
return machina.push(purifyEvent(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
84
src/subs.ts
84
src/subs.ts
|
@ -1,84 +0,0 @@
|
|||
import { NostrFilter } from '@nostrify/nostrify';
|
||||
import { Debug } from '@/deps.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { Subscription } from '@/subscription.ts';
|
||||
|
||||
const debug = Debug('ditto:subs');
|
||||
|
||||
/**
|
||||
* Manages Ditto event subscriptions.
|
||||
* Subscriptions can be added, removed, and matched against events.
|
||||
*/
|
||||
class SubscriptionStore {
|
||||
#store = new Map<unknown, Map<string, Subscription>>();
|
||||
|
||||
/**
|
||||
* Add a subscription to the store, and then iterate over it.
|
||||
*
|
||||
* ```ts
|
||||
* for (const event of Sub.sub(socket, subId, filters)) {
|
||||
* console.log(event);
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription {
|
||||
debug('sub', id, JSON.stringify(filters));
|
||||
let subs = this.#store.get(socket);
|
||||
|
||||
if (!subs) {
|
||||
subs = new Map();
|
||||
this.#store.set(socket, subs);
|
||||
}
|
||||
|
||||
const sub = new Subscription(filters);
|
||||
|
||||
this.unsub(socket, id);
|
||||
subs.set(id, sub as unknown as Subscription);
|
||||
|
||||
return sub;
|
||||
}
|
||||
|
||||
/** Remove a subscription from the store. */
|
||||
unsub(socket: unknown, id: string): void {
|
||||
debug('unsub', id);
|
||||
this.#store.get(socket)?.get(id)?.close();
|
||||
this.#store.get(socket)?.delete(id);
|
||||
}
|
||||
|
||||
/** Remove an entire socket. */
|
||||
close(socket: unknown): void {
|
||||
debug('close', (socket as any)?.constructor?.name);
|
||||
const subs = this.#store.get(socket);
|
||||
|
||||
if (subs) {
|
||||
for (const sub of subs.values()) {
|
||||
sub.close();
|
||||
}
|
||||
}
|
||||
|
||||
this.#store.delete(socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loop through matching subscriptions to stream out.
|
||||
*
|
||||
* ```ts
|
||||
* for (const sub of Sub.matches(event, data)) {
|
||||
* sub.stream(event);
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
*matches(event: DittoEvent): Iterable<Subscription> {
|
||||
for (const subs of this.#store.values()) {
|
||||
for (const sub of subs.values()) {
|
||||
if (sub.matches(event)) {
|
||||
yield sub;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const Sub = new SubscriptionStore();
|
||||
|
||||
export { Sub };
|
|
@ -1,49 +0,0 @@
|
|||
import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify';
|
||||
import { Machina, matchFilter } from '@/deps.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
|
||||
class Subscription implements AsyncIterable<NostrEvent> {
|
||||
filters: NostrFilter[];
|
||||
#machina: Machina<NostrEvent>;
|
||||
|
||||
constructor(filters: NostrFilter[]) {
|
||||
this.filters = filters;
|
||||
this.#machina = new Machina();
|
||||
}
|
||||
|
||||
stream(event: NostrEvent): void {
|
||||
this.#machina.push(event);
|
||||
}
|
||||
|
||||
matches(event: DittoEvent): boolean {
|
||||
for (const filter of this.filters) {
|
||||
if (matchFilter(filter, event)) {
|
||||
if (filter.search) {
|
||||
const tokens = NIP50.parseInput(filter.search);
|
||||
|
||||
const domain = (tokens.find((t) =>
|
||||
typeof t === 'object' && t.key === 'domain'
|
||||
) as { key: 'domain'; value: string } | undefined)?.value;
|
||||
|
||||
if (domain) {
|
||||
return domain === event.author_domain;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
close() {
|
||||
this.#machina.close();
|
||||
}
|
||||
|
||||
[Symbol.asyncIterator]() {
|
||||
return this.#machina.stream();
|
||||
}
|
||||
}
|
||||
|
||||
export { Subscription };
|
Loading…
Reference in New Issue