From 58b12ae462a9fedcb1e025f50e6e526741d40d39 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 22 Dec 2023 10:23:48 -0600 Subject: [PATCH 1/3] client: fix wrong import of allRelays --- src/client.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.ts b/src/client.ts index c3d7cd9..25bbe65 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,6 @@ import { type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; -import { allRelays, pool } from '@/pool.ts'; +import { activeRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; @@ -13,7 +13,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts const unsub = pool.subscribe( filters, - allRelays, + activeRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); From 6d6e3bcecc871996719b605b49967c4345f46f26 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 22 Dec 2023 10:24:14 -0600 Subject: [PATCH 2/3] Move console.info from firehose to pipeline --- src/firehose.ts | 2 -- src/pipeline.ts | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/firehose.ts b/src/firehose.ts index ea6be62..5e24de7 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -17,8 +17,6 @@ pool.subscribe( /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { - console.info(`firehose: Event<${event.kind}> ${event.id}`); - return pipeline .handleEvent(event) .catch(() => {}); diff --git a/src/pipeline.ts b/src/pipeline.ts index 4f0c51a..cf721c4 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -24,6 +24,7 @@ import type { EventData } from '@/types.ts'; async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; if (encounterEvent(event)) return; + console.info(`pipeline: Event<${event.kind}> ${event.id}`); const data = await getEventData(event); await Promise.all([ From ad0aaf97dd9d65fefe98bbb1a2ac39c46640b754 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 22 Dec 2023 10:38:48 -0600 Subject: [PATCH 3/3] Replace all timeouts with AbortSignal --- src/client.ts | 15 ++++++--------- src/controllers/api/accounts.ts | 6 +++--- src/controllers/api/notifications.ts | 3 +-- src/controllers/api/search.ts | 6 +++--- src/controllers/api/timelines.ts | 5 ++--- src/filter.ts | 4 ++-- src/pipeline.ts | 2 +- src/queries.ts | 24 ++++++++++++------------ src/utils/nip05.ts | 6 +++--- src/utils/unfurl.ts | 4 ++-- 10 files changed, 35 insertions(+), 40 deletions(-) diff --git a/src/client.ts b/src/client.ts index 25bbe65..970a077 100644 --- a/src/client.ts +++ b/src/client.ts @@ -6,9 +6,10 @@ import type { GetFiltersOpts } from '@/filter.ts'; /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + return new Promise((resolve) => { - let tid: number; const results: Event[] = []; const unsub = pool.subscribe( @@ -29,24 +30,20 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts } if (typeof opts.limit === 'number' && results.length >= opts.limit) { unsub(); - clearTimeout(tid); resolve(results as Event[]); } }, undefined, () => { unsub(); - clearTimeout(tid); resolve(results as Event[]); }, ); - if (typeof opts.timeout === 'number') { - tid = setTimeout(() => { - unsub(); - resolve(results as Event[]); - }, opts.timeout); - } + opts.signal?.addEventListener('abort', () => { + unsub(); + resolve(results as Event[]); + }); }); } diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 22c9a63..0a4430c 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { uploadFile } from '@/upload.ts'; -import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts'; +import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts'; import { paginated, paginationSchema, parseBody } from '@/utils/web.ts'; import { createEvent } from '@/utils/web.ts'; import { renderEventAccounts } from '@/views.ts'; @@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => { const events7 = await mixer.getFilters( [{ kinds: [7], authors: [pubkey], ...params }], - { timeout: Time.seconds(1) }, + { signal: AbortSignal.timeout(1000) }, ); const ids = events7 @@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => { .filter((id): id is string => !!id); const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { - timeout: Time.seconds(1), + signal: AbortSignal.timeout(1000), }); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 89f59f7..e7d6601 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -1,6 +1,5 @@ import { type AppController } from '@/app.ts'; import * as mixer from '@/mixer.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; @@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => { const events = await mixer.getFilters( [{ kinds: [1], '#p': [pubkey], since, until }], - { timeout: Time.seconds(3) }, + { signal: AbortSignal.timeout(3000) }, ); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index e5f1778..aed19fe 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { booleanParamSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { dedupeEvents, Time } from '@/utils.ts'; +import { dedupeEvents } from '@/utils.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] { } /** Resolve a searched value into an event, if applicable. */ -async function lookupEvent(query: SearchQuery): Promise { +async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise { const filters = await getLookupFilters(query); - const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) }); + const [event] = await mixer.getFilters(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index a29cdc8..c430ed9 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { booleanParamSchema } from '@/schema.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => { }; /** Render statuses for timelines. */ -async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) { +async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) { const events = await mixer.getFilters( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), - { timeout: Time.seconds(1) }, + { signal }, ); if (!events.length) { diff --git a/src/filter.ts b/src/filter.ts index 76a0fcd..fb43251 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -16,8 +16,8 @@ interface DittoFilter extends Filter { /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { - /** How long to wait (in milliseconds) until aborting the request. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event limit for the whole subscription. */ limit?: number; } diff --git a/src/pipeline.ts b/src/pipeline.ts index cf721c4..adf8a84 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -64,7 +64,7 @@ async function storeEvent(event: Event, data: EventData): Promise { if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const [deletion] = await mixer.getFilters( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], - { limit: 1, timeout: Time.seconds(1) }, + { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, ); if (deletion) { diff --git a/src/queries.ts b/src/queries.ts index edd5f4a..fc7365a 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -4,8 +4,8 @@ import { type DittoFilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; interface GetEventOpts { - /** Timeout in milliseconds. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event kind. */ kind?: K; /** Relations to include on the event. */ @@ -17,36 +17,36 @@ const getEvent = async ( id: string, opts: GetEventOpts = {}, ): Promise | undefined> => { - const { kind, relations, timeout = 1000 } = opts; + const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const filter: DittoFilter = { ids: [id], relations, limit: 1 }; if (kind) { filter.kinds = [kind]; } - const [event] = await mixer.getFilters([filter], { limit: 1, timeout }); + const [event] = await mixer.getFilters([filter], { limit: 1, signal }); return event; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const { relations, timeout = 1000 } = opts; + const { relations, signal = AbortSignal.timeout(1000) } = opts; const [event] = await mixer.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], - { limit: 1, timeout }, + { limit: 1, signal }, ); return event; }; /** Get users the given pubkey follows. */ -const getFollows = async (pubkey: string, timeout = 1000): Promise | undefined> => { - const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout }); +const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise | undefined> => { + const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; /** Get pubkeys the user follows. */ -async function getFollowedPubkeys(pubkey: string): Promise { - const event = await getFollows(pubkey); +async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise { + const event = await getFollows(pubkey, signal); if (!event) return []; return event.tags @@ -78,10 +78,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise return result.reverse(); } -function getDescendants(eventId: string): Promise[]> { +function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise[]> { return mixer.getFilters( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], - { limit: 200, timeout: 2000 }, + { limit: 200, signal }, ); } diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index b655175..df08150 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -7,12 +7,12 @@ const nip05Cache = new TTLCache>({ ttl: Time.hour const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; interface LookupOpts { - timeout?: number; + signal?: AbortSignal; } /** Get pubkey from NIP-05. */ async function lookup(value: string, opts: LookupOpts = {}): Promise { - const { timeout = 2000 } = opts; + const { signal = AbortSignal.timeout(2000) } = opts; const match = value.match(NIP05_REGEX); if (!match) return null; @@ -21,7 +21,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise>({ }); /** Unfurl card from cache if available, otherwise fetch it. */ -function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise { +function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise { const cached = previewCardCache.get(url); if (cached !== undefined) { return cached; } else { - const card = unfurlCard(url, AbortSignal.timeout(timeout)); + const card = unfurlCard(url, signal); previewCardCache.set(url, card); return card; }