diff --git a/src/filter.ts b/src/filter.ts index f59d6c1..7f641bb 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,8 +1,8 @@ import { Conf } from '@/config.ts'; import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts'; -import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { type EventData } from '@/types.ts'; import { isReplaceableKind } from '@/kinds.ts'; +import { nostrIdSchema } from '@/schemas/nostr.ts'; +import { type DittoEvent } from '@/storages/types.ts'; /** Additional properties that may be added by Ditto to events. */ type Relation = 'author' | 'author_stats' | 'event_stats'; @@ -22,8 +22,8 @@ type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] }; /** Filter to get one specific event. */ type MicroFilter = IdMicrofilter | AuthorMicrofilter; -function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { - if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) { +function matchDittoFilter(filter: DittoFilter, event: DittoEvent): boolean { + if (filter.local && !(event.user || event.pubkey === Conf.pubkey)) { return false; } @@ -34,9 +34,9 @@ function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): b * Similar to nostr-tools `matchFilters`, but supports Ditto's custom keys. * Database calls are needed to look up the extra data, so it's passed in as an argument. */ -function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { +function matchDittoFilters(filters: DittoFilter[], event: DittoEvent): boolean { for (const filter of filters) { - if (matchDittoFilter(filter, event, data)) { + if (matchDittoFilter(filter, event)) { return true; } } diff --git a/src/pipeline.ts b/src/pipeline.ts index b70550a..d29b2d0 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -2,7 +2,6 @@ import { Conf } from '@/config.ts'; import { encryptAdmin } from '@/crypto.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; -import { findUser } from '@/db/users.ts'; import { Debug, type Event, LNURL } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import { isLocallyFollowed } from '@/queries.ts'; @@ -10,13 +9,13 @@ import { updateStats } from '@/stats.ts'; import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; -import { type EventData } from '@/types.ts'; import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; import { TrendsWorker } from '@/workers/trends.ts'; import { verifySignatureWorker } from '@/workers/verify.ts'; import { signAdminEvent } from '@/sign.ts'; import { lnurlCache } from '@/utils/lnurl.ts'; +import { DittoEvent } from '@/storages/types.ts'; const debug = Debug('ditto:pipeline'); @@ -24,24 +23,24 @@ const debug = Debug('ditto:pipeline'); * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ -async function handleEvent(event: Event): Promise { +async function handleEvent(event: DittoEvent): Promise { const signal = AbortSignal.timeout(5000); if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); if (await encounterEvent(event)) return; debug(`Event<${event.kind}> ${event.id}`); - const data = await getEventData(event); + await hydrateEvent(event); await Promise.all([ - storeEvent(event, data, { force: wanted }), + storeEvent(event, { force: wanted }), processDeletions(event), trackRelays(event), trackHashtags(event), - fetchRelatedEvents(event, data, signal), - processMedia(event, data), - payZap(event, data, signal), - streamOut(event, data), - broadcast(event, data), + fetchRelatedEvents(event, signal), + processMedia(event), + payZap(event, signal), + streamOut(event), + broadcast(event), ]); } @@ -53,10 +52,10 @@ async function encounterEvent(event: Event): Promise { return preexisting; } -/** Preload data that will be useful to several tasks. */ -async function getEventData({ pubkey }: Event): Promise { - const user = await findUser({ pubkey }); - return { user }; +/** Hydrate the event with the user, if applicable. */ +async function hydrateEvent(event: DittoEvent): Promise { + const [user] = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], limit: 1 }]); + event.user = user; } /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ @@ -67,11 +66,11 @@ interface StoreEventOpts { } /** Maybe store the event, if eligible. */ -async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { +async function storeEvent(event: DittoEvent, opts: StoreEventOpts = {}): Promise { if (isEphemeralKind(event.kind)) return; const { force = false } = opts; - if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { + if (force || event.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const isDeleted = await eventsDB.count( [{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }], ) > 0; @@ -80,7 +79,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.add(event, { data }).catch(debug), + eventsDB.add(event).catch(debug), updateStats(event).catch(debug), ]); } @@ -144,8 +143,8 @@ function trackRelays(event: Event) { } /** Queue related events to fetch. */ -function fetchRelatedEvents(event: Event, data: EventData, signal: AbortSignal) { - if (!data.user) { +function fetchRelatedEvents(event: DittoEvent, signal: AbortSignal) { + if (!event.user) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {}); } for (const [name, id, relay] of event.tags) { @@ -156,7 +155,7 @@ function fetchRelatedEvents(event: Event, data: EventData, signal: AbortSignal) } /** Delete unattached media entries that are attached to the event. */ -function processMedia({ tags, pubkey }: Event, { user }: EventData) { +function processMedia({ tags, pubkey, user }: DittoEvent) { if (user) { const urls = getTagSet(tags, 'media'); return deleteAttachedMedia(pubkey, [...urls]); @@ -164,8 +163,8 @@ function processMedia({ tags, pubkey }: Event, { user }: EventData) { } /** Emit Nostr Wallet Connect event from zaps so users may pay. */ -async function payZap(event: Event, data: EventData, signal: AbortSignal) { - if (event.kind !== 9734 || !data.user) return; +async function payZap(event: DittoEvent, signal: AbortSignal) { + if (event.kind !== 9734 || !event.user) return; const lnurl = event.tags.find(([name]) => name === 'lnurl')?.[1]; const amount = Number(event.tags.find(([name]) => name === 'amount')?.[1]); @@ -212,10 +211,10 @@ async function payZap(event: Event, data: EventData, signal: AbortSignal) { const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10); /** Distribute the event through active subscriptions. */ -function streamOut(event: Event, data: EventData) { +function streamOut(event: Event) { if (!isFresh(event)) return; - for (const sub of Sub.matches(event, data)) { + for (const sub of Sub.matches(event)) { sub.stream(event); } } @@ -224,8 +223,8 @@ function streamOut(event: Event, data: EventData) { * Publish the event to other relays. * This should only be done in certain circumstances, like mentioning a user or publishing deletions. */ -function broadcast(event: Event, data: EventData) { - if (!data.user || !isFresh(event)) return; +function broadcast(event: DittoEvent) { + if (!event.user || !isFresh(event)) return; if (event.kind === 5) { client.add(event); diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index a2fec88..c24e08d 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -6,12 +6,11 @@ import { isDittoInternalKind, isParameterizedReplaceableKind, isReplaceableKind import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { isNostrId, isURL } from '@/utils.ts'; -import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from './types.ts'; +import { type DittoEvent, EventStore, type GetEventsOpts } from './types.ts'; /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { - event: Event; - opts: StoreEventOpts; + event: DittoEvent; count: number; value: string; }) => boolean; @@ -19,8 +18,8 @@ type TagCondition = ({ event, count, value }: { /** Conditions for when to index certain tags. */ const tagConditions: Record = { 'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind), - 'e': ({ event, count, value, opts }) => ((opts.data?.user && event.kind === 10003) || count < 15) && isNostrId(value), - 'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value), + 'e': ({ event, count, value }) => ((event.user && event.kind === 10003) || count < 15) && isNostrId(value), + 'media': ({ event, count, value }) => (event.user || count < 4) && isURL(value), 'P': ({ event, count, value }) => event.kind === 9735 && count === 0 && isNostrId(value), 'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value), 'proxy': ({ count, value }) => count === 0 && isURL(value), @@ -66,7 +65,7 @@ class EventsDB implements EventStore { } /** Insert an event (and its tags) into the database. */ - async add(event: Event, opts: StoreEventOpts = {}): Promise { + async add(event: DittoEvent): Promise { this.#debug('EVENT', JSON.stringify(event)); if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) { @@ -92,7 +91,7 @@ class EventsDB implements EventStore { /** Index event tags depending on the conditions defined above. */ async function indexTags() { - const tags = filterIndexableTags(event, opts); + const tags = filterIndexableTags(event); const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value })); if (!tags.length) return; @@ -361,7 +360,7 @@ class EventsDB implements EventStore { } /** Return only the tags that should be indexed. */ -function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { +function filterIndexableTags(event: DittoEvent): string[][] { const tagCounts: Record = {}; function getCount(name: string) { @@ -375,7 +374,6 @@ function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] { function checkCondition(name: string, value: string, condition: TagCondition) { return condition({ event, - opts, count: getCount(name), value, }); diff --git a/src/storages/types.ts b/src/storages/types.ts index 65a3725..2966464 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -1,7 +1,6 @@ import { type DittoDB } from '@/db.ts'; import { type Event } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; -import { type EventData } from '@/types.ts'; /** Additional options to apply to the whole subscription. */ interface GetEventsOpts { @@ -15,8 +14,6 @@ interface GetEventsOpts { /** Options when storing an event. */ interface StoreEventOpts { - /** Event data to store. */ - data?: EventData; /** Relays to use, if applicable. */ relays?: WebSocket['url'][]; } diff --git a/src/subs.ts b/src/subs.ts index 08ffbd2..100660e 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,9 +1,8 @@ -import { Debug, type Event } from '@/deps.ts'; +import { Debug } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; +import { type DittoEvent } from '@/storages/types.ts'; import { Subscription } from '@/subscription.ts'; -import type { DittoFilter } from '@/filter.ts'; -import type { EventData } from '@/types.ts'; - const debug = Debug('ditto:subs'); /** @@ -69,10 +68,10 @@ class SubscriptionStore { * } * ``` */ - *matches(event: Event, data: EventData): Iterable { + *matches(event: DittoEvent): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (sub.matches(event, data)) { + if (sub.matches(event)) { yield sub; } } diff --git a/src/subscription.ts b/src/subscription.ts index 9492ab2..ec17dc4 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,7 +1,6 @@ import { type Event, Machina } from '@/deps.ts'; import { type DittoFilter, matchDittoFilters } from '@/filter.ts'; - -import type { EventData } from '@/types.ts'; +import { type DittoEvent } from '@/storages/types.ts'; class Subscription implements AsyncIterable> { filters: DittoFilter[]; @@ -16,8 +15,8 @@ class Subscription implements AsyncIterable> this.#machina.push(event); } - matches(event: Event, data: EventData): boolean { - return matchDittoFilters(this.filters, event, data); + matches(event: DittoEvent): boolean { + return matchDittoFilters(this.filters, event); } close() { diff --git a/src/types.ts b/src/types.ts deleted file mode 100644 index 445a6d1..0000000 --- a/src/types.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { User } from '@/db/users.ts'; -interface EventData { - user: User | undefined; -} - -export type { EventData };