Merge branch 'storages' into 'main'

Support external search

See merge request soapbox-pub/ditto!95
This commit is contained in:
Alex Gleason 2024-01-04 20:42:15 +00:00
commit 4e6549407e
38 changed files with 1174 additions and 583 deletions

View File

@ -1,8 +1,8 @@
import { Conf } from '@/config.ts';
import { db } from '@/db.ts';
import { eventsDB } from '@/db/events.ts';
import { type Kysely } from '@/deps.ts';
import { signAdminEvent } from '@/sign.ts';
import { eventsDB } from '@/storages.ts';
interface DB {
users: {

View File

@ -1,18 +1,23 @@
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
import { normalizeFilters } from '@/filter.ts';
import * as pipeline from '@/pipeline.ts';
import { activeRelays, pool } from '@/pool.ts';
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts';
import { EventSet } from '@/utils/event-set.ts';
const debug = Debug('ditto:client');
/** Get events from a NIP-01 filter. */
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
filters = normalizeFilters(filters);
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
return new Promise((resolve) => {
const results: Event[] = [];
const results = new EventSet<Event<K>>();
const unsub = pool.subscribe(
filters,
@ -20,9 +25,9 @@ function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts =
(event: Event | null) => {
if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {});
results.push({
results.add({
id: event.id,
kind: event.kind,
kind: event.kind as K,
pubkey: event.pubkey,
content: event.content,
tags: event.tags,
@ -30,21 +35,21 @@ function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts =
sig: event.sig,
});
}
if (typeof opts.limit === 'number' && results.length >= opts.limit) {
if (typeof opts.limit === 'number' && results.size >= opts.limit) {
unsub();
resolve(results as Event<K>[]);
resolve([...results]);
}
},
undefined,
() => {
unsub();
resolve(results as Event<K>[]);
resolve([...results]);
},
);
opts.signal?.addEventListener('abort', () => {
unsub();
resolve(results as Event<K>[]);
resolve([...results]);
});
});
}
@ -59,6 +64,7 @@ function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
}
const client: EventStore = {
supportedNips: [1],
getEvents,
storeEvent,
countEvents: () => Promise.reject(new Error('COUNT not implemented')),

View File

@ -42,6 +42,10 @@ const Conf = {
const { protocol, host } = Conf.url;
return `${protocol === 'https:' ? 'wss:' : 'ws:'}//${host}/relay`;
},
/** Relay to use for NIP-50 `search` queries. */
get searchRelay() {
return Deno.env.get('SEARCH_RELAY');
},
/** Origin of the Ditto server, including the protocol and port. */
get localDomain() {
return Deno.env.get('LOCAL_DOMAIN') || 'http://localhost:8000';

View File

@ -1,12 +1,12 @@
import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
import { eventsDB } from '@/db/events.ts';
import { insertUser } from '@/db/users.ts';
import { findReplyTag, nip19, z } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts';
import { getAuthor, getFollowedPubkeys } from '@/queries.ts';
import { booleanParamSchema, fileSchema } from '@/schema.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { eventsDB } from '@/storages.ts';
import { addTag, deleteTag, getTagSet } from '@/tags.ts';
import { uploadFile } from '@/upload.ts';
import { lookupAccount, nostrNow } from '@/utils.ts';

View File

@ -1,5 +1,5 @@
import { type AppController } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import { getTagSet } from '@/tags.ts';
import { renderAccounts } from '@/views.ts';

View File

@ -1,5 +1,5 @@
import { type AppController } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import { getTagSet } from '@/tags.ts';
import { renderStatuses } from '@/views.ts';

View File

@ -1,5 +1,5 @@
import { type AppController } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import { paginated, paginationSchema } from '@/utils/api.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts';

View File

@ -1,7 +1,7 @@
import { type AppController } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { z } from '@/deps.ts';
import { configSchema, elixirTupleSchema } from '@/schemas/pleroma-api.ts';
import { eventsDB } from '@/storages.ts';
import { createAdminEvent } from '@/utils/api.ts';
import { Conf } from '@/config.ts';

View File

@ -1,9 +1,9 @@
import { AppController } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { type Event, nip19, z } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts';
import { booleanParamSchema } from '@/schema.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import { searchStore } from '@/storages.ts';
import { dedupeEvents } from '@/utils.ts';
import { lookupNip05Cached } from '@/utils/nip05.ts';
import { renderAccount } from '@/views/mastodon/accounts.ts';
@ -30,9 +30,11 @@ const searchController: AppController = async (c) => {
return c.json({ error: 'Bad request', schema: result.error }, 422);
}
const signal = AbortSignal.timeout(1000);
const [event, events] = await Promise.all([
lookupEvent(result.data),
searchEvents(result.data),
lookupEvent(result.data, signal),
searchEvents(result.data, signal),
]);
if (event) {
@ -62,7 +64,7 @@ const searchController: AppController = async (c) => {
};
/** Get events for the search params. */
function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise<Event[]> {
function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise<Event[]> {
if (type === 'hashtags') return Promise.resolve([]);
const filter: DittoFilter = {
@ -76,7 +78,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise<Even
filter.authors = [account_id];
}
return eventsDB.getEvents([filter]);
return searchStore.getEvents([filter], { signal });
}
/** Get event kinds to search from `type` query param. */
@ -92,9 +94,9 @@ function typeToKinds(type: SearchQuery['type']): number[] {
}
/** Resolve a searched value into an event, if applicable. */
async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise<Event | undefined> {
const filters = await getLookupFilters(query);
const [event] = await eventsDB.getEvents(filters, { limit: 1, signal });
const [event] = await searchStore.getEvents(filters, { limit: 1, signal });
return event;
}

View File

@ -1,4 +1,4 @@
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import { z } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts';
import { getFeedPubkeys } from '@/queries.ts';

View File

@ -1,5 +1,5 @@
import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import * as pipeline from '@/pipeline.ts';
import { jsonSchema } from '@/schema.ts';
import {

View File

@ -1,64 +0,0 @@
import { assertEquals, assertRejects } from '@/deps-test.ts';
import { buildUserEvent } from '@/db/users.ts';
import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { eventsDB as db } from './events.ts';
Deno.test('count filters', async () => {
assertEquals(await db.countEvents([{ kinds: [1] }]), 0);
await db.storeEvent(event1);
assertEquals(await db.countEvents([{ kinds: [1] }]), 1);
});
Deno.test('insert and filter events', async () => {
await db.storeEvent(event1);
assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]);
assertEquals(await db.getEvents([{ kinds: [3] }]), []);
assertEquals(await db.getEvents([{ since: 1691091000 }]), [event1]);
assertEquals(await db.getEvents([{ until: 1691091000 }]), []);
assertEquals(
await db.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
[event1],
);
});
Deno.test('delete events', async () => {
await db.storeEvent(event1);
assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]);
await db.deleteEvents([{ kinds: [1] }]);
assertEquals(await db.getEvents([{ kinds: [1] }]), []);
});
Deno.test('query events with local filter', async () => {
await db.storeEvent(event1);
assertEquals(await db.getEvents([{}]), [event1]);
assertEquals(await db.getEvents([{ local: true }]), []);
assertEquals(await db.getEvents([{ local: false }]), [event1]);
const userEvent = await buildUserEvent({
username: 'alex',
pubkey: event1.pubkey,
inserted_at: new Date(),
admin: false,
});
await db.storeEvent(userEvent);
assertEquals(await db.getEvents([{ kinds: [1], local: true }]), [event1]);
assertEquals(await db.getEvents([{ kinds: [1], local: false }]), []);
});
Deno.test('inserting replaceable events', async () => {
assertEquals(await db.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0);
await db.storeEvent(event0);
await assertRejects(() => db.storeEvent(event0));
assertEquals(await db.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1);
const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 };
await db.storeEvent(changeEvent);
assertEquals(await db.getEvents([{ kinds: [0] }]), [changeEvent]);
});

View File

@ -1,410 +0,0 @@
import { Conf } from '@/config.ts';
import { db, type DittoDB } from '@/db.ts';
import { Debug, type Event, Kysely, type SelectQueryBuilder } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts';
import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
import { isNostrId, isURL } from '@/utils.ts';
const debug = Debug('ditto:db:events');
/** Function to decide whether or not to index a tag. */
type TagCondition = ({ event, count, value }: {
event: Event;
opts: StoreEventOpts;
count: number;
value: string;
}) => boolean;
/** Conditions for when to index certain tags. */
const tagConditions: Record<string, TagCondition> = {
'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind),
'e': ({ event, count, value, opts }) => ((opts.data?.user && event.kind === 10003) || count < 15) && isNostrId(value),
'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value),
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
'proxy': ({ count, value }) => count === 0 && isURL(value),
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
't': ({ count, value }) => count < 5 && value.length < 50,
'name': ({ event, count }) => event.kind === 30361 && count === 0,
'role': ({ event, count }) => event.kind === 30361 && count === 0,
};
/** Insert an event (and its tags) into the database. */
async function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
debug('EVENT', JSON.stringify(event));
if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) {
throw new Error('Internal events can only be stored by the server keypair');
}
return await db.transaction().execute(async (trx) => {
/** Insert the event into the database. */
async function addEvent() {
await trx.insertInto('events')
.values({ ...event, tags: JSON.stringify(event.tags) })
.execute();
}
/** Add search data to the FTS table. */
async function indexSearch() {
const searchContent = buildSearchContent(event);
if (!searchContent) return;
await trx.insertInto('events_fts')
.values({ id: event.id, content: searchContent.substring(0, 1000) })
.execute();
}
/** Index event tags depending on the conditions defined above. */
async function indexTags() {
const tags = filterIndexableTags(event, opts);
const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value }));
if (!tags.length) return;
await trx.insertInto('tags')
.values(rows)
.execute();
}
if (isReplaceableKind(event.kind)) {
const prevEvents = await getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey] }).execute();
for (const prevEvent of prevEvents) {
if (prevEvent.created_at >= event.created_at) {
throw new Error('Cannot replace an event with an older event');
}
}
await deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey] }]);
}
if (isParameterizedReplaceableKind(event.kind)) {
const d = event.tags.find(([tag]) => tag === 'd')?.[1];
if (d) {
const prevEvents = await getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey], '#d': [d] })
.execute();
for (const prevEvent of prevEvents) {
if (prevEvent.created_at >= event.created_at) {
throw new Error('Cannot replace an event with an older event');
}
}
await deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey], '#d': [d] }]);
}
}
// Run the queries.
await Promise.all([
addEvent(),
indexTags(),
indexSearch(),
]);
}).catch((error) => {
// Don't throw for duplicate events.
if (error.message.includes('UNIQUE constraint failed')) {
return;
} else {
throw error;
}
});
}
type EventQuery = SelectQueryBuilder<DittoDB, 'events', {
id: string;
tags: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
sig: string;
stats_replies_count?: number;
stats_reposts_count?: number;
stats_reactions_count?: number;
author_id?: string;
author_tags?: string;
author_kind?: number;
author_pubkey?: string;
author_content?: string;
author_created_at?: number;
author_sig?: string;
author_stats_followers_count?: number;
author_stats_following_count?: number;
author_stats_notes_count?: number;
}>;
/** Build the query for a filter. */
function getFilterQuery(db: Kysely<DittoDB>, filter: DittoFilter): EventQuery {
let query = db
.selectFrom('events')
.select([
'events.id',
'events.kind',
'events.pubkey',
'events.content',
'events.tags',
'events.created_at',
'events.sig',
])
.orderBy('events.created_at', 'desc');
for (const [key, value] of Object.entries(filter)) {
if (value === undefined) continue;
switch (key as keyof DittoFilter) {
case 'ids':
query = query.where('events.id', 'in', filter.ids!);
break;
case 'kinds':
query = query.where('events.kind', 'in', filter.kinds!);
break;
case 'authors':
query = query.where('events.pubkey', 'in', filter.authors!);
break;
case 'since':
query = query.where('events.created_at', '>=', filter.since!);
break;
case 'until':
query = query.where('events.created_at', '<=', filter.until!);
break;
case 'limit':
query = query.limit(filter.limit!);
break;
}
if (key.startsWith('#')) {
const tag = key.replace(/^#/, '');
const value = filter[key as `#${string}`] as string[];
query = query
.leftJoin('tags', 'tags.event_id', 'events.id')
.where('tags.tag', '=', tag)
.where('tags.value', 'in', value);
}
}
if (typeof filter.local === 'boolean') {
query = query
.leftJoin(usersQuery, (join) => join.onRef('users.d_tag', '=', 'events.pubkey'))
.where('users.d_tag', filter.local ? 'is not' : 'is', null);
}
if (filter.relations?.includes('author')) {
query = query
.leftJoin(
(eb) =>
eb
.selectFrom('events')
.selectAll()
.where('kind', '=', 0)
.groupBy('pubkey')
.as('authors'),
(join) => join.onRef('authors.pubkey', '=', 'events.pubkey'),
)
.select([
'authors.id as author_id',
'authors.kind as author_kind',
'authors.pubkey as author_pubkey',
'authors.content as author_content',
'authors.tags as author_tags',
'authors.created_at as author_created_at',
'authors.sig as author_sig',
]);
}
if (filter.relations?.includes('author_stats')) {
query = query
.leftJoin('author_stats', 'author_stats.pubkey', 'events.pubkey')
.select((eb) => [
eb.fn.coalesce('author_stats.followers_count', eb.val(0)).as('author_stats_followers_count'),
eb.fn.coalesce('author_stats.following_count', eb.val(0)).as('author_stats_following_count'),
eb.fn.coalesce('author_stats.notes_count', eb.val(0)).as('author_stats_notes_count'),
]);
}
if (filter.relations?.includes('event_stats')) {
query = query
.leftJoin('event_stats', 'event_stats.event_id', 'events.id')
.select((eb) => [
eb.fn.coalesce('event_stats.replies_count', eb.val(0)).as('stats_replies_count'),
eb.fn.coalesce('event_stats.reposts_count', eb.val(0)).as('stats_reposts_count'),
eb.fn.coalesce('event_stats.reactions_count', eb.val(0)).as('stats_reactions_count'),
]);
}
if (filter.search) {
query = query
.innerJoin('events_fts', 'events_fts.id', 'events.id')
.where('events_fts.content', 'match', JSON.stringify(filter.search));
}
return query;
}
/** Combine filter queries into a single union query. */
function getEventsQuery(filters: DittoFilter[]) {
return filters
.map((filter) => db.selectFrom(() => getFilterQuery(db, filter).as('events')).selectAll())
.reduce((result, query) => result.unionAll(query));
}
/** Query to get user events, joined by tags. */
function usersQuery() {
return getFilterQuery(db, { kinds: [30361], authors: [Conf.pubkey] })
.leftJoin('tags', 'tags.event_id', 'events.id')
.where('tags.tag', '=', 'd')
.select('tags.value as d_tag')
.as('users');
}
/** Get events for filters from the database. */
async function getEvents<K extends number>(
filters: DittoFilter<K>[],
opts: GetEventsOpts = {},
): Promise<DittoEvent<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
let query = getEventsQuery(filters);
if (typeof opts.limit === 'number') {
query = query.limit(opts.limit);
}
return (await query.execute()).map((row) => {
const event: DittoEvent<K> = {
id: row.id,
kind: row.kind as K,
pubkey: row.pubkey,
content: row.content,
created_at: row.created_at,
tags: JSON.parse(row.tags),
sig: row.sig,
};
if (row.author_id) {
event.author = {
id: row.author_id,
kind: row.author_kind! as 0,
pubkey: row.author_pubkey!,
content: row.author_content!,
created_at: row.author_created_at!,
tags: JSON.parse(row.author_tags!),
sig: row.author_sig!,
};
}
if (typeof row.author_stats_followers_count === 'number') {
event.author_stats = {
followers_count: row.author_stats_followers_count,
following_count: row.author_stats_following_count!,
notes_count: row.author_stats_notes_count!,
};
}
if (typeof row.stats_replies_count === 'number') {
event.event_stats = {
replies_count: row.stats_replies_count,
reposts_count: row.stats_reposts_count!,
reactions_count: row.stats_reactions_count!,
};
}
return event;
});
}
/** Delete events from each table. Should be run in a transaction! */
async function deleteEventsTrx(db: Kysely<DittoDB>, filters: DittoFilter[]) {
if (!filters.length) return Promise.resolve();
debug('DELETE', JSON.stringify(filters));
const query = getEventsQuery(filters).clearSelect().select('id');
await db.deleteFrom('events_fts')
.where('id', 'in', () => query)
.execute();
return db.deleteFrom('events')
.where('id', 'in', () => query)
.execute();
}
/** Delete events based on filters from the database. */
async function deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void> {
if (!filters.length) return Promise.resolve();
debug('DELETE', JSON.stringify(filters));
await db.transaction().execute((trx) => deleteEventsTrx(trx, filters));
}
/** Get number of events that would be returned by filters. */
async function countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
if (!filters.length) return Promise.resolve(0);
debug('COUNT', JSON.stringify(filters));
const query = getEventsQuery(filters);
const [{ count }] = await query
.clearSelect()
.select((eb) => eb.fn.count('id').as('count'))
.execute();
return Number(count);
}
/** Return only the tags that should be indexed. */
function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] {
const tagCounts: Record<string, number> = {};
function getCount(name: string) {
return tagCounts[name] || 0;
}
function incrementCount(name: string) {
tagCounts[name] = getCount(name) + 1;
}
function checkCondition(name: string, value: string, condition: TagCondition) {
return condition({
event,
opts,
count: getCount(name),
value,
});
}
return event.tags.reduce<string[][]>((results, tag) => {
const [name, value] = tag;
const condition = tagConditions[name] as TagCondition | undefined;
if (value && condition && value.length < 200 && checkCondition(name, value, condition)) {
results.push(tag);
}
incrementCount(name);
return results;
}, []);
}
/** Build a search index from the event. */
function buildSearchContent(event: Event): string {
switch (event.kind) {
case 0:
return buildUserSearchContent(event as Event<0>);
case 1:
return event.content;
default:
return '';
}
}
/** Build search content for a user. */
function buildUserSearchContent(event: Event<0>): string {
const { name, nip05, about } = jsonMetaContentSchema.parse(event.content);
return [name, nip05, about].filter(Boolean).join('\n');
}
/** SQLite database storage adapter for Nostr events. */
const eventsDB: EventStore = {
getEvents,
storeEvent,
countEvents,
deleteEvents,
};
export { eventsDB };

