Refactor memorelay as an EventStore
This commit is contained in:
parent
ccfdbfeb8d
commit
56373c4ce3
|
@ -354,8 +354,8 @@ function buildUserSearchContent(event: Event<0>): string {
|
||||||
|
|
||||||
/** SQLite database storage adapter for Nostr events. */
|
/** SQLite database storage adapter for Nostr events. */
|
||||||
const eventsDB: EventStore = {
|
const eventsDB: EventStore = {
|
||||||
storeEvent,
|
|
||||||
getEvents,
|
getEvents,
|
||||||
|
storeEvent,
|
||||||
countEvents,
|
countEvents,
|
||||||
deleteEvents,
|
deleteEvents,
|
||||||
};
|
};
|
||||||
|
|
|
@ -5,14 +5,12 @@ import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
||||||
import { memorelay } from './memorelay.ts';
|
import { memorelay } from './memorelay.ts';
|
||||||
|
|
||||||
Deno.test('memorelay', async () => {
|
Deno.test('memorelay', async () => {
|
||||||
assertEquals(memorelay.hasEvent(event1), false);
|
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);
|
||||||
assertEquals(memorelay.hasEventById(event1.id), false);
|
|
||||||
|
|
||||||
memorelay.insertEvent(event1);
|
await memorelay.storeEvent(event1);
|
||||||
|
|
||||||
assertEquals(memorelay.hasEvent(event1), true);
|
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 1);
|
||||||
assertEquals(memorelay.hasEventById(event1.id), true);
|
|
||||||
|
|
||||||
const result = await memorelay.getFilters([{ ids: [event1.id] }]);
|
const result = await memorelay.getEvents([{ ids: [event1.id] }]);
|
||||||
assertEquals(result[0], event1);
|
assertEquals(result[0], event1);
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
||||||
import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
||||||
|
import { type EventStore, type GetEventsOpts } from '@/store.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:memorelay');
|
const debug = Debug('ditto:memorelay');
|
||||||
|
|
||||||
|
@ -10,7 +11,7 @@ const events = new LRUCache<string, Event>({
|
||||||
});
|
});
|
||||||
|
|
||||||
/** Get events from memory. */
|
/** Get events from memory. */
|
||||||
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
|
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
debug('REQ', JSON.stringify(filters));
|
debug('REQ', JSON.stringify(filters));
|
||||||
|
@ -30,7 +31,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Insert an event into memory. */
|
/** Insert an event into memory. */
|
||||||
function insertEvent(event: Event): void {
|
function storeEvent(event: Event): Promise<void> {
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
for (const microfilter of getMicroFilters(event)) {
|
||||||
const filterId = getFilterId(microfilter);
|
const filterId = getFilterId(microfilter);
|
||||||
const existing = events.get(filterId);
|
const existing = events.get(filterId);
|
||||||
|
@ -38,32 +39,31 @@ function insertEvent(event: Event): void {
|
||||||
events.set(filterId, event);
|
events.set(filterId, event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if an event is in memory. */
|
/** Count events in memory for the filters. */
|
||||||
function hasEvent(event: Event): boolean {
|
async function countEvents(filters: Filter[]): Promise<number> {
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
const events = await getEvents(filters);
|
||||||
const filterId = getFilterId(microfilter);
|
return events.length;
|
||||||
const existing = events.get(filterId);
|
}
|
||||||
if (existing) {
|
|
||||||
return true;
|
/** Delete events from memory. */
|
||||||
|
function deleteEvents(filters: Filter[]): Promise<void> {
|
||||||
|
for (const filter of filters) {
|
||||||
|
if (isMicrofilter(filter)) {
|
||||||
|
events.delete(getFilterId(filter));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return Promise.resolve();
|
||||||
}
|
|
||||||
|
|
||||||
/** Check if an event is in memory by ID. */
|
|
||||||
function hasEventById(eventId: string): boolean {
|
|
||||||
const filterId = getFilterId({ ids: [eventId] });
|
|
||||||
return events.has(filterId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** In-memory data store for events using microfilters. */
|
/** In-memory data store for events using microfilters. */
|
||||||
const memorelay = {
|
const memorelay: EventStore = {
|
||||||
getFilters,
|
getEvents,
|
||||||
insertEvent,
|
storeEvent,
|
||||||
hasEvent,
|
countEvents,
|
||||||
hasEventById,
|
deleteEvents,
|
||||||
};
|
};
|
||||||
|
|
||||||
export { memorelay };
|
export { memorelay };
|
||||||
|
|
|
@ -21,16 +21,6 @@ type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] };
|
||||||
/** Filter to get one specific event. */
|
/** Filter to get one specific event. */
|
||||||
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
|
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
|
||||||
|
|
||||||
/** Additional options to apply to the whole subscription. */
|
|
||||||
interface GetFiltersOpts {
|
|
||||||
/** Signal to abort the request. */
|
|
||||||
signal?: AbortSignal;
|
|
||||||
/** Event limit for the whole subscription. */
|
|
||||||
limit?: number;
|
|
||||||
/** Relays to use, if applicable. */
|
|
||||||
relays?: WebSocket['url'][];
|
|
||||||
}
|
|
||||||
|
|
||||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
||||||
if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) {
|
if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -97,7 +87,6 @@ export {
|
||||||
type DittoFilter,
|
type DittoFilter,
|
||||||
eventToMicroFilter,
|
eventToMicroFilter,
|
||||||
getFilterId,
|
getFilterId,
|
||||||
type GetFiltersOpts,
|
|
||||||
getMicroFilters,
|
getMicroFilters,
|
||||||
type IdMicrofilter,
|
type IdMicrofilter,
|
||||||
isMicrofilter,
|
isMicrofilter,
|
||||||
|
|
|
@ -26,7 +26,7 @@ const debug = Debug('ditto:pipeline');
|
||||||
async function handleEvent(event: Event): Promise<void> {
|
async function handleEvent(event: Event): Promise<void> {
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
const wanted = reqmeister.isWanted(event);
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (encounterEvent(event)) return;
|
if (await encounterEvent(event)) return;
|
||||||
debug(`Event<${event.kind}> ${event.id}`);
|
debug(`Event<${event.kind}> ${event.id}`);
|
||||||
const data = await getEventData(event);
|
const data = await getEventData(event);
|
||||||
|
|
||||||
|
@ -43,9 +43,9 @@ async function handleEvent(event: Event): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
function encounterEvent(event: Event): boolean {
|
async function encounterEvent(event: Event): Promise<boolean> {
|
||||||
const preexisting = memorelay.hasEvent(event);
|
const preexisting = (await memorelay.countEvents([{ ids: [event.id] }])) > 0;
|
||||||
memorelay.insertEvent(event);
|
memorelay.storeEvent(event);
|
||||||
reqmeister.encounter(event);
|
reqmeister.encounter(event);
|
||||||
return preexisting;
|
return preexisting;
|
||||||
}
|
}
|
||||||
|
@ -142,7 +142,7 @@ function fetchRelatedEvents(event: Event, data: EventData) {
|
||||||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
||||||
}
|
}
|
||||||
for (const [name, id, relay] of event.tags) {
|
for (const [name, id, relay] of event.tags) {
|
||||||
if (name === 'e' && !memorelay.hasEventById(id)) {
|
if (name === 'e' && !memorelay.countEvents([{ ids: [id] }])) {
|
||||||
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ const getEvent = async <K extends number = number>(
|
||||||
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: IdMicrofilter = { ids: [id] };
|
const microfilter: IdMicrofilter = { ids: [id] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as DittoEvent<K>[];
|
const [memoryEvent] = await memorelay.getEvents([microfilter], opts) as DittoEvent<K>[];
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
return memoryEvent;
|
return memoryEvent;
|
||||||
|
@ -39,14 +39,14 @@ const getEvent = async <K extends number = number>(
|
||||||
// TODO: make this DRY-er.
|
// TODO: make this DRY-er.
|
||||||
|
|
||||||
if (dbEvent && !dbEvent.author) {
|
if (dbEvent && !dbEvent.author) {
|
||||||
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
||||||
dbEvent.author = author;
|
dbEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbEvent) return dbEvent;
|
if (dbEvent) return dbEvent;
|
||||||
|
|
||||||
if (memoryEvent && !memoryEvent.author) {
|
if (memoryEvent && !memoryEvent.author) {
|
||||||
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
||||||
memoryEvent.author = author;
|
memoryEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Ev
|
||||||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.getFilters([microfilter], opts);
|
const [memoryEvent] = await memorelay.getEvents([microfilter], opts);
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
return memoryEvent;
|
return memoryEvent;
|
||||||
|
|
Loading…
Reference in New Issue