Rework client as EventStore

This commit is contained in:
Alex Gleason 2023-12-29 13:22:51 -06:00
parent e6c8d1dad9
commit ccfdbfeb8d
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
6 changed files with 42 additions and 29 deletions

View File

@ -1,13 +1,12 @@
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { activeRelays, pool } from '@/pool.ts'; import { activeRelays, pool } from '@/pool.ts';
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
import type { GetFiltersOpts } from '@/filter.ts';
const debug = Debug('ditto:client'); const debug = Debug('ditto:client');
/** Get events from a NIP-01 filter. */ /** Get events from a NIP-01 filter. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> { function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]); if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters)); debug('REQ', JSON.stringify(filters));
@ -50,4 +49,19 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
}); });
} }
export { getFilters }; /** Publish an event to the given relays, or the entire pool. */
function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
const { relays = activeRelays } = opts;
debug('EVENT', event);
pool.publish(event, relays);
return Promise.resolve();
}
const client: EventStore = {
getEvents,
storeEvent,
countEvents: () => Promise.reject(new Error('COUNT not implemented')),
deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')),
};
export { client };

View File

@ -3,8 +3,7 @@ import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts'; import { type DittoFilter } from '@/filter.ts';
import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { type DittoEvent, EventStore, type GetEventsOpts } from '@/store.ts'; import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
import { EventData } from '@/types.ts';
import { isNostrId, isURL } from '@/utils.ts'; import { isNostrId, isURL } from '@/utils.ts';
const debug = Debug('ditto:db:events'); const debug = Debug('ditto:db:events');
@ -12,7 +11,7 @@ const debug = Debug('ditto:db:events');
/** Function to decide whether or not to index a tag. */ /** Function to decide whether or not to index a tag. */
type TagCondition = ({ event, count, value }: { type TagCondition = ({ event, count, value }: {
event: Event; event: Event;
data: EventData; opts: StoreEventOpts;
count: number; count: number;
value: string; value: string;
}) => boolean; }) => boolean;
@ -21,7 +20,7 @@ type TagCondition = ({ event, count, value }: {
const tagConditions: Record<string, TagCondition> = { const tagConditions: Record<string, TagCondition> = {
'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind),
'e': ({ count, value }) => count < 15 && isNostrId(value), 'e': ({ count, value }) => count < 15 && isNostrId(value),
'media': ({ count, value, data }) => (data.user || count < 4) && isURL(value), 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value),
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
'proxy': ({ count, value }) => count === 0 && isURL(value), 'proxy': ({ count, value }) => count === 0 && isURL(value),
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
@ -29,7 +28,7 @@ const tagConditions: Record<string, TagCondition> = {
}; };
/** Insert an event (and its tags) into the database. */ /** Insert an event (and its tags) into the database. */
function storeEvent(event: Event, data: EventData): Promise<void> { function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
debug('EVENT', JSON.stringify(event)); debug('EVENT', JSON.stringify(event));
return db.transaction().execute(async (trx) => { return db.transaction().execute(async (trx) => {
@ -51,7 +50,7 @@ function storeEvent(event: Event, data: EventData): Promise<void> {
/** Index event tags depending on the conditions defined above. */ /** Index event tags depending on the conditions defined above. */
async function indexTags() { async function indexTags() {
const tags = filterIndexableTags(event, data); const tags = filterIndexableTags(event, opts);
const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value }));
if (!tags.length) return; if (!tags.length) return;
@ -302,7 +301,7 @@ async function countEvents<K extends number>(filters: DittoFilter<K>[]): Promise
} }
/** Return only the tags that should be indexed. */ /** Return only the tags that should be indexed. */
function filterIndexableTags(event: Event, data: EventData): string[][] { function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] {
const tagCounts: Record<string, number> = {}; const tagCounts: Record<string, number> = {};
function getCount(name: string) { function getCount(name: string) {
@ -316,7 +315,7 @@ function filterIndexableTags(event: Event, data: EventData): string[][] {
function checkCondition(name: string, value: string, condition: TagCondition) { function checkCondition(name: string, value: string, condition: TagCondition) {
return condition({ return condition({
event, event,
data, opts,
count: getCount(name), count: getCount(name),
value, value,
}); });

View File

@ -1,3 +1,4 @@
import { client } from '@/client.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { eventsDB } from '@/db/events.ts'; import { eventsDB } from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts'; import { memorelay } from '@/db/memorelay.ts';
@ -6,7 +7,6 @@ import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { findUser } from '@/db/users.ts'; import { findUser } from '@/db/users.ts';
import { Debug, type Event } from '@/deps.ts'; import { Debug, type Event } from '@/deps.ts';
import { isEphemeralKind } from '@/kinds.ts'; import { isEphemeralKind } from '@/kinds.ts';
import { publish } from '@/pool.ts';
import { isLocallyFollowed } from '@/queries.ts'; import { isLocallyFollowed } from '@/queries.ts';
import { reqmeister } from '@/reqmeister.ts'; import { reqmeister } from '@/reqmeister.ts';
import { updateStats } from '@/stats.ts'; import { updateStats } from '@/stats.ts';
@ -78,7 +78,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
return Promise.reject(new RelayError('blocked', 'event was deleted')); return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else { } else {
await Promise.all([ await Promise.all([
eventsDB.storeEvent(event, data).catch(debug), eventsDB.storeEvent(event, { data }).catch(debug),
updateStats(event).catch(debug), updateStats(event).catch(debug),
]); ]);
} }
@ -176,7 +176,7 @@ function broadcast(event: Event, data: EventData) {
if (!data.user || !isFresh(event)) return; if (!data.user || !isFresh(event)) return;
if (event.kind === 5) { if (event.kind === 5) {
publish(event); client.storeEvent(event);
} }
} }

View File

@ -1,7 +1,5 @@
import { getActiveRelays } from '@/db/relays.ts'; import { getActiveRelays } from '@/db/relays.ts';
import { Debug, type Event, RelayPoolWorker } from '@/deps.ts'; import { RelayPoolWorker } from '@/deps.ts';
const debug = Debug('ditto:pool');
const activeRelays = await getActiveRelays(); const activeRelays = await getActiveRelays();
@ -17,10 +15,4 @@ const pool = new RelayPoolWorker(worker, activeRelays, {
logErrorsAndNotices: false, logErrorsAndNotices: false,
}); });
/** Publish an event to the given relays, or the entire pool. */ export { activeRelays, pool };
function publish(event: Event, relays: string[] = activeRelays) {
debug('publish', event);
return pool.publish(event, relays);
}
export { activeRelays, pool, publish };

View File

@ -1,4 +1,4 @@
import * as client from '@/client.ts'; import { client } from '@/client.ts';
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
@ -64,7 +64,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
if (filters.length) { if (filters.length) {
debug('REQ', JSON.stringify(filters)); debug('REQ', JSON.stringify(filters));
const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) }); const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) });
for (const event of events) { for (const event of events) {
this.encounter(event); this.encounter(event);

View File

@ -13,6 +13,14 @@ interface GetEventsOpts {
relays?: WebSocket['url'][]; relays?: WebSocket['url'][];
} }
/** Options when storing an event. */
interface StoreEventOpts {
/** Event data to store. */
data?: EventData;
/** Relays to use, if applicable. */
relays?: WebSocket['url'][];
}
type AuthorStats = Omit<DittoDB['author_stats'], 'pubkey'>; type AuthorStats = Omit<DittoDB['author_stats'], 'pubkey'>;
type EventStats = Omit<DittoDB['event_stats'], 'event_id'>; type EventStats = Omit<DittoDB['event_stats'], 'event_id'>;
@ -26,7 +34,7 @@ interface DittoEvent<K extends number = number> extends Event<K> {
/** Storage interface for Nostr events. */ /** Storage interface for Nostr events. */
interface EventStore { interface EventStore {
/** Add an event to the store. */ /** Add an event to the store. */
storeEvent(event: Event, data?: EventData): Promise<void>; storeEvent(event: Event, opts?: StoreEventOpts): Promise<void>;
/** Get events from filters. */ /** Get events from filters. */
getEvents<K extends number>(filters: DittoFilter<K>[], opts?: GetEventsOpts): Promise<DittoEvent<K>[]>; getEvents<K extends number>(filters: DittoFilter<K>[], opts?: GetEventsOpts): Promise<DittoEvent<K>[]>;
/** Get the number of events from filters. */ /** Get the number of events from filters. */
@ -35,4 +43,4 @@ interface EventStore {
deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void>; deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void>;
} }
export type { DittoEvent, EventStore, GetEventsOpts }; export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts };