From d4721fb82dfc407a1260c1726174a60c55cce052 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 16:12:27 -0500 Subject: [PATCH] Add mixer module to combine results from DB and pool --- src/client.ts | 29 +++++++++++---- src/controllers/api/accounts.ts | 5 +-- src/deps.ts | 2 +- src/mixer.ts | 64 +++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 src/mixer.ts diff --git a/src/client.ts b/src/client.ts index 9c5e74a..71a644f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,4 +1,4 @@ -import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; +import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; import { type Event, type SignedEvent } from '@/event.ts'; import { Conf } from './config.ts'; @@ -34,16 +34,16 @@ interface GetFilterOpts { } /** Get events from a NIP-01 filter. */ -function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { +function getFilters(filters: Filter[], opts: GetFilterOpts = {}): Promise[]> { return new Promise((resolve) => { let tid: number; const results: SignedEvent[] = []; const unsub = getPool().subscribe( - [filter], + filters, Conf.poolRelays, (event: SignedEvent | null) => { - if (event && matchFilter(filter, event)) { + if (event && matchFilters(filters, event)) { results.push({ id: event.id, kind: event.kind, @@ -54,7 +54,8 @@ function getFilter(filter: Filter, opts: GetFilterOpts = {} sig: event.sig, }); } - if (filter.limit && results.length >= filter.limit) { + // HACK + if (filters.length === 1 && filters[0].limit && results.length >= filters[0].limit) { unsub(); clearTimeout(tid); resolve(results as SignedEvent[]); @@ -77,6 +78,11 @@ function getFilter(filter: Filter, opts: GetFilterOpts = {} }); } +/** @deprecated Use `getFilters` instead. */ +function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { + return getFilters([filter], opts); +} + /** Get a Nostr event by its ID. */ const getEvent = async (id: string, kind?: K): Promise | undefined> => { const event = await (getPool().getEventById(id, Conf.poolRelays, 0) as Promise); @@ -169,4 +175,15 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void { } } -export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilter, getFollows, getPublicFeed, publish }; +export { + getAncestors, + getAuthor, + getDescendants, + getEvent, + getFeed, + getFilter, + getFilters, + getFollows, + getPublicFeed, + publish, +}; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 7c8b0a0..d65c9e1 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,6 +1,7 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; -import { getAuthor, getFilter, getFollows, publish } from '@/client.ts'; +import { getAuthor, getFollows, publish } from '@/client.ts'; +import { getFilters } from '@/mixer.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; @@ -115,7 +116,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await getFilter(filter); + let events = await getFilters([filter]); events.sort(eventDateComparator); if (exclude_replies) { diff --git a/src/deps.ts b/src/deps.ts index 2c7d67d..a65503c 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -17,7 +17,7 @@ export { getPublicKey, getSignature, Kind, - matchFilter, + matchFilters, nip05, nip19, nip21, diff --git a/src/mixer.ts b/src/mixer.ts new file mode 100644 index 0000000..ee123cc --- /dev/null +++ b/src/mixer.ts @@ -0,0 +1,64 @@ +import { matchFilters } from '@/deps.ts'; + +import { getFilters as getFiltersClient } from '@/client.ts'; +import { getFilters as getFiltersDB } from '@/db/events.ts'; +import { eventDateComparator } from '@/utils.ts'; + +import type { SignedEvent } from '@/event.ts'; +import type { DittoFilter } from '@/types.ts'; + +/** Get filters from the database and pool, and mix the best results together. */ +async function getFilters(filters: DittoFilter[]): Promise[]> { + const results = await Promise.allSettled([ + getFiltersClient(filters), + getFiltersDB(filters), + ]); + + const events = results + .filter((result): result is PromiseFulfilledResult[]> => result.status === 'fulfilled') + .flatMap((result) => result.value); + + return unmixEvents(events, filters); +} + +/** Combine and sort events to match the filters. */ +function unmixEvents(events: SignedEvent[], filters: DittoFilter[]): SignedEvent[] { + events = dedupeEvents(events); + events = takeNewestEvents(events); + events = events.filter((event) => matchFilters(filters, event)); + events.sort(eventDateComparator); + + return events; +} + +/** Deduplicate events by ID. */ +function dedupeEvents(events: SignedEvent[]): SignedEvent[] { + return [...new Map(events.map((event) => [event.id, event])).values()]; +} + +/** Take the newest events among replaceable ones. */ +function takeNewestEvents(events: SignedEvent[]): SignedEvent[] { + const isReplaceable = (kind: number) => + kind === 0 || kind === 3 || (10000 <= kind && kind < 20000) || (30000 <= kind && kind < 40000); + + // Group events by author and kind. + const groupedEvents = events.reduce[]>>((acc, event) => { + const key = `${event.pubkey}:${event.kind}`; + const group = acc.get(key) || []; + acc.set(key, [...group, event]); + return acc; + }, new Map()); + + // Process each group. + const processedEvents = Array.from(groupedEvents.values()).flatMap((group) => { + if (isReplaceable(group[0].kind)) { + // Sort by `created_at` and take the latest event. + return group.sort(eventDateComparator)[0]; + } + return group; + }); + + return processedEvents; +} + +export { getFilters };