pipeline: use memorelay for encounters

This commit is contained in:
Alex Gleason 2023-12-27 23:35:42 -06:00
parent acffdd7fb8
commit d40b4a509e
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
2 changed files with 34 additions and 11 deletions

View File

@ -40,4 +40,30 @@ function insertEvent(event: Event): void {
} }
} }
export { getFilters, insertEvent }; /** Check if an event is in memory. */
function hasEvent(event: Event): boolean {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (existing) {
return true;
}
}
return false;
}
/** Check if an event is in memory by ID. */
function hasEventById(eventId: string): boolean {
const filterId = getFilterId({ ids: [eventId] });
return events.has(filterId);
}
/** In-memory data store for events using microfilters. */
const memorelay = {
getFilters,
insertEvent,
hasEvent,
hasEventById,
};
export { memorelay };

View File

@ -1,9 +1,10 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import * as eventsDB from '@/db/events.ts'; import * as eventsDB from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts';
import { addRelays } from '@/db/relays.ts'; import { addRelays } from '@/db/relays.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { findUser } from '@/db/users.ts'; import { findUser } from '@/db/users.ts';
import { Debug, type Event, LRUCache } from '@/deps.ts'; import { Debug, type Event } from '@/deps.ts';
import { isEphemeralKind } from '@/kinds.ts'; import { isEphemeralKind } from '@/kinds.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { publish } from '@/pool.ts'; import { publish } from '@/pool.ts';
@ -12,12 +13,11 @@ import { reqmeister } from '@/reqmeister.ts';
import { updateStats } from '@/stats.ts'; import { updateStats } from '@/stats.ts';
import { Sub } from '@/subs.ts'; import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts'; import { getTagSet } from '@/tags.ts';
import { type EventData } from '@/types.ts';
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
import { TrendsWorker } from '@/workers/trends.ts'; import { TrendsWorker } from '@/workers/trends.ts';
import { verifySignatureWorker } from '@/workers/verify.ts'; import { verifySignatureWorker } from '@/workers/verify.ts';
import type { EventData } from '@/types.ts';
const debug = Debug('ditto:pipeline'); const debug = Debug('ditto:pipeline');
/** /**
@ -43,15 +43,12 @@ async function handleEvent(event: Event): Promise<void> {
]); ]);
} }
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */ /** Encounter the event, and return whether it has already been encountered. */
function encounterEvent(event: Event): boolean { function encounterEvent(event: Event): boolean {
const result = encounters.get(event.id); const preexisting = memorelay.hasEvent(event);
encounters.set(event.id, true); memorelay.insertEvent(event);
reqmeister.encounter(event); reqmeister.encounter(event);
return !!result; return preexisting;
} }
/** Preload data that will be useful to several tasks. */ /** Preload data that will be useful to several tasks. */
@ -146,7 +143,7 @@ function fetchRelatedEvents(event: Event, data: EventData) {
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
} }
for (const [name, id, relay] of event.tags) { for (const [name, id, relay] of event.tags) {
if (name === 'e' && !encounters.has(id)) { if (name === 'e' && !memorelay.hasEventById(id)) {
reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
} }
} }