diff --git a/scripts/admin.ts b/scripts/admin.ts index df803a1..2c682e0 100644 --- a/scripts/admin.ts +++ b/scripts/admin.ts @@ -19,5 +19,5 @@ async function publish(t: EventStub) { ...t, }); - await pipeline.handleEvent(event); + await pipeline.handleEvent(event, AbortSignal.timeout(5000)); } diff --git a/src/controllers/activitypub/actor.ts b/src/controllers/activitypub/actor.ts index 6b2c56e..c83acbe 100644 --- a/src/controllers/activitypub/actor.ts +++ b/src/controllers/activitypub/actor.ts @@ -7,11 +7,12 @@ import type { AppContext, AppController } from '@/app.ts'; const actorController: AppController = async (c) => { const username = c.req.param('username'); + const { signal } = c.req.raw; - const user = await findUser({ username }); + const user = await findUser({ username }, signal); if (!user) return notFound(c); - const event = await getAuthor(user.pubkey); + const event = await getAuthor(user.pubkey, { signal }); if (!event) return notFound(c); const actor = await renderActor(event, user.username); diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 4525a0f..a2c1884 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -132,9 +132,10 @@ const accountStatusesController: AppController = async (c) => { const pubkey = c.req.param('pubkey'); const { since, until } = paginationSchema.parse(c.req.query()); const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query()); + const { signal } = c.req.raw; if (pinned) { - const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }]); + const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal }); if (pinEvent) { const pinnedEventIds = getTagSet(pinEvent.tags, 'e'); return renderStatuses(c, [...pinnedEventIds].reverse()); @@ -156,7 +157,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await eventsDB.query([filter]); + let events = await eventsDB.query([filter], { signal }); if (exclude_replies) { events = events.filter((event) => !findReplyTag(event.tags)); @@ -292,10 +293,11 @@ const unblockController: AppController = async (c) => { const favouritesController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const params = paginationSchema.parse(c.req.query()); + const { signal } = c.req.raw; const events7 = await eventsDB.query( [{ kinds: [7], authors: [pubkey], ...params }], - { signal: AbortSignal.timeout(1000) }, + { signal }, ); const ids = events7 @@ -304,9 +306,7 @@ const favouritesController: AppController = async (c) => { const events1 = await eventsDB.query( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], - { - signal: AbortSignal.timeout(1000), - }, + { signal }, ); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); diff --git a/src/controllers/api/admin.ts b/src/controllers/api/admin.ts index 77c1ed3..c3a5c3d 100644 --- a/src/controllers/api/admin.ts +++ b/src/controllers/api/admin.ts @@ -38,10 +38,11 @@ const adminAccountsController: AppController = async (c) => { } const { since, until, limit } = paginationSchema.parse(c.req.query()); + const { signal } = c.req.raw; - const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]); + const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal }); const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!); - const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }]); + const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }], { signal }); for (const event of events) { const d = event.tags.find(([name]) => name === 'd')?.[1]; diff --git a/src/controllers/api/blocks.ts b/src/controllers/api/blocks.ts index c2085b1..d54773a 100644 --- a/src/controllers/api/blocks.ts +++ b/src/controllers/api/blocks.ts @@ -6,10 +6,12 @@ import { renderAccounts } from '@/views.ts'; /** https://docs.joinmastodon.org/methods/blocks/#get */ const blocksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; + const { signal } = c.req.raw; - const [event10000] = await eventsDB.query([ - { kinds: [10000], authors: [pubkey], limit: 1 }, - ]); + const [event10000] = await eventsDB.query( + [{ kinds: [10000], authors: [pubkey], limit: 1 }], + { signal }, + ); if (event10000) { const pubkeys = getTagSet(event10000.tags, 'p'); diff --git a/src/controllers/api/bookmarks.ts b/src/controllers/api/bookmarks.ts index bf383ac..16e87e7 100644 --- a/src/controllers/api/bookmarks.ts +++ b/src/controllers/api/bookmarks.ts @@ -6,10 +6,12 @@ import { renderStatuses } from '@/views.ts'; /** https://docs.joinmastodon.org/methods/bookmarks/#get */ const bookmarksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; + const { signal } = c.req.raw; - const [event10003] = await eventsDB.query([ - { kinds: [10003], authors: [pubkey], limit: 1 }, - ]); + const [event10003] = await eventsDB.query( + [{ kinds: [10003], authors: [pubkey], limit: 1 }], + { signal }, + ); if (event10003) { const eventIds = getTagSet(event10003.tags, 'e'); diff --git a/src/controllers/api/instance.ts b/src/controllers/api/instance.ts index 6a668bf..b4035bb 100644 --- a/src/controllers/api/instance.ts +++ b/src/controllers/api/instance.ts @@ -5,8 +5,9 @@ import { eventsDB } from '@/storages.ts'; const instanceController: AppController = async (c) => { const { host, protocol } = Conf.url; + const { signal } = c.req.raw; - const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); + const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal }); const meta = jsonServerMetaSchema.parse(event?.content); /** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */ diff --git a/src/controllers/api/media.ts b/src/controllers/api/media.ts index f8868c1..3a77175 100644 --- a/src/controllers/api/media.ts +++ b/src/controllers/api/media.ts @@ -13,7 +13,9 @@ const mediaBodySchema = z.object({ }); const mediaController: AppController = async (c) => { + const pubkey = c.get('pubkey')!; const result = mediaBodySchema.safeParse(await parseBody(c.req.raw)); + const { signal } = c.req.raw; if (!result.success) { return c.json({ error: 'Bad request.', schema: result.error }, 422); @@ -21,7 +23,7 @@ const mediaController: AppController = async (c) => { try { const { file, description } = result.data; - const media = await uploadFile(file, { pubkey: c.get('pubkey')!, description }); + const media = await uploadFile(file, { pubkey, description }, signal); return c.json(renderAttachment(media)); } catch (e) { console.error(e); diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 0a0745a..703e79f 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -6,10 +6,11 @@ import { renderNotification } from '@/views/mastodon/notifications.ts'; const notificationsController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const { since, until } = paginationSchema.parse(c.req.query()); + const { signal } = c.req.raw; const events = await eventsDB.query( [{ kinds: [1], '#p': [pubkey], since, until }], - { signal: AbortSignal.timeout(3000) }, + { signal }, ); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index 9a5b087..a500161 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -8,12 +8,14 @@ import { createAdminEvent } from '@/utils/api.ts'; import { jsonSchema } from '@/schema.ts'; const frontendConfigController: AppController = async (c) => { + const { signal } = c.req.raw; + const [event] = await eventsDB.query([{ kinds: [30078], authors: [Conf.pubkey], '#d': ['pub.ditto.pleroma.config'], limit: 1, - }]); + }], { signal }); const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse( event?.content ? await decryptAdmin(Conf.pubkey, event.content) : '', @@ -35,13 +37,14 @@ const frontendConfigController: AppController = async (c) => { const configController: AppController = async (c) => { const { pubkey } = Conf; + const { signal } = c.req.raw; const [event] = await eventsDB.query([{ kinds: [30078], authors: [pubkey], '#d': ['pub.ditto.pleroma.config'], limit: 1, - }]); + }], { signal }); const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse( event?.content ? await decryptAdmin(pubkey, event.content) : '', @@ -53,13 +56,14 @@ const configController: AppController = async (c) => { /** Pleroma admin config controller. */ const updateConfigController: AppController = async (c) => { const { pubkey } = Conf; + const { signal } = c.req.raw; const [event] = await eventsDB.query([{ kinds: [30078], authors: [pubkey], '#d': ['pub.ditto.pleroma.config'], limit: 1, - }]); + }], { signal }); const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse( event?.content ? await decryptAdmin(pubkey, event.content) : '', diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index 30a0811..f785f34 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -25,13 +25,12 @@ type SearchQuery = z.infer; const searchController: AppController = async (c) => { const result = searchQuerySchema.safeParse(c.req.query()); + const { signal } = c.req.raw; if (!result.success) { return c.json({ error: 'Bad request', schema: result.error }, 422); } - const signal = AbortSignal.timeout(1000); - const [event, events] = await Promise.all([ lookupEvent(result.data, signal), searchEvents(result.data, signal), @@ -46,12 +45,12 @@ const searchController: AppController = async (c) => { const [accounts, statuses] = await Promise.all([ Promise.all( results - .filter((event): event is NostrEvent => event.kind === 0) + .filter((event) => event.kind === 0) .map((event) => renderAccount(event)), ), Promise.all( results - .filter((event): event is NostrEvent => event.kind === 1) + .filter((event) => event.kind === 1) .map((event) => renderStatus(event, c.get('pubkey'))), ), ]); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 99fa1e3..2d85985 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -241,10 +241,12 @@ const pinController: AppController = async (c) => { const unpinController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const eventId = c.req.param('id'); + const { signal } = c.req.raw; const event = await getEvent(eventId, { kind: 1, relations: ['author', 'event_stats', 'author_stats'], + signal, }); if (event) { @@ -273,12 +275,13 @@ const zapController: AppController = async (c) => { const id = c.req.param('id'); const body = await parseBody(c.req.raw); const params = zapSchema.safeParse(body); + const { signal } = c.req.raw; if (!params.success) { return c.json({ error: 'Bad request', schema: params.error }, 400); } - const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'] }); + const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'], signal }); const author = target?.author; const meta = jsonMetaContentSchema.parse(author?.content); const lnurl = getLnurl(meta); diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index f76a3ac..f8434c4 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -31,7 +31,9 @@ const hashtagTimelineController: AppController = (c) => { }; /** Render statuses for timelines. */ -async function renderStatuses(c: AppContext, filters: DittoFilter[], signal = AbortSignal.timeout(1000)) { +async function renderStatuses(c: AppContext, filters: DittoFilter[]) { + const { signal } = c.req.raw; + const events = await eventsDB.query( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), { signal }, diff --git a/src/controllers/nostr/relay-info.ts b/src/controllers/nostr/relay-info.ts index ed3e8b6..9d24644 100644 --- a/src/controllers/nostr/relay-info.ts +++ b/src/controllers/nostr/relay-info.ts @@ -4,7 +4,8 @@ import { jsonServerMetaSchema } from '@/schemas/nostr.ts'; import { eventsDB } from '@/storages.ts'; const relayInfoController: AppController = async (c) => { - const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]); + const { signal } = c.req.raw; + const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal }); const meta = jsonServerMetaSchema.parse(event?.content); return c.json({ diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 5361c79..53d51ba 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -78,7 +78,7 @@ function connectStream(socket: WebSocket) { async function handleEvent([_, event]: ClientEVENT): Promise { try { // This will store it (if eligible) and run other side-effects. - await pipeline.handleEvent(event); + await pipeline.handleEvent(event, AbortSignal.timeout(1000)); send(['OK', event.id, true, '']); } catch (e) { if (e instanceof pipeline.RelayError) { diff --git a/src/db/users.ts b/src/db/users.ts index f57472a..e4fdc32 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -38,7 +38,7 @@ async function insertUser(user: User) { throw new Error('User already exists'); } const event = await buildUserEvent(user); - return pipeline.handleEvent(event); + return pipeline.handleEvent(event, AbortSignal.timeout(1000)); } /** @@ -48,7 +48,7 @@ async function insertUser(user: User) { * await findUser({ username: 'alex' }); * ``` */ -async function findUser(user: Partial): Promise { +async function findUser(user: Partial, signal?: AbortSignal): Promise { const filter: NostrFilter = { kinds: [30361], authors: [Conf.pubkey], limit: 1 }; for (const [key, value] of Object.entries(user)) { @@ -65,7 +65,7 @@ async function findUser(user: Partial): Promise { } } - const [event] = await eventsDB.query([filter]); + const [event] = await eventsDB.query([filter], { signal }); if (event) { return { diff --git a/src/firehose.ts b/src/firehose.ts index f1caf8e..7014302 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -22,6 +22,6 @@ function handleEvent(event: NostrEvent): Promise { debug(`NostrEvent<${event.kind}> ${event.id}`); return pipeline - .handleEvent(event) + .handleEvent(event, AbortSignal.timeout(5000)) .catch(() => {}); } diff --git a/src/pipeline.ts b/src/pipeline.ts index 80b686e..79829f3 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -23,8 +23,7 @@ const debug = Debug('ditto:pipeline'); * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ -async function handleEvent(event: DittoEvent): Promise { - const signal = AbortSignal.timeout(5000); +async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); if (await encounterEvent(event, signal)) return; @@ -203,7 +202,7 @@ async function payZap(event: DittoEvent, signal: AbortSignal) { ], }); - await handleEvent(nwcRequestEvent); + await handleEvent(nwcRequestEvent, signal); } catch (e) { debug('lnurl error:', e); } diff --git a/src/sign.ts b/src/sign.ts index 49b8557..efe88c1 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -5,7 +5,7 @@ import { Debug, type EventTemplate, finishEvent, HTTPException, type NostrEvent import { connectResponseSchema } from '@/schemas/nostr.ts'; import { jsonSchema } from '@/schema.ts'; import { Sub } from '@/subs.ts'; -import { eventMatchesTemplate, Time } from '@/utils.ts'; +import { eventMatchesTemplate } from '@/utils.ts'; import { createAdminEvent } from '@/utils/api.ts'; const debug = Debug('ditto:sign'); @@ -87,9 +87,10 @@ async function awaitSignedEvent( function close(): void { Sub.close(messageId); + c.req.raw.signal.removeEventListener('abort', close); } - const timeout = setTimeout(close, Time.minutes(1)); + c.req.raw.signal.addEventListener('abort', close); for await (const event of sub) { const decrypted = await decryptAdmin(event.pubkey, event.content); @@ -102,7 +103,6 @@ async function awaitSignedEvent( if (result.success) { close(); - clearTimeout(timeout); return result.data.result; } } diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts index 5b5f2ef..8beadac 100644 --- a/src/storages/pool-store.ts +++ b/src/storages/pool-store.ts @@ -16,7 +16,7 @@ interface PoolStoreOpts { pool: InstanceType; relays: WebSocket['url'][]; publisher: { - handleEvent(event: NostrEvent): Promise; + handleEvent(event: NostrEvent, signal: AbortSignal): Promise; }; } @@ -25,7 +25,7 @@ class PoolStore implements NStore { #pool: InstanceType; #relays: WebSocket['url'][]; #publisher: { - handleEvent(event: NostrEvent): Promise; + handleEvent(event: NostrEvent, signal: AbortSignal): Promise; }; constructor(opts: PoolStoreOpts) { @@ -60,7 +60,7 @@ class PoolStore implements NStore { opts.relays ?? this.#relays, (event: NostrEvent | null) => { if (event && matchFilters(filters, event)) { - this.#publisher.handleEvent(event).catch(() => {}); + this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {}); results.add({ id: event.id, kind: event.kind, diff --git a/src/upload.ts b/src/upload.ts index d8ea969..5c16501 100644 --- a/src/upload.ts +++ b/src/upload.ts @@ -8,7 +8,7 @@ interface FileMeta { } /** Upload a file, track it in the database, and return the resulting media object. */ -async function uploadFile(file: File, meta: FileMeta) { +async function uploadFile(file: File, meta: FileMeta, signal?: AbortSignal) { const { name, type, size } = file; const { pubkey, description } = meta; @@ -16,7 +16,7 @@ async function uploadFile(file: File, meta: FileMeta) { throw new Error('File size is too large.'); } - const { cid } = await uploader.upload(file); + const { cid } = await uploader.upload(file, signal); const url = new URL(`/ipfs/${cid}`, Conf.mediaDomain).toString(); return insertUnattachedMedia({ diff --git a/src/uploaders/config.ts b/src/uploaders/config.ts index b0adece..2ee2f9a 100644 --- a/src/uploaders/config.ts +++ b/src/uploaders/config.ts @@ -7,11 +7,11 @@ import type { Uploader } from './types.ts'; /** Meta-uploader determined from configuration. */ const configUploader: Uploader = { - upload(file) { - return uploader().upload(file); + upload(file, signal) { + return uploader().upload(file, signal); }, - delete(cid) { - return uploader().delete(cid); + delete(cid, signal) { + return uploader().delete(cid, signal); }, }; diff --git a/src/uploaders/ipfs.ts b/src/uploaders/ipfs.ts index 7438205..7623cd4 100644 --- a/src/uploaders/ipfs.ts +++ b/src/uploaders/ipfs.ts @@ -17,7 +17,7 @@ const ipfsAddResponseSchema = z.object({ * and upload the file using the REST API. */ const ipfsUploader: Uploader = { - async upload(file) { + async upload(file, signal) { const url = new URL('/api/v0/add', Conf.ipfs.apiUrl); const formData = new FormData(); @@ -26,6 +26,7 @@ const ipfsUploader: Uploader = { const response = await fetchWorker(url, { method: 'POST', body: formData, + signal, }); const { Hash } = ipfsAddResponseSchema.parse(await response.json()); @@ -34,7 +35,7 @@ const ipfsUploader: Uploader = { cid: Hash, }; }, - async delete(cid) { + async delete(cid, signal) { const url = new URL('/api/v0/pin/rm', Conf.ipfs.apiUrl); const query = new URLSearchParams(); @@ -44,6 +45,7 @@ const ipfsUploader: Uploader = { await fetchWorker(url, { method: 'POST', + signal, }); }, }; diff --git a/src/uploaders/s3.ts b/src/uploaders/s3.ts index 378b279..2e02cc3 100644 --- a/src/uploaders/s3.ts +++ b/src/uploaders/s3.ts @@ -9,9 +9,10 @@ import type { Uploader } from './types.ts'; * take advantage of IPFS features while not really using IPFS. */ const s3Uploader: Uploader = { - async upload(file) { + async upload(file, _signal) { const cid = await IpfsHash.of(file.stream()) as string; + // FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24 await client().putObject(`ipfs/${cid}`, file.stream(), { metadata: { 'Content-Type': file.type, @@ -23,7 +24,8 @@ const s3Uploader: Uploader = { cid, }; }, - async delete(cid) { + async delete(cid, _signal) { + // FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24 await client().deleteObject(`ipfs/${cid}`); }, }; diff --git a/src/uploaders/types.ts b/src/uploaders/types.ts index 80bf431..8f11545 100644 --- a/src/uploaders/types.ts +++ b/src/uploaders/types.ts @@ -1,9 +1,9 @@ /** Modular uploader interface, to support uploading to different backends. */ interface Uploader { /** Upload the file to the backend. */ - upload(file: File): Promise; + upload(file: File, signal?: AbortSignal): Promise; /** Delete the file from the backend. */ - delete(cid: string): Promise; + delete(cid: string, signal?: AbortSignal): Promise; } /** Return value from the uploader after uploading a file. */ diff --git a/src/utils/api.ts b/src/utils/api.ts index 498bbab..bf51132 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -46,12 +46,12 @@ interface UpdateEventFilter extends NostrFilter { } /** Fetch existing event, update it, then publish the new event. */ -async function updateEvent( +async function updateEvent( filter: UpdateEventFilter, fn: (prev: NostrEvent | undefined) => E, c: AppContext, ): Promise { - const [prev] = await eventsDB.query([filter], { limit: 1 }); + const [prev] = await eventsDB.query([filter], { limit: 1, signal: c.req.raw.signal }); return createEvent(fn(prev), c); } @@ -84,7 +84,7 @@ async function createAdminEvent(t: EventStub, c: AppContext): Promise { debug('EVENT', event); try { - await pipeline.handleEvent(event); + await pipeline.handleEvent(event, c.req.raw.signal); } catch (e) { if (e instanceof pipeline.RelayError) { throw new HTTPException(422, {