Merge branch 'memorelay' into 'main'

Add in-memory Nostr data store

See merge request soapbox-pub/ditto!86
This commit is contained in:
Alex Gleason 2023-12-28 20:02:57 +00:00
commit a74d945f13
10 changed files with 256 additions and 27 deletions

View File

@ -0,0 +1,15 @@
{
"id": "63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35",
"pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6",
"created_at": 1699398376,
"kind": 0,
"tags": [
[
"proxy",
"https://gleasonator.com/users/alex",
"activitypub"
]
],
"content": "{\"name\":\"Alex Gleason\",\"about\":\"I create Fediverse software that empowers people online.\\n\\nI'm vegan btw.\\n\\nNote: If you have a question for me, please tag me publicly. This gives the opportunity for others to chime in, and bystanders to learn.\",\"picture\":\"https://media.gleasonator.com/aae0071188681629f200ab41502e03b9861d2754a44c008d3869c8a08b08d1f1.png\",\"banner\":\"https://media.gleasonator.com/e5f6e0e380536780efa774e8d3c8a5a040e3f9f99dbb48910b261c32872ee3a3.gif\",\"nip05\":\"alex_at_gleasonator.com@mostr.pub\",\"lud16\":\"alex@alexgleason.me\"}",
"sig": "9d48bbb600aab44abaeee11c97f1753f1d7de08378e9b33d84f9be893a09270aeceecfde3cfb698c555ae1bde3e4e54b3463a61bb99bdf673d64c2202f98b0e9"
}

View File

@ -0,0 +1,15 @@
{
"kind": 1,
"content": "I'm vegan btw",
"tags": [
[
"proxy",
"https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79",
"activitypub"
]
],
"pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6",
"created_at": 1691091365,
"id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2",
"sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579"
}

View File

@ -228,7 +228,7 @@ async function getFilters<K extends number>(
opts: GetFiltersOpts = {},
): Promise<DittoEvent<K>[]> {
if (!filters.length) return Promise.resolve([]);
debug('getFilters', JSON.stringify(filters));
debug('REQ', JSON.stringify(filters));
let query = getFiltersQuery(filters);
if (typeof opts.limit === 'number') {

18
src/db/memorelay.test.ts Normal file
View File

@ -0,0 +1,18 @@
import { assertEquals } from '@/deps-test.ts';
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { memorelay } from './memorelay.ts';
Deno.test('memorelay', async () => {
assertEquals(memorelay.hasEvent(event1), false);
assertEquals(memorelay.hasEventById(event1.id), false);
memorelay.insertEvent(event1);
assertEquals(memorelay.hasEvent(event1), true);
assertEquals(memorelay.hasEventById(event1.id), true);
const result = await memorelay.getFilters([{ ids: [event1.id] }]);
assertEquals(result[0], event1);
});

69
src/db/memorelay.ts Normal file
View File

@ -0,0 +1,69 @@
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts';
const debug = Debug('ditto:memorelay');
const events = new LRUCache<string, Event>({
max: 3000,
maxEntrySize: 5000,
sizeCalculation: (event) => JSON.stringify(event).length,
});
/** Get events from memory. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
const results: Event<K>[] = [];
for (const filter of filters) {
if (isMicrofilter(filter)) {
const event = events.get(getFilterId(filter));
if (event) {
results.push(event as Event<K>);
}
}
}
return Promise.resolve(results);
}
/** Insert an event into memory. */
function insertEvent(event: Event): void {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (!existing || event.created_at > existing.created_at) {
events.set(filterId, event);
}
}
}
/** Check if an event is in memory. */
function hasEvent(event: Event): boolean {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (existing) {
return true;
}
}
return false;
}
/** Check if an event is in memory by ID. */
function hasEventById(eventId: string): boolean {
const filterId = getFilterId({ ids: [eventId] });
return events.has(filterId);
}
/** In-memory data store for events using microfilters. */
const memorelay = {
getFilters,
insertEvent,
hasEvent,
hasEventById,
};
export { memorelay };