View File

@ -1,69 +0,0 @@
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts';
import { type EventStore, type GetEventsOpts } from '@/store.ts';
const debug = Debug('ditto:memorelay');
const events = new LRUCache<string, Event>({
max: 3000,
maxEntrySize: 5000,
sizeCalculation: (event) => JSON.stringify(event).length,
});
/** Get events from memory. */
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
const results: Event<K>[] = [];
for (const filter of filters) {
if (isMicrofilter(filter)) {
const event = events.get(getFilterId(filter));
if (event) {
results.push(event as Event<K>);
}
}
}
return Promise.resolve(results);
}
/** Insert an event into memory. */
function storeEvent(event: Event): Promise<void> {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (!existing || event.created_at > existing.created_at) {
events.set(filterId, event);
}
}
return Promise.resolve();
}
/** Count events in memory for the filters. */
async function countEvents(filters: Filter[]): Promise<number> {
const events = await getEvents(filters);
return events.length;
}
/** Delete events from memory. */
function deleteEvents(filters: Filter[]): Promise<void> {
for (const filter of filters) {
if (isMicrofilter(filter)) {
events.delete(getFilterId(filter));
}
}
return Promise.resolve();
}
/** In-memory data store for events using microfilters. */
const memorelay: EventStore = {
getEvents,
storeEvent,
countEvents,
deleteEvents,
};
export { memorelay };

