From 539804215665d141f526014042e6f924bab779b2 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 27 Dec 2023 22:49:35 -0600 Subject: [PATCH 01/12] Add memorelay module --- src/db/memorelay.ts | 27 +++++++++++++++++++++++++++ src/filter.ts | 18 +++++++++++++++--- 2 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 src/db/memorelay.ts diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts new file mode 100644 index 0000000..a6dc2d9 --- /dev/null +++ b/src/db/memorelay.ts @@ -0,0 +1,27 @@ +import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; +import { getFilterId, type GetFiltersOpts, isMicrofilter } from '@/filter.ts'; + +const debug = Debug('ditto:memorelay'); +const events = new LRUCache({ max: 1000 }); + +/** Get events from memory. */ +function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); + + const results: Event[] = []; + + for (const filter of filters) { + if (isMicrofilter(filter)) { + const event = events.get(getFilterId(filter)); + if (event) { + results.push(event as Event); + } + } + } + + return Promise.resolve(results); +} + +export { getFilters }; diff --git a/src/filter.ts b/src/filter.ts index 7178e2d..f562a82 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,7 +1,7 @@ import { Conf } from '@/config.ts'; -import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts'; - -import type { EventData } from '@/types.ts'; +import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts'; +import { nostrIdSchema } from '@/schemas/nostr.ts'; +import { type EventData } from '@/types.ts'; /** Additional properties that may be added by Ditto to events. */ type Relation = 'author' | 'author_stats' | 'event_stats'; @@ -70,11 +70,23 @@ function eventToMicroFilter(event: Event): MicroFilter { } } +/** Microfilter schema. */ +const microFilterSchema = z.union([ + z.object({ ids: z.tuple([nostrIdSchema]) }).strict(), + z.object({ kinds: z.tuple([z.literal(0)]), authors: z.tuple([nostrIdSchema]) }).strict(), +]); + +/** Checks whether the filter is a microfilter. */ +function isMicrofilter(filter: Filter): filter is MicroFilter { + return microFilterSchema.safeParse(filter).success; +} + export { type DittoFilter, eventToMicroFilter, getFilterId, type GetFiltersOpts, + isMicrofilter, matchDittoFilters, type MicroFilter, type Relation, From acffdd7fb80d12590f109835570d88c7778a36a9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 27 Dec 2023 23:22:24 -0600 Subject: [PATCH 02/12] memorelay: insertEvent --- src/db/memorelay.ts | 22 +++++++++++++++++++--- src/filter.ts | 14 +++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index a6dc2d9..ad2a884 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -1,8 +1,13 @@ import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; -import { getFilterId, type GetFiltersOpts, isMicrofilter } from '@/filter.ts'; +import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts'; const debug = Debug('ditto:memorelay'); -const events = new LRUCache({ max: 1000 }); + +const events = new LRUCache({ + max: 1000, + maxEntrySize: 1000, + sizeCalculation: (event) => JSON.stringify(event).length, +}); /** Get events from memory. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { @@ -24,4 +29,15 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts return Promise.resolve(results); } -export { getFilters }; +/** Insert an event into memory. */ +function insertEvent(event: Event): void { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = events.get(filterId); + if (!existing || event.created_at > existing.created_at) { + events.set(filterId, event); + } + } +} + +export { getFilters, insertEvent }; diff --git a/src/filter.ts b/src/filter.ts index f562a82..7f3cb4f 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -63,11 +63,18 @@ function getFilterId(filter: MicroFilter): string { /** Get a microfilter from a Nostr event. */ function eventToMicroFilter(event: Event): MicroFilter { + const [microfilter] = getMicroFilters(event); + return microfilter; +} + +/** Get all the microfilters for an event, in order of priority. */ +function getMicroFilters(event: Event): MicroFilter[] { + const microfilters: MicroFilter[] = []; if (event.kind === 0) { - return { kinds: [0], authors: [event.pubkey] }; - } else { - return { ids: [event.id] }; + microfilters.push({ kinds: [0], authors: [event.pubkey] }); } + microfilters.push({ ids: [event.id] }); + return microfilters; } /** Microfilter schema. */ @@ -86,6 +93,7 @@ export { eventToMicroFilter, getFilterId, type GetFiltersOpts, + getMicroFilters, isMicrofilter, matchDittoFilters, type MicroFilter, From d40b4a509e26b9f40a94a87c18773a2ab8d1da16 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 27 Dec 2023 23:35:42 -0600 Subject: [PATCH 03/12] pipeline: use memorelay for encounters --- src/db/memorelay.ts | 28 +++++++++++++++++++++++++++- src/pipeline.ts | 17 +++++++---------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index ad2a884..dae3dbd 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -40,4 +40,30 @@ function insertEvent(event: Event): void { } } -export { getFilters, insertEvent }; +/** Check if an event is in memory. */ +function hasEvent(event: Event): boolean { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = events.get(filterId); + if (existing) { + return true; + } + } + return false; +} + +/** Check if an event is in memory by ID. */ +function hasEventById(eventId: string): boolean { + const filterId = getFilterId({ ids: [eventId] }); + return events.has(filterId); +} + +/** In-memory data store for events using microfilters. */ +const memorelay = { + getFilters, + insertEvent, + hasEvent, + hasEventById, +}; + +export { memorelay }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 081d6a4..3d065c0 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,9 +1,10 @@ import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; +import { memorelay } from '@/db/memorelay.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; -import { Debug, type Event, LRUCache } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; @@ -12,12 +13,11 @@ import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; +import { type EventData } from '@/types.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import { TrendsWorker } from '@/workers/trends.ts'; import { verifySignatureWorker } from '@/workers/verify.ts'; -import type { EventData } from '@/types.ts'; - const debug = Debug('ditto:pipeline'); /** @@ -43,15 +43,12 @@ async function handleEvent(event: Event): Promise { ]); } -/** Tracks encountered events to skip duplicates, improving idempotency and performance. */ -const encounters = new LRUCache({ max: 1000 }); - /** Encounter the event, and return whether it has already been encountered. */ function encounterEvent(event: Event): boolean { - const result = encounters.get(event.id); - encounters.set(event.id, true); + const preexisting = memorelay.hasEvent(event); + memorelay.insertEvent(event); reqmeister.encounter(event); - return !!result; + return preexisting; } /** Preload data that will be useful to several tasks. */ @@ -146,7 +143,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); } for (const [name, id, relay] of event.tags) { - if (name === 'e' && !encounters.has(id)) { + if (name === 'e' && !memorelay.hasEventById(id)) { reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); } } From f75cd211594d1b500843b134d4df6ca0c36f2c23 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 27 Dec 2023 23:55:42 -0600 Subject: [PATCH 04/12] queries: make getAuthor use memorelay --- src/filter.ts | 8 +++++++- src/queries.ts | 18 ++++++++++++++---- src/reqmeister.ts | 5 ++++- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/filter.ts b/src/filter.ts index 7f3cb4f..926e360 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -14,8 +14,12 @@ interface DittoFilter extends Filter { relations?: Relation[]; } +/** Microfilter to get one specific event by ID. */ +type IdMicrofilter = { ids: [Event['id']] }; +/** Microfilter to get an author. */ +type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] }; /** Filter to get one specific event. */ -type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; +type MicroFilter = IdMicrofilter | AuthorMicrofilter; /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { @@ -89,11 +93,13 @@ function isMicrofilter(filter: Filter): filter is MicroFilter { } export { + type AuthorMicrofilter, type DittoFilter, eventToMicroFilter, getFilterId, type GetFiltersOpts, getMicroFilters, + type IdMicrofilter, isMicrofilter, matchDittoFilters, type MicroFilter, diff --git a/src/queries.ts b/src/queries.ts index 3af2253..9e7b4ce 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,8 +1,9 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; -import { type DittoFilter, type Relation } from '@/filter.ts'; +import { AuthorMicrofilter, type DittoFilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { reqmeister } from '@/reqmeister.ts'; +import { memorelay } from '@/db/memorelay.ts'; interface GetEventOpts { /** Signal to abort the request. */ @@ -30,13 +31,22 @@ const getEvent = async ( /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { const { relations, signal = AbortSignal.timeout(1000) } = opts; + const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] }; - const event = await eventsDB.getFilters( + let event: Event<0> | undefined; + + [event] = await memorelay.getFilters([microfilter], opts); + + if (event) return event; + + [event] = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], { limit: 1, signal }, - ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); + ); - return event; + if (event) return event; + + return reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => undefined); }; /** Get users the given pubkey follows. */ diff --git a/src/reqmeister.ts b/src/reqmeister.ts index d8fef75..6019758 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,6 +1,6 @@ import * as client from '@/client.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; -import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; +import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; import { Time } from '@/utils/time.ts'; const debug = Debug('ditto:reqmeister'); @@ -70,6 +70,9 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an this.#perform(); } + req(filter: IdMicrofilter, relays?: WebSocket['url'][]): Promise; + req(filter: AuthorMicrofilter, relays?: WebSocket['url'][]): Promise>; + req(filter: MicroFilter, relays?: WebSocket['url'][]): Promise; req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { const filterId = getFilterId(filter); this.#queue.push([filterId, filter, relays]); From 96e6bd18b484d50da4cfe344b502a5a8acfc84fd Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 00:15:51 -0600 Subject: [PATCH 05/12] queries: pull from memorelay when applicable, optimize --- src/queries.ts | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/queries.ts b/src/queries.ts index 9e7b4ce..48aeaf4 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,6 +1,6 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; -import { AuthorMicrofilter, type DittoFilter, type Relation } from '@/filter.ts'; +import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { reqmeister } from '@/reqmeister.ts'; import { memorelay } from '@/db/memorelay.ts'; @@ -20,12 +20,25 @@ const getEvent = async ( opts: GetEventOpts = {}, ): Promise | undefined> => { const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; + const microfilter: IdMicrofilter = { ids: [id] }; + + let event: Event | undefined; + + [event] = await memorelay.getFilters([microfilter], opts); + + if (event && !relations) return event; + const filter: DittoFilter = { ids: [id], relations, limit: 1 }; if (kind) { filter.kinds = [kind]; } - const [event] = await mixer.getFilters([filter], { limit: 1, signal }); - return event; + + event = await mixer.getFilters([filter], { limit: 1, signal }) + .then((events) => events[0] || event); + + if (event) return event; + + return await reqmeister.req(microfilter).catch(() => event) as Event | undefined; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ @@ -37,16 +50,16 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise events[0] || event); if (event) return event; - return reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => undefined); + return reqmeister.req(microfilter).catch(() => event); }; /** Get users the given pubkey follows. */ From c2b25bc7988c22f15f8dd1a0c305f2d4954e1bb4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 00:27:25 -0600 Subject: [PATCH 06/12] queries: refactor getEvent, getAuthor --- src/queries.ts | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/queries.ts b/src/queries.ts index 48aeaf4..1388cd5 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -22,23 +22,24 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - let event: Event | undefined; + const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as Event[]; - [event] = await memorelay.getFilters([microfilter], opts); - - if (event && !relations) return event; + if (memoryEvent && !relations) { + return memoryEvent; + } const filter: DittoFilter = { ids: [id], relations, limit: 1 }; if (kind) { filter.kinds = [kind]; } - event = await mixer.getFilters([filter], { limit: 1, signal }) - .then((events) => events[0] || event); + const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal }) + .then(([event]) => event); - if (event) return event; + if (dbEvent) return dbEvent; + if (memoryEvent) return memoryEvent; - return await reqmeister.req(microfilter).catch(() => event) as Event | undefined; + return await reqmeister.req(microfilter).catch(() => undefined) as Event | undefined; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ @@ -46,20 +47,21 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined; + const [memoryEvent] = await memorelay.getFilters([microfilter], opts); - [event] = await memorelay.getFilters([microfilter], opts); + if (memoryEvent && !relations) { + return memoryEvent; + } - if (event && !relations) return event; - - event = await eventsDB.getFilters( + const dbEvent = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], { limit: 1, signal }, - ).then((events) => events[0] || event); + ).then(([event]) => event); - if (event) return event; + if (dbEvent) return dbEvent; + if (memoryEvent) return memoryEvent; - return reqmeister.req(microfilter).catch(() => event); + return reqmeister.req(microfilter).catch(() => undefined); }; /** Get users the given pubkey follows. */ From 2b1e97ee1b573bb928f773af665141adf5f4091a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 00:39:05 -0600 Subject: [PATCH 07/12] queries: get author from memory when possible --- src/queries.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/queries.ts b/src/queries.ts index 1388cd5..7a9a7dd 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -22,7 +22,7 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as Event[]; + const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent[]; if (memoryEvent && !relations) { return memoryEvent; @@ -36,7 +36,20 @@ const getEvent = async ( const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal }) .then(([event]) => event); + // TODO: make this DRY-er. + + if (dbEvent && !dbEvent.author) { + const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); + dbEvent.author = author; + } + if (dbEvent) return dbEvent; + + if (memoryEvent && !memoryEvent.author) { + const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); + memoryEvent.author = author; + } + if (memoryEvent) return memoryEvent; return await reqmeister.req(microfilter).catch(() => undefined) as Event | undefined; From da6738a4051d0858785be8df57925933f7780c17 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 12:07:49 -0600 Subject: [PATCH 08/12] db/events: debug getFilters -> REQ --- src/db/events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/events.ts b/src/db/events.ts index 83551fb..91368fe 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -228,7 +228,7 @@ async function getFilters( opts: GetFiltersOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); - debug('getFilters', JSON.stringify(filters)); + debug('REQ', JSON.stringify(filters)); let query = getFiltersQuery(filters); if (typeof opts.limit === 'number') { From b3c7e22052b09c2041659994ac2a76cc838c4b11 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 12:15:24 -0600 Subject: [PATCH 09/12] memorelay: increase LRU limits --- src/db/memorelay.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts index dae3dbd..4f6022e 100644 --- a/src/db/memorelay.ts +++ b/src/db/memorelay.ts @@ -4,8 +4,8 @@ import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from const debug = Debug('ditto:memorelay'); const events = new LRUCache({ - max: 1000, - maxEntrySize: 1000, + max: 3000, + maxEntrySize: 5000, sizeCalculation: (event) => JSON.stringify(event).length, }); From 33eead2148605a16c87482242ad04bce7d953dea Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 13:26:41 -0600 Subject: [PATCH 10/12] Add filter.test.ts --- fixtures/events/event-0.json | 15 +++++++++++++++ fixtures/events/event-1.json | 15 +++++++++++++++ src/filter.test.ts | 37 ++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 fixtures/events/event-0.json create mode 100644 fixtures/events/event-1.json create mode 100644 src/filter.test.ts diff --git a/fixtures/events/event-0.json b/fixtures/events/event-0.json new file mode 100644 index 0000000..907e1a1 --- /dev/null +++ b/fixtures/events/event-0.json @@ -0,0 +1,15 @@ +{ + "id": "63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35", + "pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6", + "created_at": 1699398376, + "kind": 0, + "tags": [ + [ + "proxy", + "https://gleasonator.com/users/alex", + "activitypub" + ] + ], + "content": "{\"name\":\"Alex Gleason\",\"about\":\"I create Fediverse software that empowers people online.\\n\\nI'm vegan btw.\\n\\nNote: If you have a question for me, please tag me publicly. This gives the opportunity for others to chime in, and bystanders to learn.\",\"picture\":\"https://media.gleasonator.com/aae0071188681629f200ab41502e03b9861d2754a44c008d3869c8a08b08d1f1.png\",\"banner\":\"https://media.gleasonator.com/e5f6e0e380536780efa774e8d3c8a5a040e3f9f99dbb48910b261c32872ee3a3.gif\",\"nip05\":\"alex_at_gleasonator.com@mostr.pub\",\"lud16\":\"alex@alexgleason.me\"}", + "sig": "9d48bbb600aab44abaeee11c97f1753f1d7de08378e9b33d84f9be893a09270aeceecfde3cfb698c555ae1bde3e4e54b3463a61bb99bdf673d64c2202f98b0e9" +} \ No newline at end of file diff --git a/fixtures/events/event-1.json b/fixtures/events/event-1.json new file mode 100644 index 0000000..f902786 --- /dev/null +++ b/fixtures/events/event-1.json @@ -0,0 +1,15 @@ +{ + "kind": 1, + "content": "I'm vegan btw", + "tags": [ + [ + "proxy", + "https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79", + "activitypub" + ] + ], + "pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6", + "created_at": 1691091365, + "id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2", + "sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579" +} \ No newline at end of file diff --git a/src/filter.test.ts b/src/filter.test.ts new file mode 100644 index 0000000..efc00d7 --- /dev/null +++ b/src/filter.test.ts @@ -0,0 +1,37 @@ +import { type Event } from '@/deps.ts'; +import { assertEquals } from '@/deps-test.ts'; + +import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' }; +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts'; + +Deno.test('getMicroFilters', () => { + const event = event0 as Event<0>; + const microfilters = getMicroFilters(event); + assertEquals(microfilters.length, 2); + assertEquals(microfilters[0], { authors: [event.pubkey], kinds: [0] }); + assertEquals(microfilters[1], { ids: [event.id] }); +}); + +Deno.test('eventToMicroFilter', () => { + assertEquals(eventToMicroFilter(event0), { authors: [event0.pubkey], kinds: [0] }); + assertEquals(eventToMicroFilter(event1), { ids: [event1.id] }); +}); + +Deno.test('isMicrofilter', () => { + assertEquals(isMicrofilter({ ids: [event0.id] }), true); + assertEquals(isMicrofilter({ authors: [event0.pubkey], kinds: [0] }), true); + assertEquals(isMicrofilter({ ids: [event0.id], authors: [event0.pubkey], kinds: [0] }), false); +}); + +Deno.test('getFilterId', () => { + assertEquals( + getFilterId({ ids: [event0.id] }), + '{"ids":["63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35"]}', + ); + assertEquals( + getFilterId({ authors: [event0.pubkey], kinds: [0] }), + '{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}', + ); +}); From 2d19ab207eeb0b230af1704305cb1236a4263fb3 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 13:36:50 -0600 Subject: [PATCH 11/12] Add memorelay.test.ts --- src/db/memorelay.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/db/memorelay.test.ts diff --git a/src/db/memorelay.test.ts b/src/db/memorelay.test.ts new file mode 100644 index 0000000..b0125cf --- /dev/null +++ b/src/db/memorelay.test.ts @@ -0,0 +1,18 @@ +import { assertEquals } from '@/deps-test.ts'; + +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { memorelay } from './memorelay.ts'; + +Deno.test('memorelay', async () => { + assertEquals(memorelay.hasEvent(event1), false); + assertEquals(memorelay.hasEventById(event1.id), false); + + memorelay.insertEvent(event1); + + assertEquals(memorelay.hasEvent(event1), true); + assertEquals(memorelay.hasEventById(event1.id), true); + + const result = await memorelay.getFilters([{ ids: [event1.id] }]); + assertEquals(result[0], event1); +}); From 08da26b12adc5e130fb6f64058d50aae7c92ad70 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 28 Dec 2023 13:41:04 -0600 Subject: [PATCH 12/12] reqmeister: accept `signal` --- src/pipeline.ts | 2 +- src/queries.ts | 4 ++-- src/reqmeister.ts | 19 +++++++++++++++---- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 3d065c0..577aaae 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -144,7 +144,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { } for (const [name, id, relay] of event.tags) { if (name === 'e' && !memorelay.hasEventById(id)) { - reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); + reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); } } } diff --git a/src/queries.ts b/src/queries.ts index 7a9a7dd..5f03b6e 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -52,7 +52,7 @@ const getEvent = async ( if (memoryEvent) return memoryEvent; - return await reqmeister.req(microfilter).catch(() => undefined) as Event | undefined; + return await reqmeister.req(microfilter, opts).catch(() => undefined) as Event | undefined; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ @@ -74,7 +74,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise undefined); + return reqmeister.req(microfilter, opts).catch(() => undefined); }; /** Get users the given pubkey follows. */ diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 6019758..cff8cb9 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -10,6 +10,11 @@ interface ReqmeisterOpts { timeout?: number; } +interface ReqmeisterReqOpts { + relays?: WebSocket['url'][]; + signal?: AbortSignal; +} + type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; /** Batches requests to Nostr relays using microfilters. */ @@ -70,15 +75,21 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an this.#perform(); } - req(filter: IdMicrofilter, relays?: WebSocket['url'][]): Promise; - req(filter: AuthorMicrofilter, relays?: WebSocket['url'][]): Promise>; - req(filter: MicroFilter, relays?: WebSocket['url'][]): Promise; - req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { + req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise>; + req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { + const { relays = [], signal } = opts; + if (signal?.aborted) return Promise.reject(new DOMException('Aborted', 'AbortError')); + const filterId = getFilterId(filter); + this.#queue.push([filterId, filter, relays]); + return new Promise((resolve, reject) => { this.once(filterId, resolve); this.#promise.finally(() => setTimeout(reject, 0)); + signal?.addEventListener('abort', () => reject(new DOMException('Aborted', 'AbortError')), { once: true }); }); }