37
src/filter.test.ts Normal file
View File

@ -0,0 +1,37 @@
import { type Event } from '@/deps.ts';
import { assertEquals } from '@/deps-test.ts';
import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts';
Deno.test('getMicroFilters', () => {
const event = event0 as Event<0>;
const microfilters = getMicroFilters(event);
assertEquals(microfilters.length, 2);
assertEquals(microfilters[0], { authors: [event.pubkey], kinds: [0] });
assertEquals(microfilters[1], { ids: [event.id] });
});
Deno.test('eventToMicroFilter', () => {
assertEquals(eventToMicroFilter(event0), { authors: [event0.pubkey], kinds: [0] });
assertEquals(eventToMicroFilter(event1), { ids: [event1.id] });
});
Deno.test('isMicrofilter', () => {
assertEquals(isMicrofilter({ ids: [event0.id] }), true);
assertEquals(isMicrofilter({ authors: [event0.pubkey], kinds: [0] }), true);
assertEquals(isMicrofilter({ ids: [event0.id], authors: [event0.pubkey], kinds: [0] }), false);
});
Deno.test('getFilterId', () => {
assertEquals(
getFilterId({ ids: [event0.id] }),
'{"ids":["63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35"]}',
);
assertEquals(
getFilterId({ authors: [event0.pubkey], kinds: [0] }),
'{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}',
);
});

View File

