pipeline: optimize database calls

This commit is contained in:
Alex Gleason 2023-08-24 17:00:08 -05:00
parent 658dd397f5
commit f1c465beea
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
5 changed files with 34 additions and 29 deletions

View File

@ -1,19 +1,19 @@
import { type Event, matchFilters } from '@/deps.ts'; import { type Event, matchFilters } from '@/deps.ts';
import type { DittoFilter } from '@/types.ts'; import type { DittoFilter, EventData } from '@/types.ts';
interface EventData {
isLocal: boolean;
}
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
if (filter.local && !data.isLocal) { if (filter.local && !data.user) {
return false; return false;
} }
return matchFilters([filter], event); return matchFilters([filter], event);
} }
/**
* Similar to nostr-tools `matchFilters`, but supports Ditto's custom keys.
* Database calls are needed to look up the extra data, so it's passed in as an argument.
*/
function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean {
for (const filter of filters) { for (const filter of filters) {
if (matchDittoFilter(filter, event, data)) { if (matchDittoFilter(filter, event, data)) {

View File

@ -7,22 +7,32 @@ import { Sub } from '@/subs.ts';
import { trends } from '@/trends.ts'; import { trends } from '@/trends.ts';
import { isRelay, nostrDate } from '@/utils.ts'; import { isRelay, nostrDate } from '@/utils.ts';
import type { EventData } from '@/types.ts';
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event. * It is idempotent, so it can be called multiple times for the same event.
*/ */
async function handleEvent(event: Event): Promise<void> { async function handleEvent(event: Event): Promise<void> {
const data = await getEventData(event);
await Promise.all([ await Promise.all([
storeEvent(event), storeEvent(event, data),
trackRelays(event), trackRelays(event),
trackHashtags(event), trackHashtags(event),
streamOut(event), streamOut(event, data),
]); ]);
} }
/** Preload data that will be useful to several tasks. */
async function getEventData({ pubkey }: Event): Promise<EventData> {
const user = await findUser({ pubkey });
return { user };
}
/** Maybe store the event, if eligible. */ /** Maybe store the event, if eligible. */
async function storeEvent(event: Event): Promise<void> { async function storeEvent(event: Event, data: EventData): Promise<void> {
if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { if (data.user || await isLocallyFollowed(event.pubkey)) {
await eventsDB.insertEvent(event).catch(console.warn); await eventsDB.insertEvent(event).catch(console.warn);
} else { } else {
return Promise.reject(new RelayError('blocked', 'only registered users can post')); return Promise.reject(new RelayError('blocked', 'only registered users can post'));
@ -66,9 +76,9 @@ function trackRelays(event: Event) {
} }
/** Distribute the event through active subscriptions. */ /** Distribute the event through active subscriptions. */
async function streamOut(event: Event) { function streamOut(event: Event, data: EventData) {
for await (const sub of Sub.matches(event)) { for (const { socket, id } of Sub.matches(event, data)) {
sub.socket.send(JSON.stringify(['EVENT', event])); socket.send(JSON.stringify(['EVENT', id, event]));
} }
} }

View File

@ -1,8 +1,7 @@
import { type Event } from '@/deps.ts'; import { type Event } from '@/deps.ts';
import { matchDittoFilters } from './filter.ts'; import { matchDittoFilters } from './filter.ts';
import { isEventLocal } from '@/utils.ts';
import type { DittoFilter } from '@/types.ts'; import type { DittoFilter, EventData } from '@/types.ts';
/** Nostr subscription to receive realtime events. */ /** Nostr subscription to receive realtime events. */
interface Subscription { interface Subscription {
@ -20,7 +19,7 @@ interface Subscription {
* Subscriptions can be added, removed, and matched against events. * Subscriptions can be added, removed, and matched against events.
* *
* ```ts * ```ts
* for await (const sub of Sub.matches(event)) { * for (const sub of Sub.matches(event)) {
* // Send event to sub.socket * // Send event to sub.socket
* sub.socket.send(JSON.stringify(event)); * sub.socket.send(JSON.stringify(event));
* } * }
@ -55,18 +54,16 @@ class SubscriptionStore {
* Loop through matching subscriptions to stream out. * Loop through matching subscriptions to stream out.
* *
* ```ts * ```ts
* for await (const sub of Sub.matches(event)) { * for (const sub of Sub.matches(event)) {
* // Send event to sub.socket * // Send event to sub.socket
* sub.socket.send(JSON.stringify(event)); * sub.socket.send(JSON.stringify(event));
* } * }
* ``` * ```
*/ */
async *matches(event: Event) { *matches(event: Event, data: EventData): Iterable<Subscription> {
const isLocal = await isEventLocal(event);
for (const subs of this.#store.values()) { for (const subs of this.#store.values()) {
for (const sub of subs.values()) { for (const sub of subs.values()) {
if (matchDittoFilters(sub.filters, event, { isLocal })) { if (matchDittoFilters(sub.filters, event, data)) {
yield sub; yield sub;
} }
} }

View File

@ -1,3 +1,4 @@
import { UserRow } from '@/db.ts';
import { type Filter } from '@/deps.ts'; import { type Filter } from '@/deps.ts';
/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ /** Custom filter interface that extends Nostr filters with extra options for Ditto. */
@ -13,4 +14,8 @@ interface GetFiltersOpts {
limit?: number; limit?: number;
} }
export type { DittoFilter, GetFiltersOpts }; interface EventData {
user: UserRow | undefined;
}
export type { DittoFilter, EventData, GetFiltersOpts };

View File

@ -102,18 +102,11 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean {
); );
} }
/** Check whether the event belongs to a local user. */
async function isEventLocal(event: Event) {
const user = await findUser({ pubkey: event.pubkey });
return !!user;
}
export { export {
bech32ToPubkey, bech32ToPubkey,
eventAge, eventAge,
eventDateComparator, eventDateComparator,
findTag, findTag,
isEventLocal,
isFollowing, isFollowing,
isRelay, isRelay,
lookupAccount, lookupAccount,