Merge branch 'rm-memorelay' into 'main'
Get rid of unnecessary memorelay module, replace with NCache See merge request soapbox-pub/ditto!108
This commit is contained in:
commit
40f75f681b
|
@ -83,16 +83,17 @@ export { EventEmitter } from 'npm:tseep@^1.1.3';
|
|||
export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
|
||||
// @deno-types="npm:@types/debug@^4.1.12"
|
||||
export { default as Debug } from 'npm:debug@^4.3.4';
|
||||
export { NSet } from 'https://gitlab.com/soapbox-pub/nset/-/raw/b3c5601612f9bd277626198c5534e0796e003884/mod.ts';
|
||||
export {
|
||||
LNURL,
|
||||
type LNURLDetails,
|
||||
type MapCache,
|
||||
NCache,
|
||||
NIP05,
|
||||
type NostrEvent,
|
||||
type NostrFilter,
|
||||
NSet,
|
||||
type NStore,
|
||||
type NStoreOpts,
|
||||
} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/057ecc6e2ce813db6e2279288fbfd08c5b53cc0c/mod.ts';
|
||||
} from 'https://gitlab.com/soapbox-pub/nlib/-/raw/91ddffb8d7091286641fdc419bdde07c8037eac7/mod.ts';
|
||||
|
||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||
|
|
|
@ -7,7 +7,7 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
|||
import { isEphemeralKind } from '@/kinds.ts';
|
||||
import { isLocallyFollowed } from '@/queries.ts';
|
||||
import { updateStats } from '@/stats.ts';
|
||||
import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
||||
import { cache, client, eventsDB, reqmeister } from '@/storages.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { getTagSet } from '@/tags.ts';
|
||||
import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
|
||||
|
@ -45,8 +45,8 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
|||
|
||||
/** Encounter the event, and return whether it has already been encountered. */
|
||||
async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
|
||||
const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0;
|
||||
memorelay.event(event, { signal });
|
||||
const preexisting = (await cache.count([{ ids: [event.id] }])) > 0;
|
||||
cache.event(event);
|
||||
reqmeister.event(event, { signal });
|
||||
return preexisting;
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) {
|
|||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
|
||||
}
|
||||
for (const [name, id, relay] of event.tags) {
|
||||
if (name === 'e' && !memorelay.count([{ ids: [id] }], { signal })) {
|
||||
if (name === 'e' && !cache.count([{ ids: [id] }])) {
|
||||
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
||||
import { cache, eventsDB, reqmeister } from '@/storages.ts';
|
||||
import { Debug, type NostrEvent } from '@/deps.ts';
|
||||
import { type AuthorMicrofilter, type IdMicrofilter } from '@/filter.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
|
@ -25,7 +25,7 @@ const getEvent = async (
|
|||
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||
const microfilter: IdMicrofilter = { ids: [id] };
|
||||
|
||||
const [memoryEvent] = await memorelay.query([microfilter], opts) as DittoEvent[];
|
||||
const [memoryEvent] = await cache.query([microfilter]) as DittoEvent[];
|
||||
|
||||
if (memoryEvent && !relations) {
|
||||
debug(`getEvent: ${id.slice(0, 8)} found in memory`);
|
||||
|
@ -43,7 +43,7 @@ const getEvent = async (
|
|||
// TODO: make this DRY-er.
|
||||
|
||||
if (dbEvent && !dbEvent.author) {
|
||||
const [author] = await memorelay.query([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
||||
const [author] = await cache.query([{ kinds: [0], authors: [dbEvent.pubkey] }]);
|
||||
dbEvent.author = author;
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ const getEvent = async (
|
|||
}
|
||||
|
||||
if (memoryEvent && !memoryEvent.author) {
|
||||
const [author] = await memorelay.query([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
||||
const [author] = await cache.query([{ kinds: [0], authors: [memoryEvent.pubkey] }]);
|
||||
memoryEvent.author = author;
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise<Nostr
|
|||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
||||
|
||||
const [memoryEvent] = await memorelay.query([microfilter], opts);
|
||||
const [memoryEvent] = await cache.query([microfilter]);
|
||||
|
||||
if (memoryEvent && !relations) {
|
||||
return memoryEvent;
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { db } from '@/db.ts';
|
||||
import { NCache } from '@/deps.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { activeRelays, pool } from '@/pool.ts';
|
||||
import { EventsDB } from '@/storages/events-db.ts';
|
||||
import { Memorelay } from '@/storages/memorelay.ts';
|
||||
import { Optimizer } from '@/storages/optimizer.ts';
|
||||
import { PoolStore } from '@/storages/pool-store.ts';
|
||||
import { Reqmeister } from '@/storages/reqmeister.ts';
|
||||
|
@ -21,7 +21,7 @@ const client = new PoolStore({
|
|||
const eventsDB = new EventsDB(db);
|
||||
|
||||
/** In-memory data store for cached events. */
|
||||
const memorelay = new Memorelay({ max: 3000 });
|
||||
const cache = new NCache({ max: 3000 });
|
||||
|
||||
/** Batches requests for single events. */
|
||||
const reqmeister = new Reqmeister({
|
||||
|
@ -33,7 +33,7 @@ const reqmeister = new Reqmeister({
|
|||
/** Main Ditto storage adapter */
|
||||
const optimizer = new Optimizer({
|
||||
db: eventsDB,
|
||||
cache: memorelay,
|
||||
cache,
|
||||
client: reqmeister,
|
||||
});
|
||||
|
||||
|
@ -43,4 +43,4 @@ const searchStore = new SearchStore({
|
|||
fallback: optimizer,
|
||||
});
|
||||
|
||||
export { client, eventsDB, memorelay, optimizer, reqmeister, searchStore };
|
||||
export { cache, client, eventsDB, optimizer, reqmeister, searchStore };
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
import { assertEquals } from '@/deps-test.ts';
|
||||
|
||||
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
||||
|
||||
import { Memorelay } from './memorelay.ts';
|
||||
|
||||
const memorelay = new Memorelay({
|
||||
max: 3000,
|
||||
maxEntrySize: 5000,
|
||||
sizeCalculation: (event) => JSON.stringify(event).length,
|
||||
});
|
||||
|
||||
Deno.test('memorelay', async () => {
|
||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 0);
|
||||
|
||||
await memorelay.event(event1);
|
||||
|
||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 1);
|
||||
|
||||
const result = await memorelay.query([{ ids: [event1.id] }]);
|
||||
assertEquals(result[0], event1);
|
||||
});
|
|
@ -1,118 +0,0 @@
|
|||
import {
|
||||
Debug,
|
||||
LRUCache,
|
||||
matchFilter,
|
||||
type NostrEvent,
|
||||
type NostrFilter,
|
||||
NSet,
|
||||
type NStore,
|
||||
type NStoreOpts,
|
||||
} from '@/deps.ts';
|
||||
import { normalizeFilters } from '@/filter.ts';
|
||||
import { abortError } from '@/utils/abort.ts';
|
||||
|
||||
/** In-memory data store for events. */
|
||||
class Memorelay implements NStore {
|
||||
#debug = Debug('ditto:memorelay');
|
||||
#cache: LRUCache<string, NostrEvent>;
|
||||
|
||||
constructor(...args: ConstructorParameters<typeof LRUCache<string, NostrEvent>>) {
|
||||
this.#cache = new LRUCache<string, NostrEvent>(...args);
|
||||
}
|
||||
|
||||
/** Iterate stored events. */
|
||||
*#events(): Generator<NostrEvent> {
|
||||
for (const event of this.#cache.values()) {
|
||||
if (event && !(event instanceof Promise)) {
|
||||
yield event;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Get events from memory. */
|
||||
query(filters: NostrFilter[], opts: NStoreOpts = {}): Promise<NostrEvent[]> {
|
||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||
|
||||
filters = normalizeFilters(filters);
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
/** Event results to return. */
|
||||
const results = new NSet<NostrEvent>();
|
||||
|
||||
/** Number of times an event has been added to results for each filter. */
|
||||
const filterUsages: number[] = [];
|
||||
|
||||
/** Check if all filters have been satisfied. */
|
||||
function checkSatisfied() {
|
||||
return results.size >= (opts.limit ?? Infinity) ||
|
||||
filters.every((filter, index) => filter.limit && (filterUsages[index] >= filter.limit));
|
||||
}
|
||||
|
||||
// Optimize for filters with IDs.
|
||||
filters.forEach((filter, index) => {
|
||||
if (filter.ids) {
|
||||
for (const id of filter.ids) {
|
||||
const event = this.#cache.get(id);
|
||||
if (event && matchFilter(filter, event)) {
|
||||
results.add(event);
|
||||
}
|
||||
}
|
||||
filterUsages[index] = Infinity;
|
||||
}
|
||||
});
|
||||
|
||||
// Return early if all filters are satisfied.
|
||||
if (checkSatisfied()) {
|
||||
return Promise.resolve([...results]);
|
||||
}
|
||||
|
||||
// Seek through all events in memory.
|
||||
for (const event of this.#events()) {
|
||||
filters.forEach((filter, index) => {
|
||||
const limit = filter.limit ?? Infinity;
|
||||
const usage = filterUsages[index] ?? 0;
|
||||
|
||||
if (usage >= limit) {
|
||||
return;
|
||||
} else if (matchFilter(filter, event)) {
|
||||
results.add(event);
|
||||
this.#cache.get(event.id);
|
||||
filterUsages[index] = usage + 1;
|
||||
}
|
||||
|
||||
index++;
|
||||
});
|
||||
|
||||
// Check after each event if we can return.
|
||||
if (checkSatisfied()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve([...results]);
|
||||
}
|
||||
|
||||
/** Insert an event into memory. */
|
||||
event(event: NostrEvent, opts: NStoreOpts = {}): Promise<void> {
|
||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||
this.#cache.set(event.id, event);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
/** Count events in memory for the filters. */
|
||||
async count(filters: NostrFilter[], opts?: NStoreOpts): Promise<number> {
|
||||
const events = await this.query(filters, opts);
|
||||
return events.length;
|
||||
}
|
||||
|
||||
/** Delete events from memory. */
|
||||
async remove(filters: NostrFilter[], opts: NStoreOpts): Promise<void> {
|
||||
for (const event of await this.query(filters, opts)) {
|
||||
this.#cache.delete(event.id);
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
export { Memorelay };
|
|
@ -40,7 +40,7 @@ class Optimizer implements NStore {
|
|||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
const { limit = Infinity } = opts;
|
||||
const results = new NSet<DittoEvent>();
|
||||
const results = new NSet();
|
||||
|
||||
// Filters with IDs are immutable, so we can take them straight from the cache if we have them.
|
||||
for (let i = 0; i < filters.length; i++) {
|
||||
|
|
|
@ -53,7 +53,7 @@ class PoolStore implements NStore {
|
|||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const results = new NSet<NostrEvent>();
|
||||
const results = new NSet();
|
||||
|
||||
const unsub = this.#pool.subscribe(
|
||||
filters,
|
||||
|
|
|
@ -56,7 +56,7 @@ class SearchStore implements NStore {
|
|||
opts?.signal?.addEventListener('abort', close, { once: true });
|
||||
sub.eoseSignal.addEventListener('abort', close, { once: true });
|
||||
|
||||
const events = new NSet<DittoEvent>();
|
||||
const events = new NSet();
|
||||
|
||||
for await (const event of sub) {
|
||||
events.add(event);
|
||||
|
|
Loading…
Reference in New Issue