@ -1,7 +1,7 @@
import { Conf } from '@/config.ts';
import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';
import type { EventData } from '@/types.ts';
import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import { type EventData } from '@/types.ts';
/** Additional properties that may be added by Ditto to events. */
type Relation = 'author' | 'author_stats' | 'event_stats';
@ -14,8 +14,12 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
relations?: Relation[];
}
/** Microfilter to get one specific event by ID. */
type IdMicrofilter = { ids: [Event['id']] };
/** Microfilter to get an author. */
type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] };
/** Filter to get one specific event. */
type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
@ -63,18 +67,40 @@ function getFilterId(filter: MicroFilter): string {
/** Get a microfilter from a Nostr event. */
function eventToMicroFilter(event: Event): MicroFilter {
const [microfilter] = getMicroFilters(event);
return microfilter;
}
/** Get all the microfilters for an event, in order of priority. */
function getMicroFilters(event: Event): MicroFilter[] {
const microfilters: MicroFilter[] = [];
if (event.kind === 0) {
return { kinds: [0], authors: [event.pubkey] };
} else {
return { ids: [event.id] };
microfilters.push({ kinds: [0], authors: [event.pubkey] });
}
microfilters.push({ ids: [event.id] });
return microfilters;
}
/** Microfilter schema. */
const microFilterSchema = z.union([
z.object({ ids: z.tuple([nostrIdSchema]) }).strict(),
z.object({ kinds: z.tuple([z.literal(0)]), authors: z.tuple([nostrIdSchema]) }).strict(),
]);
/** Checks whether the filter is a microfilter. */
function isMicrofilter(filter: Filter): filter is MicroFilter {
return microFilterSchema.safeParse(filter).success;
}
export {
type AuthorMicrofilter,
type DittoFilter,
eventToMicroFilter,
getFilterId,
type GetFiltersOpts,
getMicroFilters,
type IdMicrofilter,
isMicrofilter,
matchDittoFilters,
type MicroFilter,
type Relation,

View File

@ -1,9 +1,10 @@
import { Conf } from '@/config.ts';
import * as eventsDB from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts';
import { addRelays } from '@/db/relays.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { findUser } from '@/db/users.ts';
import { Debug, type Event, LRUCache } from '@/deps.ts';
import { Debug, type Event } from '@/deps.ts';
import { isEphemeralKind } from '@/kinds.ts';
import * as mixer from '@/mixer.ts';
import { publish } from '@/pool.ts';
@ -12,12 +13,11 @@ import { reqmeister } from '@/reqmeister.ts';
import { updateStats } from '@/stats.ts';
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts';
import { type EventData } from '@/types.ts';
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
import { TrendsWorker } from '@/workers/trends.ts';
import { verifySignatureWorker } from '@/workers/verify.ts';
import type { EventData } from '@/types.ts';
const debug = Debug('ditto:pipeline');
/**
@ -43,15 +43,12 @@ async function handleEvent(event: Event): Promise<void> {
]);
}
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */
function encounterEvent(event: Event): boolean {
const result = encounters.get(event.id);
encounters.set(event.id, true);
const preexisting = memorelay.hasEvent(event);
memorelay.insertEvent(event);
reqmeister.encounter(event);
return !!result;
return preexisting;
}
/** Preload data that will be useful to several tasks. */
@ -146,8 +143,8 @@ function fetchRelatedEvents(event: Event, data: EventData) {
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
}
for (const [name, id, relay] of event.tags) {
if (name === 'e' && !encounters.has(id)) {
reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
if (name === 'e' && !memorelay.hasEventById(id)) {
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
}
}
}

View File

@ -1,8 +1,9 @@
import * as eventsDB from '@/db/events.ts';
import { type Event, findReplyTag } from '@/deps.ts';
import { type DittoFilter, type Relation } from '@/filter.ts';
import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
import * as mixer from '@/mixer.ts';
import { reqmeister } from '@/reqmeister.ts';
import { memorelay } from '@/db/memorelay.ts';
interface GetEventOpts<K extends number> {
/** Signal to abort the request. */
@ -19,24 +20,61 @@ const getEvent = async <K extends number = number>(
opts: GetEventOpts<K> = {},
): Promise<Event<K> | undefined> => {
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
const microfilter: IdMicrofilter = { ids: [id] };
const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent<K>[];
if (memoryEvent && !relations) {
return memoryEvent;
}
const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 };
if (kind) {
filter.kinds = [kind];
}
const [event] = await mixer.getFilters([filter], { limit: 1, signal });
return event;
const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal })
.then(([event]) => event);
// TODO: make this DRY-er.
if (dbEvent && !dbEvent.author) {
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
dbEvent.author = author;
}
if (dbEvent) return dbEvent;
if (memoryEvent && !memoryEvent.author) {
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
memoryEvent.author = author;
}
if (memoryEvent) return memoryEvent;
return await reqmeister.req(microfilter, opts).catch(() => undefined) as Event<K> | undefined;
};
/** Get a Nostr `set_medatadata` event for a user's pubkey. */
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, signal = AbortSignal.timeout(1000) } = opts;
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
const event = await eventsDB.getFilters(
const [memoryEvent] = await memorelay.getFilters([microfilter], opts);
if (memoryEvent && !relations) {
return memoryEvent;
}
const dbEvent = await eventsDB.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, signal },
).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
).then(([event]) => event);
return event;
if (dbEvent) return dbEvent;
if (memoryEvent) return memoryEvent;
return reqmeister.req(microfilter, opts).catch(() => undefined);
};
/** Get users the given pubkey follows. */

View File

@ -1,6 +1,6 @@
import * as client from '@/client.ts';
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
import { Time } from '@/utils/time.ts';
const debug = Debug('ditto:reqmeister');
@ -10,6 +10,11 @@ interface ReqmeisterOpts {
timeout?: number;
}
interface ReqmeisterReqOpts {
relays?: WebSocket['url'][];
signal?: AbortSignal;
}
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
/** Batches requests to Nostr relays using microfilters. */
@ -70,12 +75,21 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
this.#perform();
}
req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {
req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event<0>>;
req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<Event> {
const { relays = [], signal } = opts;
if (signal?.aborted) return Promise.reject(new DOMException('Aborted', 'AbortError'));
const filterId = getFilterId(filter);
this.#queue.push([filterId, filter, relays]);
return new Promise<Event>((resolve, reject) => {
this.once(filterId, resolve);
this.#promise.finally(() => setTimeout(reject, 0));
signal?.addEventListener('abort', () => reject(new DOMException('Aborted', 'AbortError')), { once: true });
});
}