View File

@ -1,8 +1,8 @@
import { Conf } from '@/config.ts';
import { Debug, type Filter } from '@/deps.ts';
import { eventsDB } from '@/db/events.ts';
import * as pipeline from '@/pipeline.ts';
import { signAdminEvent } from '@/sign.ts';
import { eventsDB } from '@/storages.ts';
const debug = Debug('ditto:users');

View File

@ -18,6 +18,7 @@ export {
getEventHash,
getPublicKey,
getSignature,
matchFilter,
matchFilters,
nip04,
nip05,

View File

@ -4,7 +4,7 @@ import { assertEquals } from '@/deps-test.ts';
import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts';
import { eventToMicroFilter, getFilterId, getFilterLimit, getMicroFilters, isMicrofilter } from './filter.ts';
Deno.test('getMicroFilters', () => {
const event = event0 as Event<0>;
@ -35,3 +35,13 @@ Deno.test('getFilterId', () => {
'{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}',
);
});
Deno.test('getFilterLimit', () => {
assertEquals(getFilterLimit({ ids: [event0.id] }), 1);
assertEquals(getFilterLimit({ ids: [event0.id], limit: 2 }), 1);
assertEquals(getFilterLimit({ ids: [event0.id], limit: 0 }), 0);
assertEquals(getFilterLimit({ ids: [event0.id], limit: -1 }), 0);
assertEquals(getFilterLimit({ kinds: [0], authors: [event0.pubkey] }), 1);
assertEquals(getFilterLimit({ kinds: [1], authors: [event0.pubkey] }), Infinity);
assertEquals(getFilterLimit({}), Infinity);
});

View File

@ -2,6 +2,7 @@ import { Conf } from '@/config.ts';
import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import { type EventData } from '@/types.ts';
import { isReplaceableKind } from '@/kinds.ts';
/** Additional properties that may be added by Ditto to events. */
type Relation = 'author' | 'author_stats' | 'event_stats';
@ -82,15 +83,50 @@ function isMicrofilter(filter: Filter): filter is MicroFilter {
return microFilterSchema.safeParse(filter).success;
}
/** Calculate the intrinsic limit of a filter. */
function getFilterLimit(filter: Filter): number {
if (filter.ids && !filter.ids.length) return 0;
if (filter.kinds && !filter.kinds.length) return 0;
if (filter.authors && !filter.authors.length) return 0;
return Math.min(
Math.max(0, filter.limit ?? Infinity),
filter.ids?.length ?? Infinity,
filter.authors?.length &&
filter.kinds?.every((kind) => isReplaceableKind(kind))
? filter.authors.length * filter.kinds.length
: Infinity,
);
}
/** Returns true if the filter could potentially return any stored events at all. */
function canFilter(filter: Filter): boolean {
return getFilterLimit(filter) > 0;
}
/** Normalize the `limit` of each filter, and remove filters that can't produce any events. */
function normalizeFilters<F extends Filter>(filters: F[]): F[] {
return filters.reduce<F[]>((acc, filter) => {
const limit = getFilterLimit(filter);
if (limit > 0) {
acc.push(limit === Infinity ? filter : { ...filter, limit });
}
return acc;
}, []);
}
export {
type AuthorMicrofilter,
canFilter,
type DittoFilter,
eventToMicroFilter,
getFilterId,
getFilterLimit,
getMicroFilters,
type IdMicrofilter,
isMicrofilter,
matchDittoFilters,
type MicroFilter,
normalizeFilters,
type Relation,
};

View File

@ -1,7 +1,5 @@
import { client } from '@/client.ts';
import { Conf } from '@/config.ts';
import { eventsDB } from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts';
import { addRelays } from '@/db/relays.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { findUser } from '@/db/users.ts';
@ -10,6 +8,7 @@ import { isEphemeralKind } from '@/kinds.ts';
import { isLocallyFollowed } from '@/queries.ts';
import { reqmeister } from '@/reqmeister.ts';
import { updateStats } from '@/stats.ts';
import { eventsDB, memorelay } from '@/storages.ts';
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts';
import { type EventData } from '@/types.ts';

View File

@ -1,9 +1,8 @@
import { eventsDB } from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts';
import { eventsDB, memorelay } from '@/storages.ts';
import { Debug, type Event, findReplyTag } from '@/deps.ts';
import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
import { reqmeister } from '@/reqmeister.ts';
import { type DittoEvent } from '@/store.ts';
import { type DittoEvent } from '@/storages/types.ts';
import { getTagSet } from '@/tags.ts';
const debug = Debug('ditto:queries');

View File

@ -1,6 +1,14 @@
import { client } from '@/client.ts';
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
import {
AuthorMicrofilter,
eventToMicroFilter,
getFilterId,
IdMicrofilter,
isMicrofilter,
type MicroFilter,
} from '@/filter.ts';
import { type EventStore, GetEventsOpts } from '@/storages/types.ts';
import { Time } from '@/utils/time.ts';
const debug = Debug('ditto:reqmeister');
@ -18,12 +26,14 @@ interface ReqmeisterReqOpts {
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
/** Batches requests to Nostr relays using microfilters. */
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> implements EventStore {
#opts: ReqmeisterOpts;
#queue: ReqmeisterQueueItem[] = [];
#promise!: Promise<void>;
#resolve!: () => void;
supportedNips = [];
constructor(opts: ReqmeisterOpts = {}) {
super();
this.#opts = opts;
@ -119,6 +129,33 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
const filterId = getFilterId(eventToMicroFilter(event));
return this.#queue.some(([id]) => id === filterId);
}
getEvents<K extends number>(filters: Filter<K>[], opts?: GetEventsOpts | undefined): Promise<Event<K>[]> {
if (opts?.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
const promises = filters.reduce<Promise<Event<K>>[]>((result, filter) => {
if (isMicrofilter(filter)) {
result.push(this.req(filter) as Promise<Event<K>>);
}
return result;
}, []);
return Promise.all(promises);
}
storeEvent(event: Event): Promise<void> {
this.encounter(event);
return Promise.resolve();
}
countEvents(_filters: Filter[]): Promise<number> {
throw new Error('COUNT not implemented.');
}
deleteEvents(_filters: Filter[]): Promise<void> {
throw new Error('DELETE not implemented.');
}
}
const reqmeister = new Reqmeister({

View File

@ -1,6 +1,6 @@
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
import { eventsDB } from '@/db/events.ts';
import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
import { eventsDB } from '@/storages.ts';
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;

28
src/storages.ts Normal file
View File

@ -0,0 +1,28 @@
import { Conf } from '@/config.ts';
import { db } from '@/db.ts';
import { EventsDB } from '@/storages/events-db.ts';
import { Memorelay } from '@/storages/memorelay.ts';
import { Optimizer } from '@/storages/optimizer.ts';
import { SearchStore } from '@/storages/search-store.ts';
import { reqmeister } from '@/reqmeister.ts';
/** SQLite database to store events this Ditto server cares about. */
const eventsDB = new EventsDB(db);
/** In-memory data store for cached events. */
const memorelay = new Memorelay({ max: 3000 });
/** Main Ditto storage adapter */
const optimizer = new Optimizer({
db: eventsDB,
cache: memorelay,
client: reqmeister,
});
/** Storage to use for remote search. */
const searchStore = new SearchStore({
relay: Conf.searchRelay,
fallback: optimizer,
});
export { eventsDB, memorelay, optimizer, searchStore };

View File

@ -0,0 +1,67 @@
import { db } from '@/db.ts';
import { buildUserEvent } from '@/db/users.ts';
import { assertEquals, assertRejects } from '@/deps-test.ts';
import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { EventsDB } from './events-db.ts';
const eventsDB = new EventsDB(db);
Deno.test('count filters', async () => {
assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 0);
await eventsDB.storeEvent(event1);
assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 1);
});
Deno.test('insert and filter events', async () => {
await eventsDB.storeEvent(event1);
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]);
assertEquals(await eventsDB.getEvents([{ kinds: [3] }]), []);
assertEquals(await eventsDB.getEvents([{ since: 1691091000 }]), [event1]);
assertEquals(await eventsDB.getEvents([{ until: 1691091000 }]), []);
assertEquals(
await eventsDB.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
[event1],
);
});
Deno.test('delete events', async () => {
await eventsDB.storeEvent(event1);
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]);
await eventsDB.deleteEvents([{ kinds: [1] }]);
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), []);
});
Deno.test('query events with local filter', async () => {
await eventsDB.storeEvent(event1);
assertEquals(await eventsDB.getEvents([{}]), [event1]);
assertEquals(await eventsDB.getEvents([{ local: true }]), []);
assertEquals(await eventsDB.getEvents([{ local: false }]), [event1]);
const userEvent = await buildUserEvent({
username: 'alex',
pubkey: event1.pubkey,
inserted_at: new Date(),
admin: false,
});
await eventsDB.storeEvent(userEvent);
assertEquals(await eventsDB.getEvents([{ kinds: [1], local: true }]), [event1]);
assertEquals(await eventsDB.getEvents([{ kinds: [1], local: false }]), []);
});
Deno.test('inserting replaceable events', async () => {
assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0);
await eventsDB.storeEvent(event0);
await assertRejects(() => eventsDB.storeEvent(event0));
assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1);
const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 };
await eventsDB.storeEvent(changeEvent);
assertEquals(await eventsDB.getEvents([{ kinds: [0] }]), [changeEvent]);
});

414
src/storages/events-db.ts Normal file
View File

@ -0,0 +1,414 @@
import { Conf } from '@/config.ts';
import { type DittoDB } from '@/db.ts';
import { Debug, type Event, Kysely, type SelectQueryBuilder } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts';
import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { isNostrId, isURL } from '@/utils.ts';
import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts';
/** Function to decide whether or not to index a tag. */
type TagCondition = ({ event, count, value }: {
event: Event;
opts: StoreEventOpts;
count: number;
value: string;
}) => boolean;
/** Conditions for when to index certain tags. */
const tagConditions: Record<string, TagCondition> = {
'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind),
'e': ({ event, count, value, opts }) => ((opts.data?.user && event.kind === 10003) || count < 15) && isNostrId(value),
'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value),
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
'proxy': ({ count, value }) => count === 0 && isURL(value),
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
't': ({ count, value }) => count < 5 && value.length < 50,
'name': ({ event, count }) => event.kind === 30361 && count === 0,
'role': ({ event, count }) => event.kind === 30361 && count === 0,
};
type EventQuery = SelectQueryBuilder<DittoDB, 'events', {
id: string;
tags: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
sig: string;
stats_replies_count?: number;
stats_reposts_count?: number;
stats_reactions_count?: number;
author_id?: string;
author_tags?: string;
author_kind?: number;
author_pubkey?: string;
author_content?: string;
author_created_at?: number;
author_sig?: string;
author_stats_followers_count?: number;
author_stats_following_count?: number;
author_stats_notes_count?: number;
}>;
/** SQLite database storage adapter for Nostr events. */
class EventsDB implements EventStore {
#db: Kysely<DittoDB>;
#debug = Debug('ditto:db:events');
/** NIPs supported by this storage method. */
supportedNips = [1, 45, 50];
constructor(db: Kysely<DittoDB>) {
this.#db = db;
}
/** Insert an event (and its tags) into the database. */
async storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
this.#debug('EVENT', JSON.stringify(event));
if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) {
throw new Error('Internal events can only be stored by the server keypair');
}
return await this.#db.transaction().execute(async (trx) => {
/** Insert the event into the database. */
async function addEvent() {
await trx.insertInto('events')
.values({ ...event, tags: JSON.stringify(event.tags) })
.execute();
}
/** Add search data to the FTS table. */
async function indexSearch() {
const searchContent = buildSearchContent(event);
if (!searchContent) return;
await trx.insertInto('events_fts')
.values({ id: event.id, content: searchContent.substring(0, 1000) })
.execute();
}
/** Index event tags depending on the conditions defined above. */
async function indexTags() {
const tags = filterIndexableTags(event, opts);
const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value }));
if (!tags.length) return;
await trx.insertInto('tags')
.values(rows)
.execute();
}
if (isReplaceableKind(event.kind)) {
const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey] }).execute();
for (const prevEvent of prevEvents) {
if (prevEvent.created_at >= event.created_at) {
throw new Error('Cannot replace an event with an older event');
}
}
await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey] }]);
}
if (isParameterizedReplaceableKind(event.kind)) {
const d = event.tags.find(([tag]) => tag === 'd')?.[1];
if (d) {
const prevEvents = await this.getFilterQuery(trx, { kinds: [event.kind], authors: [event.pubkey], '#d': [d] })
.execute();
for (const prevEvent of prevEvents) {
if (prevEvent.created_at >= event.created_at) {
throw new Error('Cannot replace an event with an older event');
}
}
await this.deleteEventsTrx(trx, [{ kinds: [event.kind], authors: [event.pubkey], '#d': [d] }]);
}
}
// Run the queries.
await Promise.all([
addEvent(),
indexTags(),
indexSearch(),
]);
}).catch((error) => {
// Don't throw for duplicate events.
if (error.message.includes('UNIQUE constraint failed')) {
return;
} else {
throw error;
}
});
}
/** Build the query for a filter. */
getFilterQuery(db: Kysely<DittoDB>, filter: DittoFilter): EventQuery {
let query = db
.selectFrom('events')
.select([
'events.id',
'events.kind',
'events.pubkey',
'events.content',
'events.tags',
'events.created_at',
'events.sig',
])
.orderBy('events.created_at', 'desc');
for (const [key, value] of Object.entries(filter)) {
if (value === undefined) continue;
switch (key as keyof DittoFilter) {
case 'ids':
query = query.where('events.id', 'in', filter.ids!);
break;
case 'kinds':
query = query.where('events.kind', 'in', filter.kinds!);
break;
case 'authors':
query = query.where('events.pubkey', 'in', filter.authors!);
break;
case 'since':
query = query.where('events.created_at', '>=', filter.since!);
break;
case 'until':
query = query.where('events.created_at', '<=', filter.until!);
break;
case 'limit':
query = query.limit(filter.limit!);
break;
}
if (key.startsWith('#')) {
const tag = key.replace(/^#/, '');
const value = filter[key as `#${string}`] as string[];
query = query
.leftJoin('tags', 'tags.event_id', 'events.id')
.where('tags.tag', '=', tag)
.where('tags.value', 'in', value);
}
}
if (typeof filter.local === 'boolean') {
query = query
.leftJoin(() => this.usersQuery(), (join) => join.onRef('users.d_tag', '=', 'events.pubkey'))
.where('users.d_tag', filter.local ? 'is not' : 'is', null);
}
if (filter.relations?.includes('author')) {
query = query
.leftJoin(
(eb) =>
eb
.selectFrom('events')
.selectAll()
.where('kind', '=', 0)
.groupBy('pubkey')
.as('authors'),
(join) => join.onRef('authors.pubkey', '=', 'events.pubkey'),
)
.select([
'authors.id as author_id',
'authors.kind as author_kind',
'authors.pubkey as author_pubkey',
'authors.content as author_content',
'authors.tags as author_tags',
'authors.created_at as author_created_at',
'authors.sig as author_sig',
]);
}
if (filter.relations?.includes('author_stats')) {
query = query
.leftJoin('author_stats', 'author_stats.pubkey', 'events.pubkey')
.select((eb) => [
eb.fn.coalesce('author_stats.followers_count', eb.val(0)).as('author_stats_followers_count'),
eb.fn.coalesce('author_stats.following_count', eb.val(0)).as('author_stats_following_count'),
eb.fn.coalesce('author_stats.notes_count', eb.val(0)).as('author_stats_notes_count'),
]);
}
if (filter.relations?.includes('event_stats')) {
query = query
.leftJoin('event_stats', 'event_stats.event_id', 'events.id')
.select((eb) => [
eb.fn.coalesce('event_stats.replies_count', eb.val(0)).as('stats_replies_count'),
eb.fn.coalesce('event_stats.reposts_count', eb.val(0)).as('stats_reposts_count'),
eb.fn.coalesce('event_stats.reactions_count', eb.val(0)).as('stats_reactions_count'),
]);
}
if (filter.search) {
query = query
.innerJoin('events_fts', 'events_fts.id', 'events.id')
.where('events_fts.content', 'match', JSON.stringify(filter.search));
}
return query;
}
/** Combine filter queries into a single union query. */
getEventsQuery(filters: DittoFilter[]) {
return filters
.map((filter) => this.#db.selectFrom(() => this.getFilterQuery(this.#db, filter).as('events')).selectAll())
.reduce((result, query) => result.unionAll(query));
}
/** Query to get user events, joined by tags. */
usersQuery() {
return this.getFilterQuery(this.#db, { kinds: [30361], authors: [Conf.pubkey] })
.leftJoin('tags', 'tags.event_id', 'events.id')
.where('tags.tag', '=', 'd')
.select('tags.value as d_tag')
.as('users');
}
/** Get events for filters from the database. */
async getEvents<K extends number>(
filters: DittoFilter<K>[],
opts: GetEventsOpts = {},
): Promise<DittoEvent<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
this.#debug('REQ', JSON.stringify(filters));
let query = this.getEventsQuery(filters);
if (typeof opts.limit === 'number') {
query = query.limit(opts.limit);
}
return (await query.execute()).map((row) => {
const event: DittoEvent<K> = {
id: row.id,
kind: row.kind as K,
pubkey: row.pubkey,
content: row.content,
created_at: row.created_at,
tags: JSON.parse(row.tags),
sig: row.sig,
};
if (row.author_id) {
event.author = {
id: row.author_id,
kind: row.author_kind! as 0,
pubkey: row.author_pubkey!,
content: row.author_content!,
created_at: row.author_created_at!,
tags: JSON.parse(row.author_tags!),
sig: row.author_sig!,
};
}
if (typeof row.author_stats_followers_count === 'number') {
event.author_stats = {
followers_count: row.author_stats_followers_count,
following_count: row.author_stats_following_count!,
notes_count: row.author_stats_notes_count!,
};
}
if (typeof row.stats_replies_count === 'number') {
event.event_stats = {
replies_count: row.stats_replies_count,
reposts_count: row.stats_reposts_count!,
reactions_count: row.stats_reactions_count!,
};
}
return event;
});
}
/** Delete events from each table. Should be run in a transaction! */
async deleteEventsTrx(db: Kysely<DittoDB>, filters: DittoFilter[]) {
if (!filters.length) return Promise.resolve();
this.#debug('DELETE', JSON.stringify(filters));
const query = this.getEventsQuery(filters).clearSelect().select('id');
await db.deleteFrom('events_fts')
.where('id', 'in', () => query)
.execute();
return db.deleteFrom('events')
.where('id', 'in', () => query)
.execute();
}
/** Delete events based on filters from the database. */
async deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void> {
if (!filters.length) return Promise.resolve();
this.#debug('DELETE', JSON.stringify(filters));
await this.#db.transaction().execute((trx) => this.deleteEventsTrx(trx, filters));
}
/** Get number of events that would be returned by filters. */
async countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
if (!filters.length) return Promise.resolve(0);
this.#debug('COUNT', JSON.stringify(filters));
const query = this.getEventsQuery(filters);
const [{ count }] = await query
.clearSelect()
.select((eb) => eb.fn.count('id').as('count'))
.execute();
return Number(count);
}
}
/** Return only the tags that should be indexed. */
function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] {
const tagCounts: Record<string, number> = {};
function getCount(name: string) {
return tagCounts[name] || 0;
}
function incrementCount(name: string) {
tagCounts[name] = getCount(name) + 1;
}
function checkCondition(name: string, value: string, condition: TagCondition) {
return condition({
event,
opts,
count: getCount(name),
value,
});
}
return event.tags.reduce<string[][]>((results, tag) => {
const [name, value] = tag;
const condition = tagConditions[name] as TagCondition | undefined;
if (value && condition && value.length < 200 && checkCondition(name, value, condition)) {
results.push(tag);
}
incrementCount(name);
return results;
}, []);
}
/** Build a search index from the event. */
function buildSearchContent(event: Event): string {
switch (event.kind) {
case 0:
return buildUserSearchContent(event as Event<0>);
case 1:
return event.content;
default:
return '';
}
}
/** Build search content for a user. */
function buildUserSearchContent(event: Event<0>): string {
const { name, nip05, about } = jsonMetaContentSchema.parse(event.content);
return [name, nip05, about].filter(Boolean).join('\n');
}
export { EventsDB };

27
src/storages/hydrate.ts Normal file
View File

@ -0,0 +1,27 @@
import { type DittoFilter } from '@/filter.ts';
import { type DittoEvent, type EventStore } from '@/storages/types.ts';
interface HydrateEventOpts<K extends number> {
events: DittoEvent<K>[];
filters: DittoFilter<K>[];
storage: EventStore;
signal?: AbortSignal;
}
/** Hydrate event relationships using the provided storage. */
async function hydrateEvents<K extends number>(opts: HydrateEventOpts<K>): Promise<DittoEvent<K>[]> {
const { events, filters, storage, signal } = opts;
if (filters.some((filter) => filter.relations?.includes('author'))) {
const pubkeys = new Set([...events].map((event) => event.pubkey));
const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }], { signal });
for (const event of events) {
event.author = authors.find((author) => author.pubkey === event.pubkey);
}
}
return events;
}
export { hydrateEvents };

View File

@ -2,7 +2,13 @@ import { assertEquals } from '@/deps-test.ts';
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { memorelay } from './memorelay.ts';
import { Memorelay } from './memorelay.ts';
const memorelay = new Memorelay({
max: 3000,
maxEntrySize: 5000,
sizeCalculation: (event) => JSON.stringify(event).length,
});
Deno.test('memorelay', async () => {
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);

114
src/storages/memorelay.ts Normal file
View File

@ -0,0 +1,114 @@
import { Debug, type Event, type Filter, LRUCache, matchFilter } from '@/deps.ts';
import { normalizeFilters } from '@/filter.ts';
import { EventSet } from '@/utils/event-set.ts';
import { type EventStore, type GetEventsOpts } from './types.ts';
/** In-memory data store for events. */
class Memorelay implements EventStore {
#debug = Debug('ditto:memorelay');
#cache: LRUCache<string, Event>;
/** NIPs supported by this storage method. */
supportedNips = [1, 45];
constructor(...args: ConstructorParameters<typeof LRUCache<string, Event>>) {
this.#cache = new LRUCache<string, Event>(...args);
}
/** Iterate stored events. */
*#events(): Generator<Event> {
for (const event of this.#cache.values()) {
if (event && !(event instanceof Promise)) {
yield event;
}
}
}
/** Get events from memory. */
getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
filters = normalizeFilters(filters);
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
this.#debug('REQ', JSON.stringify(filters));
/** Event results to return. */
const results = new EventSet<Event<K>>();
/** Number of times an event has been added to results for each filter. */
const filterUsages: number[] = [];
/** Check if all filters have been satisfied. */
function checkSatisfied() {
return results.size >= (opts.limit ?? Infinity) ||
filters.every((filter, index) => filter.limit && (filterUsages[index] >= filter.limit));
}
// Optimize for filters with IDs.
filters.forEach((filter, index) => {
if (filter.ids) {
for (const id of filter.ids) {
const event = this.#cache.get(id);
if (event && matchFilter(filter, event)) {
results.add(event as Event<K>);
}
}
filterUsages[index] = Infinity;
}
});
// Return early if all filters are satisfied.
if (checkSatisfied()) {
return Promise.resolve([...results]);
}
// Seek through all events in memory.
for (const event of this.#events()) {
filters.forEach((filter, index) => {
const limit = filter.limit ?? Infinity;
const usage = filterUsages[index] ?? 0;
if (usage >= limit) {
return;
} else if (matchFilter(filter, event)) {
results.add(event as Event<K>);
this.#cache.get(event.id);
filterUsages[index] = usage + 1;
}
index++;
});
// Check after each event if we can return.
if (checkSatisfied()) {
break;
}
}
return Promise.resolve([...results]);
}
/** Insert an event into memory. */
storeEvent(event: Event): Promise<void> {
this.#cache.set(event.id, event);
return Promise.resolve();
}
/** Count events in memory for the filters. */
async countEvents(filters: Filter[]): Promise<number> {
const events = await this.getEvents(filters);
return events.length;
}
/** Delete events from memory. */
async deleteEvents(filters: Filter[]): Promise<void> {
for (const event of await this.getEvents(filters)) {
this.#cache.delete(event.id);
}
return Promise.resolve();
}
}
export { Memorelay };

111
src/storages/optimizer.ts Normal file
View File

@ -0,0 +1,111 @@
import { Debug } from '@/deps.ts';
import { type DittoFilter, normalizeFilters } from '@/filter.ts';
import { EventSet } from '@/utils/event-set.ts';
import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts';
interface OptimizerOpts {
db: EventStore;
cache: EventStore;
client: EventStore;
}
class Optimizer implements EventStore {
#debug = Debug('ditto:optimizer');
#db: EventStore;
#cache: EventStore;
#client: EventStore;
supportedNips = [1];
constructor(opts: OptimizerOpts) {
this.#db = opts.db;
this.#cache = opts.cache;
this.#client = opts.client;
}
async storeEvent(event: DittoEvent<number>, opts?: StoreEventOpts | undefined): Promise<void> {
await Promise.all([
this.#db.storeEvent(event, opts),
this.#cache.storeEvent(event, opts),
]);
}
async getEvents<K extends number>(
filters: DittoFilter<K>[],
opts: GetEventsOpts | undefined = {},
): Promise<DittoEvent<K>[]> {
this.#debug('REQ', JSON.stringify(filters));
const { limit = Infinity } = opts;
filters = normalizeFilters(filters);
if (opts?.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
const results = new EventSet<DittoEvent<K>>();
// Filters with IDs are immutable, so we can take them straight from the cache if we have them.
for (let i = 0; i < filters.length; i++) {
const filter = filters[i];
if (filter.ids) {
this.#debug(`Filter[${i}] is an IDs filter; querying cache...`);
const ids = new Set<string>(filter.ids);
for (const event of await this.#cache.getEvents([filter], opts)) {
ids.delete(event.id);
results.add(event);
if (results.size >= limit) return getResults();
}
filters[i] = { ...filter, ids: [...ids] };
}
}
filters = normalizeFilters(filters);
if (!filters.length) return getResults();
// Query the database for events.
this.#debug('Querying database...');
for (const dbEvent of await this.#db.getEvents(filters, opts)) {
results.add(dbEvent);
if (results.size >= limit) return getResults();
}
// We already searched the DB, so stop if this is a search filter.
if (filters.some((filter) => typeof filter.search === 'string')) {
this.#debug(`Bailing early for search filter: "${filters[0]?.search}"`);
return getResults();
}
// Query the cache again.
this.#debug('Querying cache...');
for (const cacheEvent of await this.#cache.getEvents(filters, opts)) {
results.add(cacheEvent);
if (results.size >= limit) return getResults();
}
// Finally, query the client.
this.#debug('Querying client...');
for (const clientEvent of await this.#client.getEvents(filters, opts)) {
results.add(clientEvent);
if (results.size >= limit) return getResults();
}
/** Get return type from map. */
function getResults() {
return [...results.values()];
}
return getResults();
}
countEvents<K extends number>(_filters: DittoFilter<K>[]): Promise<number> {
throw new Error('COUNT not implemented.');
}
deleteEvents<K extends number>(_filters: DittoFilter<K>[]): Promise<void> {
throw new Error('DELETE not implemented.');
}
}
export { Optimizer };

View File

@ -0,0 +1,85 @@
import { NiceRelay } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/5f4fb59c90c092e5aa59c01e6556a4bec264c167/mod.ts';
import { Debug, type Event, type Filter } from '@/deps.ts';
import { type DittoFilter, normalizeFilters } from '@/filter.ts';
import { hydrateEvents } from '@/storages/hydrate.ts';
import { type DittoEvent, type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts';
import { EventSet } from '@/utils/event-set.ts';
interface SearchStoreOpts {
relay: string | undefined;
fallback: EventStore;
hydrator?: EventStore;
}
class SearchStore implements EventStore {
#debug = Debug('ditto:storages:search');
#fallback: EventStore;
#hydrator: EventStore;
#relay: NiceRelay | undefined;
supportedNips = [50];
constructor(opts: SearchStoreOpts) {
this.#fallback = opts.fallback;
this.#hydrator = opts.hydrator ?? this;
if (opts.relay) {
this.#relay = new NiceRelay(opts.relay);
}
}
storeEvent(_event: Event, _opts?: StoreEventOpts | undefined): Promise<void> {
throw new Error('EVENT not implemented.');
}
async getEvents<K extends number>(
filters: DittoFilter<K>[],
opts?: GetEventsOpts | undefined,
): Promise<DittoEvent<K>[]> {
filters = normalizeFilters(filters);
if (opts?.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
this.#debug('REQ', JSON.stringify(filters));
const query = filters[0]?.search;
if (this.#relay) {
this.#debug(`Searching for "${query}" at ${this.#relay.socket.url}...`);
const sub = this.#relay.req(filters, opts);
const close = () => {
sub.close();
opts?.signal?.removeEventListener('abort', close);
sub.eoseSignal.removeEventListener('abort', close);
};
opts?.signal?.addEventListener('abort', close, { once: true });
sub.eoseSignal.addEventListener('abort', close, { once: true });
const events = new EventSet<DittoEvent<K>>();
for await (const event of sub) {
events.add(event);
}
return hydrateEvents({ events: [...events], filters, storage: this.#hydrator, signal: opts?.signal });
} else {
this.#debug(`Searching for "${query}" locally...`);
return this.#fallback.getEvents(filters, opts);
}
}
countEvents<K extends number>(_filters: Filter<K>[]): Promise<number> {
throw new Error('COUNT not implemented.');
}
deleteEvents<K extends number>(_filters: Filter<K>[]): Promise<void> {
throw new Error('DELETE not implemented.');
}
}
export { SearchStore };

View File

@ -33,6 +33,8 @@ interface DittoEvent<K extends number = number> extends Event<K> {
/** Storage interface for Nostr events. */
interface EventStore {
/** Indicates NIPs supported by this data store, similar to NIP-11. For example, `50` would indicate support for `search` filters. */
supportedNips: readonly number[];
/** Add an event to the store. */
storeEvent(event: Event, opts?: StoreEventOpts): Promise<void>;
/** Get events from filters. */

View File

@ -13,8 +13,8 @@ import {
} from '@/deps.ts';
import * as pipeline from '@/pipeline.ts';
import { signAdminEvent, signEvent } from '@/sign.ts';
import { eventsDB } from '@/storages.ts';
import { nostrNow } from '@/utils.ts';
import { eventsDB } from '@/db/events.ts';
const debug = Debug('ditto:api');

109
src/utils/event-set.test.ts Normal file
View File

@ -0,0 +1,109 @@
import { assertEquals } from '@/deps-test.ts';
import { EventSet } from './event-set.ts';
Deno.test('EventSet', () => {
const set = new EventSet();
assertEquals(set.size, 0);
const event = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] };
set.add(event);
assertEquals(set.size, 1);
assertEquals(set.has(event), true);
set.add(event);
assertEquals(set.size, 1);
assertEquals(set.has(event), true);
set.delete(event);
assertEquals(set.size, 0);
assertEquals(set.has(event), false);
set.delete(event);
assertEquals(set.size, 0);
assertEquals(set.has(event), false);
set.add(event);
assertEquals(set.size, 1);
assertEquals(set.has(event), true);
set.clear();
assertEquals(set.size, 0);
assertEquals(set.has(event), false);
});
Deno.test('EventSet.add (replaceable)', () => {
const event0 = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] };
const event1 = { id: '2', kind: 0, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [] };
const event2 = { id: '3', kind: 0, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [] };
const set = new EventSet();
set.add(event0);
assertEquals(set.size, 1);
assertEquals(set.has(event0), true);
set.add(event1);
assertEquals(set.size, 1);
assertEquals(set.has(event0), false);
assertEquals(set.has(event1), true);
set.add(event2);
assertEquals(set.size, 1);
assertEquals(set.has(event0), false);
assertEquals(set.has(event1), false);
assertEquals(set.has(event2), true);
});
Deno.test('EventSet.add (parameterized)', () => {
const event0 = { id: '1', kind: 30000, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [['d', 'a']] };
const event1 = { id: '2', kind: 30000, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [['d', 'a']] };
const event2 = { id: '3', kind: 30000, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [['d', 'a']] };
const set = new EventSet();
set.add(event0);
assertEquals(set.size, 1);
assertEquals(set.has(event0), true);
set.add(event1);
assertEquals(set.size, 1);
assertEquals(set.has(event0), false);
assertEquals(set.has(event1), true);
set.add(event2);
assertEquals(set.size, 1);
assertEquals(set.has(event0), false);
assertEquals(set.has(event1), false);
assertEquals(set.has(event2), true);
});
Deno.test('EventSet.eventReplaces', () => {
const event0 = { id: '1', kind: 0, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [] };
const event1 = { id: '2', kind: 0, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [] };
const event2 = { id: '3', kind: 0, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [] };
const event3 = { id: '4', kind: 0, pubkey: 'def', content: '', created_at: 0, sig: '', tags: [] };
assertEquals(EventSet.eventReplaces(event1, event0), true);
assertEquals(EventSet.eventReplaces(event2, event0), true);
assertEquals(EventSet.eventReplaces(event2, event1), true);
assertEquals(EventSet.eventReplaces(event0, event1), false);
assertEquals(EventSet.eventReplaces(event0, event2), false);
assertEquals(EventSet.eventReplaces(event1, event2), false);
assertEquals(EventSet.eventReplaces(event3, event1), false);
assertEquals(EventSet.eventReplaces(event1, event3), false);
});
Deno.test('EventSet.eventReplaces (parameterized)', () => {
const event0 = { id: '1', kind: 30000, pubkey: 'abc', content: '', created_at: 0, sig: '', tags: [['d', 'a']] };
const event1 = { id: '2', kind: 30000, pubkey: 'abc', content: '', created_at: 1, sig: '', tags: [['d', 'a']] };
const event2 = { id: '3', kind: 30000, pubkey: 'abc', content: '', created_at: 2, sig: '', tags: [['d', 'a']] };
assertEquals(EventSet.eventReplaces(event1, event0), true);
assertEquals(EventSet.eventReplaces(event2, event0), true);
assertEquals(EventSet.eventReplaces(event2, event1), true);
assertEquals(EventSet.eventReplaces(event0, event1), false);
assertEquals(EventSet.eventReplaces(event0, event2), false);
assertEquals(EventSet.eventReplaces(event1, event2), false);
});

77
src/utils/event-set.ts Normal file
View File

@ -0,0 +1,77 @@
import { type Event } from '@/deps.ts';
import { isParameterizedReplaceableKind, isReplaceableKind } from '@/kinds.ts';
/** In-memory store for Nostr events with replaceable event functionality. */
class EventSet<E extends Event = Event> implements Set<E> {
#map = new Map<string, E>();
get size() {
return this.#map.size;
}
add(event: E): this {
if (isReplaceableKind(event.kind) || isParameterizedReplaceableKind(event.kind)) {
for (const e of this.values()) {
if (EventSet.eventReplaces(event, e)) {
this.delete(e);
}
}
}
this.#map.set(event.id, event);
return this;
}
clear(): void {
this.#map.clear();
}
delete(event: E): boolean {
return this.#map.delete(event.id);
}
forEach(callbackfn: (event: E, key: E, set: Set<E>) => void, thisArg?: any): void {
return this.#map.forEach((event, _id) => callbackfn(event, event, this), thisArg);
}
has(event: E): boolean {
return this.#map.has(event.id);
}
*entries(): IterableIterator<[E, E]> {
for (const event of this.#map.values()) {
yield [event, event];
}
}
keys(): IterableIterator<E> {
return this.#map.values();
}
values(): IterableIterator<E> {
return this.#map.values();
}
[Symbol.iterator](): IterableIterator<E> {
return this.#map.values();
}
[Symbol.toStringTag]: string = 'EventSet';
/** Returns true if both events are replaceable, belong to the same kind and pubkey (and `d` tag, for parameterized events), and the first event is newer than the second one. */
static eventReplaces(event: Event, target: Event): boolean {
if (isReplaceableKind(event.kind)) {
return event.kind === target.kind && event.pubkey === target.pubkey && event.created_at > target.created_at;
} else if (isParameterizedReplaceableKind(event.kind)) {
const d = event.tags.find(([name]) => name === 'd')?.[1] || '';
const d2 = target.tags.find(([name]) => name === 'd')?.[1] || '';
return event.kind === target.kind &&
event.pubkey === target.pubkey &&
d === d2 &&
event.created_at > target.created_at;
}
return false;
}
}
export { EventSet };

View File

@ -1,6 +1,6 @@
import { AppContext } from '@/app.ts';
import { eventsDB } from '@/db/events.ts';
import { type Filter } from '@/deps.ts';
import { eventsDB } from '@/storages.ts';
import { renderAccount } from '@/views/mastodon/accounts.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts';
import { paginated, paginationSchema } from '@/utils/api.ts';

View File

@ -2,8 +2,8 @@ import { Conf } from '@/config.ts';
import { findUser } from '@/db/users.ts';
import { lodash, nip19, type UnsignedEvent } from '@/deps.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { type DittoEvent } from '@/storages/types.ts';
import { verifyNip05Cached } from '@/utils/nip05.ts';
import { type DittoEvent } from '@/store.ts';
import { Nip05, nostrDate, nostrNow, parseNip05 } from '@/utils.ts';
import { renderEmojis } from '@/views/mastodon/emojis.ts';

View File

@ -1,4 +1,4 @@
import { eventsDB } from '@/db/events.ts';
import { eventsDB } from '@/storages.ts';
import { hasTag } from '@/tags.ts';
async function renderRelationship(sourcePubkey: string, targetPubkey: string) {

View File

@ -1,12 +1,12 @@
import { isCWTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts';
import { Conf } from '@/config.ts';
import { eventsDB } from '@/db/events.ts';
import { findReplyTag, nip19 } from '@/deps.ts';
import { getMediaLinks, parseNoteContent } from '@/note.ts';
import { getAuthor } from '@/queries.ts';
import { jsonMediaDataSchema } from '@/schemas/nostr.ts';
import { DittoEvent } from '@/store.ts';
import { eventsDB } from '@/storages.ts';
import { type DittoEvent } from '@/storages/types.ts';
import { nostrDate } from '@/utils.ts';
import { unfurlCardCached } from '@/utils/unfurl.ts';
import { accountFromPubkey, renderAccount } from '@/views/mastodon/accounts.ts';