Make Reqmeister work
This commit is contained in:
parent
4a32fe2c37
commit
acc133e8e2
|
@ -0,0 +1,9 @@
|
||||||
|
import { Reqmeister } from '@/reqmeister.ts';
|
||||||
|
import { Time } from '@/utils/time.ts';
|
||||||
|
|
||||||
|
const reqmeister = new Reqmeister({
|
||||||
|
delay: Time.seconds(1),
|
||||||
|
timeout: Time.seconds(1),
|
||||||
|
});
|
||||||
|
|
||||||
|
export { reqmeister };
|
|
@ -3,6 +3,8 @@ import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
import stringifyStable from 'npm:fast-stable-stringify';
|
||||||
|
|
||||||
/** Additional properties that may be added by Ditto to events. */
|
/** Additional properties that may be added by Ditto to events. */
|
||||||
type Relation = 'author' | 'author_stats' | 'event_stats';
|
type Relation = 'author' | 'author_stats' | 'event_stats';
|
||||||
|
|
||||||
|
@ -14,6 +16,9 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
|
||||||
relations?: Relation[];
|
relations?: Relation[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Filter to get one specific event. */
|
||||||
|
type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };
|
||||||
|
|
||||||
/** Additional options to apply to the whole subscription. */
|
/** Additional options to apply to the whole subscription. */
|
||||||
interface GetFiltersOpts {
|
interface GetFiltersOpts {
|
||||||
/** How long to wait (in milliseconds) until aborting the request. */
|
/** How long to wait (in milliseconds) until aborting the request. */
|
||||||
|
@ -46,4 +51,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };
|
/** Get deterministic ID for a microfilter. */
|
||||||
|
function getFilterId(filter: MicroFilter): string {
|
||||||
|
if ('ids' in filter) {
|
||||||
|
return stringifyStable({ ids: [filter.ids] });
|
||||||
|
} else {
|
||||||
|
return stringifyStable({
|
||||||
|
kinds: [filter.kinds[0]],
|
||||||
|
authors: [filter.authors[0]],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get a microfilter from a Nostr event. */
|
||||||
|
function eventToMicroFilter(event: Event): MicroFilter {
|
||||||
|
if (event.kind === 0) {
|
||||||
|
return { kinds: [0], authors: [event.pubkey] };
|
||||||
|
} else {
|
||||||
|
return { ids: [event.id] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export {
|
||||||
|
type DittoFilter,
|
||||||
|
eventToMicroFilter,
|
||||||
|
getFilterId,
|
||||||
|
type GetFiltersOpts,
|
||||||
|
matchDittoFilters,
|
||||||
|
type MicroFilter,
|
||||||
|
type Relation,
|
||||||
|
};
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { reqmeister } from '@/common.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import * as eventsDB from '@/db/events.ts';
|
||||||
import { addRelays } from '@/db/relays.ts';
|
import { addRelays } from '@/db/relays.ts';
|
||||||
|
@ -8,7 +9,6 @@ import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
import { publish } from '@/pool.ts';
|
import { publish } from '@/pool.ts';
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
import { Reqmeister } from '@/reqmeister.ts';
|
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
|
@ -18,11 +18,6 @@ import { verifySignatureWorker } from '@/workers/verify.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
const reqmeister = new Reqmeister({
|
|
||||||
delay: Time.seconds(1),
|
|
||||||
timeout: Time.seconds(1),
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common pipeline function to process (and maybe store) events.
|
* Common pipeline function to process (and maybe store) events.
|
||||||
* It is idempotent, so it can be called multiple times for the same event.
|
* It is idempotent, so it can be called multiple times for the same event.
|
||||||
|
@ -145,11 +140,11 @@ function trackRelays(event: Event) {
|
||||||
/** Track related events to fetch. */
|
/** Track related events to fetch. */
|
||||||
function trackRelatedEvents(event: Event, data: EventData) {
|
function trackRelatedEvents(event: Event, data: EventData) {
|
||||||
if (!data.user) {
|
if (!data.user) {
|
||||||
reqmeister.wantAuthor(event.pubkey);
|
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
||||||
}
|
}
|
||||||
for (const [name, id, relay] of event.tags) {
|
for (const [name, id, relay] of event.tags) {
|
||||||
if (name === 'e' && !encounters.has(id)) {
|
if (name === 'e' && !encounters.has(id)) {
|
||||||
reqmeister.wantEvent(id, [relay]);
|
reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts';
|
||||||
import { type Event, findReplyTag } from '@/deps.ts';
|
import { type Event, findReplyTag } from '@/deps.ts';
|
||||||
import { type DittoFilter, type Relation } from '@/filter.ts';
|
import { type DittoFilter, type Relation } from '@/filter.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
|
import { reqmeister } from '@/common.ts';
|
||||||
|
|
||||||
interface GetEventOpts<K extends number> {
|
interface GetEventOpts<K extends number> {
|
||||||
/** Timeout in milliseconds. */
|
/** Timeout in milliseconds. */
|
||||||
|
@ -30,10 +31,10 @@ const getEvent = async <K extends number = number>(
|
||||||
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
|
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
|
||||||
const { relations, timeout = 1000 } = opts;
|
const { relations, timeout = 1000 } = opts;
|
||||||
|
|
||||||
const [event] = await mixer.getFilters(
|
const event = await eventsDB.getFilters(
|
||||||
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
||||||
{ limit: 1, timeout },
|
{ limit: 1, timeout },
|
||||||
);
|
).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
|
||||||
|
|
||||||
return event;
|
return event;
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,112 +1,88 @@
|
||||||
import * as client from '@/client.ts';
|
import * as client from '@/client.ts';
|
||||||
import { Event, Filter } from '@/deps.ts';
|
import { Event, Filter } from '@/deps.ts';
|
||||||
|
|
||||||
|
import { EventEmitter } from 'npm:tseep';
|
||||||
|
import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
|
||||||
|
|
||||||
interface ReqmeisterOpts {
|
interface ReqmeisterOpts {
|
||||||
delay?: number;
|
delay?: number;
|
||||||
timeout?: number;
|
timeout?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Reqmeister {
|
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
||||||
#opts: ReqmeisterOpts;
|
|
||||||
|
|
||||||
#wantedEvents = new Map<Event['id'], Set<WebSocket['url']>>();
|
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
|
||||||
#wantedAuthors = new Map<Event['pubkey'], Set<WebSocket['url']>>();
|
#opts: ReqmeisterOpts;
|
||||||
|
#queue: ReqmeisterQueueItem[] = [];
|
||||||
|
#promise!: Promise<void>;
|
||||||
|
#resolve!: () => void;
|
||||||
|
|
||||||
constructor(opts: ReqmeisterOpts = {}) {
|
constructor(opts: ReqmeisterOpts = {}) {
|
||||||
|
super();
|
||||||
this.#opts = opts;
|
this.#opts = opts;
|
||||||
|
this.#cycle();
|
||||||
this.#perform();
|
this.#perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#cycle() {
|
||||||
|
this.#resolve?.();
|
||||||
|
this.#promise = new Promise((resolve) => {
|
||||||
|
this.#resolve = resolve;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async #perform() {
|
async #perform() {
|
||||||
const { delay, timeout } = this.#opts;
|
const { delay } = this.#opts;
|
||||||
if (delay) {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
}
|
|
||||||
|
|
||||||
const relaysWantedEvents = new Map<WebSocket['url'], Set<Event['id']>>();
|
const queue = this.#queue;
|
||||||
const relaysWantedAuthors = new Map<WebSocket['url'], Set<Event['pubkey']>>();
|
this.#queue = [];
|
||||||
|
|
||||||
const allRelays = new Set<WebSocket['url']>(
|
const wantedEvents = new Set<Event['id']>();
|
||||||
...relaysWantedEvents.keys(),
|
const wantedAuthors = new Set<Event['pubkey']>();
|
||||||
...relaysWantedAuthors.keys(),
|
|
||||||
);
|
|
||||||
|
|
||||||
for (const [eventId, relays] of this.#wantedEvents) {
|
// TODO: batch by relays.
|
||||||
for (const relay of relays) {
|
for (const [_filterId, filter, _relays] of queue) {
|
||||||
const relaysSet = relaysWantedEvents.get(relay);
|
if ('ids' in filter) {
|
||||||
if (relaysSet) {
|
filter.ids.forEach((id) => wantedEvents.add(id));
|
||||||
relaysSet.add(eventId);
|
|
||||||
} else {
|
} else {
|
||||||
relaysWantedEvents.set(relay, new Set([eventId]));
|
wantedAuthors.add(filter.authors[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for (const [author, relays] of this.#wantedAuthors) {
|
|
||||||
for (const relay of relays) {
|
|
||||||
const relaysSet = relaysWantedAuthors.get(relay);
|
|
||||||
if (relaysSet) {
|
|
||||||
relaysSet.add(author);
|
|
||||||
} else {
|
|
||||||
relaysWantedAuthors.set(relay, new Set([author]));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const promises: ReturnType<typeof client.getFilters>[] = [];
|
|
||||||
|
|
||||||
for (const relay of allRelays) {
|
|
||||||
const wantedEvents = relaysWantedEvents.get(relay);
|
|
||||||
const wantedAuthors = relaysWantedAuthors.get(relay);
|
|
||||||
|
|
||||||
const filters: Filter[] = [];
|
const filters: Filter[] = [];
|
||||||
|
|
||||||
if (wantedEvents) filters.push({ ids: [...wantedEvents] });
|
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
|
||||||
if (wantedAuthors) filters.push({ authors: [...wantedAuthors] });
|
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
||||||
|
|
||||||
console.log('reqmeister:', [relay, filters]);
|
const events = await client.getFilters(filters, { timeout: this.#opts.timeout });
|
||||||
promises.push(
|
|
||||||
client.getFilters(filters, { relays: [relay], timeout }),
|
for (const event of events) {
|
||||||
);
|
this.encounter(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.all(promises);
|
this.#cycle();
|
||||||
this.#perform();
|
this.#perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
wantEvent(eventId: Event['id'], relays: WebSocket['url'][] = []) {
|
req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {
|
||||||
const relaysSet = this.#wantedEvents.get(eventId);
|
const filterId = getFilterId(filter);
|
||||||
if (relaysSet) {
|
this.#queue.push([filterId, filter, relays]);
|
||||||
for (const relay of relays) {
|
return new Promise<Event>((resolve, reject) => {
|
||||||
relaysSet.add(relay);
|
this.once(filterId, resolve);
|
||||||
}
|
this.#promise.finally(reject);
|
||||||
} else {
|
});
|
||||||
this.#wantedEvents.set(eventId, new Set(relays));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wantAuthor(author: Event['pubkey'], relays: WebSocket['url'][] = []) {
|
|
||||||
const relaysSet = this.#wantedAuthors.get(author);
|
|
||||||
if (relaysSet) {
|
|
||||||
for (const relay of relays) {
|
|
||||||
relaysSet.add(relay);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
this.#wantedAuthors.set(author, new Set(relays));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
encounter(event: Event): void {
|
encounter(event: Event): void {
|
||||||
this.#wantedEvents.delete(event.id);
|
const filterId = getFilterId(eventToMicroFilter(event));
|
||||||
if (event.kind === 0) {
|
this.#queue = this.#queue.filter(([id]) => id !== filterId);
|
||||||
this.#wantedAuthors.delete(event.pubkey);
|
this.emit(filterId, event);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isWanted(event: Event): boolean {
|
isWanted(event: Event): boolean {
|
||||||
if (this.#wantedEvents.has(event.id)) return true;
|
const filterId = getFilterId(eventToMicroFilter(event));
|
||||||
if (event.kind === 0 && this.#wantedAuthors.has(event.pubkey)) return true;
|
return this.#queue.some(([id]) => id === filterId);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue