ditto/src/storages/InternalRelay.ts

74 lines
1.9 KiB
TypeScript
Raw Normal View History

2024-04-25 11:54:25 -05:00
// deno-lint-ignore-file require-await
import {
NIP50,
NostrEvent,
NostrFilter,
NostrRelayCLOSED,
NostrRelayEOSE,
NostrRelayEVENT,
NRelay,
} from '@nostrify/nostrify';
import { Machina } from '@nostrify/nostrify/utils';
2024-04-25 11:54:25 -05:00
import { matchFilter } from '@/deps.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
2024-04-25 11:54:25 -05:00
/**
* PubSub event store for streaming events within the application.
* The pipeline should push events to it, then anything in the application can subscribe to it.
*/
export class InternalRelay implements NRelay {
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
async *req(
filters: NostrFilter[],
2024-04-25 18:28:19 -05:00
opts?: { signal?: AbortSignal },
): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
2024-04-25 11:54:25 -05:00
const id = crypto.randomUUID();
const machina = new Machina<NostrEvent>(opts?.signal);
2024-04-25 11:54:25 -05:00
yield ['EOSE', id];
this.subs.set(id, { filters, machina });
try {
for await (const event of machina) {
yield ['EVENT', id, event];
}
} finally {
this.subs.delete(id);
}
}
async event(event: DittoEvent): Promise<void> {
for (const { filters, machina } of this.subs.values()) {
for (const filter of filters) {
if (matchFilter(filter, event)) {
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
const domain = (tokens.find((t) =>
typeof t === 'object' && t.key === 'domain'
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain === event.author_domain) {
2024-04-25 21:03:39 -05:00
machina.push(purifyEvent(event));
break;
2024-04-25 11:54:25 -05:00
}
} else {
2024-04-25 21:03:39 -05:00
machina.push(purifyEvent(event));
break;
2024-04-25 11:54:25 -05:00
}
}
}
}
return Promise.resolve();
}
async query(): Promise<NostrEvent[]> {
return [];
}
}