From a676b71d23cfdcc36740d0c735909a7984e501c9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 23 Aug 2023 23:25:38 -0500 Subject: [PATCH 1/6] relay: make Nostr streaming work --- src/controllers/nostr/relay.ts | 27 ++++++++---- src/pipeline.ts | 9 ++++ src/subs.ts | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 src/subs.ts diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 8befa56..b304413 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -8,6 +8,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; +import { Sub } from '@/subs.ts'; import type { AppController } from '@/app.ts'; import type { Event, Filter } from '@/deps.ts'; @@ -49,15 +50,24 @@ function connectStream(socket: WebSocket) { } /** Handle REQ. Start a subscription. */ - async function handleReq([_, sub, ...filters]: ClientREQ) { - for (const event of await eventsDB.getFilters(prepareFilters(filters))) { - send(['EVENT', sub, event]); + async function handleReq([_, subId, ...filters]: ClientREQ): Promise { + const prepared = prepareFilters(filters); + + for (const event of await eventsDB.getFilters(prepared)) { + send(['EVENT', subId, event]); } - send(['EOSE', sub]); + + send(['EOSE', subId]); + + Sub.sub({ + id: subId, + filters: prepared, + socket, + }); } /** Handle EVENT. Store the event. */ - async function handleEvent([_, event]: ClientEVENT) { + async function handleEvent([_, event]: ClientEVENT): Promise { try { // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(event); @@ -72,13 +82,12 @@ function connectStream(socket: WebSocket) { } /** Handle CLOSE. Close the subscription. */ - function handleClose([_, _sub]: ClientCLOSE) { - // TODO: ??? - return; + function handleClose([_, subId]: ClientCLOSE): void { + Sub.unsub({ id: subId, socket }); } /** Send a message back to the client. */ - function send(msg: RelayMsg) { + function send(msg: RelayMsg): void { return socket.send(JSON.stringify(msg)); } } diff --git a/src/pipeline.ts b/src/pipeline.ts index af72b79..7632464 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -3,6 +3,7 @@ import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event } from '@/deps.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; import { isRelay, nostrDate } from '@/utils.ts'; @@ -15,6 +16,7 @@ async function handleEvent(event: Event): Promise { storeEvent(event), trackRelays(event), trackHashtags(event), + streamOut(event), ]); } @@ -63,6 +65,13 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Distribute the event through active subscriptions. */ +function streamOut(event: Event) { + for (const sub of Sub.matches(event)) { + sub.socket.send(JSON.stringify(['EVENT', event])); + } +} + /** NIP-20 command line result. */ class RelayError extends Error { constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { diff --git a/src/subs.ts b/src/subs.ts new file mode 100644 index 0000000..2a63214 --- /dev/null +++ b/src/subs.ts @@ -0,0 +1,75 @@ +import { type Event, matchFilters } from '@/deps.ts'; + +import type { DittoFilter } from '@/types.ts'; + +/** Nostr subscription to receive realtime events. */ +interface Subscription { + /** User-defined NIP-01 subscription ID. */ + id: string; + /** Event filters for the subscription. */ + filters: DittoFilter[]; + /** WebSocket to deliver results to. */ + socket: WebSocket; +} + +/** + * Manages Ditto event subscriptions. + * + * Subscriptions can be added, removed, and matched against events. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ +class SubscriptionStore { + #store = new Map>(); + + /** Add a subscription to the store. */ + sub(data: Subscription): void { + let subs = this.#store.get(data.socket); + + if (!subs) { + subs = new Map(); + this.#store.set(data.socket, subs); + } + + subs.set(data.id, data); + } + + /** Remove a subscription from the store. */ + unsub(sub: Pick): void { + this.#store.get(sub.socket)?.delete(sub.id); + } + + /** Remove an entire socket. */ + close(socket: WebSocket): void { + this.#store.delete(socket); + } + + /** + * Loop through matching subscriptions to stream out. + * + * ```ts + * for (const sub of Sub.matches(event)) { + * // Send event to sub.socket + * sub.socket.send(JSON.stringify(event)); + * } + * ``` + */ + *matches(event: Event): Iterable { + for (const subs of this.#store.values()) { + for (const sub of subs.values()) { + if (matchFilters(sub.filters, event)) { + yield sub; + } + } + } + } +} + +const Sub = new SubscriptionStore(); + +export { Sub }; From 658dd397f56da4684678c93738064f5c4d21e16a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 24 Aug 2023 15:28:13 -0500 Subject: [PATCH 2/6] relay: respect "local: true" filter --- src/filter.ts | 27 +++++++++++++++++++++++++++ src/pipeline.ts | 4 ++-- src/subs.ts | 14 +++++++++----- src/utils.ts | 8 ++++++++ 4 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 src/filter.ts diff --git a/src/filter.ts b/src/filter.ts new file mode 100644 index 0000000..d813c62 --- /dev/null +++ b/src/filter.ts @@ -0,0 +1,27 @@ +import { type Event, matchFilters } from '@/deps.ts'; + +import type { DittoFilter } from '@/types.ts'; + +interface EventData { + isLocal: boolean; +} + +function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { + if (filter.local && !data.isLocal) { + return false; + } + + return matchFilters([filter], event); +} + +function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { + for (const filter of filters) { + if (matchDittoFilter(filter, event, data)) { + return true; + } + } + + return false; +} + +export { matchDittoFilters }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 7632464..0507e75 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -66,8 +66,8 @@ function trackRelays(event: Event) { } /** Distribute the event through active subscriptions. */ -function streamOut(event: Event) { - for (const sub of Sub.matches(event)) { +async function streamOut(event: Event) { + for await (const sub of Sub.matches(event)) { sub.socket.send(JSON.stringify(['EVENT', event])); } } diff --git a/src/subs.ts b/src/subs.ts index 2a63214..afe265a 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,4 +1,6 @@ -import { type Event, matchFilters } from '@/deps.ts'; +import { type Event } from '@/deps.ts'; +import { matchDittoFilters } from './filter.ts'; +import { isEventLocal } from '@/utils.ts'; import type { DittoFilter } from '@/types.ts'; @@ -18,7 +20,7 @@ interface Subscription { * Subscriptions can be added, removed, and matched against events. * * ```ts - * for (const sub of Sub.matches(event)) { + * for await (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } @@ -53,16 +55,18 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for (const sub of Sub.matches(event)) { + * for await (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } * ``` */ - *matches(event: Event): Iterable { + async *matches(event: Event) { + const isLocal = await isEventLocal(event); + for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchFilters(sub.filters, event)) { + if (matchDittoFilters(sub.filters, event, { isLocal })) { yield sub; } } diff --git a/src/utils.ts b/src/utils.ts index 653c824..6cd6dd4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,3 +1,4 @@ +import { findUser } from '@/db/users.ts'; import { type Event, nip19, z } from '@/deps.ts'; import { lookupNip05Cached } from '@/nip05.ts'; import { getAuthor } from '@/queries.ts'; @@ -101,11 +102,18 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean { ); } +/** Check whether the event belongs to a local user. */ +async function isEventLocal(event: Event) { + const user = await findUser({ pubkey: event.pubkey }); + return !!user; +} + export { bech32ToPubkey, eventAge, eventDateComparator, findTag, + isEventLocal, isFollowing, isRelay, lookupAccount, From f1c465beea1a1f29459a14f46f3ce1468b41744d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 24 Aug 2023 17:00:08 -0500 Subject: [PATCH 3/6] pipeline: optimize database calls --- src/filter.ts | 12 ++++++------ src/pipeline.ts | 24 +++++++++++++++++------- src/subs.ts | 13 +++++-------- src/types.ts | 7 ++++++- src/utils.ts | 7 ------- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/filter.ts b/src/filter.ts index d813c62..9cde0d1 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,19 +1,19 @@ import { type Event, matchFilters } from '@/deps.ts'; -import type { DittoFilter } from '@/types.ts'; - -interface EventData { - isLocal: boolean; -} +import type { DittoFilter, EventData } from '@/types.ts'; function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { - if (filter.local && !data.isLocal) { + if (filter.local && !data.user) { return false; } return matchFilters([filter], event); } +/** + * Similar to nostr-tools `matchFilters`, but supports Ditto's custom keys. + * Database calls are needed to look up the extra data, so it's passed in as an argument. + */ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData): boolean { for (const filter of filters) { if (matchDittoFilter(filter, event, data)) { diff --git a/src/pipeline.ts b/src/pipeline.ts index 0507e75..770f1bd 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -7,22 +7,32 @@ import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; import { isRelay, nostrDate } from '@/utils.ts'; +import type { EventData } from '@/types.ts'; + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { + const data = await getEventData(event); + await Promise.all([ - storeEvent(event), + storeEvent(event, data), trackRelays(event), trackHashtags(event), - streamOut(event), + streamOut(event, data), ]); } +/** Preload data that will be useful to several tasks. */ +async function getEventData({ pubkey }: Event): Promise { + const user = await findUser({ pubkey }); + return { user }; +} + /** Maybe store the event, if eligible. */ -async function storeEvent(event: Event): Promise { - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { +async function storeEvent(event: Event, data: EventData): Promise { + if (data.user || await isLocallyFollowed(event.pubkey)) { await eventsDB.insertEvent(event).catch(console.warn); } else { return Promise.reject(new RelayError('blocked', 'only registered users can post')); @@ -66,9 +76,9 @@ function trackRelays(event: Event) { } /** Distribute the event through active subscriptions. */ -async function streamOut(event: Event) { - for await (const sub of Sub.matches(event)) { - sub.socket.send(JSON.stringify(['EVENT', event])); +function streamOut(event: Event, data: EventData) { + for (const { socket, id } of Sub.matches(event, data)) { + socket.send(JSON.stringify(['EVENT', id, event])); } } diff --git a/src/subs.ts b/src/subs.ts index afe265a..82041c6 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,8 +1,7 @@ import { type Event } from '@/deps.ts'; import { matchDittoFilters } from './filter.ts'; -import { isEventLocal } from '@/utils.ts'; -import type { DittoFilter } from '@/types.ts'; +import type { DittoFilter, EventData } from '@/types.ts'; /** Nostr subscription to receive realtime events. */ interface Subscription { @@ -20,7 +19,7 @@ interface Subscription { * Subscriptions can be added, removed, and matched against events. * * ```ts - * for await (const sub of Sub.matches(event)) { + * for (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } @@ -55,18 +54,16 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for await (const sub of Sub.matches(event)) { + * for (const sub of Sub.matches(event)) { * // Send event to sub.socket * sub.socket.send(JSON.stringify(event)); * } * ``` */ - async *matches(event: Event) { - const isLocal = await isEventLocal(event); - + *matches(event: Event, data: EventData): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchDittoFilters(sub.filters, event, { isLocal })) { + if (matchDittoFilters(sub.filters, event, data)) { yield sub; } } diff --git a/src/types.ts b/src/types.ts index 096b847..beb184b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,4 @@ +import { UserRow } from '@/db.ts'; import { type Filter } from '@/deps.ts'; /** Custom filter interface that extends Nostr filters with extra options for Ditto. */ @@ -13,4 +14,8 @@ interface GetFiltersOpts { limit?: number; } -export type { DittoFilter, GetFiltersOpts }; +interface EventData { + user: UserRow | undefined; +} + +export type { DittoFilter, EventData, GetFiltersOpts }; diff --git a/src/utils.ts b/src/utils.ts index 6cd6dd4..138cc7a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -102,18 +102,11 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean { ); } -/** Check whether the event belongs to a local user. */ -async function isEventLocal(event: Event) { - const user = await findUser({ pubkey: event.pubkey }); - return !!user; -} - export { bech32ToPubkey, eventAge, eventDateComparator, findTag, - isEventLocal, isFollowing, isRelay, lookupAccount, From a0dff12ca04b9907cf511c0520b033cd422852c9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 24 Aug 2023 17:26:46 -0500 Subject: [PATCH 4/6] pipeline: don't stream events older than 10 seconds --- src/pipeline.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 770f1bd..9c6905c 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -5,7 +5,7 @@ import { type Event } from '@/deps.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; -import { isRelay, nostrDate } from '@/utils.ts'; +import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts'; import type { EventData } from '@/types.ts'; @@ -75,8 +75,13 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Determine if the event is being received in a timely manner. */ +const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10); + /** Distribute the event through active subscriptions. */ function streamOut(event: Event, data: EventData) { + if (!isFresh(event)) return; + for (const { socket, id } of Sub.matches(event, data)) { socket.send(JSON.stringify(['EVENT', id, event])); } From 2f7914f044f419949786d924a4d1032e580efb0b Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 24 Aug 2023 17:39:24 -0500 Subject: [PATCH 5/6] pipeline: skip previously encountered events --- src/pipeline.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 9c6905c..a6eb15f 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,7 +1,7 @@ import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; -import { type Event } from '@/deps.ts'; +import { type Event, LRUCache } from '@/deps.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; @@ -14,6 +14,7 @@ import type { EventData } from '@/types.ts'; * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { + if (encounterEvent(event)) return; const data = await getEventData(event); await Promise.all([ @@ -24,6 +25,16 @@ async function handleEvent(event: Event): Promise { ]); } +/** Tracks encountered events to skip duplicates, improving idempotency and performance. */ +const encounters = new LRUCache({ max: 1000 }); + +/** Encounter the event, and return whether it has already been encountered. */ +function encounterEvent(event: Event) { + const result = encounters.get(event.id); + encounters.set(event.id, true); + return result; +} + /** Preload data that will be useful to several tasks. */ async function getEventData({ pubkey }: Event): Promise { const user = await findUser({ pubkey }); From f9de6932ac06c5df45e967167330a6ae4dc3f673 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 24 Aug 2023 17:42:28 -0500 Subject: [PATCH 6/6] utils: remove unused import --- src/utils.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 138cc7a..653c824 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,3 @@ -import { findUser } from '@/db/users.ts'; import { type Event, nip19, z } from '@/deps.ts'; import { lookupNip05Cached } from '@/nip05.ts'; import { getAuthor } from '@/queries.ts';