Merge branch 'mixer' into 'develop'

Add Mixer module

See merge request soapbox-pub/ditto!10
This commit is contained in:
Alex Gleason 2023-08-17 02:53:59 +00:00
commit 18beccf067
7 changed files with 121 additions and 51 deletions

View File

@ -1,9 +1,9 @@
import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; import { Conf } from '@/config.ts';
import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts';
import { type Event, type SignedEvent } from '@/event.ts'; import { type Event, type SignedEvent } from '@/event.ts';
import { eventDateComparator, type PaginationParams, Time } from '@/utils.ts';
import { Conf } from './config.ts'; import type { GetFiltersOpts } from '@/types.ts';
import { eventDateComparator, type PaginationParams, Time } from './utils.ts';
const db = await Deno.openKv(); const db = await Deno.openKv();
@ -29,21 +29,17 @@ function getPool(): Pool {
return pool; return pool;
} }
interface GetFilterOpts {
timeout?: number;
}
/** Get events from a NIP-01 filter. */ /** Get events from a NIP-01 filter. */
function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}): Promise<SignedEvent<K>[]> { function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<SignedEvent<K>[]> {
return new Promise((resolve) => { return new Promise((resolve) => {
let tid: number; let tid: number;
const results: SignedEvent[] = []; const results: SignedEvent[] = [];
const unsub = getPool().subscribe( const unsub = getPool().subscribe(
[filter], filters,
Conf.poolRelays, Conf.poolRelays,
(event: SignedEvent | null) => { (event: SignedEvent | null) => {
if (event && matchFilter(filter, event)) { if (event && matchFilters(filters, event)) {
results.push({ results.push({
id: event.id, id: event.id,
kind: event.kind, kind: event.kind,
@ -54,7 +50,7 @@ function getFilter<K extends number>(filter: Filter<K>, opts: GetFilterOpts = {}
sig: event.sig, sig: event.sig,
}); });
} }
if (filter.limit && results.length >= filter.limit) { if (typeof opts.limit === 'number' && results.length >= opts.limit) {
unsub(); unsub();
clearTimeout(tid); clearTimeout(tid);
resolve(results as SignedEvent<K>[]); resolve(results as SignedEvent<K>[]);
@ -101,7 +97,7 @@ const getAuthor = async (pubkey: string, timeout = 1000): Promise<SignedEvent<0>
/** Get users the given pubkey follows. */ /** Get users the given pubkey follows. */
const getFollows = async (pubkey: string): Promise<SignedEvent<3> | undefined> => { const getFollows = async (pubkey: string): Promise<SignedEvent<3> | undefined> => {
const [event] = await getFilter({ authors: [pubkey], kinds: [3] }, { timeout: 5000 }); const [event] = await getFilters([{ authors: [pubkey], kinds: [3] }], { timeout: 5000 });
// TODO: figure out a better, more generic & flexible way to handle event cache (and timeouts?) // TODO: figure out a better, more generic & flexible way to handle event cache (and timeouts?)
// Prewarm cache in GET `/api/v1/accounts/verify_credentials` // Prewarm cache in GET `/api/v1/accounts/verify_credentials`
@ -127,13 +123,13 @@ async function getFeed(event3: Event<3>, params: PaginationParams): Promise<Sign
...params, ...params,
}; };
const results = await getFilter(filter, { timeout: 5000 }) as SignedEvent<1>[]; const results = await getFilters([filter], { timeout: 5000 }) as SignedEvent<1>[];
return results.sort(eventDateComparator); return results.sort(eventDateComparator);
} }
/** Get a feed of all known text notes. */ /** Get a feed of all known text notes. */
async function getPublicFeed(params: PaginationParams): Promise<SignedEvent<1>[]> { async function getPublicFeed(params: PaginationParams): Promise<SignedEvent<1>[]> {
const results = await getFilter({ kinds: [1], ...params }, { timeout: 5000 }); const results = await getFilters([{ kinds: [1], ...params }], { timeout: 5000 });
return results.sort(eventDateComparator); return results.sort(eventDateComparator);
} }
@ -156,7 +152,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
} }
function getDescendants(eventId: string): Promise<SignedEvent<1>[]> { function getDescendants(eventId: string): Promise<SignedEvent<1>[]> {
return getFilter({ kinds: [1], '#e': [eventId], limit: 200 }, { timeout: 2000 }) as Promise<SignedEvent<1>[]>; return getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 }) as Promise<SignedEvent<1>[]>;
} }
/** Publish an event to the Nostr relay. */ /** Publish an event to the Nostr relay. */
@ -169,4 +165,4 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void {
} }
} }
export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilter, getFollows, getPublicFeed, publish }; export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilters, getFollows, getPublicFeed, publish };

View File

@ -1,6 +1,7 @@
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import { type Filter, findReplyTag, z } from '@/deps.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts';
import { getAuthor, getFilter, getFollows, publish } from '@/client.ts'; import { getAuthor, getFollows, publish } from '@/client.ts';
import { getFilters } from '@/mixer.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { signEvent } from '@/sign.ts'; import { signEvent } from '@/sign.ts';
import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts';
@ -115,7 +116,7 @@ const accountStatusesController: AppController = async (c) => {
filter['#t'] = [tagged]; filter['#t'] = [tagged];
} }
let events = await getFilter(filter); let events = await getFilters([filter]);
events.sort(eventDateComparator); events.sort(eventDateComparator);
if (exclude_replies) { if (exclude_replies) {

View File

@ -1,17 +1,17 @@
import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' };
import { assertEquals } from '@/deps-test.ts'; import { assertEquals } from '@/deps-test.ts';
import { getFilter, insertEvent } from './events.ts'; import { getFilters, insertEvent } from './events.ts';
Deno.test('insert and filter events', async () => { Deno.test('insert and filter events', async () => {
await insertEvent(event55920b75); await insertEvent(event55920b75);
assertEquals(await getFilter({ kinds: [1] }), [event55920b75]); assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]);
assertEquals(await getFilter({ kinds: [3] }), []); assertEquals(await getFilters([{ kinds: [3] }]), []);
assertEquals(await getFilter({ since: 1691091000 }), [event55920b75]); assertEquals(await getFilters([{ since: 1691091000 }]), [event55920b75]);
assertEquals(await getFilter({ until: 1691091000 }), []); assertEquals(await getFilters([{ until: 1691091000 }]), []);
assertEquals( assertEquals(
await getFilter({ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }), await getFilters([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
[event55920b75], [event55920b75],
); );
}); });

View File

@ -1,7 +1,9 @@
import { db, type TagRow } from '@/db.ts'; import { db, type TagRow } from '@/db.ts';
import { type Filter, type Insertable } from '@/deps.ts'; import { type Insertable } from '@/deps.ts';
import { type SignedEvent } from '@/event.ts'; import { type SignedEvent } from '@/event.ts';
import type { DittoFilter, GetFiltersOpts } from '@/types.ts';
type TagCondition = ({ event, count }: { event: SignedEvent; count: number }) => boolean; type TagCondition = ({ event, count }: { event: SignedEvent; count: number }) => boolean;
/** Conditions for when to index certain tags. */ /** Conditions for when to index certain tags. */
@ -42,19 +44,12 @@ function insertEvent(event: SignedEvent): Promise<void> {
return results; return results;
}, []); }, []);
await Promise.all(tags.map((tag) => { await trx.insertInto('tags')
return trx.insertInto('tags') .values(tags)
.values(tag)
.execute(); .execute();
}));
}); });
} }
/** Custom filter interface that extends Nostr filters with extra options for Ditto. */
interface DittoFilter<K extends number = number> extends Filter<K> {
local?: boolean;
}
/** Build the query for a filter. */ /** Build the query for a filter. */
function getFilterQuery(filter: DittoFilter) { function getFilterQuery(filter: DittoFilter) {
let query = db let query = db
@ -110,25 +105,20 @@ function getFilterQuery(filter: DittoFilter) {
} }
/** Get events for filters from the database. */ /** Get events for filters from the database. */
async function getFilters<K extends number>(filters: [DittoFilter<K>]): Promise<SignedEvent<K>[]>; async function getFilters<K extends number>(
async function getFilters(filters: DittoFilter[]): Promise<SignedEvent[]>; filters: DittoFilter<K>[],
async function getFilters(filters: DittoFilter[]) { _opts?: GetFiltersOpts,
const queries = filters ): Promise<SignedEvent<K>[]> {
const events = await filters
.map(getFilterQuery) .map(getFilterQuery)
.map((query) => query.execute()); .reduce((acc, curr) => acc.union(curr))
.execute();
const events = (await Promise.all(queries)).flat();
return events.map((event) => ( return events.map((event) => (
{ ...event, tags: JSON.parse(event.tags) } { ...event, tags: JSON.parse(event.tags) } as SignedEvent<K>
)); ));
} }
/** Get events for a filter from the database. */
function getFilter<K extends number = number>(filter: DittoFilter<K>): Promise<SignedEvent<K>[]> {
return getFilters<K>([filter]);
}
/** Returns whether the pubkey is followed by a local user. */ /** Returns whether the pubkey is followed by a local user. */
async function isLocallyFollowed(pubkey: string): Promise<boolean> { async function isLocallyFollowed(pubkey: string): Promise<boolean> {
return Boolean( return Boolean(
@ -141,4 +131,4 @@ async function isLocallyFollowed(pubkey: string): Promise<boolean> {
); );
} }
export { getFilter, getFilters, insertEvent, isLocallyFollowed }; export { getFilters, insertEvent, isLocallyFollowed };

View File

@ -17,7 +17,7 @@ export {
getPublicKey, getPublicKey,
getSignature, getSignature,
Kind, Kind,
matchFilter, matchFilters,
nip05, nip05,
nip19, nip19,
nip21, nip21,

67
src/mixer.ts Normal file
View File

@ -0,0 +1,67 @@
import { matchFilters } from '@/deps.ts';
import { getFilters as getFiltersClient } from '@/client.ts';
import { getFilters as getFiltersDB } from '@/db/events.ts';
import { eventDateComparator } from '@/utils.ts';
import type { SignedEvent } from '@/event.ts';
import type { DittoFilter, GetFiltersOpts } from '@/types.ts';
/** Get filters from the database and pool, and mix the best results together. */
async function getFilters<K extends number>(
filters: DittoFilter<K>[],
opts?: GetFiltersOpts,
): Promise<SignedEvent<K>[]> {
const results = await Promise.allSettled([
getFiltersClient(filters, opts),
getFiltersDB(filters, opts),
]);
const events = results
.filter((result): result is PromiseFulfilledResult<SignedEvent<K>[]> => result.status === 'fulfilled')
.flatMap((result) => result.value);
return unmixEvents(events, filters);
}
/** Combine and sort events to match the filters. */
function unmixEvents<K extends number>(events: SignedEvent<K>[], filters: DittoFilter<K>[]): SignedEvent<K>[] {
events = dedupeEvents(events);
events = takeNewestEvents(events);
events = events.filter((event) => matchFilters(filters, event));
events.sort(eventDateComparator);
return events;
}
/** Deduplicate events by ID. */
function dedupeEvents<K extends number>(events: SignedEvent<K>[]): SignedEvent<K>[] {
return [...new Map(events.map((event) => [event.id, event])).values()];
}
/** Take the newest events among replaceable ones. */
function takeNewestEvents<K extends number>(events: SignedEvent<K>[]): SignedEvent<K>[] {
const isReplaceable = (kind: number) =>
kind === 0 || kind === 3 || (10000 <= kind && kind < 20000) || (30000 <= kind && kind < 40000);
// Group events by author and kind.
const groupedEvents = events.reduce<Map<string, SignedEvent<K>[]>>((acc, event) => {
const key = `${event.pubkey}:${event.kind}`;
const group = acc.get(key) || [];
acc.set(key, [...group, event]);
return acc;
}, new Map());
// Process each group.
const processedEvents = Array.from(groupedEvents.values()).flatMap((group) => {
if (isReplaceable(group[0].kind)) {
// Sort by `created_at` and take the latest event.
return group.sort(eventDateComparator)[0];
}
return group;
});
return processedEvents;
}
export { getFilters };

16
src/types.ts Normal file
View File

@ -0,0 +1,16 @@
import { type Filter } from '@/deps.ts';
/** Custom filter interface that extends Nostr filters with extra options for Ditto. */
interface DittoFilter<K extends number = number> extends Filter<K> {
local?: boolean;
}
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */
timeout?: number;
/** Event limit for the whole subscription. */
limit?: number;
}
export type { DittoFilter, GetFiltersOpts };