From a15013e22aed27c0943eab4d687111be4086d837 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 May 2024 12:22:01 -0500 Subject: [PATCH] Remove Optimizer and Reqmeister --- src/controllers/api/streaming.ts | 4 +- src/pipeline.ts | 26 ------ src/queries.ts | 4 +- src/storages.ts | 45 +--------- src/storages/optimizer.ts | 104 ---------------------- src/storages/reqmeister.ts | 144 ------------------------------- src/views/mastodon/statuses.ts | 7 +- 7 files changed, 8 insertions(+), 326 deletions(-) delete mode 100644 src/storages/optimizer.ts delete mode 100644 src/storages/reqmeister.ts diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index e3852d9..b67bce8 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -69,8 +69,8 @@ const streamingController: AppController = (c) => { if (!filter) return; try { + const db = await Storages.db(); const pubsub = await Storages.pubsub(); - const optimizer = await Storages.optimizer(); for await (const msg of pubsub.req([filter], { signal: controller.signal })) { if (msg[0] === 'EVENT') { @@ -86,7 +86,7 @@ const streamingController: AppController = (c) => { await hydrateEvents({ events: [event], - store: optimizer, + store: db, signal: AbortSignal.timeout(1000), }); diff --git a/src/pipeline.ts b/src/pipeline.ts index a540125..fd5fc99 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -47,7 +47,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { } } -/** Queue related events to fetch. */ -async function fetchRelatedEvents(event: DittoEvent) { - const cache = await Storages.cache(); - const reqmeister = await Storages.reqmeister(); - - if (!event.author) { - const signal = AbortSignal.timeout(3000); - reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) - .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => {}); - } - - for (const [name, id] of event.tags) { - if (name === 'e') { - const { count } = await cache.count([{ ids: [id] }]); - if (!count) { - const signal = AbortSignal.timeout(3000); - reqmeister.query([{ ids: [id] }], { signal }) - .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) - .catch(() => {}); - } - } - } -} - /** Delete unattached media entries that are attached to the event. */ function processMedia({ tags, pubkey, user }: DittoEvent) { if (user) { diff --git a/src/queries.ts b/src/queries.ts index 7407077..1fccb68 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -25,7 +25,7 @@ const getEvent = async ( opts: GetEventOpts = {}, ): Promise => { debug(`getEvent: ${id}`); - const store = await Storages.optimizer(); + const store = await Storages.db(); const { kind, signal = AbortSignal.timeout(1000) } = opts; const filter: NostrFilter = { ids: [id], limit: 1 }; @@ -40,7 +40,7 @@ const getEvent = async ( /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise => { - const store = await Storages.optimizer(); + const store = await Storages.db(); const { signal = AbortSignal.timeout(1000) } = opts; return await store.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal }) diff --git a/src/storages.ts b/src/storages.ts index 10d5b05..f8f206d 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,25 +1,18 @@ // deno-lint-ignore-file require-await -import { NCache } from '@nostrify/nostrify'; import { RelayPoolWorker } from 'nostr-relaypool'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; -import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; import { InternalRelay } from '@/storages/InternalRelay.ts'; import { UserStore } from '@/storages/UserStore.ts'; -import { Time } from '@/utils/time.ts'; export class Storages { private static _db: Promise | undefined; private static _admin: Promise | undefined; - private static _cache: Promise | undefined; private static _client: Promise | undefined; - private static _optimizer: Promise | undefined; - private static _reqmeister: Promise | undefined; private static _pubsub: Promise | undefined; private static _search: Promise | undefined; @@ -93,49 +86,13 @@ export class Storages { return this._client; } - /** In-memory data store for cached events. */ - public static async cache(): Promise { - if (!this._cache) { - this._cache = Promise.resolve(new NCache({ max: 3000 })); - } - return this._cache; - } - - /** Batches requests for single events. */ - public static async reqmeister(): Promise { - if (!this._reqmeister) { - this._reqmeister = Promise.resolve( - new Reqmeister({ - client: await this.client(), - delay: Time.seconds(1), - timeout: Time.seconds(1), - }), - ); - } - return this._reqmeister; - } - - /** Main Ditto storage adapter */ - public static async optimizer(): Promise { - if (!this._optimizer) { - this._optimizer = Promise.resolve( - new Optimizer({ - db: await this.db(), - cache: await this.cache(), - client: await this.reqmeister(), - }), - ); - } - return this._optimizer; - } - /** Storage to use for remote search. */ public static async search(): Promise { if (!this._search) { this._search = Promise.resolve( new SearchStore({ relay: Conf.searchRelay, - fallback: await this.optimizer(), + fallback: await this.db(), }), ); } diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts deleted file mode 100644 index 7b4153e..0000000 --- a/src/storages/optimizer.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { NostrFilter, NSet, NStore } from '@nostrify/nostrify'; -import Debug from '@soapbox/stickynotes/debug'; - -import { normalizeFilters } from '@/filter.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { abortError } from '@/utils/abort.ts'; - -interface OptimizerOpts { - db: NStore; - cache: NStore; - client: NStore; -} - -class Optimizer implements NStore { - #debug = Debug('ditto:optimizer'); - - #db: NStore; - #cache: NStore; - #client: NStore; - - constructor(opts: OptimizerOpts) { - this.#db = opts.db; - this.#cache = opts.cache; - this.#client = opts.client; - } - - async event(event: DittoEvent, opts?: { signal?: AbortSignal }): Promise { - if (opts?.signal?.aborted) return Promise.reject(abortError()); - - await Promise.all([ - this.#db.event(event, opts), - this.#cache.event(event, opts), - ]); - } - - async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - if (opts?.signal?.aborted) return Promise.reject(abortError()); - - filters = normalizeFilters(filters); - this.#debug('REQ', JSON.stringify(filters)); - if (!filters.length) return Promise.resolve([]); - - const { limit = Infinity } = opts; - const results = new NSet(); - - // Filters with IDs are immutable, so we can take them straight from the cache if we have them. - for (let i = 0; i < filters.length; i++) { - const filter = filters[i]; - if (filter.ids) { - this.#debug(`Filter[${i}] is an IDs filter; querying cache...`); - const ids = new Set(filter.ids); - for (const event of await this.#cache.query([filter], opts)) { - ids.delete(event.id); - results.add(event); - if (results.size >= limit) return getResults(); - } - filters[i] = { ...filter, ids: [...ids] }; - } - } - - filters = normalizeFilters(filters); - if (!filters.length) return getResults(); - - // Query the database for events. - this.#debug('Querying database...'); - for (const dbEvent of await this.#db.query(filters, opts)) { - results.add(dbEvent); - if (results.size >= limit) return getResults(); - } - - // We already searched the DB, so stop if this is a search filter. - if (filters.some((filter) => typeof filter.search === 'string')) { - this.#debug(`Bailing early for search filter: "${filters[0]?.search}"`); - return getResults(); - } - - // Query the cache again. - this.#debug('Querying cache...'); - for (const cacheEvent of await this.#cache.query(filters, opts)) { - results.add(cacheEvent); - if (results.size >= limit) return getResults(); - } - - // Finally, query the client. - this.#debug('Querying client...'); - try { - for (const clientEvent of await this.#client.query(filters, opts)) { - results.add(clientEvent); - if (results.size >= limit) return getResults(); - } - } catch (_e) { - // do nothing - } - - /** Get return type from map. */ - function getResults() { - return [...results.values()]; - } - - return getResults(); - } -} - -export { Optimizer }; diff --git a/src/storages/reqmeister.ts b/src/storages/reqmeister.ts deleted file mode 100644 index e3833d3..0000000 --- a/src/storages/reqmeister.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify'; -import Debug from '@soapbox/stickynotes/debug'; -import { EventEmitter } from 'tseep'; - -import { eventToMicroFilter, getFilterId, isMicrofilter, type MicroFilter } from '@/filter.ts'; -import { Time } from '@/utils/time.ts'; -import { abortError } from '@/utils/abort.ts'; - -interface ReqmeisterOpts { - client: NStore; - delay?: number; - timeout?: number; -} - -interface ReqmeisterReqOpts { - relays?: WebSocket['url'][]; - signal?: AbortSignal; -} - -type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; - -/** Batches requests to Nostr relays using microfilters. */ -class Reqmeister extends EventEmitter<{ [filterId: string]: (event: NostrEvent) => any }> implements NStore { - #debug = Debug('ditto:reqmeister'); - - #opts: ReqmeisterOpts; - #queue: ReqmeisterQueueItem[] = []; - #promise!: Promise; - #resolve!: () => void; - - constructor(opts: ReqmeisterOpts) { - super(); - this.#opts = opts; - this.#tick(); - this.#perform(); - } - - #tick() { - this.#resolve?.(); - this.#promise = new Promise((resolve) => { - this.#resolve = resolve; - }); - } - - async #perform() { - const { client, delay, timeout = Time.seconds(1) } = this.#opts; - await new Promise((resolve) => setTimeout(resolve, delay)); - - const queue = this.#queue; - this.#queue = []; - - const wantedEvents = new Set(); - const wantedAuthors = new Set(); - - // TODO: batch by relays. - for (const [_filterId, filter, _relays] of queue) { - if ('ids' in filter) { - filter.ids.forEach((id) => wantedEvents.add(id)); - } else { - wantedAuthors.add(filter.authors[0]); - } - } - - const filters: NostrFilter[] = []; - - if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); - if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); - - if (filters.length) { - try { - const events = await client.query(filters, { signal: AbortSignal.timeout(timeout) }); - - for (const event of events) { - this.event(event); - } - } catch (_e) { - // do nothing - } - } - - this.#tick(); - this.#perform(); - } - - private fetch(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { - const { - relays = [], - signal = AbortSignal.timeout(this.#opts.timeout ?? 1000), - } = opts; - - if (signal.aborted) { - return Promise.reject(abortError()); - } - - const filterId = getFilterId(filter); - - this.#queue.push([filterId, filter, relays]); - - return new Promise((resolve, reject) => { - const handleEvent = (event: NostrEvent) => { - resolve(event); - this.removeListener(filterId, handleEvent); - }; - - const handleAbort = () => { - reject(new DOMException('Aborted', 'AbortError')); - this.removeListener(filterId, resolve); - signal.removeEventListener('abort', handleAbort); - }; - - this.once(filterId, handleEvent); - signal.addEventListener('abort', handleAbort, { once: true }); - }); - } - - event(event: NostrEvent, _opts?: { signal?: AbortSignal }): Promise { - const filterId = getFilterId(eventToMicroFilter(event)); - this.#queue = this.#queue.filter(([id]) => id !== filterId); - this.emit(filterId, event); - return Promise.resolve(); - } - - async query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise { - if (opts?.signal?.aborted) return Promise.reject(abortError()); - - this.#debug('REQ', JSON.stringify(filters)); - if (!filters.length) return Promise.resolve([]); - - const promises = filters.reduce[]>((result, filter) => { - if (isMicrofilter(filter)) { - result.push(this.fetch(filter, opts)); - } - return result; - }, []); - - const results = await Promise.allSettled(promises); - - return results - .filter((result): result is PromiseFulfilledResult => result.status === 'fulfilled') - .map((result) => result.value); - } -} - -export { Reqmeister }; diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index 4182493..7d83cac 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -39,10 +39,9 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise< ), ]; - const db = await Storages.db(); - const optimizer = await Storages.optimizer(); + const store = await Storages.db(); - const mentionedProfiles = await optimizer.query( + const mentionedProfiles = await store.query( [{ kinds: [0], authors: mentionedPubkeys, limit: mentionedPubkeys.length }], ); @@ -55,7 +54,7 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise< ), firstUrl ? unfurlCardCached(firstUrl) : null, viewerPubkey - ? await db.query([ + ? await store.query([ { kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 },