diff --git a/src/client.ts b/src/client.ts index 1d7cd87..3cf2e8a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -6,9 +6,10 @@ import type { GetFiltersOpts } from '@/filter.ts'; /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + return new Promise((resolve) => { - let tid: number; const results: Event[] = []; const unsub = pool.subscribe( @@ -29,24 +30,20 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts } if (typeof opts.limit === 'number' && results.length >= opts.limit) { unsub(); - clearTimeout(tid); resolve(results as Event[]); } }, undefined, () => { unsub(); - clearTimeout(tid); resolve(results as Event[]); }, ); - if (typeof opts.timeout === 'number') { - tid = setTimeout(() => { - unsub(); - resolve(results as Event[]); - }, opts.timeout); - } + opts.signal?.addEventListener('abort', () => { + unsub(); + resolve(results as Event[]); + }); }); } diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 22c9a63..0a4430c 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { uploadFile } from '@/upload.ts'; -import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts'; +import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts'; import { paginated, paginationSchema, parseBody } from '@/utils/web.ts'; import { createEvent } from '@/utils/web.ts'; import { renderEventAccounts } from '@/views.ts'; @@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => { const events7 = await mixer.getFilters( [{ kinds: [7], authors: [pubkey], ...params }], - { timeout: Time.seconds(1) }, + { signal: AbortSignal.timeout(1000) }, ); const ids = events7 @@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => { .filter((id): id is string => !!id); const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { - timeout: Time.seconds(1), + signal: AbortSignal.timeout(1000), }); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 89f59f7..e7d6601 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -1,6 +1,5 @@ import { type AppController } from '@/app.ts'; import * as mixer from '@/mixer.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; @@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => { const events = await mixer.getFilters( [{ kinds: [1], '#p': [pubkey], since, until }], - { timeout: Time.seconds(3) }, + { signal: AbortSignal.timeout(3000) }, ); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index e5f1778..aed19fe 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { booleanParamSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { dedupeEvents, Time } from '@/utils.ts'; +import { dedupeEvents } from '@/utils.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] { } /** Resolve a searched value into an event, if applicable. */ -async function lookupEvent(query: SearchQuery): Promise { +async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise { const filters = await getLookupFilters(query); - const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) }); + const [event] = await mixer.getFilters(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index a29cdc8..c430ed9 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { booleanParamSchema } from '@/schema.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => { }; /** Render statuses for timelines. */ -async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) { +async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) { const events = await mixer.getFilters( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), - { timeout: Time.seconds(1) }, + { signal }, ); if (!events.length) { diff --git a/src/filter.ts b/src/filter.ts index 7fb0297..92a022d 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -21,8 +21,8 @@ type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubke /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { - /** How long to wait (in milliseconds) until aborting the request. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event limit for the whole subscription. */ limit?: number; /** Relays to use, if applicable. */ diff --git a/src/firehose.ts b/src/firehose.ts index ea6be62..5e24de7 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -17,8 +17,6 @@ pool.subscribe( /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { - console.info(`firehose: Event<${event.kind}> ${event.id}`); - return pipeline .handleEvent(event) .catch(() => {}); diff --git a/src/pipeline.ts b/src/pipeline.ts index 0c7520b..de5c9d2 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -26,6 +26,7 @@ async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); if (encounterEvent(event)) return; + console.info(`pipeline: Event<${event.kind}> ${event.id}`); const data = await getEventData(event); await Promise.all([ @@ -72,7 +73,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const [deletion] = await mixer.getFilters( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], - { limit: 1, timeout: Time.seconds(1) }, + { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, ); if (deletion) { diff --git a/src/queries.ts b/src/queries.ts index 6a9557d..1ecff7b 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -5,8 +5,8 @@ import * as mixer from '@/mixer.ts'; import { reqmeister } from '@/common.ts'; interface GetEventOpts { - /** Timeout in milliseconds. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event kind. */ kind?: K; /** Relations to include on the event. */ @@ -18,36 +18,36 @@ const getEvent = async ( id: string, opts: GetEventOpts = {}, ): Promise | undefined> => { - const { kind, relations, timeout = 1000 } = opts; + const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const filter: DittoFilter = { ids: [id], relations, limit: 1 }; if (kind) { filter.kinds = [kind]; } - const [event] = await mixer.getFilters([filter], { limit: 1, timeout }); + const [event] = await mixer.getFilters([filter], { limit: 1, signal }); return event; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const { relations, timeout = 1000 } = opts; + const { relations, signal = AbortSignal.timeout(1000) } = opts; const event = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], - { limit: 1, timeout }, + { limit: 1, signal }, ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); return event; }; /** Get users the given pubkey follows. */ -const getFollows = async (pubkey: string, timeout = 1000): Promise | undefined> => { - const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout }); +const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise | undefined> => { + const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; /** Get pubkeys the user follows. */ -async function getFollowedPubkeys(pubkey: string): Promise { - const event = await getFollows(pubkey); +async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise { + const event = await getFollows(pubkey, signal); if (!event) return []; return event.tags @@ -79,10 +79,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise return result.reverse(); } -function getDescendants(eventId: string): Promise[]> { +function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise[]> { return mixer.getFilters( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], - { limit: 200, timeout: 2000 }, + { limit: 200, signal }, ); } diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index b655175..df08150 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -7,12 +7,12 @@ const nip05Cache = new TTLCache>({ ttl: Time.hour const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; interface LookupOpts { - timeout?: number; + signal?: AbortSignal; } /** Get pubkey from NIP-05. */ async function lookup(value: string, opts: LookupOpts = {}): Promise { - const { timeout = 2000 } = opts; + const { signal = AbortSignal.timeout(2000) } = opts; const match = value.match(NIP05_REGEX); if (!match) return null; @@ -21,7 +21,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise>({ }); /** Unfurl card from cache if available, otherwise fetch it. */ -function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise { +function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise { const cached = previewCardCache.get(url); if (cached !== undefined) { return cached; } else { - const card = unfurlCard(url, AbortSignal.timeout(timeout)); + const card = unfurlCard(url, signal); previewCardCache.set(url, card); return card; }