From e5e737faae58a470e7f1fa8675ed2025bd427d59 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 26 Dec 2023 16:23:24 -0600 Subject: [PATCH] Reqmeister: add a jsdoc --- src/reqmeister.ts | 1 + tringifyStable from npm:fast-stable-stringify | 307 ++++++++++++++++++ 2 files changed, 308 insertions(+) create mode 100644 tringifyStable from npm:fast-stable-stringify diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 960151f..7830b84 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -10,6 +10,7 @@ interface ReqmeisterOpts { type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; +/** Batches requests to Nostr relays using microfilters. */ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { #opts: ReqmeisterOpts; #queue: ReqmeisterQueueItem[] = []; diff --git a/tringifyStable from npm:fast-stable-stringify b/tringifyStable from npm:fast-stable-stringify new file mode 100644 index 0000000..edb0958 --- /dev/null +++ b/tringifyStable from npm:fast-stable-stringify @@ -0,0 +1,307 @@ +diff --git a/src/client.ts b/src/client.ts +index 970a077..3cf2e8a 100644 +--- a/src/client.ts ++++ b/src/client.ts +@@ -14,7 +14,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts +  + const unsub = pool.subscribe( + filters, +- activeRelays, ++ opts.relays ?? activeRelays, + (event: Event | null) => { + if (event && matchFilters(filters, event)) { + pipeline.handleEvent(event).catch(() => {}); +diff --git a/src/common.ts b/src/common.ts +new file mode 100644 +index 0000000..0424b52 +--- /dev/null ++++ b/src/common.ts +@@ -0,0 +1,9 @@ ++import { Reqmeister } from '@/reqmeister.ts'; ++import { Time } from '@/utils/time.ts'; ++ ++const reqmeister = new Reqmeister({ ++ delay: Time.seconds(1), ++ signal: AbortSignal.timeout(Time.seconds(1)), ++}); ++ ++export { reqmeister }; +diff --git a/src/deps.ts b/src/deps.ts +index b9db9e2..4a9314b 100644 +--- a/src/deps.ts ++++ b/src/deps.ts +@@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1 + export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; + export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; + export * as Comlink from 'npm:comlink@^4.4.1'; ++export { EventEmitter } from 'npm:tseep@^1.1.3'; ++export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0'; +  + export type * as TypeFest from 'npm:type-fest@^4.3.0'; +diff --git a/src/filter.ts b/src/filter.ts +index fb43251..38fcff7 100644 +--- a/src/filter.ts ++++ b/src/filter.ts +@@ -1,5 +1,5 @@ + import { Conf } from '@/config.ts'; +-import { type Event, type Filter, matchFilters } from '@/deps.ts'; ++import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts'; +  + import type { EventData } from '@/types.ts'; +  +@@ -14,12 +14,17 @@ interface DittoFilter extends Filter { + relations?: Relation[]; + } +  ++/** Filter to get one specific event. */ ++type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; ++ + /** Additional options to apply to the whole subscription. */ + interface GetFiltersOpts { + /** Signal to abort the request. */ + signal?: AbortSignal; + /** Event limit for the whole subscription. */ + limit?: number; ++ /** Relays to use, if applicable. */ ++ relays?: WebSocket['url'][]; + } +  + function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { +@@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData + return false; + } +  +-export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; ++/** Get deterministic ID for a microfilter. */ ++function getFilterId(filter: MicroFilter): string { ++ if ('ids' in filter) { ++ return stringifyStable({ ids: [filter.ids] }); ++ } else { ++ return stringifyStable({ ++ kinds: [filter.kinds[0]], ++ authors: [filter.authors[0]], ++ }); ++ } ++} ++ ++/** Get a microfilter from a Nostr event. */ ++function eventToMicroFilter(event: Event): MicroFilter { ++ if (event.kind === 0) { ++ return { kinds: [0], authors: [event.pubkey] }; ++ } else { ++ return { ids: [event.id] }; ++ } ++} ++ ++export { ++ type DittoFilter, ++ eventToMicroFilter, ++ getFilterId, ++ type GetFiltersOpts, ++ matchDittoFilters, ++ type MicroFilter, ++ type Relation, ++}; +diff --git a/src/pipeline.ts b/src/pipeline.ts +index adf8a84..923bf4e 100644 +--- a/src/pipeline.ts ++++ b/src/pipeline.ts +@@ -1,3 +1,4 @@ ++import { reqmeister } from '@/common.ts'; + import { Conf } from '@/config.ts'; + import * as eventsDB from '@/db/events.ts'; + import { addRelays } from '@/db/relays.ts'; +@@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts'; + */ + async function handleEvent(event: Event): Promise { + if (!(await verifySignatureWorker(event))) return; ++ const wanted = reqmeister.isWanted(event); + if (encounterEvent(event)) return; + console.info(`pipeline: Event<${event.kind}> ${event.id}`); + const data = await getEventData(event); +  + await Promise.all([ +- storeEvent(event, data), ++ storeEvent(event, data, { force: wanted }), + processDeletions(event), + trackRelays(event), + trackHashtags(event), ++ fetchRelatedEvents(event, data), + processMedia(event, data), + streamOut(event, data), + broadcast(event, data), +@@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise { + } +  + /** Tracks encountered events to skip duplicates, improving idempotency and performance. */ +-const encounters = new LRUCache({ max: 1000 }); ++const encounters = new LRUCache({ max: 1000 }); +  + /** Encounter the event, and return whether it has already been encountered. */ +-function encounterEvent(event: Event) { ++function encounterEvent(event: Event): boolean { + const result = encounters.get(event.id); + encounters.set(event.id, true); +- return result; ++ reqmeister.encounter(event); ++ return !!result; + } +  + /** Preload data that will be useful to several tasks. */ +@@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise { + /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ + const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; +  ++interface StoreEventOpts { ++ force?: boolean; ++} ++ + /** Maybe store the event, if eligible. */ +-async function storeEvent(event: Event, data: EventData): Promise { ++async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { + if (isEphemeralKind(event.kind)) return; ++ const { force = false } = opts; +  +- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { ++ if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { + const [deletion] = await mixer.getFilters( + [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], + { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, +@@ -129,6 +138,18 @@ function trackRelays(event: Event) { + return addRelays([...relays]); + } +  ++/** Queue related events to fetch. */ ++function fetchRelatedEvents(event: Event, data: EventData) { ++ if (!data.user) { ++ reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); ++ } ++ for (const [name, id, relay] of event.tags) { ++ if (name === 'e' && !encounters.has(id)) { ++ reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); ++ } ++ } ++} ++ + /** Delete unattached media entries that are attached to the event. */ + function processMedia({ tags, pubkey }: Event, { user }: EventData) { + if (user) { +diff --git a/src/queries.ts b/src/queries.ts +index fc7365a..1ecff7b 100644 +--- a/src/queries.ts ++++ b/src/queries.ts +@@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; + import { type Event, findReplyTag } from '@/deps.ts'; + import { type DittoFilter, type Relation } from '@/filter.ts'; + import * as mixer from '@/mixer.ts'; ++import { reqmeister } from '@/common.ts'; +  + interface GetEventOpts { + /** Signal to abort the request. */ +@@ -30,10 +31,10 @@ const getEvent = async ( + const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { + const { relations, signal = AbortSignal.timeout(1000) } = opts; +  +- const [event] = await mixer.getFilters( ++ const event = await eventsDB.getFilters( + [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], + { limit: 1, signal }, +- ); ++ ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); +  + return event; + }; +diff --git a/src/reqmeister.ts b/src/reqmeister.ts +new file mode 100644 +index 0000000..960151f +--- /dev/null ++++ b/src/reqmeister.ts +@@ -0,0 +1,88 @@ ++import * as client from '@/client.ts'; ++import { type Event, EventEmitter, type Filter } from '@/deps.ts'; ++ ++import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; ++ ++interface ReqmeisterOpts { ++ delay?: number; ++ signal?: AbortSignal; ++} ++ ++type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; ++ ++class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { ++ #opts: ReqmeisterOpts; ++ #queue: ReqmeisterQueueItem[] = []; ++ #promise!: Promise; ++ #resolve!: () => void; ++ ++ constructor(opts: ReqmeisterOpts = {}) { ++ super(); ++ this.#opts = opts; ++ this.#cycle(); ++ this.#perform(); ++ } ++ ++ #cycle() { ++ this.#resolve?.(); ++ this.#promise = new Promise((resolve) => { ++ this.#resolve = resolve; ++ }); ++ } ++ ++ async #perform() { ++ const { delay } = this.#opts; ++ await new Promise((resolve) => setTimeout(resolve, delay)); ++ ++ const queue = this.#queue; ++ this.#queue = []; ++ ++ const wantedEvents = new Set(); ++ const wantedAuthors = new Set(); ++ ++ // TODO: batch by relays. ++ for (const [_filterId, filter, _relays] of queue) { ++ if ('ids' in filter) { ++ filter.ids.forEach((id) => wantedEvents.add(id)); ++ } else { ++ wantedAuthors.add(filter.authors[0]); ++ } ++ } ++ ++ const filters: Filter[] = []; ++ ++ if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); ++ if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); ++ ++ const events = await client.getFilters(filters, { signal: this.#opts.signal }); ++ ++ for (const event of events) { ++ this.encounter(event); ++ } ++ ++ this.#cycle(); ++ this.#perform(); ++ } ++ ++ req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { ++ const filterId = getFilterId(filter); ++ this.#queue.push([filterId, filter, relays]); ++ return new Promise((resolve, reject) => { ++ this.once(filterId, resolve); ++ this.#promise.finally(reject); ++ }); ++ } ++ ++ encounter(event: Event): void { ++ const filterId = getFilterId(eventToMicroFilter(event)); ++ this.#queue = this.#queue.filter(([id]) => id !== filterId); ++ this.emit(filterId, event); ++ } ++ ++ isWanted(event: Event): boolean { ++ const filterId = getFilterId(eventToMicroFilter(event)); ++ return this.#queue.some(([id]) => id === filterId); ++ } ++} ++ ++export { Reqmeister };