Merge branch 'pool-req' into 'main'

PoolStore: add req method

See merge request soapbox-pub/ditto!214
This commit is contained in:
Alex Gleason 2024-05-03 18:42:35 +00:00
commit 60116ee651
6 changed files with 86 additions and 96 deletions

View File

@ -19,7 +19,7 @@
"@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@db/sqlite": "jsr:@db/sqlite@^0.11.1",
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.15.0", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.17.1",
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0",

View File

@ -1,9 +1,8 @@
import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
import stringifyStable from 'fast-stable-stringify'; import stringifyStable from 'fast-stable-stringify';
import { getFilterLimit } from 'nostr-tools';
import { z } from 'zod'; import { z } from 'zod';
import { isReplaceableKind } from '@/kinds.ts';
/** Microfilter to get one specific event by ID. */ /** Microfilter to get one specific event by ID. */
type IdMicrofilter = { ids: [NostrEvent['id']] }; type IdMicrofilter = { ids: [NostrEvent['id']] };
/** Microfilter to get an author. */ /** Microfilter to get an author. */
@ -50,22 +49,6 @@ function isMicrofilter(filter: NostrFilter): filter is MicroFilter {
return microFilterSchema.safeParse(filter).success; return microFilterSchema.safeParse(filter).success;
} }
/** Calculate the intrinsic limit of a filter. */
function getFilterLimit(filter: NostrFilter): number {
if (filter.ids && !filter.ids.length) return 0;
if (filter.kinds && !filter.kinds.length) return 0;
if (filter.authors && !filter.authors.length) return 0;
return Math.min(
Math.max(0, filter.limit ?? Infinity),
filter.ids?.length ?? Infinity,
filter.authors?.length &&
filter.kinds?.every((kind) => isReplaceableKind(kind))
? filter.authors.length * filter.kinds.length
: Infinity,
);
}
/** Returns true if the filter could potentially return any stored events at all. */ /** Returns true if the filter could potentially return any stored events at all. */
function canFilter(filter: NostrFilter): boolean { function canFilter(filter: NostrFilter): boolean {
return getFilterLimit(filter) > 0; return getFilterLimit(filter) > 0;

View File

@ -59,7 +59,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
processDeletions(event, signal), processDeletions(event, signal),
DVM.event(event), DVM.event(event),
trackHashtags(event), trackHashtags(event),
fetchRelatedEvents(event, signal), fetchRelatedEvents(event),
processMedia(event), processMedia(event),
payZap(event, signal), payZap(event, signal),
streamOut(event), streamOut(event),
@ -182,16 +182,22 @@ async function trackHashtags(event: NostrEvent): Promise<void> {
} }
/** Queue related events to fetch. */ /** Queue related events to fetch. */
async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { async function fetchRelatedEvents(event: DittoEvent) {
if (!event.user) { if (!event.author) {
Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); const signal = AbortSignal.timeout(3000);
Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal })
.then((events) => events.forEach((event) => handleEvent(event, signal)))
.catch(() => {});
} }
for (const [name, id, relay] of event.tags) { for (const [name, id] of event.tags) {
if (name === 'e') { if (name === 'e') {
const { count } = await Storages.cache.count([{ ids: [id] }]); const { count } = await Storages.cache.count([{ ids: [id] }]);
if (!count) { if (!count) {
Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); const signal = AbortSignal.timeout(3000);
Storages.reqmeister.query([{ ids: [id] }], { signal })
.then((events) => events.forEach((event) => handleEvent(event, signal)))
.catch(() => {});
} }
} }
} }

View File

@ -1,7 +1,6 @@
import { NCache } from '@nostrify/nostrify'; import { NCache } from '@nostrify/nostrify';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { db } from '@/db.ts'; import { db } from '@/db.ts';
import * as pipeline from '@/pipeline.ts';
import { activeRelays, pool } from '@/pool.ts'; import { activeRelays, pool } from '@/pool.ts';
import { EventsDB } from '@/storages/events-db.ts'; import { EventsDB } from '@/storages/events-db.ts';
import { Optimizer } from '@/storages/optimizer.ts'; import { Optimizer } from '@/storages/optimizer.ts';
@ -52,7 +51,6 @@ export class Storages {
this._client = new PoolStore({ this._client = new PoolStore({
pool, pool,
relays: activeRelays, relays: activeRelays,
publisher: pipeline,
}); });
} }
return this._client; return this._client;

View File

