Merge branch 'pipeline-encounter-race' into 'main'
pipeline: fix race condition in encounterEvent See merge request soapbox-pub/ditto!318
This commit is contained in:
commit
b77e14dac3
|
@ -3,12 +3,14 @@ import { LNURL } from '@nostrify/nostrify/ln';
|
||||||
import { PipePolicy } from '@nostrify/nostrify/policies';
|
import { PipePolicy } from '@nostrify/nostrify/policies';
|
||||||
import Debug from '@soapbox/stickynotes/debug';
|
import Debug from '@soapbox/stickynotes/debug';
|
||||||
import { sql } from 'kysely';
|
import { sql } from 'kysely';
|
||||||
|
import { LRUCache } from 'lru-cache';
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
||||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||||
import { DVM } from '@/pipeline/DVM.ts';
|
import { DVM } from '@/pipeline/DVM.ts';
|
||||||
|
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
|
||||||
import { RelayError } from '@/RelayError.ts';
|
import { RelayError } from '@/RelayError.ts';
|
||||||
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
||||||
import { Storages } from '@/storages.ts';
|
import { Storages } from '@/storages.ts';
|
||||||
|
@ -23,8 +25,6 @@ import { nip05Cache } from '@/utils/nip05.ts';
|
||||||
import { updateStats } from '@/utils/stats.ts';
|
import { updateStats } from '@/utils/stats.ts';
|
||||||
import { getTagSet } from '@/utils/tags.ts';
|
import { getTagSet } from '@/utils/tags.ts';
|
||||||
|
|
||||||
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
|
|
||||||
|
|
||||||
const debug = Debug('ditto:pipeline');
|
const debug = Debug('ditto:pipeline');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,7 +33,7 @@ const debug = Debug('ditto:pipeline');
|
||||||
*/
|
*/
|
||||||
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
|
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
|
||||||
if (!(await verifyEventWorker(event))) return;
|
if (!(await verifyEventWorker(event))) return;
|
||||||
if (await encounterEvent(event, signal)) return;
|
if (encounterEvent(event)) return;
|
||||||
debug(`NostrEvent<${event.kind}> ${event.id}`);
|
debug(`NostrEvent<${event.kind}> ${event.id}`);
|
||||||
|
|
||||||
if (event.kind !== 24133) {
|
if (event.kind !== 24133) {
|
||||||
|
@ -90,17 +90,15 @@ async function policyFilter(event: NostrEvent): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const encounters = new LRUCache<string, true>({ max: 1000 });
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
|
function encounterEvent(event: NostrEvent): boolean {
|
||||||
const cache = await Storages.cache();
|
const encountered = !!encounters.get(event.id);
|
||||||
const reqmeister = await Storages.reqmeister();
|
if (!encountered) {
|
||||||
|
encounters.set(event.id, true);
|
||||||
const [existing] = await cache.query([{ ids: [event.id], limit: 1 }]);
|
}
|
||||||
|
return encountered;
|
||||||
cache.event(event);
|
|
||||||
reqmeister.event(event, { signal });
|
|
||||||
|
|
||||||
return !!existing;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Hydrate the event with the user, if applicable. */
|
/** Hydrate the event with the user, if applicable. */
|
||||||
|
|
Loading…
Reference in New Issue