From 6684edaeaf60ec601414f14eec4215c06fa65646 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 May 2024 12:13:55 -0500 Subject: [PATCH] pipeline: fix race condition in encounterEvent --- src/pipeline.ts | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 7bab6d0..a540125 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -3,12 +3,14 @@ import { LNURL } from '@nostrify/nostrify/ln'; import { PipePolicy } from '@nostrify/nostrify/policies'; import Debug from '@soapbox/stickynotes/debug'; import { sql } from 'kysely'; +import { LRUCache } from 'lru-cache'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DVM } from '@/pipeline/DVM.ts'; +import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { RelayError } from '@/RelayError.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; @@ -23,8 +25,6 @@ import { nip05Cache } from '@/utils/nip05.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; -import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; - const debug = Debug('ditto:pipeline'); /** @@ -33,7 +33,7 @@ const debug = Debug('ditto:pipeline'); */ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { if (!(await verifyEventWorker(event))) return; - if (await encounterEvent(event, signal)) return; + if (encounterEvent(event)) return; debug(`NostrEvent<${event.kind}> ${event.id}`); if (event.kind !== 24133) { @@ -90,17 +90,15 @@ async function policyFilter(event: NostrEvent): Promise { } } +const encounters = new LRUCache({ max: 1000 }); + /** Encounter the event, and return whether it has already been encountered. */ -async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise { - const cache = await Storages.cache(); - const reqmeister = await Storages.reqmeister(); - - const [existing] = await cache.query([{ ids: [event.id], limit: 1 }]); - - cache.event(event); - reqmeister.event(event, { signal }); - - return !!existing; +function encounterEvent(event: NostrEvent): boolean { + const encountered = !!encounters.get(event.id); + if (!encountered) { + encounters.set(event.id, true); + } + return encountered; } /** Hydrate the event with the user, if applicable. */