Add mixer module to combine results from DB and pool
This commit is contained in:
parent
59b7a3eed8
commit
d4721fb82d
|
@ -1,4 +1,4 @@
|
||||||
import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts';
|
import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts';
|
||||||
import { type Event, type SignedEvent } from '@/event.ts';
|
import { type Event, type SignedEvent } from '@/event.ts';
|
||||||
|
|
||||||
import { Conf } from './config.ts';
|
import { Conf } from './config.ts';
|
||||||
|
@ -34,16 +34,16 @@ interface GetFilterOpts {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get events from a NIP-01 filter. */
|
/** Get events from a NIP-01 filter. */
|
||||||
function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}): Promise<SignedEvent<K>[]> {
|
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFilterOpts = {}): Promise<SignedEvent<K>[]> {
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
let tid: number;
|
let tid: number;
|
||||||
const results: SignedEvent[] = [];
|
const results: SignedEvent[] = [];
|
||||||
|
|
||||||
const unsub = getPool().subscribe(
|
const unsub = getPool().subscribe(
|
||||||
[filter],
|
filters,
|
||||||
Conf.poolRelays,
|
Conf.poolRelays,
|
||||||
(event: SignedEvent | null) => {
|
(event: SignedEvent | null) => {
|
||||||
if (event && matchFilter(filter, event)) {
|
if (event && matchFilters(filters, event)) {
|
||||||
results.push({
|
results.push({
|
||||||
id: event.id,
|
id: event.id,
|
||||||
kind: event.kind,
|
kind: event.kind,
|
||||||
|
@ -54,7 +54,8 @@ function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}
|
||||||
sig: event.sig,
|
sig: event.sig,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (filter.limit && results.length >= filter.limit) {
|
// HACK
|
||||||
|
if (filters.length === 1 && filters[0].limit && results.length >= filters[0].limit) {
|
||||||
unsub();
|
unsub();
|
||||||
clearTimeout(tid);
|
clearTimeout(tid);
|
||||||
resolve(results as SignedEvent<K>[]);
|
resolve(results as SignedEvent<K>[]);
|
||||||
|
@ -77,6 +78,11 @@ function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** @deprecated Use `getFilters` instead. */
|
||||||
|
function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}): Promise<SignedEvent<K>[]> {
|
||||||
|
return getFilters([filter], opts);
|
||||||
|
}
|
||||||
|
|
||||||
/** Get a Nostr event by its ID. */
|
/** Get a Nostr event by its ID. */
|
||||||
const getEvent = async <K extends number = number>(id: string, kind?: K): Promise<SignedEvent<K> | undefined> => {
|
const getEvent = async <K extends number = number>(id: string, kind?: K): Promise<SignedEvent<K> | undefined> => {
|
||||||
const event = await (getPool().getEventById(id, Conf.poolRelays, 0) as Promise<SignedEvent>);
|
const event = await (getPool().getEventById(id, Conf.poolRelays, 0) as Promise<SignedEvent>);
|
||||||
|
@ -169,4 +175,15 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilter, getFollows, getPublicFeed, publish };
|
export {
|
||||||
|
getAncestors,
|
||||||
|
getAuthor,
|
||||||
|
getDescendants,
|
||||||
|
getEvent,
|
||||||
|
getFeed,
|
||||||
|
getFilter,
|
||||||
|
getFilters,
|
||||||
|
getFollows,
|
||||||
|
getPublicFeed,
|
||||||
|
publish,
|
||||||
|
};
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import { type Filter, findReplyTag, z } from '@/deps.ts';
|
import { type Filter, findReplyTag, z } from '@/deps.ts';
|
||||||
import { getAuthor, getFilter, getFollows, publish } from '@/client.ts';
|
import { getAuthor, getFollows, publish } from '@/client.ts';
|
||||||
|
import { getFilters } from '@/mixer.ts';
|
||||||
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
||||||
import { signEvent } from '@/sign.ts';
|
import { signEvent } from '@/sign.ts';
|
||||||
import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
||||||
|
@ -115,7 +116,7 @@ const accountStatusesController: AppController = async (c) => {
|
||||||
filter['#t'] = [tagged];
|
filter['#t'] = [tagged];
|
||||||
}
|
}
|
||||||
|
|
||||||
let events = await getFilter(filter);
|
let events = await getFilters([filter]);
|
||||||
events.sort(eventDateComparator);
|
events.sort(eventDateComparator);
|
||||||
|
|
||||||
if (exclude_replies) {
|
if (exclude_replies) {
|
||||||
|
|
|
@ -17,7 +17,7 @@ export {
|
||||||
getPublicKey,
|
getPublicKey,
|
||||||
getSignature,
|
getSignature,
|
||||||
Kind,
|
Kind,
|
||||||
matchFilter,
|
matchFilters,
|
||||||
nip05,
|
nip05,
|
||||||
nip19,
|
nip19,
|
||||||
nip21,
|
nip21,
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
import { matchFilters } from '@/deps.ts';
|
||||||
|
|
||||||
|
import { getFilters as getFiltersClient } from '@/client.ts';
|
||||||
|
import { getFilters as getFiltersDB } from '@/db/events.ts';
|
||||||
|
import { eventDateComparator } from '@/utils.ts';
|
||||||
|
|
||||||
|
import type { SignedEvent } from '@/event.ts';
|
||||||
|
import type { DittoFilter } from '@/types.ts';
|
||||||
|
|
||||||
|
/** Get filters from the database and pool, and mix the best results together. */
|
||||||
|
async function getFilters<K extends number>(filters: DittoFilter<K>[]): Promise<SignedEvent<K>[]> {
|
||||||
|
const results = await Promise.allSettled([
|
||||||
|
getFiltersClient(filters),
|
||||||
|
getFiltersDB(filters),
|
||||||
|
]);
|
||||||
|
|
||||||
|
const events = results
|
||||||
|
.filter((result): result is PromiseFulfilledResult<SignedEvent<K>[]> => result.status === 'fulfilled')
|
||||||
|
.flatMap((result) => result.value);
|
||||||
|
|
||||||
|
return unmixEvents(events, filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Combine and sort events to match the filters. */
|
||||||
|
function unmixEvents<K extends number>(events: SignedEvent<K>[], filters: DittoFilter<K>[]): SignedEvent<K>[] {
|
||||||
|
events = dedupeEvents(events);
|
||||||
|
events = takeNewestEvents(events);
|
||||||
|
events = events.filter((event) => matchFilters(filters, event));
|
||||||
|
events.sort(eventDateComparator);
|
||||||
|
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deduplicate events by ID. */
|
||||||
|
function dedupeEvents<K extends number>(events: SignedEvent<K>[]): SignedEvent<K>[] {
|
||||||
|
return [...new Map(events.map((event) => [event.id, event])).values()];
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Take the newest events among replaceable ones. */
|
||||||
|
function takeNewestEvents<K extends number>(events: SignedEvent<K>[]): SignedEvent<K>[] {
|
||||||
|
const isReplaceable = (kind: number) =>
|
||||||
|
kind === 0 || kind === 3 || (10000 <= kind && kind < 20000) || (30000 <= kind && kind < 40000);
|
||||||
|
|
||||||
|
// Group events by author and kind.
|
||||||
|
const groupedEvents = events.reduce<Map<string, SignedEvent<K>[]>>((acc, event) => {
|
||||||
|
const key = `${event.pubkey}:${event.kind}`;
|
||||||
|
const group = acc.get(key) || [];
|
||||||
|
acc.set(key, [...group, event]);
|
||||||
|
return acc;
|
||||||
|
}, new Map());
|
||||||
|
|
||||||
|
// Process each group.
|
||||||
|
const processedEvents = Array.from(groupedEvents.values()).flatMap((group) => {
|
||||||
|
if (isReplaceable(group[0].kind)) {
|
||||||
|
// Sort by `created_at` and take the latest event.
|
||||||
|
return group.sort(eventDateComparator)[0];
|
||||||
|
}
|
||||||
|
return group;
|
||||||
|
});
|
||||||
|
|
||||||
|
return processedEvents;
|
||||||
|
}
|
||||||
|
|
||||||
|
export { getFilters };
|
Loading…
Reference in New Issue