diff --git a/tringifyStable from npm:fast-stable-stringify b/tringifyStable from npm:fast-stable-stringify deleted file mode 100644 index edb0958..0000000 --- a/tringifyStable from npm:fast-stable-stringify +++ /dev/null @@ -1,307 +0,0 @@ -diff --git a/src/client.ts b/src/client.ts -index 970a077..3cf2e8a 100644 ---- a/src/client.ts -+++ b/src/client.ts -@@ -14,7 +14,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts -  - const unsub = pool.subscribe( - filters, -- activeRelays, -+ opts.relays ?? activeRelays, - (event: Event | null) => { - if (event && matchFilters(filters, event)) { - pipeline.handleEvent(event).catch(() => {}); -diff --git a/src/common.ts b/src/common.ts -new file mode 100644 -index 0000000..0424b52 ---- /dev/null -+++ b/src/common.ts -@@ -0,0 +1,9 @@ -+import { Reqmeister } from '@/reqmeister.ts'; -+import { Time } from '@/utils/time.ts'; -+ -+const reqmeister = new Reqmeister({ -+ delay: Time.seconds(1), -+ signal: AbortSignal.timeout(Time.seconds(1)), -+}); -+ -+export { reqmeister }; -diff --git a/src/deps.ts b/src/deps.ts -index b9db9e2..4a9314b 100644 ---- a/src/deps.ts -+++ b/src/deps.ts -@@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1 - export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; - export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; - export * as Comlink from 'npm:comlink@^4.4.1'; -+export { EventEmitter } from 'npm:tseep@^1.1.3'; -+export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0'; -  - export type * as TypeFest from 'npm:type-fest@^4.3.0'; -diff --git a/src/filter.ts b/src/filter.ts -index fb43251..38fcff7 100644 ---- a/src/filter.ts -+++ b/src/filter.ts -@@ -1,5 +1,5 @@ - import { Conf } from '@/config.ts'; --import { type Event, type Filter, matchFilters } from '@/deps.ts'; -+import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts'; -  - import type { EventData } from '@/types.ts'; -  -@@ -14,12 +14,17 @@ interface DittoFilter extends Filter { - relations?: Relation[]; - } -  -+/** Filter to get one specific event. */ -+type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; -+ - /** Additional options to apply to the whole subscription. */ - interface GetFiltersOpts { - /** Signal to abort the request. */ - signal?: AbortSignal; - /** Event limit for the whole subscription. */ - limit?: number; -+ /** Relays to use, if applicable. */ -+ relays?: WebSocket['url'][]; - } -  - function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { -@@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData - return false; - } -  --export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; -+/** Get deterministic ID for a microfilter. */ -+function getFilterId(filter: MicroFilter): string { -+ if ('ids' in filter) { -+ return stringifyStable({ ids: [filter.ids] }); -+ } else { -+ return stringifyStable({ -+ kinds: [filter.kinds[0]], -+ authors: [filter.authors[0]], -+ }); -+ } -+} -+ -+/** Get a microfilter from a Nostr event. */ -+function eventToMicroFilter(event: Event): MicroFilter { -+ if (event.kind === 0) { -+ return { kinds: [0], authors: [event.pubkey] }; -+ } else { -+ return { ids: [event.id] }; -+ } -+} -+ -+export { -+ type DittoFilter, -+ eventToMicroFilter, -+ getFilterId, -+ type GetFiltersOpts, -+ matchDittoFilters, -+ type MicroFilter, -+ type Relation, -+}; -diff --git a/src/pipeline.ts b/src/pipeline.ts -index adf8a84..923bf4e 100644 ---- a/src/pipeline.ts -+++ b/src/pipeline.ts -@@ -1,3 +1,4 @@ -+import { reqmeister } from '@/common.ts'; - import { Conf } from '@/config.ts'; - import * as eventsDB from '@/db/events.ts'; - import { addRelays } from '@/db/relays.ts'; -@@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts'; - */ - async function handleEvent(event: Event): Promise { - if (!(await verifySignatureWorker(event))) return; -+ const wanted = reqmeister.isWanted(event); - if (encounterEvent(event)) return; - console.info(`pipeline: Event<${event.kind}> ${event.id}`); - const data = await getEventData(event); -  - await Promise.all([ -- storeEvent(event, data), -+ storeEvent(event, data, { force: wanted }), - processDeletions(event), - trackRelays(event), - trackHashtags(event), -+ fetchRelatedEvents(event, data), - processMedia(event, data), - streamOut(event, data), - broadcast(event, data), -@@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise { - } -  - /** Tracks encountered events to skip duplicates, improving idempotency and performance. */ --const encounters = new LRUCache({ max: 1000 }); -+const encounters = new LRUCache({ max: 1000 }); -  - /** Encounter the event, and return whether it has already been encountered. */ --function encounterEvent(event: Event) { -+function encounterEvent(event: Event): boolean { - const result = encounters.get(event.id); - encounters.set(event.id, true); -- return result; -+ reqmeister.encounter(event); -+ return !!result; - } -  - /** Preload data that will be useful to several tasks. */ -@@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise { - /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ - const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; -  -+interface StoreEventOpts { -+ force?: boolean; -+} -+ - /** Maybe store the event, if eligible. */ --async function storeEvent(event: Event, data: EventData): Promise { -+async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { - if (isEphemeralKind(event.kind)) return; -+ const { force = false } = opts; -  -- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { -+ if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - const [deletion] = await mixer.getFilters( - [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], - { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, -@@ -129,6 +138,18 @@ function trackRelays(event: Event) { - return addRelays([...relays]); - } -  -+/** Queue related events to fetch. */ -+function fetchRelatedEvents(event: Event, data: EventData) { -+ if (!data.user) { -+ reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); -+ } -+ for (const [name, id, relay] of event.tags) { -+ if (name === 'e' && !encounters.has(id)) { -+ reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); -+ } -+ } -+} -+ - /** Delete unattached media entries that are attached to the event. */ - function processMedia({ tags, pubkey }: Event, { user }: EventData) { - if (user) { -diff --git a/src/queries.ts b/src/queries.ts -index fc7365a..1ecff7b 100644 ---- a/src/queries.ts -+++ b/src/queries.ts -@@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; - import { type Event, findReplyTag } from '@/deps.ts'; - import { type DittoFilter, type Relation } from '@/filter.ts'; - import * as mixer from '@/mixer.ts'; -+import { reqmeister } from '@/common.ts'; -  - interface GetEventOpts { - /** Signal to abort the request. */ -@@ -30,10 +31,10 @@ const getEvent = async ( - const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const { relations, signal = AbortSignal.timeout(1000) } = opts; -  -- const [event] = await mixer.getFilters( -+ const event = await eventsDB.getFilters( - [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], - { limit: 1, signal }, -- ); -+ ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); -  - return event; - }; -diff --git a/src/reqmeister.ts b/src/reqmeister.ts -new file mode 100644 -index 0000000..960151f ---- /dev/null -+++ b/src/reqmeister.ts -@@ -0,0 +1,88 @@ -+import * as client from '@/client.ts'; -+import { type Event, EventEmitter, type Filter } from '@/deps.ts'; -+ -+import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; -+ -+interface ReqmeisterOpts { -+ delay?: number; -+ signal?: AbortSignal; -+} -+ -+type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; -+ -+class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { -+ #opts: ReqmeisterOpts; -+ #queue: ReqmeisterQueueItem[] = []; -+ #promise!: Promise; -+ #resolve!: () => void; -+ -+ constructor(opts: ReqmeisterOpts = {}) { -+ super(); -+ this.#opts = opts; -+ this.#cycle(); -+ this.#perform(); -+ } -+ -+ #cycle() { -+ this.#resolve?.(); -+ this.#promise = new Promise((resolve) => { -+ this.#resolve = resolve; -+ }); -+ } -+ -+ async #perform() { -+ const { delay } = this.#opts; -+ await new Promise((resolve) => setTimeout(resolve, delay)); -+ -+ const queue = this.#queue; -+ this.#queue = []; -+ -+ const wantedEvents = new Set(); -+ const wantedAuthors = new Set(); -+ -+ // TODO: batch by relays. -+ for (const [_filterId, filter, _relays] of queue) { -+ if ('ids' in filter) { -+ filter.ids.forEach((id) => wantedEvents.add(id)); -+ } else { -+ wantedAuthors.add(filter.authors[0]); -+ } -+ } -+ -+ const filters: Filter[] = []; -+ -+ if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); -+ if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); -+ -+ const events = await client.getFilters(filters, { signal: this.#opts.signal }); -+ -+ for (const event of events) { -+ this.encounter(event); -+ } -+ -+ this.#cycle(); -+ this.#perform(); -+ } -+ -+ req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { -+ const filterId = getFilterId(filter); -+ this.#queue.push([filterId, filter, relays]); -+ return new Promise((resolve, reject) => { -+ this.once(filterId, resolve); -+ this.#promise.finally(reject); -+ }); -+ } -+ -+ encounter(event: Event): void { -+ const filterId = getFilterId(eventToMicroFilter(event)); -+ this.#queue = this.#queue.filter(([id]) => id !== filterId); -+ this.emit(filterId, event); -+ } -+ -+ isWanted(event: Event): boolean { -+ const filterId = getFilterId(eventToMicroFilter(event)); -+ return this.#queue.some(([id]) => id === filterId); -+ } -+} -+ -+export { Reqmeister };