ditto/src/storages/reqmeister.ts

162 lines
4.4 KiB
TypeScript
Raw Normal View History

import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
2024-01-03 15:28:03 -06:00
import {
AuthorMicrofilter,
eventToMicroFilter,
getFilterId,
IdMicrofilter,
isMicrofilter,
type MicroFilter,
} from '@/filter.ts';
2024-01-03 22:02:34 -06:00
import { type EventStore, GetEventsOpts } from '@/storages/types.ts';
import { Time } from '@/utils/time.ts';
2023-12-21 14:56:21 -06:00
interface ReqmeisterOpts {
2024-01-07 14:58:17 -06:00
client: EventStore;
2023-12-21 14:56:21 -06:00
delay?: number;
timeout?: number;
2023-12-21 14:56:21 -06:00
}
2023-12-28 13:41:04 -06:00
interface ReqmeisterReqOpts {
relays?: WebSocket['url'][];
signal?: AbortSignal;
}
2023-12-21 19:10:42 -06:00
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
2023-12-21 14:56:21 -06:00
2023-12-26 16:23:24 -06:00
/** Batches requests to Nostr relays using microfilters. */
2024-01-03 15:28:03 -06:00
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> implements EventStore {
2024-01-07 14:58:17 -06:00
#debug = Debug('ditto:reqmeister');
2023-12-21 19:10:42 -06:00
#opts: ReqmeisterOpts;
#queue: ReqmeisterQueueItem[] = [];
#promise!: Promise<void>;
#resolve!: () => void;
2023-12-21 14:56:21 -06:00
2024-01-03 21:39:54 -06:00
supportedNips = [];
2024-01-07 14:58:17 -06:00
constructor(opts: ReqmeisterOpts) {
2023-12-21 19:10:42 -06:00
super();
2023-12-21 14:56:21 -06:00
this.#opts = opts;
this.#tick();
2023-12-21 14:56:21 -06:00
this.#perform();
}
#tick() {
2023-12-21 19:10:42 -06:00
this.#resolve?.();
this.#promise = new Promise((resolve) => {
this.#resolve = resolve;
});
}
2023-12-21 14:56:21 -06:00
async #perform() {
2024-01-07 14:58:17 -06:00
const { client, delay, timeout = Time.seconds(1) } = this.#opts;
2023-12-21 19:10:42 -06:00
await new Promise((resolve) => setTimeout(resolve, delay));
2023-12-21 14:56:21 -06:00
2023-12-21 19:10:42 -06:00
const queue = this.#queue;
this.#queue = [];
const wantedEvents = new Set<Event['id']>();
const wantedAuthors = new Set<Event['pubkey']>();
2023-12-21 14:56:21 -06:00
2023-12-21 19:10:42 -06:00
// TODO: batch by relays.
for (const [_filterId, filter, _relays] of queue) {
if ('ids' in filter) {
filter.ids.forEach((id) => wantedEvents.add(id));
} else {
wantedAuthors.add(filter.authors[0]);
2023-12-21 14:56:21 -06:00
}
}
2023-12-21 19:10:42 -06:00
const filters: Filter[] = [];
2023-12-21 14:56:21 -06:00
2023-12-21 19:10:42 -06:00
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
2023-12-21 14:56:21 -06:00
if (filters.length) {
2024-01-07 14:58:17 -06:00
this.#debug('REQ', JSON.stringify(filters));
2023-12-29 13:22:51 -06:00
const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) });
2023-12-21 14:56:21 -06:00
for (const event of events) {
this.encounter(event);
}
2023-12-21 14:56:21 -06:00
}
this.#tick();
2023-12-21 14:56:21 -06:00
this.#perform();
}
2023-12-28 13:41:04 -06:00
req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event<0>>;
req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<Event> {
const {
relays = [],
signal = AbortSignal.timeout(this.#opts.timeout ?? 1000),
} = opts;
if (signal.aborted) {
return Promise.reject(new DOMException('Aborted', 'AbortError'));
}
2023-12-28 13:41:04 -06:00
2023-12-21 19:10:42 -06:00
const filterId = getFilterId(filter);
2023-12-28 13:41:04 -06:00
2023-12-21 19:10:42 -06:00
this.#queue.push([filterId, filter, relays]);
2023-12-28 13:41:04 -06:00
2023-12-21 19:10:42 -06:00
return new Promise<Event>((resolve, reject) => {
const handleEvent = (event: Event) => {
resolve(event);
this.removeListener(filterId, handleEvent);
};
const handleAbort = () => {
reject(new DOMException('Aborted', 'AbortError'));
this.removeListener(filterId, resolve);
signal.removeEventListener('abort', handleAbort);
};
this.once(filterId, handleEvent);
signal.addEventListener('abort', handleAbort, { once: true });
2023-12-21 19:10:42 -06:00
});
2023-12-21 14:56:21 -06:00
}
encounter(event: Event): void {
2023-12-21 19:10:42 -06:00
const filterId = getFilterId(eventToMicroFilter(event));
this.#queue = this.#queue.filter(([id]) => id !== filterId);
this.emit(filterId, event);
2023-12-21 14:56:21 -06:00
}
isWanted(event: Event): boolean {
2023-12-21 19:10:42 -06:00
const filterId = getFilterId(eventToMicroFilter(event));
return this.#queue.some(([id]) => id === filterId);
2023-12-21 14:56:21 -06:00
}
2024-01-03 15:28:03 -06:00
getEvents<K extends number>(filters: Filter<K>[], opts?: GetEventsOpts | undefined): Promise<Event<K>[]> {
if (opts?.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
const promises = filters.reduce<Promise<Event<K>>[]>((result, filter) => {
if (isMicrofilter(filter)) {
result.push(this.req(filter) as Promise<Event<K>>);
}
return result;
}, []);
return Promise.all(promises);
}
storeEvent(event: Event): Promise<void> {
this.encounter(event);
return Promise.resolve();
}
countEvents(_filters: Filter[]): Promise<number> {
throw new Error('COUNT not implemented.');
}
deleteEvents(_filters: Filter[]): Promise<void> {
throw new Error('DELETE not implemented.');
}
2023-12-21 14:56:21 -06:00
}
export { Reqmeister };