diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 8befa56..b304413 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -8,6 +8,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; +import { Sub } from '@/subs.ts'; import type { AppController } from '@/app.ts'; import type { Event, Filter } from '@/deps.ts'; @@ -49,15 +50,24 @@ function connectStream(socket: WebSocket) { } /** Handle REQ. Start a subscription. */ - async function handleReq([_, sub, ...filters]: ClientREQ) { - for (const event of await eventsDB.getFilters(prepareFilters(filters))) { - send(['EVENT', sub, event]); + async function handleReq([_, subId, ...filters]: ClientREQ): Promise { + const prepared = prepareFilters(filters); + + for (const event of await eventsDB.getFilters(prepared)) { + send(['EVENT', subId, event]); } - send(['EOSE', sub]); + + send(['EOSE', subId]); + + Sub.sub({ + id: subId, + filters: prepared, + socket, + }); } /** Handle EVENT. Store the event. */ - async function handleEvent([_, event]: ClientEVENT) { + async function handleEvent([_, event]: ClientEVENT): Promise { try { // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(event); @@ -72,13 +82,12 @@ function connectStream(socket: WebSocket) { } /** Handle CLOSE. Close the subscription. */ - function handleClose([_, _sub]: ClientCLOSE) { - // TODO: ??? - return; + function handleClose([_, subId]: ClientCLOSE): void { + Sub.unsub({ id: subId, socket }); } /** Send a message back to the client. */ - function send(msg: RelayMsg) { + function send(msg: RelayMsg): void { return socket.send(JSON.stringify(msg)); } } diff --git a/src/filter.ts b/src/filter.ts new file mode 100644 index 0000000..9cde0d1 --- /dev/null +++ b/src/filter.ts @@ -0,0 +1,27 @@ +import { type Event, matchFilters } from '@/deps.ts'; + +import type { DittoFilter, EventData } from '@/types.ts'; + +function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { + if (filter.local && !data.user) { + return false; + } + + return matchFilters([filter], event); +} + +/** + * Similar to nostr-tools `matchFilters`, but supports Ditto's custom keys. + * Database calls are needed to look up the extra data, so it's passed in as an argument. + */ +function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { + for (const filter of filters) { + if (matchDittoFilter(filter, event, data)) { + return true; + } + } + + return false; +} + +export { matchDittoFilters }; diff --git a/src/pipeline.ts b/src/pipeline.ts index af72b79..a6eb15f 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,26 +1,49 @@ import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; -import { type Event } from '@/deps.ts'; +import { type Event, LRUCache } from '@/deps.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; -import { isRelay, nostrDate } from '@/utils.ts'; +import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts'; + +import type { EventData } from '@/types.ts'; /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { + if (encounterEvent(event)) return; + const data = await getEventData(event); + await Promise.all([ - storeEvent(event), + storeEvent(event, data), trackRelays(event), trackHashtags(event), + streamOut(event, data), ]); } +/** Tracks encountered events to skip duplicates, improving idempotency and performance. */ +const encounters = new LRUCache({ max: 1000 }); + +/** Encounter the event, and return whether it has already been encountered. */ +function encounterEvent(event: Event) { + const result = encounters.get(event.id); + encounters.set(event.id, true); + return result; +} + +/** Preload data that will be useful to several tasks. */ +async function getEventData({ pubkey }: Event): Promise { + const user = await findUser({ pubkey }); + return { user }; +} + /** Maybe store the event, if eligible. */ -async function storeEvent(event: Event): Promise { - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { +async function storeEvent(event: Event, data: EventData): Promise { + if (data.user || await isLocallyFollowed(event.pubkey)) { await eventsDB.insertEvent(event).catch(console.warn); } else { return Promise.reject(new RelayError('blocked', 'only registered users can post')); @@ -63,6 +86,18 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Determine if the event is being received in a timely manner. */ +const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10); + +/** Distribute the event through active subscriptions. */ +function streamOut(event: Event, data: EventData) { + if (!isFresh(event)) return; + + for (const { socket, id } of Sub.matches(event, data)) { + socket.send(JSON.stringify(['EVENT', id, event])); + } +} + /** NIP-20 command line result. */ class RelayError extends Error { constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { diff --git a/src/subs.ts b/src/subs.ts new file mode 100644 index 0000000..82041c6 --- /dev/null +++ b/src/subs.ts @@ -0,0 +1,76 @@ +import { type Event } from '@/deps.ts'; +import { matchDittoFilters } from './filter.ts'; + +import type { DittoFilter, EventData } from '@/types.ts'; + +/** Nostr subscription to receive realtime events. */ +interface Subscription { + /** User-defined NIP-01 subscription ID. */ + id: string; + /** Event filters for the subscription. */ + filters: DittoFilter[]; + /** WebSocket to deliver results to. */ + socket: WebSocket; +} + +/** + * Manages Ditto event subscriptions. + * + * Subscriptions can be added, removed, and matched against events. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ +class SubscriptionStore { + #store = new Map>(); + + /** Add a subscription to the store. */ + sub(data: Subscription): void { + let subs = this.#store.get(data.socket); + + if (!subs) { + subs = new Map(); + this.#store.set(data.socket, subs); + } + + subs.set(data.id, data); + } + + /** Remove a subscription from the store. */ + unsub(sub: Pick): void { + this.#store.get(sub.socket)?.delete(sub.id); + } + + /** Remove an entire socket. */ + close(socket: WebSocket): void { + this.#store.delete(socket); + } + + /** + * Loop through matching subscriptions to stream out. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ + *matches(event: Event, data: EventData): Iterable { + for (const subs of this.#store.values()) { + for (const sub of subs.values()) { + if (matchDittoFilters(sub.filters, event, data)) { + yield sub; + } + } + } + } +} + +const Sub = new SubscriptionStore(); + +export { Sub }; diff --git a/src/types.ts b/src/types.ts index 096b847..beb184b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,4 @@ +import { UserRow } from '@/db.ts'; import { type Filter } from '@/deps.ts'; /** Custom filter interface that extends Nostr filters with extra options for Ditto. */ @@ -13,4 +14,8 @@ interface GetFiltersOpts { limit?: number; } -export type { DittoFilter, GetFiltersOpts }; +interface EventData { + user: UserRow | undefined; +} + +export type { DittoFilter, EventData, GetFiltersOpts };