@ -1,34 +1,35 @@
import { NostrEvent, NostrFilter, NSet, NStore } from '@nostrify/nostrify'; import {
NostrEvent,
NostrFilter,
NostrRelayCLOSED,
NostrRelayEOSE,
NostrRelayEVENT,
NRelay,
NSet,
} from '@nostrify/nostrify';
import { Machina } from '@nostrify/nostrify/utils';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import { RelayPoolWorker } from 'nostr-relaypool'; import { RelayPoolWorker } from 'nostr-relaypool';
import { matchFilters } from 'nostr-tools'; import { getFilterLimit, matchFilters } from 'nostr-tools';
import { normalizeFilters } from '@/filter.ts'; import { Conf } from '@/config.ts';
import { purifyEvent } from '@/storages/hydrate.ts'; import { purifyEvent } from '@/storages/hydrate.ts';
import { abortError } from '@/utils/abort.ts'; import { abortError } from '@/utils/abort.ts';
import { getRelays } from '@/utils/outbox.ts'; import { getRelays } from '@/utils/outbox.ts';
import { Conf } from '@/config.ts';
interface PoolStoreOpts { interface PoolStoreOpts {
pool: InstanceType<typeof RelayPoolWorker>; pool: InstanceType<typeof RelayPoolWorker>;
relays: WebSocket['url'][]; relays: WebSocket['url'][];
publisher: {
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
};
} }
class PoolStore implements NStore { class PoolStore implements NRelay {
#debug = Debug('ditto:client'); private debug = Debug('ditto:client');
#pool: InstanceType<typeof RelayPoolWorker>; private pool: InstanceType<typeof RelayPoolWorker>;
#relays: WebSocket['url'][]; private relays: WebSocket['url'][];
#publisher: {
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
};
constructor(opts: PoolStoreOpts) { constructor(opts: PoolStoreOpts) {
this.#pool = opts.pool; this.pool = opts.pool;
this.#relays = opts.relays; this.relays = opts.relays;
this.#publisher = opts.publisher;
} }
async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> { async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> {
@ -40,58 +41,61 @@ class PoolStore implements NStore {
const relays = [...relaySet].slice(0, 4); const relays = [...relaySet].slice(0, 4);
event = purifyEvent(event); event = purifyEvent(event);
this.#debug('EVENT', event, relays); this.debug('EVENT', event, relays);
this.#pool.publish(event, relays); this.pool.publish(event, relays);
return Promise.resolve(); return Promise.resolve();
} }
query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> { async *req(
if (opts.signal?.aborted) return Promise.reject(abortError()); filters: NostrFilter[],
opts: { signal?: AbortSignal; limit?: number } = {},
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
this.debug('REQ', JSON.stringify(filters));
filters = normalizeFilters(filters); const uuid = crypto.randomUUID();
this.#debug('REQ', JSON.stringify(filters)); const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(opts.signal);
if (!filters.length) return Promise.resolve([]);
return new Promise((resolve, reject) => { const unsub = this.pool.subscribe(
const results = new NSet(); filters,
this.relays,
(event: NostrEvent | null) => {
if (event && matchFilters(filters, event)) {
machina.push(['EVENT', uuid, purifyEvent(event)]);
}
},
undefined,
() => {
machina.push(['EOSE', uuid]);
},
);
const unsub = this.#pool.subscribe( try {
filters, for await (const msg of machina) {
this.#relays, yield msg;
(event: NostrEvent | null) => { }
if (event && matchFilters(filters, event)) { } finally {
this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {}); unsub();
results.add({ }
id: event.id, }
kind: event.kind,
pubkey: event.pubkey,
content: event.content,
tags: event.tags,
created_at: event.created_at,
sig: event.sig,
});
}
if (typeof opts.limit === 'number' && results.size >= opts.limit) {
unsub();
resolve([...results]);
}
},
undefined,
() => {
unsub();
resolve([...results]);
},
);
const onAbort = () => { async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
unsub(); const events = new NSet();
reject(abortError());
opts.signal?.removeEventListener('abort', onAbort);
};
opts.signal?.addEventListener('abort', onAbort); const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0);
}); if (limit === 0) return [];
for await (const msg of this.req(filters, opts)) {
if (msg[0] === 'EOSE') break;
if (msg[0] === 'EVENT') events.add(msg[2]);
if (msg[0] === 'CLOSED') throw new Error('Subscription closed');
if (events.size >= limit) {
break;
}
}
return [...events];
} }
} }

View File

@ -82,7 +82,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
this.#perform(); this.#perform();
} }
req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<NostrEvent> { private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<NostrEvent> {
const { const {
relays = [], relays = [],
signal = AbortSignal.timeout(this.#opts.timeout ?? 1000), signal = AbortSignal.timeout(this.#opts.timeout ?? 1000),
@ -120,12 +120,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
return Promise.resolve(); return Promise.resolve();
} }
isWanted(event: NostrEvent): boolean { async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
const filterId = getFilterId(eventToMicroFilter(event));
return this.#queue.some(([id]) => id === filterId);
}
query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
if (opts?.signal?.aborted) return Promise.reject(abortError()); if (opts?.signal?.aborted) return Promise.reject(abortError());
this.#debug('REQ', JSON.stringify(filters)); this.#debug('REQ', JSON.stringify(filters));
@ -133,12 +128,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent)
const promises = filters.reduce<Promise<NostrEvent>[]>((result, filter) => { const promises = filters.reduce<Promise<NostrEvent>[]>((result, filter) => {
if (isMicrofilter(filter)) { if (isMicrofilter(filter)) {
result.push(this.req(filter, opts)); result.push(this.fetch(filter, opts));
} }
return result; return result;
}, []); }, []);
return Promise.all(promises); const results = await Promise.allSettled(promises);
return results
.filter((result): result is PromiseFulfilledResult<NostrEvent> => result.status === 'fulfilled')
.map((result) => result.value);
} }
} }