Memorelay: reimplement by just looping events with matchFilters
This commit is contained in:
parent
a1dad3a0c5
commit
f667ba3c69
|
@ -1,69 +1,10 @@
|
||||||
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
import { Memorelay } from '@/storages/memorelay.ts';
|
||||||
import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
|
||||||
import { type EventStore, type GetEventsOpts } from '@/store.ts';
|
|
||||||
|
|
||||||
const debug = Debug('ditto:memorelay');
|
/** In-memory data store for events using microfilters. */
|
||||||
|
const memorelay = new Memorelay({
|
||||||
const events = new LRUCache<string, Event>({
|
|
||||||
max: 3000,
|
max: 3000,
|
||||||
maxEntrySize: 5000,
|
maxEntrySize: 5000,
|
||||||
sizeCalculation: (event) => JSON.stringify(event).length,
|
sizeCalculation: (event) => JSON.stringify(event).length,
|
||||||
});
|
});
|
||||||
|
|
||||||
/** Get events from memory. */
|
|
||||||
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
|
||||||
if (!filters.length) return Promise.resolve([]);
|
|
||||||
debug('REQ', JSON.stringify(filters));
|
|
||||||
|
|
||||||
const results: Event<K>[] = [];
|
|
||||||
|
|
||||||
for (const filter of filters) {
|
|
||||||
if (isMicrofilter(filter)) {
|
|
||||||
const event = events.get(getFilterId(filter));
|
|
||||||
if (event) {
|
|
||||||
results.push(event as Event<K>);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.resolve(results);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Insert an event into memory. */
|
|
||||||
function storeEvent(event: Event): Promise<void> {
|
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
|
||||||
const filterId = getFilterId(microfilter);
|
|
||||||
const existing = events.get(filterId);
|
|
||||||
if (!existing || event.created_at > existing.created_at) {
|
|
||||||
events.set(filterId, event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Count events in memory for the filters. */
|
|
||||||
async function countEvents(filters: Filter[]): Promise<number> {
|
|
||||||
const events = await getEvents(filters);
|
|
||||||
return events.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Delete events from memory. */
|
|
||||||
function deleteEvents(filters: Filter[]): Promise<void> {
|
|
||||||
for (const filter of filters) {
|
|
||||||
if (isMicrofilter(filter)) {
|
|
||||||
events.delete(getFilterId(filter));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** In-memory data store for events using microfilters. */
|
|
||||||
const memorelay: EventStore = {
|
|
||||||
getEvents,
|
|
||||||
storeEvent,
|
|
||||||
countEvents,
|
|
||||||
deleteEvents,
|
|
||||||
};
|
|
||||||
|
|
||||||
export { memorelay };
|
export { memorelay };
|
||||||
|
|
|
@ -18,6 +18,7 @@ export {
|
||||||
getEventHash,
|
getEventHash,
|
||||||
getPublicKey,
|
getPublicKey,
|
||||||
getSignature,
|
getSignature,
|
||||||
|
matchFilter,
|
||||||
matchFilters,
|
matchFilters,
|
||||||
nip04,
|
nip04,
|
||||||
nip05,
|
nip05,
|
||||||
|
|
|
@ -2,7 +2,13 @@ import { assertEquals } from '@/deps-test.ts';
|
||||||
|
|
||||||
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
||||||
|
|
||||||
import { memorelay } from './memorelay.ts';
|
import { Memorelay } from './memorelay.ts';
|
||||||
|
|
||||||
|
const memorelay = new Memorelay({
|
||||||
|
max: 3000,
|
||||||
|
maxEntrySize: 5000,
|
||||||
|
sizeCalculation: (event) => JSON.stringify(event).length,
|
||||||
|
});
|
||||||
|
|
||||||
Deno.test('memorelay', async () => {
|
Deno.test('memorelay', async () => {
|
||||||
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);
|
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);
|
|
@ -1,8 +1,7 @@
|
||||||
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
import { Debug, type Event, type Filter, LRUCache, matchFilter, matchFilters } from '@/deps.ts';
|
||||||
import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
|
||||||
import { type EventStore, type GetEventsOpts } from '@/store.ts';
|
import { type EventStore, type GetEventsOpts } from '@/store.ts';
|
||||||
|
|
||||||
/** In-memory data store for events using microfilters. */
|
/** In-memory data store for events. */
|
||||||
class Memorelay implements EventStore {
|
class Memorelay implements EventStore {
|
||||||
#debug = Debug('ditto:memorelay');
|
#debug = Debug('ditto:memorelay');
|
||||||
#cache: LRUCache<string, Event>;
|
#cache: LRUCache<string, Event>;
|
||||||
|
@ -11,6 +10,15 @@ class Memorelay implements EventStore {
|
||||||
this.#cache = new LRUCache<string, Event>(...args);
|
this.#cache = new LRUCache<string, Event>(...args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Iterate stored events. */
|
||||||
|
*#events(): Generator<Event> {
|
||||||
|
for (const event of this.#cache.values()) {
|
||||||
|
if (event && !(event instanceof Promise)) {
|
||||||
|
yield event;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Get events from memory. */
|
/** Get events from memory. */
|
||||||
getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||||
|
@ -18,13 +26,27 @@ class Memorelay implements EventStore {
|
||||||
this.#debug('REQ', JSON.stringify(filters));
|
this.#debug('REQ', JSON.stringify(filters));
|
||||||
|
|
||||||
const results: Event<K>[] = [];
|
const results: Event<K>[] = [];
|
||||||
|
const usages: number[] = [];
|
||||||
|
|
||||||
|
for (const event of this.#events()) {
|
||||||
|
let index = 0;
|
||||||
|
|
||||||
for (const filter of filters) {
|
for (const filter of filters) {
|
||||||
if (isMicrofilter(filter)) {
|
const limit = filter.limit ?? Infinity;
|
||||||
const event = this.#cache.get(getFilterId(filter));
|
const usage = usages[index] ?? 0;
|
||||||
if (event) {
|
|
||||||
|
if (usage >= limit) {
|
||||||
|
continue;
|
||||||
|
} else if (matchFilter(filter, event)) {
|
||||||
results.push(event as Event<K>);
|
results.push(event as Event<K>);
|
||||||
|
usages[index] = usage + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filters.every((filter, index) => usages[index] >= (filter.limit ?? Infinity))) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,13 +55,7 @@ class Memorelay implements EventStore {
|
||||||
|
|
||||||
/** Insert an event into memory. */
|
/** Insert an event into memory. */
|
||||||
storeEvent(event: Event): Promise<void> {
|
storeEvent(event: Event): Promise<void> {
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
this.#cache.set(event.id, event);
|
||||||
const filterId = getFilterId(microfilter);
|
|
||||||
const existing = this.#cache.get(filterId);
|
|
||||||
if (!existing || event.created_at > existing.created_at) {
|
|
||||||
this.#cache.set(filterId, event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,9 +67,9 @@ class Memorelay implements EventStore {
|
||||||
|
|
||||||
/** Delete events from memory. */
|
/** Delete events from memory. */
|
||||||
deleteEvents(filters: Filter[]): Promise<void> {
|
deleteEvents(filters: Filter[]): Promise<void> {
|
||||||
for (const filter of filters) {
|
for (const event of this.#events()) {
|
||||||
if (isMicrofilter(filter)) {
|
if (matchFilters(filters, event)) {
|
||||||
this.#cache.delete(getFilterId(filter));
|
this.#cache.delete(event.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
|
|
Loading…
Reference in New Issue