13 KiB
13 KiB
diff --git a/src/client.ts b/src/client.ts
index 970a077..3cf2e8a 100644
--- a/src/client.ts
+++ b/src/client.ts
@@ -14,7 +14,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
const unsub = pool.subscribe(
filters,
- activeRelays,
+ opts.relays ?? activeRelays,
(event: Event | null) => {
if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {});
diff --git a/src/common.ts b/src/common.ts
new file mode 100644
index 0000000..0424b52
--- /dev/null
+++ b/src/common.ts
@@ -0,0 +1,9 @@
+import { Reqmeister } from '@/reqmeister.ts';
+import { Time } from '@/utils/time.ts';
+
+const reqmeister = new Reqmeister({
+ delay: Time.seconds(1),
+ signal: AbortSignal.timeout(Time.seconds(1)),
+});
+
+export { reqmeister };
diff --git a/src/deps.ts b/src/deps.ts
index b9db9e2..4a9314b 100644
--- a/src/deps.ts
+++ b/src/deps.ts
@@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
export * as Comlink from 'npm:comlink@^4.4.1';
+export { EventEmitter } from 'npm:tseep@^1.1.3';
+export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
export type * as TypeFest from 'npm:type-fest@^4.3.0';
diff --git a/src/filter.ts b/src/filter.ts
index fb43251..38fcff7 100644
--- a/src/filter.ts
+++ b/src/filter.ts
@@ -1,5 +1,5 @@
import { Conf } from '@/config.ts';
-import { type Event, type Filter, matchFilters } from '@/deps.ts';
+import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';
import type { EventData } from '@/types.ts';
@@ -14,12 +14,17 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
relations?: Relation[];
}
+/** Filter to get one specific event. */
+type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };
+
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** Signal to abort the request. */
signal?: AbortSignal;
/** Event limit for the whole subscription. */
limit?: number;
+ /** Relays to use, if applicable. */
+ relays?: WebSocket['url'][];
}
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
@@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
return false;
}
-export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };
+/** Get deterministic ID for a microfilter. */
+function getFilterId(filter: MicroFilter): string {
+ if ('ids' in filter) {
+ return stringifyStable({ ids: [filter.ids] });
+ } else {
+ return stringifyStable({
+ kinds: [filter.kinds[0]],
+ authors: [filter.authors[0]],
+ });
+ }
+}
+
+/** Get a microfilter from a Nostr event. */
+function eventToMicroFilter(event: Event): MicroFilter {
+ if (event.kind === 0) {
+ return { kinds: [0], authors: [event.pubkey] };
+ } else {
+ return { ids: [event.id] };
+ }
+}
+
+export {
+ type DittoFilter,
+ eventToMicroFilter,
+ getFilterId,
+ type GetFiltersOpts,
+ matchDittoFilters,
+ type MicroFilter,
+ type Relation,
+};
diff --git a/src/pipeline.ts b/src/pipeline.ts
index adf8a84..923bf4e 100644
--- a/src/pipeline.ts
+++ b/src/pipeline.ts
@@ -1,3 +1,4 @@
+import { reqmeister } from '@/common.ts';
import { Conf } from '@/config.ts';
import * as eventsDB from '@/db/events.ts';
import { addRelays } from '@/db/relays.ts';
@@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts';
*/
async function handleEvent(event: Event): Promise<void> {
if (!(await verifySignatureWorker(event))) return;
+ const wanted = reqmeister.isWanted(event);
if (encounterEvent(event)) return;
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
const data = await getEventData(event);
await Promise.all([
- storeEvent(event, data),
+ storeEvent(event, data, { force: wanted }),
processDeletions(event),
trackRelays(event),
trackHashtags(event),
+ fetchRelatedEvents(event, data),
processMedia(event, data),
streamOut(event, data),
broadcast(event, data),
@@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise<void> {
}
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
-const encounters = new LRUCache<string, boolean>({ max: 1000 });
+const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */
-function encounterEvent(event: Event) {
+function encounterEvent(event: Event): boolean {
const result = encounters.get(event.id);
encounters.set(event.id, true);
- return result;
+ reqmeister.encounter(event);
+ return !!result;
}
/** Preload data that will be useful to several tasks. */
@@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
+interface StoreEventOpts {
+ force?: boolean;
+}
+
/** Maybe store the event, if eligible. */
-async function storeEvent(event: Event, data: EventData): Promise<void> {
+async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
if (isEphemeralKind(event.kind)) return;
+ const { force = false } = opts;
- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
+ if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
@@ -129,6 +138,18 @@ function trackRelays(event: Event) {
return addRelays([...relays]);
}
+/** Queue related events to fetch. */
+function fetchRelatedEvents(event: Event, data: EventData) {
+ if (!data.user) {
+ reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
+ }
+ for (const [name, id, relay] of event.tags) {
+ if (name === 'e' && !encounters.has(id)) {
+ reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
+ }
+ }
+}
+
/** Delete unattached media entries that are attached to the event. */
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
if (user) {
diff --git a/src/queries.ts b/src/queries.ts
index fc7365a..1ecff7b 100644
--- a/src/queries.ts
+++ b/src/queries.ts
@@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts';
import { type Event, findReplyTag } from '@/deps.ts';
import { type DittoFilter, type Relation } from '@/filter.ts';
import * as mixer from '@/mixer.ts';
+import { reqmeister } from '@/common.ts';
interface GetEventOpts<K extends number> {
/** Signal to abort the request. */
@@ -30,10 +31,10 @@ const getEvent = async <K extends number = number>(
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, signal = AbortSignal.timeout(1000) } = opts;
- const [event] = await mixer.getFilters(
+ const event = await eventsDB.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, signal },
- );
+ ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
return event;
};
diff --git a/src/reqmeister.ts b/src/reqmeister.ts
new file mode 100644
index 0000000..960151f
--- /dev/null
+++ b/src/reqmeister.ts
@@ -0,0 +1,88 @@
+import * as client from '@/client.ts';
+import { type Event, EventEmitter, type Filter } from '@/deps.ts';
+
+import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
+
+interface ReqmeisterOpts {
+ delay?: number;
+ signal?: AbortSignal;
+}
+
+type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
+
+class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
+ #opts: ReqmeisterOpts;
+ #queue: ReqmeisterQueueItem[] = [];
+ #promise!: Promise<void>;
+ #resolve!: () => void;
+
+ constructor(opts: ReqmeisterOpts = {}) {
+ super();
+ this.#opts = opts;
+ this.#cycle();
+ this.#perform();
+ }
+
+ #cycle() {
+ this.#resolve?.();
+ this.#promise = new Promise((resolve) => {
+ this.#resolve = resolve;
+ });
+ }
+
+ async #perform() {
+ const { delay } = this.#opts;
+ await new Promise((resolve) => setTimeout(resolve, delay));
+
+ const queue = this.#queue;
+ this.#queue = [];
+
+ const wantedEvents = new Set<Event['id']>();
+ const wantedAuthors = new Set<Event['pubkey']>();
+
+ // TODO: batch by relays.
+ for (const [_filterId, filter, _relays] of queue) {
+ if ('ids' in filter) {
+ filter.ids.forEach((id) => wantedEvents.add(id));
+ } else {
+ wantedAuthors.add(filter.authors[0]);
+ }
+ }
+
+ const filters: Filter[] = [];
+
+ if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
+ if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
+
+ const events = await client.getFilters(filters, { signal: this.#opts.signal });
+
+ for (const event of events) {
+ this.encounter(event);
+ }
+
+ this.#cycle();
+ this.#perform();
+ }
+
+ req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {
+ const filterId = getFilterId(filter);
+ this.#queue.push([filterId, filter, relays]);
+ return new Promise<Event>((resolve, reject) => {
+ this.once(filterId, resolve);
+ this.#promise.finally(reject);
+ });
+ }
+
+ encounter(event: Event): void {
+ const filterId = getFilterId(eventToMicroFilter(event));
+ this.#queue = this.#queue.filter(([id]) => id !== filterId);
+ this.emit(filterId, event);
+ }
+
+ isWanted(event: Event): boolean {
+ const filterId = getFilterId(eventToMicroFilter(event));
+ return this.#queue.some(([id]) => id === filterId);
+ }
+}
+
+export { Reqmeister };
index 970a077..3cf2e8a 100644
--- a/src/client.ts
+++ b/src/client.ts
@@ -14,7 +14,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
const unsub = pool.subscribe(
filters,
- activeRelays,
+ opts.relays ?? activeRelays,
(event: Event | null) => {
if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {});
diff --git a/src/common.ts b/src/common.ts
new file mode 100644
index 0000000..0424b52
--- /dev/null
+++ b/src/common.ts
@@ -0,0 +1,9 @@
+import { Reqmeister } from '@/reqmeister.ts';
+import { Time } from '@/utils/time.ts';
+
+const reqmeister = new Reqmeister({
+ delay: Time.seconds(1),
+ signal: AbortSignal.timeout(Time.seconds(1)),
+});
+
+export { reqmeister };
diff --git a/src/deps.ts b/src/deps.ts
index b9db9e2..4a9314b 100644
--- a/src/deps.ts
+++ b/src/deps.ts
@@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
export * as Comlink from 'npm:comlink@^4.4.1';
+export { EventEmitter } from 'npm:tseep@^1.1.3';
+export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
export type * as TypeFest from 'npm:type-fest@^4.3.0';
diff --git a/src/filter.ts b/src/filter.ts
index fb43251..38fcff7 100644
--- a/src/filter.ts
+++ b/src/filter.ts
@@ -1,5 +1,5 @@
import { Conf } from '@/config.ts';
-import { type Event, type Filter, matchFilters } from '@/deps.ts';
+import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';
import type { EventData } from '@/types.ts';
@@ -14,12 +14,17 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
relations?: Relation[];
}
+/** Filter to get one specific event. */
+type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };
+
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** Signal to abort the request. */
signal?: AbortSignal;
/** Event limit for the whole subscription. */
limit?: number;
+ /** Relays to use, if applicable. */
+ relays?: WebSocket['url'][];
}
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
@@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
return false;
}
-export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };
+/** Get deterministic ID for a microfilter. */
+function getFilterId(filter: MicroFilter): string {
+ if ('ids' in filter) {
+ return stringifyStable({ ids: [filter.ids] });
+ } else {
+ return stringifyStable({
+ kinds: [filter.kinds[0]],
+ authors: [filter.authors[0]],
+ });
+ }
+}
+
+/** Get a microfilter from a Nostr event. */
+function eventToMicroFilter(event: Event): MicroFilter {
+ if (event.kind === 0) {
+ return { kinds: [0], authors: [event.pubkey] };
+ } else {
+ return { ids: [event.id] };
+ }
+}
+
+export {
+ type DittoFilter,
+ eventToMicroFilter,
+ getFilterId,
+ type GetFiltersOpts,
+ matchDittoFilters,
+ type MicroFilter,
+ type Relation,
+};
diff --git a/src/pipeline.ts b/src/pipeline.ts
index adf8a84..923bf4e 100644
--- a/src/pipeline.ts
+++ b/src/pipeline.ts
@@ -1,3 +1,4 @@
+import { reqmeister } from '@/common.ts';
import { Conf } from '@/config.ts';
import * as eventsDB from '@/db/events.ts';
import { addRelays } from '@/db/relays.ts';
@@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts';
*/
async function handleEvent(event: Event): Promise<void> {
if (!(await verifySignatureWorker(event))) return;
+ const wanted = reqmeister.isWanted(event);
if (encounterEvent(event)) return;
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
const data = await getEventData(event);
await Promise.all([
- storeEvent(event, data),
+ storeEvent(event, data, { force: wanted }),
processDeletions(event),
trackRelays(event),
trackHashtags(event),
+ fetchRelatedEvents(event, data),
processMedia(event, data),
streamOut(event, data),
broadcast(event, data),
@@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise<void> {
}
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
-const encounters = new LRUCache<string, boolean>({ max: 1000 });
+const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */
-function encounterEvent(event: Event) {
+function encounterEvent(event: Event): boolean {
const result = encounters.get(event.id);
encounters.set(event.id, true);
- return result;
+ reqmeister.encounter(event);
+ return !!result;
}
/** Preload data that will be useful to several tasks. */
@@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
+interface StoreEventOpts {
+ force?: boolean;
+}
+
/** Maybe store the event, if eligible. */
-async function storeEvent(event: Event, data: EventData): Promise<void> {
+async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
if (isEphemeralKind(event.kind)) return;
+ const { force = false } = opts;
- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
+ if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
@@ -129,6 +138,18 @@ function trackRelays(event: Event) {
return addRelays([...relays]);
}
+/** Queue related events to fetch. */
+function fetchRelatedEvents(event: Event, data: EventData) {
+ if (!data.user) {
+ reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
+ }
+ for (const [name, id, relay] of event.tags) {
+ if (name === 'e' && !encounters.has(id)) {
+ reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
+ }
+ }
+}
+
/** Delete unattached media entries that are attached to the event. */
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
if (user) {
diff --git a/src/queries.ts b/src/queries.ts
index fc7365a..1ecff7b 100644
--- a/src/queries.ts
+++ b/src/queries.ts
@@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts';
import { type Event, findReplyTag } from '@/deps.ts';
import { type DittoFilter, type Relation } from '@/filter.ts';
import * as mixer from '@/mixer.ts';
+import { reqmeister } from '@/common.ts';
interface GetEventOpts<K extends number> {
/** Signal to abort the request. */
@@ -30,10 +31,10 @@ const getEvent = async <K extends number = number>(
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, signal = AbortSignal.timeout(1000) } = opts;
- const [event] = await mixer.getFilters(
+ const event = await eventsDB.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, signal },
- );
+ ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
return event;
};
diff --git a/src/reqmeister.ts b/src/reqmeister.ts
new file mode 100644
index 0000000..960151f
--- /dev/null
+++ b/src/reqmeister.ts
@@ -0,0 +1,88 @@
+import * as client from '@/client.ts';
+import { type Event, EventEmitter, type Filter } from '@/deps.ts';
+
+import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
+
+interface ReqmeisterOpts {
+ delay?: number;
+ signal?: AbortSignal;
+}
+
+type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
+
+class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
+ #opts: ReqmeisterOpts;
+ #queue: ReqmeisterQueueItem[] = [];
+ #promise!: Promise<void>;
+ #resolve!: () => void;
+
+ constructor(opts: ReqmeisterOpts = {}) {
+ super();
+ this.#opts = opts;
+ this.#cycle();
+ this.#perform();
+ }
+
+ #cycle() {
+ this.#resolve?.();
+ this.#promise = new Promise((resolve) => {
+ this.#resolve = resolve;
+ });
+ }
+
+ async #perform() {
+ const { delay } = this.#opts;
+ await new Promise((resolve) => setTimeout(resolve, delay));
+
+ const queue = this.#queue;
+ this.#queue = [];
+
+ const wantedEvents = new Set<Event['id']>();
+ const wantedAuthors = new Set<Event['pubkey']>();
+
+ // TODO: batch by relays.
+ for (const [_filterId, filter, _relays] of queue) {
+ if ('ids' in filter) {
+ filter.ids.forEach((id) => wantedEvents.add(id));
+ } else {
+ wantedAuthors.add(filter.authors[0]);
+ }
+ }
+
+ const filters: Filter[] = [];
+
+ if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
+ if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
+
+ const events = await client.getFilters(filters, { signal: this.#opts.signal });
+
+ for (const event of events) {
+ this.encounter(event);
+ }
+
+ this.#cycle();
+ this.#perform();
+ }
+
+ req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {
+ const filterId = getFilterId(filter);
+ this.#queue.push([filterId, filter, relays]);
+ return new Promise<Event>((resolve, reject) => {
+ this.once(filterId, resolve);
+ this.#promise.finally(reject);
+ });
+ }
+
+ encounter(event: Event): void {
+ const filterId = getFilterId(eventToMicroFilter(event));
+ this.#queue = this.#queue.filter(([id]) => id !== filterId);
+ this.emit(filterId, event);
+ }
+
+ isWanted(event: Event): boolean {
+ const filterId = getFilterId(eventToMicroFilter(event));
+ return this.#queue.some(([id]) => id === filterId);
+ }
+}
+
+export { Reqmeister };