diff --git a/src/client.ts b/src/client.ts index 970a077..3cf2e8a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -14,7 +14,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts  const unsub = pool.subscribe( filters, - activeRelays, + opts.relays ?? activeRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); diff --git a/src/common.ts b/src/common.ts new file mode 100644 index 0000000..0424b52 --- /dev/null +++ b/src/common.ts @@ -0,0 +1,9 @@ +import { Reqmeister } from '@/reqmeister.ts'; +import { Time } from '@/utils/time.ts'; + +const reqmeister = new Reqmeister({ + delay: Time.seconds(1), + signal: AbortSignal.timeout(Time.seconds(1)), +}); + +export { reqmeister }; diff --git a/src/deps.ts b/src/deps.ts index b9db9e2..4a9314b 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1 export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; export * as Comlink from 'npm:comlink@^4.4.1'; +export { EventEmitter } from 'npm:tseep@^1.1.3'; +export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';  export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/filter.ts b/src/filter.ts index fb43251..38fcff7 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,5 +1,5 @@ import { Conf } from '@/config.ts'; -import { type Event, type Filter, matchFilters } from '@/deps.ts'; +import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';  import type { EventData } from '@/types.ts';  @@ -14,12 +14,17 @@ interface DittoFilter extends Filter { relations?: Relation[]; }  +/** Filter to get one specific event. */ +type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; + /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { /** Signal to abort the request. */ signal?: AbortSignal; /** Event limit for the whole subscription. */ limit?: number; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; }  function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { @@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData return false; }  -export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; +/** Get deterministic ID for a microfilter. */ +function getFilterId(filter: MicroFilter): string { + if ('ids' in filter) { + return stringifyStable({ ids: [filter.ids] }); + } else { + return stringifyStable({ + kinds: [filter.kinds[0]], + authors: [filter.authors[0]], + }); + } +} + +/** Get a microfilter from a Nostr event. */ +function eventToMicroFilter(event: Event): MicroFilter { + if (event.kind === 0) { + return { kinds: [0], authors: [event.pubkey] }; + } else { + return { ids: [event.id] }; + } +} + +export { + type DittoFilter, + eventToMicroFilter, + getFilterId, + type GetFiltersOpts, + matchDittoFilters, + type MicroFilter, + type Relation, +}; diff --git a/src/pipeline.ts b/src/pipeline.ts index adf8a84..923bf4e 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,3 +1,4 @@ +import { reqmeister } from '@/common.ts'; import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; @@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts'; */ async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; + const wanted = reqmeister.isWanted(event); if (encounterEvent(event)) return; console.info(`pipeline: Event<${event.kind}> ${event.id}`); const data = await getEventData(event);  await Promise.all([ - storeEvent(event, data), + storeEvent(event, data, { force: wanted }), processDeletions(event), trackRelays(event), trackHashtags(event), + fetchRelatedEvents(event, data), processMedia(event, data), streamOut(event, data), broadcast(event, data), @@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise { }  /** Tracks encountered events to skip duplicates, improving idempotency and performance. */ -const encounters = new LRUCache({ max: 1000 }); +const encounters = new LRUCache({ max: 1000 });  /** Encounter the event, and return whether it has already been encountered. */ -function encounterEvent(event: Event) { +function encounterEvent(event: Event): boolean { const result = encounters.get(event.id); encounters.set(event.id, true); - return result; + reqmeister.encounter(event); + return !!result; }  /** Preload data that will be useful to several tasks. */ @@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise { /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;  +interface StoreEventOpts { + force?: boolean; +} + /** Maybe store the event, if eligible. */ -async function storeEvent(event: Event, data: EventData): Promise { +async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { if (isEphemeralKind(event.kind)) return; + const { force = false } = opts;  - if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { + if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const [deletion] = await mixer.getFilters( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, @@ -129,6 +138,18 @@ function trackRelays(event: Event) { return addRelays([...relays]); }  +/** Queue related events to fetch. */ +function fetchRelatedEvents(event: Event, data: EventData) { + if (!data.user) { + reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); + } + for (const [name, id, relay] of event.tags) { + if (name === 'e' && !encounters.has(id)) { + reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); + } + } +} + /** Delete unattached media entries that are attached to the event. */ function processMedia({ tags, pubkey }: Event, { user }: EventData) { if (user) { diff --git a/src/queries.ts b/src/queries.ts index fc7365a..1ecff7b 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; import { type DittoFilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; +import { reqmeister } from '@/common.ts';  interface GetEventOpts { /** Signal to abort the request. */ @@ -30,10 +31,10 @@ const getEvent = async ( const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { const { relations, signal = AbortSignal.timeout(1000) } = opts;  - const [event] = await mixer.getFilters( + const event = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], { limit: 1, signal }, - ); + ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});  return event; }; diff --git a/src/reqmeister.ts b/src/reqmeister.ts new file mode 100644 index 0000000..960151f --- /dev/null +++ b/src/reqmeister.ts @@ -0,0 +1,88 @@ +import * as client from '@/client.ts'; +import { type Event, EventEmitter, type Filter } from '@/deps.ts'; + +import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; + +interface ReqmeisterOpts { + delay?: number; + signal?: AbortSignal; +} + +type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; + +class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { + #opts: ReqmeisterOpts; + #queue: ReqmeisterQueueItem[] = []; + #promise!: Promise; + #resolve!: () => void; + + constructor(opts: ReqmeisterOpts = {}) { + super(); + this.#opts = opts; + this.#cycle(); + this.#perform(); + } + + #cycle() { + this.#resolve?.(); + this.#promise = new Promise((resolve) => { + this.#resolve = resolve; + }); + } + + async #perform() { + const { delay } = this.#opts; + await new Promise((resolve) => setTimeout(resolve, delay)); + + const queue = this.#queue; + this.#queue = []; + + const wantedEvents = new Set(); + const wantedAuthors = new Set(); + + // TODO: batch by relays. + for (const [_filterId, filter, _relays] of queue) { + if ('ids' in filter) { + filter.ids.forEach((id) => wantedEvents.add(id)); + } else { + wantedAuthors.add(filter.authors[0]); + } + } + + const filters: Filter[] = []; + + if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); + if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); + + const events = await client.getFilters(filters, { signal: this.#opts.signal }); + + for (const event of events) { + this.encounter(event); + } + + this.#cycle(); + this.#perform(); + } + + req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { + const filterId = getFilterId(filter); + this.#queue.push([filterId, filter, relays]); + return new Promise((resolve, reject) => { + this.once(filterId, resolve); + this.#promise.finally(reject); + }); + } + + encounter(event: Event): void { + const filterId = getFilterId(eventToMicroFilter(event)); + this.#queue = this.#queue.filter(([id]) => id !== filterId); + this.emit(filterId, event); + } + + isWanted(event: Event): boolean { + const filterId = getFilterId(eventToMicroFilter(event)); + return this.#queue.some(([id]) => id === filterId); + } +} + +export { Reqmeister };