PoolStore: implement NRelay
This commit is contained in:
parent
2ee668d562
commit
705e8e7c31
|
@ -19,7 +19,7 @@
|
|||
"@db/sqlite": "jsr:@db/sqlite@^0.11.1",
|
||||
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
|
||||
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
|
||||
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.15.0",
|
||||
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.17.1",
|
||||
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
|
||||
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
|
||||
"@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0",
|
||||
|
|
|
@ -183,15 +183,19 @@ async function trackHashtags(event: NostrEvent): Promise<void> {
|
|||
|
||||
/** Queue related events to fetch. */
|
||||
async function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) {
|
||||
if (!event.user) {
|
||||
Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
|
||||
if (!event.author) {
|
||||
Storages.reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal })
|
||||
.then((event) => handleEvent(event, AbortSignal.timeout(1000)))
|
||||
.catch(() => {});
|
||||
}
|
||||
|
||||
for (const [name, id, relay] of event.tags) {
|
||||
if (name === 'e') {
|
||||
const { count } = await Storages.cache.count([{ ids: [id] }]);
|
||||
if (!count) {
|
||||
Storages.reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||
Storages.reqmeister.req({ ids: [id] }, { relays: [relay] })
|
||||
.then((event) => handleEvent(event, AbortSignal.timeout(1000)))
|
||||
.catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import { NCache } from '@nostrify/nostrify';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { db } from '@/db.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { activeRelays, pool } from '@/pool.ts';
|
||||
import { EventsDB } from '@/storages/events-db.ts';
|
||||
import { Optimizer } from '@/storages/optimizer.ts';
|
||||
|
@ -52,7 +51,6 @@ export class Storages {
|
|||
this._client = new PoolStore({
|
||||
pool,
|
||||
relays: activeRelays,
|
||||
publisher: pipeline,
|
||||
});
|
||||
}
|
||||
return this._client;
|
||||
|
|
|
@ -1,7 +1,16 @@
|
|||
import { NostrEvent, NostrFilter, NSet, NStore } from '@nostrify/nostrify';
|
||||
import {
|
||||
NostrEvent,
|
||||
NostrFilter,
|
||||
NostrRelayCLOSED,
|
||||
NostrRelayEOSE,
|
||||
NostrRelayEVENT,
|
||||
NRelay,
|
||||
NSet,
|
||||
} from '@nostrify/nostrify';
|
||||
import { Machina } from '@nostrify/nostrify/utils';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
||||
import { matchFilters } from 'nostr-tools';
|
||||
import { getFilterLimit, matchFilters } from 'nostr-tools';
|
||||
|
||||
import { normalizeFilters } from '@/filter.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
|
@ -12,23 +21,16 @@ import { Conf } from '@/config.ts';
|
|||
interface PoolStoreOpts {
|
||||
pool: InstanceType<typeof RelayPoolWorker>;
|
||||
relays: WebSocket['url'][];
|
||||
publisher: {
|
||||
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
class PoolStore implements NStore {
|
||||
#debug = Debug('ditto:client');
|
||||
#pool: InstanceType<typeof RelayPoolWorker>;
|
||||
#relays: WebSocket['url'][];
|
||||
#publisher: {
|
||||
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
|
||||
};
|
||||
class PoolStore implements NRelay {
|
||||
private debug = Debug('ditto:client');
|
||||
private pool: InstanceType<typeof RelayPoolWorker>;
|
||||
private relays: WebSocket['url'][];
|
||||
|
||||
constructor(opts: PoolStoreOpts) {
|
||||
this.#pool = opts.pool;
|
||||
this.#relays = opts.relays;
|
||||
this.#publisher = opts.publisher;
|
||||
this.pool = opts.pool;
|
||||
this.relays = opts.relays;
|
||||
}
|
||||
|
||||
async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> {
|
||||
|
@ -40,58 +42,66 @@ class PoolStore implements NStore {
|
|||
const relays = [...relaySet].slice(0, 4);
|
||||
|
||||
event = purifyEvent(event);
|
||||
this.#debug('EVENT', event, relays);
|
||||
this.debug('EVENT', event, relays);
|
||||
|
||||
this.#pool.publish(event, relays);
|
||||
this.pool.publish(event, relays);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
|
||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||
async *req(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal; limit?: number } = {},
|
||||
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
if (opts.signal?.aborted) return;
|
||||
|
||||
filters = normalizeFilters(filters);
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
if (!filters.length) return;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const results = new NSet();
|
||||
this.debug('REQ', JSON.stringify(filters));
|
||||
|
||||
const unsub = this.#pool.subscribe(
|
||||
filters,
|
||||
this.#relays,
|
||||
(event: NostrEvent | null) => {
|
||||
if (event && matchFilters(filters, event)) {
|
||||
this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {});
|
||||
results.add({
|
||||
id: event.id,
|
||||
kind: event.kind,
|
||||
pubkey: event.pubkey,
|
||||
content: event.content,
|
||||
tags: event.tags,
|
||||
created_at: event.created_at,
|
||||
sig: event.sig,
|
||||
});
|
||||
}
|
||||
if (typeof opts.limit === 'number' && results.size >= opts.limit) {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
() => {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
},
|
||||
);
|
||||
const uuid = crypto.randomUUID();
|
||||
const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(opts.signal);
|
||||
|
||||
const onAbort = () => {
|
||||
unsub();
|
||||
reject(abortError());
|
||||
opts.signal?.removeEventListener('abort', onAbort);
|
||||
};
|
||||
const unsub = this.pool.subscribe(
|
||||
filters,
|
||||
this.relays,
|
||||
(event: NostrEvent | null) => {
|
||||
if (event && matchFilters(filters, event)) {
|
||||
machina.push(['EVENT', uuid, purifyEvent(event)]);
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
() => {
|
||||
machina.push(['EOSE', uuid]);
|
||||
},
|
||||
);
|
||||
|
||||
opts.signal?.addEventListener('abort', onAbort);
|
||||
});
|
||||
try {
|
||||
for await (const msg of machina) {
|
||||
yield msg;
|
||||
}
|
||||
} finally {
|
||||
unsub();
|
||||
}
|
||||
}
|
||||
|
||||
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
|
||||
const events = new NSet();
|
||||
|
||||
const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0);
|
||||
if (limit === 0) return [];
|
||||
|
||||
for await (const msg of this.req(filters, opts)) {
|
||||
if (msg[0] === 'EOSE') break;
|
||||
if (msg[0] === 'EVENT') events.add(msg[2]);
|
||||
if (msg[0] === 'CLOSED') throw new Error('Subscription closed');
|
||||
|
||||
if (events.size >= limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return [...events];
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue