Merge branch 'rm-optimizer' into 'main'
Remove Optimizer and Reqmeister See merge request soapbox-pub/ditto!319
This commit is contained in:
commit
6bfe611847
|
@ -69,8 +69,8 @@ const streamingController: AppController = (c) => {
|
|||
if (!filter) return;
|
||||
|
||||
try {
|
||||
const db = await Storages.db();
|
||||
const pubsub = await Storages.pubsub();
|
||||
const optimizer = await Storages.optimizer();
|
||||
|
||||
for await (const msg of pubsub.req([filter], { signal: controller.signal })) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
|
@ -86,7 +86,7 @@ const streamingController: AppController = (c) => {
|
|||
|
||||
await hydrateEvents({
|
||||
events: [event],
|
||||
store: optimizer,
|
||||
store: db,
|
||||
signal: AbortSignal.timeout(1000),
|
||||
});
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
|||
parseMetadata(event, signal),
|
||||
DVM.event(event),
|
||||
trackHashtags(event),
|
||||
fetchRelatedEvents(event),
|
||||
processMedia(event),
|
||||
payZap(event, signal),
|
||||
streamOut(event),
|
||||
|
@ -182,31 +181,6 @@ async function trackHashtags(event: NostrEvent): Promise<void> {
|
|||
}
|
||||
}
|
||||
|
||||
/** Queue related events to fetch. */
|
||||
async function fetchRelatedEvents(event: DittoEvent) {
|
||||
const cache = await Storages.cache();
|
||||
const reqmeister = await Storages.reqmeister();
|
||||
|
||||
if (!event.author) {
|
||||
const signal = AbortSignal.timeout(3000);
|
||||
reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal })
|
||||
.then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal))))
|
||||
.catch(() => {});
|
||||
}
|
||||
|
||||
for (const [name, id] of event.tags) {
|
||||
if (name === 'e') {
|
||||
const { count } = await cache.count([{ ids: [id] }]);
|
||||
if (!count) {
|
||||
const signal = AbortSignal.timeout(3000);
|
||||
reqmeister.query([{ ids: [id] }], { signal })
|
||||
.then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal))))
|
||||
.catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Delete unattached media entries that are attached to the event. */
|
||||
function processMedia({ tags, pubkey, user }: DittoEvent) {
|
||||
if (user) {
|
||||
|
|
|
@ -25,7 +25,7 @@ const getEvent = async (
|
|||
opts: GetEventOpts = {},
|
||||
): Promise<DittoEvent | undefined> => {
|
||||
debug(`getEvent: ${id}`);
|
||||
const store = await Storages.optimizer();
|
||||
const store = await Storages.db();
|
||||
const { kind, signal = AbortSignal.timeout(1000) } = opts;
|
||||
|
||||
const filter: NostrFilter = { ids: [id], limit: 1 };
|
||||
|
@ -40,7 +40,7 @@ const getEvent = async (
|
|||
|
||||
/** Get a Nostr `set_medatadata` event for a user's pubkey. */
|
||||
const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise<NostrEvent | undefined> => {
|
||||
const store = await Storages.optimizer();
|
||||
const store = await Storages.db();
|
||||
const { signal = AbortSignal.timeout(1000) } = opts;
|
||||
|
||||
return await store.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal })
|
||||
|
|
|
@ -1,25 +1,18 @@
|
|||
// deno-lint-ignore-file require-await
|
||||
import { NCache } from '@nostrify/nostrify';
|
||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { Optimizer } from '@/storages/optimizer.ts';
|
||||
import { PoolStore } from '@/storages/pool-store.ts';
|
||||
import { Reqmeister } from '@/storages/reqmeister.ts';
|
||||
import { SearchStore } from '@/storages/search-store.ts';
|
||||
import { InternalRelay } from '@/storages/InternalRelay.ts';
|
||||
import { UserStore } from '@/storages/UserStore.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
||||
export class Storages {
|
||||
private static _db: Promise<EventsDB> | undefined;
|
||||
private static _admin: Promise<UserStore> | undefined;
|
||||
private static _cache: Promise<NCache> | undefined;
|
||||
private static _client: Promise<PoolStore> | undefined;
|
||||
private static _optimizer: Promise<Optimizer> | undefined;
|
||||
private static _reqmeister: Promise<Reqmeister> | undefined;
|
||||
private static _pubsub: Promise<InternalRelay> | undefined;
|
||||
private static _search: Promise<SearchStore> | undefined;
|
||||
|
||||
|
@ -93,49 +86,13 @@ export class Storages {
|
|||
return this._client;
|
||||
}
|
||||
|
||||
/** In-memory data store for cached events. */
|
||||
public static async cache(): Promise<NCache> {
|
||||
if (!this._cache) {
|
||||
this._cache = Promise.resolve(new NCache({ max: 3000 }));
|
||||
}
|
||||
return this._cache;
|
||||
}
|
||||
|
||||
/** Batches requests for single events. */
|
||||
public static async reqmeister(): Promise<Reqmeister> {
|
||||
if (!this._reqmeister) {
|
||||
this._reqmeister = Promise.resolve(
|
||||
new Reqmeister({
|
||||
client: await this.client(),
|
||||
delay: Time.seconds(1),
|
||||
timeout: Time.seconds(1),
|
||||
}),
|
||||
);
|
||||
}
|
||||
return this._reqmeister;
|
||||
}
|
||||
|
||||
/** Main Ditto storage adapter */
|
||||
public static async optimizer(): Promise<Optimizer> {
|
||||
if (!this._optimizer) {
|
||||
this._optimizer = Promise.resolve(
|
||||
new Optimizer({
|
||||
db: await this.db(),
|
||||
cache: await this.cache(),
|
||||
client: await this.reqmeister(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
return this._optimizer;
|
||||
}
|
||||
|
||||
/** Storage to use for remote search. */
|
||||
public static async search(): Promise<SearchStore> {
|
||||
if (!this._search) {
|
||||
this._search = Promise.resolve(
|
||||
new SearchStore({
|
||||
relay: Conf.searchRelay,
|
||||
fallback: await this.optimizer(),
|
||||
fallback: await this.db(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
import { NostrFilter, NSet, NStore } from '@nostrify/nostrify';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
|
||||
import { normalizeFilters } from '@/filter.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { abortError } from '@/utils/abort.ts';
|
||||
|
||||
interface OptimizerOpts {
|
||||
db: NStore;
|
||||
cache: NStore;
|
||||
client: NStore;
|
||||
}
|
||||
|
||||
class Optimizer implements NStore {
|
||||
#debug = Debug('ditto:optimizer');
|
||||
|
||||
#db: NStore;
|
||||
#cache: NStore;
|
||||
#client: NStore;
|
||||
|
||||
constructor(opts: OptimizerOpts) {
|
||||
this.#db = opts.db;
|
||||
this.#cache = opts.cache;
|
||||
this.#client = opts.client;
|
||||
}
|
||||
|
||||
async event(event: DittoEvent, opts?: { signal?: AbortSignal }): Promise<void> {
|
||||
if (opts?.signal?.aborted) return Promise.reject(abortError());
|
||||
|
||||
await Promise.all([
|
||||
this.#db.event(event, opts),
|
||||
this.#cache.event(event, opts),
|
||||
]);
|
||||
}
|
||||
|
||||
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<DittoEvent[]> {
|
||||
if (opts?.signal?.aborted) return Promise.reject(abortError());
|
||||
|
||||
filters = normalizeFilters(filters);
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
const { limit = Infinity } = opts;
|
||||
const results = new NSet();
|
||||
|
||||
// Filters with IDs are immutable, so we can take them straight from the cache if we have them.
|
||||
for (let i = 0; i < filters.length; i++) {
|
||||
const filter = filters[i];
|
||||
if (filter.ids) {
|
||||
this.#debug(`Filter[${i}] is an IDs filter; querying cache...`);
|
||||
const ids = new Set<string>(filter.ids);
|
||||
for (const event of await this.#cache.query([filter], opts)) {
|
||||
ids.delete(event.id);
|
||||
results.add(event);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
filters[i] = { ...filter, ids: [...ids] };
|
||||
}
|
||||
}
|
||||
|
||||
filters = normalizeFilters(filters);
|
||||
if (!filters.length) return getResults();
|
||||
|
||||
// Query the database for events.
|
||||
this.#debug('Querying database...');
|
||||
for (const dbEvent of await this.#db.query(filters, opts)) {
|
||||
results.add(dbEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
|
||||
// We already searched the DB, so stop if this is a search filter.
|
||||
if (filters.some((filter) => typeof filter.search === 'string')) {
|
||||
this.#debug(`Bailing early for search filter: "${filters[0]?.search}"`);
|
||||
return getResults();
|
||||
}
|
||||
|
||||
// Query the cache again.
|
||||
this.#debug('Querying cache...');
|
||||
for (const cacheEvent of await this.#cache.query(filters, opts)) {
|
||||
results.add(cacheEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
|
||||
// Finally, query the client.
|
||||
this.#debug('Querying client...');
|
||||
try {
|
||||
for (const clientEvent of await this.#client.query(filters, opts)) {
|
||||
results.add(clientEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
} catch (_e) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/** Get return type from map. */
|
||||
function getResults() {
|
||||
return [...results.values()];
|
||||
}
|
||||
|
||||
return getResults();
|
||||
}
|
||||
}
|
||||
|
||||
export { Optimizer };
|
|
@ -1,144 +0,0 @@
|
|||
import { NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import { EventEmitter } from 'tseep';
|
||||
|
||||
import { eventToMicroFilter, getFilterId, isMicrofilter, type MicroFilter } from '@/filter.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
import { abortError } from '@/utils/abort.ts';
|
||||
|
||||
interface ReqmeisterOpts {
|
||||
client: NStore;
|
||||
delay?: number;
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
interface ReqmeisterReqOpts {
|
||||
relays?: WebSocket['url'][];
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
||||
|
||||
/** Batches requests to Nostr relays using microfilters. */
|
||||
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) => any }> implements NStore {
|
||||
#debug = Debug('ditto:reqmeister');
|
||||
|
||||
#opts: ReqmeisterOpts;
|
||||
#queue: ReqmeisterQueueItem[] = [];
|
||||
#promise!: Promise<void>;
|
||||
#resolve!: () => void;
|
||||
|
||||
constructor(opts: ReqmeisterOpts) {
|
||||
super();
|
||||
this.#opts = opts;
|
||||
this.#tick();
|
||||
this.#perform();
|
||||
}
|
||||
|
||||
#tick() {
|
||||
this.#resolve?.();
|
||||
this.#promise = new Promise((resolve) => {
|
||||
this.#resolve = resolve;
|
||||
});
|
||||
}
|
||||
|
||||
async #perform() {
|
||||
const { client, delay, timeout = Time.seconds(1) } = this.#opts;
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
|
||||
const queue = this.#queue;
|
||||
this.#queue = [];
|
||||
|
||||
const wantedEvents = new Set<NostrEvent['id']>();
|
||||
const wantedAuthors = new Set<NostrEvent['pubkey']>();
|
||||
|
||||
// TODO: batch by relays.
|
||||
for (const [_filterId, filter, _relays] of queue) {
|
||||
if ('ids' in filter) {
|
||||
filter.ids.forEach((id) => wantedEvents.add(id));
|
||||
} else {
|
||||
wantedAuthors.add(filter.authors[0]);
|
||||
}
|
||||
}
|
||||
|
||||
const filters: NostrFilter[] = [];
|
||||
|
||||
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
|
||||
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
||||
|
||||
if (filters.length) {
|
||||
try {
|
||||
const events = await client.query(filters, { signal: AbortSignal.timeout(timeout) });
|
||||
|
||||
for (const event of events) {
|
||||
this.event(event);
|
||||
}
|
||||
} catch (_e) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
this.#tick();
|
||||
this.#perform();
|
||||
}
|
||||
|
||||
private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<NostrEvent> {
|
||||
const {
|
||||
relays = [],
|
||||
signal = AbortSignal.timeout(this.#opts.timeout ?? 1000),
|
||||
} = opts;
|
||||
|
||||
if (signal.aborted) {
|
||||
return Promise.reject(abortError());
|
||||
}
|
||||
|
||||
const filterId = getFilterId(filter);
|
||||
|
||||
this.#queue.push([filterId, filter, relays]);
|
||||
|
||||
return new Promise<NostrEvent>((resolve, reject) => {
|
||||
const handleEvent = (event: NostrEvent) => {
|
||||
resolve(event);
|
||||
this.removeListener(filterId, handleEvent);
|
||||
};
|
||||
|
||||
const handleAbort = () => {
|
||||
reject(new DOMException('Aborted', 'AbortError'));
|
||||
this.removeListener(filterId, resolve);
|
||||
signal.removeEventListener('abort', handleAbort);
|
||||
};
|
||||
|
||||
this.once(filterId, handleEvent);
|
||||
signal.addEventListener('abort', handleAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
event(event: NostrEvent, _opts?: { signal?: AbortSignal }): Promise<void> {
|
||||
const filterId = getFilterId(eventToMicroFilter(event));
|
||||
this.#queue = this.#queue.filter(([id]) => id !== filterId);
|
||||
this.emit(filterId, event);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
|
||||
if (opts?.signal?.aborted) return Promise.reject(abortError());
|
||||
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
const promises = filters.reduce<Promise<NostrEvent>[]>((result, filter) => {
|
||||
if (isMicrofilter(filter)) {
|
||||
result.push(this.fetch(filter, opts));
|
||||
}
|
||||
return result;
|
||||
}, []);
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
return results
|
||||
.filter((result): result is PromiseFulfilledResult<NostrEvent> => result.status === 'fulfilled')
|
||||
.map((result) => result.value);
|
||||
}
|
||||
}
|
||||
|
||||
export { Reqmeister };
|
|
@ -39,10 +39,9 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
|
|||
),
|
||||
];
|
||||
|
||||
const db = await Storages.db();
|
||||
const optimizer = await Storages.optimizer();
|
||||
const store = await Storages.db();
|
||||
|
||||
const mentionedProfiles = await optimizer.query(
|
||||
const mentionedProfiles = await store.query(
|
||||
[{ kinds: [0], authors: mentionedPubkeys, limit: mentionedPubkeys.length }],
|
||||
);
|
||||
|
||||
|
@ -55,7 +54,7 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
|
|||
),
|
||||
firstUrl ? unfurlCardCached(firstUrl) : null,
|
||||
viewerPubkey
|
||||
? await db.query([
|
||||
? await store.query([
|
||||
{ kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
{ kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
{ kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
|
|
Loading…
Reference in New Issue