diff --git a/src/filter.ts b/src/filter.ts index d813c62..9cde0d1 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,19 +1,19 @@ import { type Event, matchFilters } from '@/deps.ts'; -import type { DittoFilter } from '@/types.ts'; - -interface EventData { - isLocal: boolean; -} +import type { DittoFilter, EventData } from '@/types.ts'; function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { - if (filter.local && !data.isLocal) { + if (filter.local && !data.user) { return false; } return matchFilters([filter], event); } +/** + * Similar to nostr-tools `matchFilters`, but supports Ditto's custom keys. + * Database calls are needed to look up the extra data, so it's passed in as an argument. + */ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { for (const filter of filters) { if (matchDittoFilter(filter, event, data)) { diff --git a/src/pipeline.ts b/src/pipeline.ts index 0507e75..770f1bd 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -7,22 +7,32 @@ import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; import { isRelay, nostrDate } from '@/utils.ts'; +import type { EventData } from '@/types.ts'; + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { + const data = await getEventData(event); + await Promise.all([ - storeEvent(event), + storeEvent(event, data), trackRelays(event), trackHashtags(event), - streamOut(event), + streamOut(event, data), ]); } +/** Preload data that will be useful to several tasks. */ +async function getEventData({ pubkey }: Event): Promise { + const user = await findUser({ pubkey }); + return { user }; +} + /** Maybe store the event, if eligible. */ -async function storeEvent(event: Event): Promise { - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { +async function storeEvent(event: Event, data: EventData): Promise { + if (data.user || await isLocallyFollowed(event.pubkey)) { await eventsDB.insertEvent(event).catch(console.warn); } else { return Promise.reject(new RelayError('blocked', 'only registered users can post')); @@ -66,9 +76,9 @@ function trackRelays(event: Event) { } /** Distribute the event through active subscriptions. */ -async function streamOut(event: Event) { - for await (const sub of Sub.matches(event)) { - sub.socket.send(JSON.stringify(['EVENT', event])); +function streamOut(event: Event, data: EventData) { + for (const { socket, id } of Sub.matches(event, data)) { + socket.send(JSON.stringify(['EVENT', id, event])); } } diff --git a/src/subs.ts b/src/subs.ts index afe265a..82041c6 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,8 +1,7 @@ import { type Event } from '@/deps.ts'; import { matchDittoFilters } from './filter.ts'; -import { isEventLocal } from '@/utils.ts'; -import type { DittoFilter } from '@/types.ts'; +import type { DittoFilter, EventData } from '@/types.ts'; /** Nostr subscription to receive realtime events. */ interface Subscription { @@ -20,7 +19,7 @@ interface Subscription { * Subscriptions can be added, removed, and matched against events. * * ```ts - * for await (const sub of Sub.matches(event)) { + * for (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } @@ -55,18 +54,16 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for await (const sub of Sub.matches(event)) { + * for (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } * ``` */ - async *matches(event: Event) { - const isLocal = await isEventLocal(event); - + *matches(event: Event, data: EventData): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchDittoFilters(sub.filters, event, { isLocal })) { + if (matchDittoFilters(sub.filters, event, data)) { yield sub; } } diff --git a/src/types.ts b/src/types.ts index 096b847..beb184b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,4 @@ +import { UserRow } from '@/db.ts'; import { type Filter } from '@/deps.ts'; /** Custom filter interface that extends Nostr filters with extra options for Ditto. */ @@ -13,4 +14,8 @@ interface GetFiltersOpts { limit?: number; } -export type { DittoFilter, GetFiltersOpts }; +interface EventData { + user: UserRow | undefined; +} + +export type { DittoFilter, EventData, GetFiltersOpts }; diff --git a/src/utils.ts b/src/utils.ts index 6cd6dd4..138cc7a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -102,18 +102,11 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean { ); } -/** Check whether the event belongs to a local user. */ -async function isEventLocal(event: Event) { - const user = await findUser({ pubkey: event.pubkey }); - return !!user; -} - export { bech32ToPubkey, eventAge, eventDateComparator, findTag, - isEventLocal, isFollowing, isRelay, lookupAccount,