Merge branch 'reqstorage' into 'main'
Refactor storages See merge request soapbox-pub/ditto!97
This commit is contained in:
commit
7c4590e6d4
|
@ -39,6 +39,6 @@ async function usersToEvents() {
|
|||
created_at: Math.floor(new Date(row.inserted_at).getTime() / 1000),
|
||||
});
|
||||
|
||||
await eventsDB.storeEvent(event);
|
||||
await eventsDB.add(event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,74 +0,0 @@
|
|||
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||
import { normalizeFilters } from '@/filter.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { activeRelays, pool } from '@/pool.ts';
|
||||
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts';
|
||||
import { EventSet } from '@/utils/event-set.ts';
|
||||
|
||||
const debug = Debug('ditto:client');
|
||||
|
||||
/** Get events from a NIP-01 filter. */
|
||||
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||
filters = normalizeFilters(filters);
|
||||
|
||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
debug('REQ', JSON.stringify(filters));
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const results = new EventSet<Event<K>>();
|
||||
|
||||
const unsub = pool.subscribe(
|
||||
filters,
|
||||
opts.relays ?? activeRelays,
|
||||
(event: Event | null) => {
|
||||
if (event && matchFilters(filters, event)) {
|
||||
pipeline.handleEvent(event).catch(() => {});
|
||||
results.add({
|
||||
id: event.id,
|
||||
kind: event.kind as K,
|
||||
pubkey: event.pubkey,
|
||||
content: event.content,
|
||||
tags: event.tags,
|
||||
created_at: event.created_at,
|
||||
sig: event.sig,
|
||||
});
|
||||
}
|
||||
if (typeof opts.limit === 'number' && results.size >= opts.limit) {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
() => {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
},
|
||||
);
|
||||
|
||||
opts.signal?.addEventListener('abort', () => {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/** Publish an event to the given relays, or the entire pool. */
|
||||
function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||
const { relays = activeRelays } = opts;
|
||||
const debug = Debug('ditto:client:publish');
|
||||
debug('EVENT', event);
|
||||
pool.publish(event, relays);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const client: EventStore = {
|
||||
supportedNips: [1],
|
||||
getEvents,
|
||||
storeEvent,
|
||||
countEvents: () => Promise.reject(new Error('COUNT not implemented')),
|
||||
deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')),
|
||||
};
|
||||
|
||||
export { client };
|
|
@ -134,7 +134,7 @@ const accountStatusesController: AppController = async (c) => {
|
|||
const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query());
|
||||
|
||||
if (pinned) {
|
||||
const [pinEvent] = await eventsDB.getEvents([{ kinds: [10001], authors: [pubkey], limit: 1 }]);
|
||||
const [pinEvent] = await eventsDB.filter([{ kinds: [10001], authors: [pubkey], limit: 1 }]);
|
||||
if (pinEvent) {
|
||||
const pinnedEventIds = getTagSet(pinEvent.tags, 'e');
|
||||
return renderStatuses(c, [...pinnedEventIds].reverse());
|
||||
|
@ -156,7 +156,7 @@ const accountStatusesController: AppController = async (c) => {
|
|||
filter['#t'] = [tagged];
|
||||
}
|
||||
|
||||
let events = await eventsDB.getEvents([filter]);
|
||||
let events = await eventsDB.filter([filter]);
|
||||
|
||||
if (exclude_replies) {
|
||||
events = events.filter((event) => !findReplyTag(event));
|
||||
|
@ -293,7 +293,7 @@ const favouritesController: AppController = async (c) => {
|
|||
const pubkey = c.get('pubkey')!;
|
||||
const params = paginationSchema.parse(c.req.query());
|
||||
|
||||
const events7 = await eventsDB.getEvents(
|
||||
const events7 = await eventsDB.filter(
|
||||
[{ kinds: [7], authors: [pubkey], ...params }],
|
||||
{ signal: AbortSignal.timeout(1000) },
|
||||
);
|
||||
|
@ -302,7 +302,7 @@ const favouritesController: AppController = async (c) => {
|
|||
.map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1])
|
||||
.filter((id): id is string => !!id);
|
||||
|
||||
const events1 = await eventsDB.getEvents(
|
||||
const events1 = await eventsDB.filter(
|
||||
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
|
||||
{
|
||||
signal: AbortSignal.timeout(1000),
|
||||
|
|
|
@ -39,9 +39,9 @@ const adminAccountsController: AppController = async (c) => {
|
|||
|
||||
const { since, until, limit } = paginationSchema.parse(c.req.query());
|
||||
|
||||
const events = await eventsDB.getEvents([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]);
|
||||
const events = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]);
|
||||
const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!);
|
||||
const authors = await eventsDB.getEvents([{ kinds: [0], authors: pubkeys }]);
|
||||
const authors = await eventsDB.filter([{ kinds: [0], authors: pubkeys }]);
|
||||
|
||||
for (const event of events) {
|
||||
const d = event.tags.find(([name]) => name === 'd')?.[1];
|
||||
|
|
|
@ -7,7 +7,7 @@ import { renderAccounts } from '@/views.ts';
|
|||
const blocksController: AppController = async (c) => {
|
||||
const pubkey = c.get('pubkey')!;
|
||||
|
||||
const [event10000] = await eventsDB.getEvents([
|
||||
const [event10000] = await eventsDB.filter([
|
||||
{ kinds: [10000], authors: [pubkey], limit: 1 },
|
||||
]);
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ import { renderStatuses } from '@/views.ts';
|
|||
const bookmarksController: AppController = async (c) => {
|
||||
const pubkey = c.get('pubkey')!;
|
||||
|
||||
const [event10003] = await eventsDB.getEvents([
|
||||
const [event10003] = await eventsDB.filter([
|
||||
{ kinds: [10003], authors: [pubkey], limit: 1 },
|
||||
]);
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ const notificationsController: AppController = async (c) => {
|
|||
const pubkey = c.get('pubkey')!;
|
||||
const { since, until } = paginationSchema.parse(c.req.query());
|
||||
|
||||
const events = await eventsDB.getEvents(
|
||||
const events = await eventsDB.filter(
|
||||
[{ kinds: [1], '#p': [pubkey], since, until }],
|
||||
{ signal: AbortSignal.timeout(3000) },
|
||||
);
|
||||
|
|
|
@ -6,7 +6,7 @@ import { createAdminEvent } from '@/utils/api.ts';
|
|||
import { Conf } from '@/config.ts';
|
||||
|
||||
const frontendConfigController: AppController = async (c) => {
|
||||
const [event] = await eventsDB.getEvents([{
|
||||
const [event] = await eventsDB.filter([{
|
||||
kinds: [30078],
|
||||
authors: [Conf.pubkey],
|
||||
'#d': ['pub.ditto.frontendConfig'],
|
||||
|
|
|
@ -78,7 +78,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: Abort
|
|||
filter.authors = [account_id];
|
||||
}
|
||||
|
||||
return searchStore.getEvents([filter], { signal });
|
||||
return searchStore.filter([filter], { signal });
|
||||
}
|
||||
|
||||
/** Get event kinds to search from `type` query param. */
|
||||
|
@ -96,7 +96,7 @@ function typeToKinds(type: SearchQuery['type']): number[] {
|
|||
/** Resolve a searched value into an event, if applicable. */
|
||||
async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise<Event | undefined> {
|
||||
const filters = await getLookupFilters(query);
|
||||
const [event] = await searchStore.getEvents(filters, { limit: 1, signal });
|
||||
const [event] = await searchStore.filter(filters, { limit: 1, signal });
|
||||
return event;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ const hashtagTimelineController: AppController = (c) => {
|
|||
|
||||
/** Render statuses for timelines. */
|
||||
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
|
||||
const events = await eventsDB.getEvents(
|
||||
const events = await eventsDB.filter(
|
||||
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
|
||||
{ signal },
|
||||
);
|
||||
|
|
|
@ -63,7 +63,7 @@ function connectStream(socket: WebSocket) {
|
|||
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
|
||||
const filters = prepareFilters(rest);
|
||||
|
||||
for (const event of await eventsDB.getEvents(filters, { limit: FILTER_LIMIT })) {
|
||||
for (const event of await eventsDB.filter(filters, { limit: FILTER_LIMIT })) {
|
||||
send(['EVENT', subId, event]);
|
||||
}
|
||||
|
||||
|
@ -96,7 +96,7 @@ function connectStream(socket: WebSocket) {
|
|||
|
||||
/** Handle COUNT. Return the number of events matching the filters. */
|
||||
async function handleCount([_, subId, ...rest]: ClientCOUNT): Promise<void> {
|
||||
const count = await eventsDB.countEvents(prepareFilters(rest));
|
||||
const count = await eventsDB.count(prepareFilters(rest));
|
||||
send(['COUNT', subId, { count, approximate: false }]);
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ async function findUser(user: Partial<User>): Promise<User | undefined> {
|
|||
}
|
||||
}
|
||||
|
||||
const [event] = await eventsDB.getEvents([filter]);
|
||||
const [event] = await eventsDB.filter([filter]);
|
||||
|
||||
if (event) {
|
||||
return {
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import { client } from '@/client.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { addRelays } from '@/db/relays.ts';
|
||||
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
||||
|
@ -6,9 +5,8 @@ import { findUser } from '@/db/users.ts';
|
|||
import { Debug, type Event } from '@/deps.ts';
|
||||
import { isEphemeralKind } from '@/kinds.ts';
|
||||
import { isLocallyFollowed } from '@/queries.ts';
|
||||
import { reqmeister } from '@/reqmeister.ts';
|
||||
import { updateStats } from '@/stats.ts';
|
||||
import { eventsDB, memorelay } from '@/storages.ts';
|
||||
import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { getTagSet } from '@/tags.ts';
|
||||
import { type EventData } from '@/types.ts';
|
||||
|
@ -43,9 +41,9 @@ async function handleEvent(event: Event): Promise<void> {
|
|||
|
||||
/** Encounter the event, and return whether it has already been encountered. */
|
||||
async function encounterEvent(event: Event): Promise<boolean> {
|
||||
const preexisting = (await memorelay.countEvents([{ ids: [event.id] }])) > 0;
|
||||
memorelay.storeEvent(event);
|
||||
reqmeister.encounter(event);
|
||||
const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0;
|
||||
memorelay.add(event);
|
||||
reqmeister.add(event);
|
||||
return preexisting;
|
||||
}
|
||||
|
||||
|
@ -68,7 +66,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
|||
const { force = false } = opts;
|
||||
|
||||
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||
const [deletion] = await eventsDB.getEvents(
|
||||
const [deletion] = await eventsDB.filter(
|
||||
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
||||
{ limit: 1 },
|
||||
);
|
||||
|
@ -77,7 +75,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
|||
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
||||
} else {
|
||||
await Promise.all([
|
||||
eventsDB.storeEvent(event, { data }).catch(debug),
|
||||
eventsDB.add(event, { data }).catch(debug),
|
||||
updateStats(event).catch(debug),
|
||||
]);
|
||||
}
|
||||
|
@ -90,13 +88,13 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
|||
async function processDeletions(event: Event): Promise<void> {
|
||||
if (event.kind === 5) {
|
||||
const ids = getTagSet(event.tags, 'e');
|
||||
const events = await eventsDB.getEvents([{ ids: [...ids] }]);
|
||||
const events = await eventsDB.filter([{ ids: [...ids] }]);
|
||||
|
||||
const deleteIds = events
|
||||
.filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id))
|
||||
.map((event) => event.id);
|
||||
|
||||
await eventsDB.deleteEvents([{ ids: deleteIds }]);
|
||||
await eventsDB.deleteFilters([{ ids: deleteIds }]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,7 +139,7 @@ function fetchRelatedEvents(event: Event, data: EventData) {
|
|||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
||||
}
|
||||
for (const [name, id, relay] of event.tags) {
|
||||
if (name === 'e' && !memorelay.countEvents([{ ids: [id] }])) {
|
||||
if (name === 'e' && !memorelay.count([{ ids: [id] }])) {
|
||||
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
@ -175,7 +173,7 @@ function broadcast(event: Event, data: EventData) {
|
|||
if (!data.user || !isFresh(event)) return;
|
||||
|
||||
if (event.kind === 5) {
|
||||
client.storeEvent(event);
|
||||
client.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import { eventsDB, memorelay } from '@/storages.ts';
|
||||
import { eventsDB, memorelay, reqmeister } from '@/storages.ts';
|
||||
import { Debug, type Event, findReplyTag } from '@/deps.ts';
|
||||
import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
|
||||
import { reqmeister } from '@/reqmeister.ts';
|
||||
import { type DittoEvent } from '@/storages/types.ts';
|
||||
import { getTagSet } from '@/tags.ts';
|
||||
|
||||
|
@ -25,7 +24,7 @@ const getEvent = async <K extends number = number>(
|
|||
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||
const microfilter: IdMicrofilter = { ids: [id] };
|
||||
|
||||
const [memoryEvent] = await memorelay.getEvents([microfilter], opts) as DittoEvent<K>[];
|
||||
const [memoryEvent] = await memorelay.filter([microfilter], opts) as DittoEvent<K>[];
|
||||
|
||||
if (memoryEvent && !relations) {
|
||||
debug(`getEvent: ${id.slice(0, 8)} found in memory`);
|
||||
|
@ -37,13 +36,13 @@ const getEvent = async <K extends number = number>(
|
|||
filter.kinds = [kind];
|
||||
}
|
||||
|
||||
const dbEvent = await eventsDB.getEvents([filter], { limit: 1, signal })
|
||||
const dbEvent = await eventsDB.filter([filter], { limit: 1, signal })
|
||||
.then(([event]) => event);
|
||||
|
||||
// TODO: make this DRY-er.
|
||||
|
||||
if (dbEvent && !dbEvent.author) {
|
||||
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
||||
const [author] = await memorelay.filter([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
||||
dbEvent.author = author;
|
||||
}
|
||||
|
||||
|
@ -53,7 +52,7 @@ const getEvent = async <K extends number = number>(
|
|||
}
|
||||
|
||||
if (memoryEvent && !memoryEvent.author) {
|
||||
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
||||
const [author] = await memorelay.filter([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
||||
memoryEvent.author = author;
|
||||
}
|
||||
|
||||
|
@ -77,13 +76,13 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Ev
|
|||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
||||
|
||||
const [memoryEvent] = await memorelay.getEvents([microfilter], opts);
|
||||
const [memoryEvent] = await memorelay.filter([microfilter], opts);
|
||||
|
||||
if (memoryEvent && !relations) {
|
||||
return memoryEvent;
|
||||
}
|
||||
|
||||
const dbEvent = await eventsDB.getEvents(
|
||||
const dbEvent = await eventsDB.filter(
|
||||
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
||||
{ limit: 1, signal },
|
||||
).then(([event]) => event);
|
||||
|
@ -96,7 +95,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Ev
|
|||
|
||||
/** Get users the given pubkey follows. */
|
||||
const getFollows = async (pubkey: string, signal?: AbortSignal): Promise<Event<3> | undefined> => {
|
||||
const [event] = await eventsDB.getEvents([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
|
||||
const [event] = await eventsDB.filter([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
|
||||
return event;
|
||||
};
|
||||
|
||||
|
@ -132,7 +131,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
|
|||
}
|
||||
|
||||
function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
|
||||
return eventsDB.getEvents(
|
||||
return eventsDB.filter(
|
||||
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
|
||||
{ limit: 200, signal },
|
||||
);
|
||||
|
@ -140,7 +139,7 @@ function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Pr
|
|||
|
||||
/** Returns whether the pubkey is followed by a local user. */
|
||||
async function isLocallyFollowed(pubkey: string): Promise<boolean> {
|
||||
const [event] = await eventsDB.getEvents([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 });
|
||||
const [event] = await eventsDB.filter([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 });
|
||||
return Boolean(event);
|
||||
}
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
|
|||
|
||||
/** Get the last version of the event, if any. */
|
||||
async function maybeGetPrev<K extends number>(event: Event<K>): Promise<Event<K>> {
|
||||
const [prev] = await eventsDB.getEvents([
|
||||
const [prev] = await eventsDB.filter([
|
||||
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 },
|
||||
]);
|
||||
|
||||
|
|
|
@ -1,10 +1,21 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { db } from '@/db.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { activeRelays, pool } from '@/pool.ts';
|
||||
import { EventsDB } from '@/storages/events-db.ts';
|
||||
import { Memorelay } from '@/storages/memorelay.ts';
|
||||
import { Optimizer } from '@/storages/optimizer.ts';
|
||||
import { PoolStore } from '@/storages/pool-store.ts';
|
||||
import { Reqmeister } from '@/storages/reqmeister.ts';
|
||||
import { SearchStore } from '@/storages/search-store.ts';
|
||||
import { reqmeister } from '@/reqmeister.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
||||
/** Relay pool storage. */
|
||||
const client = new PoolStore({
|
||||
pool,
|
||||
relays: activeRelays,
|
||||
publisher: pipeline,
|
||||
});
|
||||
|
||||
/** SQLite database to store events this Ditto server cares about. */
|
||||
const eventsDB = new EventsDB(db);
|
||||
|
@ -12,6 +23,13 @@ const eventsDB = new EventsDB(db);
|
|||
/** In-memory data store for cached events. */
|
||||
const memorelay = new Memorelay({ max: 3000 });
|
||||
|
||||
/** Batches requests for single events. */
|
||||
const reqmeister = new Reqmeister({
|
||||
client,
|
||||
delay: Time.seconds(1),
|
||||
timeout: Time.seconds(1),
|
||||
});
|
||||
|
||||
/** Main Ditto storage adapter */
|
||||
const optimizer = new Optimizer({
|
||||
db: eventsDB,
|
||||
|
@ -25,4 +43,4 @@ const searchStore = new SearchStore({
|
|||
fallback: optimizer,
|
||||
});
|
||||
|
||||
export { eventsDB, memorelay, optimizer, searchStore };
|
||||
export { client, eventsDB, memorelay, optimizer, reqmeister, searchStore };
|
||||
|
|
|
@ -10,37 +10,37 @@ import { EventsDB } from './events-db.ts';
|
|||
const eventsDB = new EventsDB(db);
|
||||
|
||||
Deno.test('count filters', async () => {
|
||||
assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 0);
|
||||
await eventsDB.storeEvent(event1);
|
||||
assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 1);
|
||||
assertEquals(await eventsDB.count([{ kinds: [1] }]), 0);
|
||||
await eventsDB.add(event1);
|
||||
assertEquals(await eventsDB.count([{ kinds: [1] }]), 1);
|
||||
});
|
||||
|
||||
Deno.test('insert and filter events', async () => {
|
||||
await eventsDB.storeEvent(event1);
|
||||
await eventsDB.add(event1);
|
||||
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]);
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [3] }]), []);
|
||||
assertEquals(await eventsDB.getEvents([{ since: 1691091000 }]), [event1]);
|
||||
assertEquals(await eventsDB.getEvents([{ until: 1691091000 }]), []);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [3] }]), []);
|
||||
assertEquals(await eventsDB.filter([{ since: 1691091000 }]), [event1]);
|
||||
assertEquals(await eventsDB.filter([{ until: 1691091000 }]), []);
|
||||
assertEquals(
|
||||
await eventsDB.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
|
||||
await eventsDB.filter([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
|
||||
[event1],
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test('delete events', async () => {
|
||||
await eventsDB.storeEvent(event1);
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]);
|
||||
await eventsDB.deleteEvents([{ kinds: [1] }]);
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), []);
|
||||
await eventsDB.add(event1);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]);
|
||||
await eventsDB.deleteFilters([{ kinds: [1] }]);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [1] }]), []);
|
||||
});
|
||||
|
||||
Deno.test('query events with local filter', async () => {
|
||||
await eventsDB.storeEvent(event1);
|
||||
await eventsDB.add(event1);
|
||||
|
||||
assertEquals(await eventsDB.getEvents([{}]), [event1]);
|
||||
assertEquals(await eventsDB.getEvents([{ local: true }]), []);
|
||||
assertEquals(await eventsDB.getEvents([{ local: false }]), [event1]);
|
||||
assertEquals(await eventsDB.filter([{}]), [event1]);
|
||||
assertEquals(await eventsDB.filter([{ local: true }]), []);
|
||||
assertEquals(await eventsDB.filter([{ local: false }]), [event1]);
|
||||
|
||||
const userEvent = await buildUserEvent({
|
||||
username: 'alex',
|
||||
|
@ -48,20 +48,20 @@ Deno.test('query events with local filter', async () => {
|
|||
inserted_at: new Date(),
|
||||
admin: false,
|
||||
});
|
||||
await eventsDB.storeEvent(userEvent);
|
||||
await eventsDB.add(userEvent);
|
||||
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [1], local: true }]), [event1]);
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [1], local: false }]), []);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [1], local: true }]), [event1]);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [1], local: false }]), []);
|
||||
});
|
||||
|
||||
Deno.test('inserting replaceable events', async () => {
|
||||
assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0);
|
||||
assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 0);
|
||||
|
||||
await eventsDB.storeEvent(event0);
|
||||
await assertRejects(() => eventsDB.storeEvent(event0));
|
||||
assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1);
|
||||
await eventsDB.add(event0);
|
||||
await assertRejects(() => eventsDB.add(event0));
|
||||
assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 1);
|
||||
|
||||
const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 };
|
||||
await eventsDB.storeEvent(changeEvent);
|
||||
assertEquals(await eventsDB.getEvents([{ kinds: [0] }]), [changeEvent]);
|
||||
await eventsDB.add(changeEvent);
|
||||
assertEquals(await eventsDB.filter([{ kinds: [0] }]), [changeEvent]);
|
||||
});
|
||||
|
|
|
@ -65,7 +65,7 @@ class EventsDB implements EventStore {
|
|||
}
|
||||
|
||||
/** Insert an event (and its tags) into the database. */
|
||||
async storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||
async add(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||
this.#debug('EVENT', JSON.stringify(event));
|
||||
|
||||
if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) {
|
||||
|
@ -264,7 +264,7 @@ class EventsDB implements EventStore {
|
|||
}
|
||||
|
||||
/** Get events for filters from the database. */
|
||||
async getEvents<K extends number>(filters: DittoFilter<K>[], opts: GetEventsOpts = {}): Promise<DittoEvent<K>[]> {
|
||||
async filter<K extends number>(filters: DittoFilter<K>[], opts: GetEventsOpts = {}): Promise<DittoEvent<K>[]> {
|
||||
filters = normalizeFilters(filters); // Improves performance of `{ kinds: [0], authors: ['...'] }` queries.
|
||||
|
||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||
|
@ -337,7 +337,7 @@ class EventsDB implements EventStore {
|
|||
}
|
||||
|
||||
/** Delete events based on filters from the database. */
|
||||
async deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void> {
|
||||
async deleteFilters<K extends number>(filters: DittoFilter<K>[]): Promise<void> {
|
||||
if (!filters.length) return Promise.resolve();
|
||||
this.#debug('DELETE', JSON.stringify(filters));
|
||||
|
||||
|
@ -345,7 +345,7 @@ class EventsDB implements EventStore {
|
|||
}
|
||||
|
||||
/** Get number of events that would be returned by filters. */
|
||||
async countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
||||
async count<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
||||
if (!filters.length) return Promise.resolve(0);
|
||||
this.#debug('COUNT', JSON.stringify(filters));
|
||||
const query = this.getEventsQuery(filters);
|
||||
|
|
|
@ -14,7 +14,7 @@ async function hydrateEvents<K extends number>(opts: HydrateEventOpts<K>): Promi
|
|||
|
||||
if (filters.some((filter) => filter.relations?.includes('author'))) {
|
||||
const pubkeys = new Set([...events].map((event) => event.pubkey));
|
||||
const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }], { signal });
|
||||
const authors = await storage.filter([{ kinds: [0], authors: [...pubkeys] }], { signal });
|
||||
|
||||
for (const event of events) {
|
||||
event.author = authors.find((author) => author.pubkey === event.pubkey);
|
||||
|
|
|
@ -11,12 +11,12 @@ const memorelay = new Memorelay({
|
|||
});
|
||||
|
||||
Deno.test('memorelay', async () => {
|
||||
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);
|
||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 0);
|
||||
|
||||
await memorelay.storeEvent(event1);
|
||||
await memorelay.add(event1);
|
||||
|
||||
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 1);
|
||||
assertEquals(await memorelay.count([{ ids: [event1.id] }]), 1);
|
||||
|
||||
const result = await memorelay.getEvents([{ ids: [event1.id] }]);
|
||||
const result = await memorelay.filter([{ ids: [event1.id] }]);
|
||||
assertEquals(result[0], event1);
|
||||
});
|
||||
|
|
|
@ -26,7 +26,7 @@ class Memorelay implements EventStore {
|
|||
}
|
||||
|
||||
/** Get events from memory. */
|
||||
getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||
filter<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||
filters = normalizeFilters(filters);
|
||||
|
||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||
|
@ -91,20 +91,20 @@ class Memorelay implements EventStore {
|
|||
}
|
||||
|
||||
/** Insert an event into memory. */
|
||||
storeEvent(event: Event): Promise<void> {
|
||||
add(event: Event): Promise<void> {
|
||||
this.#cache.set(event.id, event);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
/** Count events in memory for the filters. */
|
||||
async countEvents(filters: Filter[]): Promise<number> {
|
||||
const events = await this.getEvents(filters);
|
||||
async count(filters: Filter[]): Promise<number> {
|
||||
const events = await this.filter(filters);
|
||||
return events.length;
|
||||
}
|
||||
|
||||
/** Delete events from memory. */
|
||||
async deleteEvents(filters: Filter[]): Promise<void> {
|
||||
for (const event of await this.getEvents(filters)) {
|
||||
async deleteFilters(filters: Filter[]): Promise<void> {
|
||||
for (const event of await this.filter(filters)) {
|
||||
this.#cache.delete(event.id);
|
||||
}
|
||||
return Promise.resolve();
|
||||
|
|
|
@ -25,14 +25,14 @@ class Optimizer implements EventStore {
|
|||
this.#client = opts.client;
|
||||
}
|
||||
|
||||
async storeEvent(event: DittoEvent<number>, opts?: StoreEventOpts | undefined): Promise<void> {
|
||||
async add(event: DittoEvent<number>, opts?: StoreEventOpts | undefined): Promise<void> {
|
||||
await Promise.all([
|
||||
this.#db.storeEvent(event, opts),
|
||||
this.#cache.storeEvent(event, opts),
|
||||
this.#db.add(event, opts),
|
||||
this.#cache.add(event, opts),
|
||||
]);
|
||||
}
|
||||
|
||||
async getEvents<K extends number>(
|
||||
async filter<K extends number>(
|
||||
filters: DittoFilter<K>[],
|
||||
opts: GetEventsOpts | undefined = {},
|
||||
): Promise<DittoEvent<K>[]> {
|
||||
|
@ -52,7 +52,7 @@ class Optimizer implements EventStore {
|
|||
if (filter.ids) {
|
||||
this.#debug(`Filter[${i}] is an IDs filter; querying cache...`);
|
||||
const ids = new Set<string>(filter.ids);
|
||||
for (const event of await this.#cache.getEvents([filter], opts)) {
|
||||
for (const event of await this.#cache.filter([filter], opts)) {
|
||||
ids.delete(event.id);
|
||||
results.add(event);
|
||||
if (results.size >= limit) return getResults();
|
||||
|
@ -66,7 +66,7 @@ class Optimizer implements EventStore {
|
|||
|
||||
// Query the database for events.
|
||||
this.#debug('Querying database...');
|
||||
for (const dbEvent of await this.#db.getEvents(filters, opts)) {
|
||||
for (const dbEvent of await this.#db.filter(filters, opts)) {
|
||||
results.add(dbEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
|
@ -79,14 +79,14 @@ class Optimizer implements EventStore {
|
|||
|
||||
// Query the cache again.
|
||||
this.#debug('Querying cache...');
|
||||
for (const cacheEvent of await this.#cache.getEvents(filters, opts)) {
|
||||
for (const cacheEvent of await this.#cache.filter(filters, opts)) {
|
||||
results.add(cacheEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
|
||||
// Finally, query the client.
|
||||
this.#debug('Querying client...');
|
||||
for (const clientEvent of await this.#client.getEvents(filters, opts)) {
|
||||
for (const clientEvent of await this.#client.filter(filters, opts)) {
|
||||
results.add(clientEvent);
|
||||
if (results.size >= limit) return getResults();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
import { Debug, type Event, type Filter, matchFilters, type RelayPoolWorker } from '@/deps.ts';
|
||||
import { normalizeFilters } from '@/filter.ts';
|
||||
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts';
|
||||
import { EventSet } from '@/utils/event-set.ts';
|
||||
|
||||
interface PoolStoreOpts {
|
||||
pool: InstanceType<typeof RelayPoolWorker>;
|
||||
relays: WebSocket['url'][];
|
||||
publisher: {
|
||||
handleEvent(event: Event): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
class PoolStore implements EventStore {
|
||||
#debug = Debug('ditto:client');
|
||||
#pool: InstanceType<typeof RelayPoolWorker>;
|
||||
#relays: WebSocket['url'][];
|
||||
#publisher: {
|
||||
handleEvent(event: Event): Promise<void>;
|
||||
};
|
||||
|
||||
supportedNips = [1];
|
||||
|
||||
constructor(opts: PoolStoreOpts) {
|
||||
this.#pool = opts.pool;
|
||||
this.#relays = opts.relays;
|
||||
this.#publisher = opts.publisher;
|
||||
}
|
||||
|
||||
add(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||
const { relays = this.#relays } = opts;
|
||||
this.#debug('EVENT', event);
|
||||
this.#pool.publish(event, relays);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
filter<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||
filters = normalizeFilters(filters);
|
||||
|
||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const results = new EventSet<Event<K>>();
|
||||
|
||||
const unsub = this.#pool.subscribe(
|
||||
filters,
|
||||
opts.relays ?? this.#relays,
|
||||
(event: Event | null) => {
|
||||
if (event && matchFilters(filters, event)) {
|
||||
this.#publisher.handleEvent(event).catch(() => {});
|
||||
results.add({
|
||||
id: event.id,
|
||||
kind: event.kind as K,
|
||||
pubkey: event.pubkey,
|
||||
content: event.content,
|
||||
tags: event.tags,
|
||||
created_at: event.created_at,
|
||||
sig: event.sig,
|
||||
});
|
||||
}
|
||||
if (typeof opts.limit === 'number' && results.size >= opts.limit) {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
() => {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
},
|
||||
);
|
||||
|
||||
opts.signal?.addEventListener('abort', () => {
|
||||
unsub();
|
||||
resolve([...results]);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
count() {
|
||||
return Promise.reject(new Error('COUNT not implemented'));
|
||||
}
|
||||
|
||||
deleteFilters() {
|
||||
return Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.'));
|
||||
}
|
||||
}
|
||||
|
||||
export { PoolStore };
|
|
@ -1,4 +1,3 @@
|
|||
import { client } from '@/client.ts';
|
||||
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
|
||||
import {
|
||||
AuthorMicrofilter,
|
||||
|
@ -11,9 +10,8 @@ import {
|
|||
import { type EventStore, GetEventsOpts } from '@/storages/types.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
||||
const debug = Debug('ditto:reqmeister');
|
||||
|
||||
interface ReqmeisterOpts {
|
||||
client: EventStore;
|
||||
delay?: number;
|
||||
timeout?: number;
|
||||
}
|
||||
|
@ -27,6 +25,8 @@ type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
|||
|
||||
/** Batches requests to Nostr relays using microfilters. */
|
||||
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> implements EventStore {
|
||||
#debug = Debug('ditto:reqmeister');
|
||||
|
||||
#opts: ReqmeisterOpts;
|
||||
#queue: ReqmeisterQueueItem[] = [];
|
||||
#promise!: Promise<void>;
|
||||
|
@ -34,7 +34,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
|
||||
supportedNips = [];
|
||||
|
||||
constructor(opts: ReqmeisterOpts = {}) {
|
||||
constructor(opts: ReqmeisterOpts) {
|
||||
super();
|
||||
this.#opts = opts;
|
||||
this.#tick();
|
||||
|
@ -49,7 +49,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
}
|
||||
|
||||
async #perform() {
|
||||
const { delay, timeout = Time.seconds(1) } = this.#opts;
|
||||
const { client, delay, timeout = Time.seconds(1) } = this.#opts;
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
|
||||
const queue = this.#queue;
|
||||
|
@ -73,11 +73,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
||||
|
||||
if (filters.length) {
|
||||
debug('REQ', JSON.stringify(filters));
|
||||
const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) });
|
||||
this.#debug('REQ', JSON.stringify(filters));
|
||||
const events = await client.filter(filters, { signal: AbortSignal.timeout(timeout) });
|
||||
|
||||
for (const event of events) {
|
||||
this.encounter(event);
|
||||
this.add(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,10 +119,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
});
|
||||
}
|
||||
|
||||
encounter(event: Event): void {
|
||||
add(event: Event): Promise<void> {
|
||||
const filterId = getFilterId(eventToMicroFilter(event));
|
||||
this.#queue = this.#queue.filter(([id]) => id !== filterId);
|
||||
this.emit(filterId, event);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
isWanted(event: Event): boolean {
|
||||
|
@ -130,7 +131,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
return this.#queue.some(([id]) => id === filterId);
|
||||
}
|
||||
|
||||
getEvents<K extends number>(filters: Filter<K>[], opts?: GetEventsOpts | undefined): Promise<Event<K>[]> {
|
||||
filter<K extends number>(filters: Filter<K>[], opts?: GetEventsOpts | undefined): Promise<Event<K>[]> {
|
||||
if (opts?.signal?.aborted) return Promise.resolve([]);
|
||||
if (!filters.length) return Promise.resolve([]);
|
||||
|
||||
|
@ -144,23 +145,13 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
|||
return Promise.all(promises);
|
||||
}
|
||||
|
||||
storeEvent(event: Event): Promise<void> {
|
||||
this.encounter(event);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
countEvents(_filters: Filter[]): Promise<number> {
|
||||
count(_filters: Filter[]): Promise<number> {
|
||||
throw new Error('COUNT not implemented.');
|
||||
}
|
||||
|
||||
deleteEvents(_filters: Filter[]): Promise<void> {
|
||||
deleteFilters(_filters: Filter[]): Promise<void> {
|
||||
throw new Error('DELETE not implemented.');
|
||||
}
|
||||
}
|
||||
|
||||
const reqmeister = new Reqmeister({
|
||||
delay: Time.seconds(1),
|
||||
timeout: Time.seconds(1),
|
||||
});
|
||||
|
||||
export { reqmeister };
|
||||
export { Reqmeister };
|
|
@ -30,11 +30,11 @@ class SearchStore implements EventStore {
|
|||
}
|
||||
}
|
||||
|
||||
storeEvent(_event: Event, _opts?: StoreEventOpts | undefined): Promise<void> {
|
||||
add(_event: Event, _opts?: StoreEventOpts | undefined): Promise<void> {
|
||||
throw new Error('EVENT not implemented.');
|
||||
}
|
||||
|
||||
async getEvents<K extends number>(
|
||||
async filter<K extends number>(
|
||||
filters: DittoFilter<K>[],
|
||||
opts?: GetEventsOpts | undefined,
|
||||
): Promise<DittoEvent<K>[]> {
|
||||
|
@ -69,15 +69,15 @@ class SearchStore implements EventStore {
|
|||
return hydrateEvents({ events: [...events], filters, storage: this.#hydrator, signal: opts?.signal });
|
||||
} else {
|
||||
this.#debug(`Searching for "${query}" locally...`);
|
||||
return this.#fallback.getEvents(filters, opts);
|
||||
return this.#fallback.filter(filters, opts);
|
||||
}
|
||||
}
|
||||
|
||||
countEvents<K extends number>(_filters: Filter<K>[]): Promise<number> {
|
||||
count<K extends number>(_filters: Filter<K>[]): Promise<number> {
|
||||
throw new Error('COUNT not implemented.');
|
||||
}
|
||||
|
||||
deleteEvents<K extends number>(_filters: Filter<K>[]): Promise<void> {
|
||||
deleteFilters<K extends number>(_filters: Filter<K>[]): Promise<void> {
|
||||
throw new Error('DELETE not implemented.');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,13 +38,13 @@ interface EventStore {
|
|||
/** Indicates NIPs supported by this data store, similar to NIP-11. For example, `50` would indicate support for `search` filters. */
|
||||
supportedNips: readonly number[];
|
||||
/** Add an event to the store. */
|
||||
storeEvent(event: Event, opts?: StoreEventOpts): Promise<void>;
|
||||
add(event: Event, opts?: StoreEventOpts): Promise<void>;
|
||||
/** Get events from filters. */
|
||||
getEvents<K extends number>(filters: DittoFilter<K>[], opts?: GetEventsOpts): Promise<DittoEvent<K>[]>;
|
||||
filter<K extends number>(filters: DittoFilter<K>[], opts?: GetEventsOpts): Promise<DittoEvent<K>[]>;
|
||||
/** Get the number of events from filters. */
|
||||
countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number>;
|
||||
count?<K extends number>(filters: DittoFilter<K>[]): Promise<number>;
|
||||
/** Delete events from filters. */
|
||||
deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void>;
|
||||
deleteFilters?<K extends number>(filters: DittoFilter<K>[]): Promise<void>;
|
||||
}
|
||||
|
||||
export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts };
|
||||
|
|
|
@ -51,7 +51,7 @@ async function updateEvent<K extends number, E extends EventStub<K>>(
|
|||
fn: (prev: Event<K> | undefined) => E,
|
||||
c: AppContext,
|
||||
): Promise<Event<K>> {
|
||||
const [prev] = await eventsDB.getEvents([filter], { limit: 1 });
|
||||
const [prev] = await eventsDB.filter([filter], { limit: 1 });
|
||||
return createEvent(fn(prev), c);
|
||||
}
|
||||
|
||||
|
|
|
@ -11,14 +11,14 @@ async function renderEventAccounts(c: AppContext, filters: Filter[], signal = Ab
|
|||
return c.json([]);
|
||||
}
|
||||
|
||||
const events = await eventsDB.getEvents(filters, { signal });
|
||||
const events = await eventsDB.filter(filters, { signal });
|
||||
const pubkeys = new Set(events.map(({ pubkey }) => pubkey));
|
||||
|
||||
if (!pubkeys.size) {
|
||||
return c.json([]);
|
||||
}
|
||||
|
||||
const authors = await eventsDB.getEvents(
|
||||
const authors = await eventsDB.filter(
|
||||
[{ kinds: [0], authors: [...pubkeys], relations: ['author_stats'] }],
|
||||
{ signal },
|
||||
);
|
||||
|
@ -33,7 +33,7 @@ async function renderEventAccounts(c: AppContext, filters: Filter[], signal = Ab
|
|||
async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) {
|
||||
const { since, until, limit } = paginationSchema.parse(c.req.query());
|
||||
|
||||
const events = await eventsDB.getEvents(
|
||||
const events = await eventsDB.filter(
|
||||
[{ kinds: [0], authors, relations: ['author_stats'], since, until, limit }],
|
||||
{ signal },
|
||||
);
|
||||
|
@ -53,7 +53,7 @@ async function renderStatuses(c: AppContext, ids: string[], signal = AbortSignal
|
|||
|
||||
const { limit } = paginationSchema.parse(c.req.query());
|
||||
|
||||
const events = await eventsDB.getEvents(
|
||||
const events = await eventsDB.filter(
|
||||
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'], limit }],
|
||||
{ signal },
|
||||
);
|
||||
|
|
|
@ -2,7 +2,7 @@ import { eventsDB } from '@/storages.ts';
|
|||
import { hasTag } from '@/tags.ts';
|
||||
|
||||
async function renderRelationship(sourcePubkey: string, targetPubkey: string) {
|
||||
const events = await eventsDB.getEvents([
|
||||
const events = await eventsDB.filter([
|
||||
{ kinds: [3], authors: [sourcePubkey], limit: 1 },
|
||||
{ kinds: [3], authors: [targetPubkey], limit: 1 },
|
||||
{ kinds: [10000], authors: [sourcePubkey], limit: 1 },
|
||||
|
|
|
@ -35,7 +35,7 @@ async function renderStatus(event: DittoEvent<1>, viewerPubkey?: string) {
|
|||
Promise.all(mentionedPubkeys.map(toMention)),
|
||||
firstUrl ? unfurlCardCached(firstUrl) : null,
|
||||
viewerPubkey
|
||||
? await eventsDB.getEvents([
|
||||
? await eventsDB.filter([
|
||||
{ kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
{ kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
{ kinds: [10001], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
|
||||
|
|
Loading…
Reference in New Issue