Get rid of unnecessary memorelay module, replace with NCache
This commit is contained in:
parent
2369030ff0
commit
0fc8143889
|
@ -94,6 +94,6 @@ export {
|
||||||
NSet,
|
NSet,
|
||||||
type NStore,
|
type NStore,
|
||||||
type NStoreOpts,
|
type NStoreOpts,
|
||||||
} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/554ec56cd92ff0a65e8b74765da3b854f50e4a2a/mod.ts';
|
} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/91ddffb8d7091286641fdc419bdde07c8037eac7/mod.ts';
|
||||||
|
|
||||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||||
|
|
|
@ -7,7 +7,7 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||||
import { isEphemeralKind } from '@/kinds.ts';
|
import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
import { cache, client, eventsDB, reqmeister } from '@/storages.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
|
import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
|
||||||
|
@ -45,8 +45,8 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
|
async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
|
||||||
const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0;
|
const preexisting = (await cache.count([{ ids: [event.id] }])) > 0;
|
||||||
memorelay.event(event, { signal });
|
cache.event(event);
|
||||||
reqmeister.event(event, { signal });
|
reqmeister.event(event, { signal });
|
||||||
return preexisting;
|
return preexisting;
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) {
|
||||||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
|
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
|
||||||
}
|
}
|
||||||
for (const [name, id, relay] of event.tags) {
|
for (const [name, id, relay] of event.tags) {
|
||||||
if (name === 'e' && !memorelay.count([{ ids: [id] }], { signal })) {
|
if (name === 'e' && !cache.count([{ ids: [id] }])) {
|
||||||
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import { eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
import { cache, eventsDB, reqmeister } from '@/storages.ts';
|
||||||
import { Debug, type NostrEvent } from '@/deps.ts';
|
import { Debug, type NostrEvent } from '@/deps.ts';
|
||||||
import { type AuthorMicrofilter, type IdMicrofilter } from '@/filter.ts';
|
import { type AuthorMicrofilter, type IdMicrofilter } from '@/filter.ts';
|
||||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||||
|
@ -25,7 +25,7 @@ const getEvent = async (
|
||||||
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: IdMicrofilter = { ids: [id] };
|
const microfilter: IdMicrofilter = { ids: [id] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.query([microfilter], opts) as DittoEvent[];
|
const [memoryEvent] = await cache.query([microfilter]) as DittoEvent[];
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
debug(`getEvent: ${id.slice(0, 8)} found in memory`);
|
debug(`getEvent: ${id.slice(0, 8)} found in memory`);
|
||||||
|
@ -43,7 +43,7 @@ const getEvent = async (
|
||||||
// TODO: make this DRY-er.
|
// TODO: make this DRY-er.
|
||||||
|
|
||||||
if (dbEvent && !dbEvent.author) {
|
if (dbEvent && !dbEvent.author) {
|
||||||
const [author] = await memorelay.query([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
const [author] = await cache.query([{ kinds: [0], authors: [dbEvent.pubkey] }]);
|
||||||
dbEvent.author = author;
|
dbEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ const getEvent = async (
|
||||||
}
|
}
|
||||||
|
|
||||||
if (memoryEvent && !memoryEvent.author) {
|
if (memoryEvent && !memoryEvent.author) {
|
||||||
const [author] = await memorelay.query([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
const [author] = await cache.query([{ kinds: [0], authors: [memoryEvent.pubkey] }]);
|
||||||
memoryEvent.author = author;
|
memoryEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise<Nostr
|
||||||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.query([microfilter], opts);
|
const [memoryEvent] = await cache.query([microfilter]);
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
return memoryEvent;
|
return memoryEvent;
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { db } from '@/db.ts';
|
import { db } from '@/db.ts';
|
||||||
|
import { NCache } from '@/deps.ts';
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { activeRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
import { EventsDB } from '@/storages/events-db.ts';
|
import { EventsDB } from '@/storages/events-db.ts';
|
||||||
import { Memorelay } from '@/storages/memorelay.ts';
|
|
||||||
import { Optimizer } from '@/storages/optimizer.ts';
|
import { Optimizer } from '@/storages/optimizer.ts';
|
||||||
import { PoolStore } from '@/storages/pool-store.ts';
|
import { PoolStore } from '@/storages/pool-store.ts';
|
||||||
import { Reqmeister } from '@/storages/reqmeister.ts';
|
import { Reqmeister } from '@/storages/reqmeister.ts';
|
||||||
|
@ -21,7 +21,7 @@ const client = new PoolStore({
|
||||||
const eventsDB = new EventsDB(db);
|
const eventsDB = new EventsDB(db);
|
||||||
|
|
||||||
/** In-memory data store for cached events. */
|
/** In-memory data store for cached events. */
|
||||||
const memorelay = new Memorelay({ max: 3000 });
|
const cache = new NCache({ max: 3000 });
|
||||||
|
|
||||||
/** Batches requests for single events. */
|
/** Batches requests for single events. */
|
||||||
const reqmeister = new Reqmeister({
|
const reqmeister = new Reqmeister({
|
||||||
|
@ -33,7 +33,7 @@ const reqmeister = new Reqmeister({
|
||||||
/** Main Ditto storage adapter */
|
/** Main Ditto storage adapter */
|
||||||
const optimizer = new Optimizer({
|
const optimizer = new Optimizer({
|
||||||
db: eventsDB,
|
db: eventsDB,
|
||||||
cache: memorelay,
|
cache,
|
||||||
client: reqmeister,
|
client: reqmeister,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -43,4 +43,4 @@ const searchStore = new SearchStore({
|
||||||
fallback: optimizer,
|
fallback: optimizer,
|
||||||
});
|
});
|
||||||
|
|
||||||
export { client, eventsDB, memorelay, optimizer, reqmeister, searchStore };
|
export { cache, client, eventsDB, optimizer, reqmeister, searchStore };
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
import { assertEquals } from '@/deps-test.ts';
|
|
||||||
|
|
||||||
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
|
||||||
|
|
||||||
import { Memorelay } from './memorelay.ts';
|
|
||||||
|
|
||||||
const memorelay = new Memorelay({
|
|
||||||
max: 3000,
|
|
||||||
maxEntrySize: 5000,
|
|
||||||
sizeCalculation: (event) => JSON.stringify(event).length,
|
|
||||||
});
|
|
||||||
|
|
||||||
Deno.test('memorelay', async () => {
|
|
||||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 0);
|
|
||||||
|
|
||||||
await memorelay.event(event1);
|
|
||||||
|
|
||||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 1);
|
|
||||||
|
|
||||||
const result = await memorelay.query([{ ids: [event1.id] }]);
|
|
||||||
assertEquals(result[0], event1);
|
|
||||||
});
|
|
|
@ -1,118 +0,0 @@
|
||||||
import {
|
|
||||||
Debug,
|
|
||||||
LRUCache,
|
|
||||||
matchFilter,
|
|
||||||
type NostrEvent,
|
|
||||||
type NostrFilter,
|
|
||||||
NSet,
|
|
||||||
type NStore,
|
|
||||||
type NStoreOpts,
|
|
||||||
} from '@/deps.ts';
|
|
||||||
import { normalizeFilters } from '@/filter.ts';
|
|
||||||
import { abortError } from '@/utils/abort.ts';
|
|
||||||
|
|
||||||
/** In-memory data store for events. */
|
|
||||||
class Memorelay implements NStore {
|
|
||||||
#debug = Debug('ditto:memorelay');
|
|
||||||
#cache: LRUCache<string, NostrEvent>;
|
|
||||||
|
|
||||||
constructor(...args: ConstructorParameters<typeof LRUCache<string, NostrEvent>>) {
|
|
||||||
this.#cache = new LRUCache<string, NostrEvent>(...args);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Iterate stored events. */
|
|
||||||
*#events(): Generator<NostrEvent> {
|
|
||||||
for (const event of this.#cache.values()) {
|
|
||||||
if (event && !(event instanceof Promise)) {
|
|
||||||
yield event;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get events from memory. */
|
|
||||||
query(filters: NostrFilter[], opts: NStoreOpts = {}): Promise<NostrEvent[]> {
|
|
||||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
|
||||||
|
|
||||||
filters = normalizeFilters(filters);
|
|
||||||
this.#debug('REQ', JSON.stringify(filters));
|
|
||||||
if (!filters.length) return Promise.resolve([]);
|
|
||||||
|
|
||||||
/** Event results to return. */
|
|
||||||
const results = new NSet();
|
|
||||||
|
|
||||||
/** Number of times an event has been added to results for each filter. */
|
|
||||||
const filterUsages: number[] = [];
|
|
||||||
|
|
||||||
/** Check if all filters have been satisfied. */
|
|
||||||
function checkSatisfied() {
|
|
||||||
return results.size >= (opts.limit ?? Infinity) ||
|
|
||||||
filters.every((filter, index) => filter.limit && (filterUsages[index] >= filter.limit));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Optimize for filters with IDs.
|
|
||||||
filters.forEach((filter, index) => {
|
|
||||||
if (filter.ids) {
|
|
||||||
for (const id of filter.ids) {
|
|
||||||
const event = this.#cache.get(id);
|
|
||||||
if (event && matchFilter(filter, event)) {
|
|
||||||
results.add(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
filterUsages[index] = Infinity;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Return early if all filters are satisfied.
|
|
||||||
if (checkSatisfied()) {
|
|
||||||
return Promise.resolve([...results]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Seek through all events in memory.
|
|
||||||
for (const event of this.#events()) {
|
|
||||||
filters.forEach((filter, index) => {
|
|
||||||
const limit = filter.limit ?? Infinity;
|
|
||||||
const usage = filterUsages[index] ?? 0;
|
|
||||||
|
|
||||||
if (usage >= limit) {
|
|
||||||
return;
|
|
||||||
} else if (matchFilter(filter, event)) {
|
|
||||||
results.add(event);
|
|
||||||
this.#cache.get(event.id);
|
|
||||||
filterUsages[index] = usage + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
index++;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Check after each event if we can return.
|
|
||||||
if (checkSatisfied()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.resolve([...results]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Insert an event into memory. */
|
|
||||||
event(event: NostrEvent, opts: NStoreOpts = {}): Promise<void> {
|
|
||||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
|
||||||
this.#cache.set(event.id, event);
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Count events in memory for the filters. */
|
|
||||||
async count(filters: NostrFilter[], opts?: NStoreOpts): Promise<number> {
|
|
||||||
const events = await this.query(filters, opts);
|
|
||||||
return events.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Delete events from memory. */
|
|
||||||
async remove(filters: NostrFilter[], opts: NStoreOpts): Promise<void> {
|
|
||||||
for (const event of await this.query(filters, opts)) {
|
|
||||||
this.#cache.delete(event.id);
|
|
||||||
}
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export { Memorelay };
|
|
Loading…
Reference in New Issue