From d40b4a509e26b9f40a94a87c18773a2ab8d1da16 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 27 Dec 2023 23:35:42 -0600 Subject: [PATCH] pipeline: use memorelay for encounters --- src/db/memorelay.ts | 28 +++++++++++++++++++++++++++- src/pipeline.ts | 17 +++++++---------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index ad2a884..dae3dbd 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -40,4 +40,30 @@ function insertEvent(event: Event): void { } } -export { getFilters, insertEvent }; +/** Check if an event is in memory. */ +function hasEvent(event: Event): boolean { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = events.get(filterId); + if (existing) { + return true; + } + } + return false; +} + +/** Check if an event is in memory by ID. */ +function hasEventById(eventId: string): boolean { + const filterId = getFilterId({ ids: [eventId] }); + return events.has(filterId); +} + +/** In-memory data store for events using microfilters. */ +const memorelay = { + getFilters, + insertEvent, + hasEvent, + hasEventById, +}; + +export { memorelay }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 081d6a4..3d065c0 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,9 +1,10 @@ import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; +import { memorelay } from '@/db/memorelay.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; -import { Debug, type Event, LRUCache } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; @@ -12,12 +13,11 @@ import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; +import { type EventData } from '@/types.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import { TrendsWorker } from '@/workers/trends.ts'; import { verifySignatureWorker } from '@/workers/verify.ts'; -import type { EventData } from '@/types.ts'; - const debug = Debug('ditto:pipeline'); /** @@ -43,15 +43,12 @@ async function handleEvent(event: Event): Promise { ]); } -/** Tracks encountered events to skip duplicates, improving idempotency and performance. */ -const encounters = new LRUCache({ max: 1000 }); - /** Encounter the event, and return whether it has already been encountered. */ function encounterEvent(event: Event): boolean { - const result = encounters.get(event.id); - encounters.set(event.id, true); + const preexisting = memorelay.hasEvent(event); + memorelay.insertEvent(event); reqmeister.encounter(event); - return !!result; + return preexisting; } /** Preload data that will be useful to several tasks. */ @@ -146,7 +143,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); } for (const [name, id, relay] of event.tags) { - if (name === 'e' && !encounters.has(id)) { + if (name === 'e' && !memorelay.hasEventById(id)) { reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); } }