Merge branch 'api-signal' into 'main'

Pass Request signal down from API controllers

See merge request soapbox-pub/ditto!107
This commit is contained in:
Alex Gleason 2024-01-23 22:02:32 +00:00
commit ea8df5b791
26 changed files with 81 additions and 59 deletions

View File

@ -19,5 +19,5 @@ async function publish(t: EventStub) {
...t,
});
await pipeline.handleEvent(event);
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
}

View File

@ -7,11 +7,12 @@ import type { AppContext, AppController } from '@/app.ts';
const actorController: AppController = async (c) => {
const username = c.req.param('username');
const { signal } = c.req.raw;
const user = await findUser({ username });
const user = await findUser({ username }, signal);
if (!user) return notFound(c);
const event = await getAuthor(user.pubkey);
const event = await getAuthor(user.pubkey, { signal });
if (!event) return notFound(c);
const actor = await renderActor(event, user.username);

View File

@ -132,9 +132,10 @@ const accountStatusesController: AppController = async (c) => {
const pubkey = c.req.param('pubkey');
const { since, until } = paginationSchema.parse(c.req.query());
const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query());
const { signal } = c.req.raw;
if (pinned) {
const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }]);
const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal });
if (pinEvent) {
const pinnedEventIds = getTagSet(pinEvent.tags, 'e');
return renderStatuses(c, [...pinnedEventIds].reverse());
@ -156,7 +157,7 @@ const accountStatusesController: AppController = async (c) => {
filter['#t'] = [tagged];
}
let events = await eventsDB.query([filter]);
let events = await eventsDB.query([filter], { signal });
if (exclude_replies) {
events = events.filter((event) => !findReplyTag(event.tags));
@ -292,10 +293,11 @@ const unblockController: AppController = async (c) => {
const favouritesController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const params = paginationSchema.parse(c.req.query());
const { signal } = c.req.raw;
const events7 = await eventsDB.query(
[{ kinds: [7], authors: [pubkey], ...params }],
{ signal: AbortSignal.timeout(1000) },
{ signal },
);
const ids = events7
@ -304,9 +306,7 @@ const favouritesController: AppController = async (c) => {
const events1 = await eventsDB.query(
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
{
signal: AbortSignal.timeout(1000),
},
{ signal },
);
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));

View File

@ -38,10 +38,11 @@ const adminAccountsController: AppController = async (c) => {
}
const { since, until, limit } = paginationSchema.parse(c.req.query());
const { signal } = c.req.raw;
const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]);
const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal });
const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!);
const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }]);
const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }], { signal });
for (const event of events) {
const d = event.tags.find(([name]) => name === 'd')?.[1];

View File

@ -6,10 +6,12 @@ import { renderAccounts } from '@/views.ts';
/** https://docs.joinmastodon.org/methods/blocks/#get */
const blocksController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const { signal } = c.req.raw;
const [event10000] = await eventsDB.query([
{ kinds: [10000], authors: [pubkey], limit: 1 },
]);
const [event10000] = await eventsDB.query(
[{ kinds: [10000], authors: [pubkey], limit: 1 }],
{ signal },
);
if (event10000) {
const pubkeys = getTagSet(event10000.tags, 'p');

View File

@ -6,10 +6,12 @@ import { renderStatuses } from '@/views.ts';
/** https://docs.joinmastodon.org/methods/bookmarks/#get */
const bookmarksController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const { signal } = c.req.raw;
const [event10003] = await eventsDB.query([
{ kinds: [10003], authors: [pubkey], limit: 1 },
]);
const [event10003] = await eventsDB.query(
[{ kinds: [10003], authors: [pubkey], limit: 1 }],
{ signal },
);
if (event10003) {
const eventIds = getTagSet(event10003.tags, 'e');

View File

@ -5,8 +5,9 @@ import { eventsDB } from '@/storages.ts';
const instanceController: AppController = async (c) => {
const { host, protocol } = Conf.url;
const { signal } = c.req.raw;
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]);
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal });
const meta = jsonServerMetaSchema.parse(event?.content);
/** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */

View File

@ -13,7 +13,9 @@ const mediaBodySchema = z.object({
});
const mediaController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const result = mediaBodySchema.safeParse(await parseBody(c.req.raw));
const { signal } = c.req.raw;
if (!result.success) {
return c.json({ error: 'Bad request.', schema: result.error }, 422);
@ -21,7 +23,7 @@ const mediaController: AppController = async (c) => {
try {
const { file, description } = result.data;
const media = await uploadFile(file, { pubkey: c.get('pubkey')!, description });
const media = await uploadFile(file, { pubkey, description }, signal);
return c.json(renderAttachment(media));
} catch (e) {
console.error(e);

View File

@ -6,10 +6,11 @@ import { renderNotification } from '@/views/mastodon/notifications.ts';
const notificationsController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const { since, until } = paginationSchema.parse(c.req.query());
const { signal } = c.req.raw;
const events = await eventsDB.query(
[{ kinds: [1], '#p': [pubkey], since, until }],
{ signal: AbortSignal.timeout(3000) },
{ signal },
);
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));

View File

@ -8,12 +8,14 @@ import { createAdminEvent } from '@/utils/api.ts';
import { jsonSchema } from '@/schema.ts';
const frontendConfigController: AppController = async (c) => {
const { signal } = c.req.raw;
const [event] = await eventsDB.query([{
kinds: [30078],
authors: [Conf.pubkey],
'#d': ['pub.ditto.pleroma.config'],
limit: 1,
}]);
}], { signal });
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
event?.content ? await decryptAdmin(Conf.pubkey, event.content) : '',
@ -35,13 +37,14 @@ const frontendConfigController: AppController = async (c) => {
const configController: AppController = async (c) => {
const { pubkey } = Conf;
const { signal } = c.req.raw;
const [event] = await eventsDB.query([{
kinds: [30078],
authors: [pubkey],
'#d': ['pub.ditto.pleroma.config'],
limit: 1,
}]);
}], { signal });
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
event?.content ? await decryptAdmin(pubkey, event.content) : '',
@ -53,13 +56,14 @@ const configController: AppController = async (c) => {
/** Pleroma admin config controller. */
const updateConfigController: AppController = async (c) => {
const { pubkey } = Conf;
const { signal } = c.req.raw;
const [event] = await eventsDB.query([{
kinds: [30078],
authors: [pubkey],
'#d': ['pub.ditto.pleroma.config'],
limit: 1,
}]);
}], { signal });
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
event?.content ? await decryptAdmin(pubkey, event.content) : '',

View File

@ -25,13 +25,12 @@ type SearchQuery = z.infer<typeof searchQuerySchema>;
const searchController: AppController = async (c) => {
const result = searchQuerySchema.safeParse(c.req.query());
const { signal } = c.req.raw;
if (!result.success) {
return c.json({ error: 'Bad request', schema: result.error }, 422);
}
const signal = AbortSignal.timeout(1000);
const [event, events] = await Promise.all([
lookupEvent(result.data, signal),
searchEvents(result.data, signal),
@ -46,12 +45,12 @@ const searchController: AppController = async (c) => {
const [accounts, statuses] = await Promise.all([
Promise.all(
results
.filter((event): event is NostrEvent => event.kind === 0)
.filter((event) => event.kind === 0)
.map((event) => renderAccount(event)),
),
Promise.all(
results
.filter((event): event is NostrEvent => event.kind === 1)
.filter((event) => event.kind === 1)
.map((event) => renderStatus(event, c.get('pubkey'))),
),
]);

View File

@ -241,10 +241,12 @@ const pinController: AppController = async (c) => {
const unpinController: AppController = async (c) => {
const pubkey = c.get('pubkey')!;
const eventId = c.req.param('id');
const { signal } = c.req.raw;
const event = await getEvent(eventId, {
kind: 1,
relations: ['author', 'event_stats', 'author_stats'],
signal,
});
if (event) {
@ -273,12 +275,13 @@ const zapController: AppController = async (c) => {
const id = c.req.param('id');
const body = await parseBody(c.req.raw);
const params = zapSchema.safeParse(body);
const { signal } = c.req.raw;
if (!params.success) {
return c.json({ error: 'Bad request', schema: params.error }, 400);
}
const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'] });
const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'], signal });
const author = target?.author;
const meta = jsonMetaContentSchema.parse(author?.content);
const lnurl = getLnurl(meta);

View File

@ -31,7 +31,9 @@ const hashtagTimelineController: AppController = (c) => {
};
/** Render statuses for timelines. */
async function renderStatuses(c: AppContext, filters: DittoFilter[], signal = AbortSignal.timeout(1000)) {
async function renderStatuses(c: AppContext, filters: DittoFilter[]) {
const { signal } = c.req.raw;
const events = await eventsDB.query(
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
{ signal },

View File

@ -4,7 +4,8 @@ import { jsonServerMetaSchema } from '@/schemas/nostr.ts';
import { eventsDB } from '@/storages.ts';
const relayInfoController: AppController = async (c) => {
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]);
const { signal } = c.req.raw;
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal });
const meta = jsonServerMetaSchema.parse(event?.content);
return c.json({

View File

@ -78,7 +78,7 @@ function connectStream(socket: WebSocket) {
async function handleEvent([_, event]: ClientEVENT): Promise<void> {
try {
// This will store it (if eligible) and run other side-effects.
await pipeline.handleEvent(event);
await pipeline.handleEvent(event, AbortSignal.timeout(1000));
send(['OK', event.id, true, '']);
} catch (e) {
if (e instanceof pipeline.RelayError) {

View File

@ -38,7 +38,7 @@ async function insertUser(user: User) {
throw new Error('User already exists');
}
const event = await buildUserEvent(user);
return pipeline.handleEvent(event);
return pipeline.handleEvent(event, AbortSignal.timeout(1000));
}
/**
@ -48,7 +48,7 @@ async function insertUser(user: User) {
* await findUser({ username: 'alex' });
* ```
*/
async function findUser(user: Partial<User>): Promise<User | undefined> {
async function findUser(user: Partial<User>, signal?: AbortSignal): Promise<User | undefined> {
const filter: NostrFilter = { kinds: [30361], authors: [Conf.pubkey], limit: 1 };
for (const [key, value] of Object.entries(user)) {
@ -65,7 +65,7 @@ async function findUser(user: Partial<User>): Promise<User | undefined> {
}
}
const [event] = await eventsDB.query([filter]);
const [event] = await eventsDB.query([filter], { signal });
if (event) {
return {

View File

@ -22,6 +22,6 @@ function handleEvent(event: NostrEvent): Promise<void> {
debug(`NostrEvent<${event.kind}> ${event.id}`);
return pipeline
.handleEvent(event)
.handleEvent(event, AbortSignal.timeout(5000))
.catch(() => {});
}

View File

@ -23,8 +23,7 @@ const debug = Debug('ditto:pipeline');
* Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event.
*/
async function handleEvent(event: DittoEvent): Promise<void> {
const signal = AbortSignal.timeout(5000);
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
if (!(await verifySignatureWorker(event))) return;
const wanted = reqmeister.isWanted(event);
if (await encounterEvent(event, signal)) return;
@ -203,7 +202,7 @@ async function payZap(event: DittoEvent, signal: AbortSignal) {
],
});
await handleEvent(nwcRequestEvent);
await handleEvent(nwcRequestEvent, signal);
} catch (e) {
debug('lnurl error:', e);
}

View File

@ -5,7 +5,7 @@ import { Debug, type EventTemplate, finishEvent, HTTPException, type NostrEvent
import { connectResponseSchema } from '@/schemas/nostr.ts';
import { jsonSchema } from '@/schema.ts';
import { Sub } from '@/subs.ts';
import { eventMatchesTemplate, Time } from '@/utils.ts';
import { eventMatchesTemplate } from '@/utils.ts';
import { createAdminEvent } from '@/utils/api.ts';
const debug = Debug('ditto:sign');
@ -87,9 +87,10 @@ async function awaitSignedEvent(
function close(): void {
Sub.close(messageId);
c.req.raw.signal.removeEventListener('abort', close);
}
const timeout = setTimeout(close, Time.minutes(1));
c.req.raw.signal.addEventListener('abort', close);
for await (const event of sub) {
const decrypted = await decryptAdmin(event.pubkey, event.content);
@ -102,7 +103,6 @@ async function awaitSignedEvent(
if (result.success) {
close();
clearTimeout(timeout);
return result.data.result;
}
}

View File

@ -16,7 +16,7 @@ interface PoolStoreOpts {
pool: InstanceType<typeof RelayPoolWorker>;
relays: WebSocket['url'][];
publisher: {
handleEvent(event: NostrEvent): Promise<void>;
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
};
}
@ -25,7 +25,7 @@ class PoolStore implements NStore {
#pool: InstanceType<typeof RelayPoolWorker>;
#relays: WebSocket['url'][];
#publisher: {
handleEvent(event: NostrEvent): Promise<void>;
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
};
constructor(opts: PoolStoreOpts) {
@ -60,7 +60,7 @@ class PoolStore implements NStore {
opts.relays ?? this.#relays,
(event: NostrEvent | null) => {
if (event && matchFilters(filters, event)) {
this.#publisher.handleEvent(event).catch(() => {});
this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {});
results.add({
id: event.id,
kind: event.kind,

View File

@ -8,7 +8,7 @@ interface FileMeta {
}
/** Upload a file, track it in the database, and return the resulting media object. */
async function uploadFile(file: File, meta: FileMeta) {
async function uploadFile(file: File, meta: FileMeta, signal?: AbortSignal) {
const { name, type, size } = file;
const { pubkey, description } = meta;
@ -16,7 +16,7 @@ async function uploadFile(file: File, meta: FileMeta) {
throw new Error('File size is too large.');
}
const { cid } = await uploader.upload(file);
const { cid } = await uploader.upload(file, signal);
const url = new URL(`/ipfs/${cid}`, Conf.mediaDomain).toString();
return insertUnattachedMedia({

View File

@ -7,11 +7,11 @@ import type { Uploader } from './types.ts';
/** Meta-uploader determined from configuration. */
const configUploader: Uploader = {
upload(file) {
return uploader().upload(file);
upload(file, signal) {
return uploader().upload(file, signal);
},
delete(cid) {
return uploader().delete(cid);
delete(cid, signal) {
return uploader().delete(cid, signal);
},
};

View File

@ -17,7 +17,7 @@ const ipfsAddResponseSchema = z.object({
* and upload the file using the REST API.
*/
const ipfsUploader: Uploader = {
async upload(file) {
async upload(file, signal) {
const url = new URL('/api/v0/add', Conf.ipfs.apiUrl);
const formData = new FormData();
@ -26,6 +26,7 @@ const ipfsUploader: Uploader = {
const response = await fetchWorker(url, {
method: 'POST',
body: formData,
signal,
});
const { Hash } = ipfsAddResponseSchema.parse(await response.json());
@ -34,7 +35,7 @@ const ipfsUploader: Uploader = {
cid: Hash,
};
},
async delete(cid) {
async delete(cid, signal) {
const url = new URL('/api/v0/pin/rm', Conf.ipfs.apiUrl);
const query = new URLSearchParams();
@ -44,6 +45,7 @@ const ipfsUploader: Uploader = {
await fetchWorker(url, {
method: 'POST',
signal,
});
},
};

View File

@ -9,9 +9,10 @@ import type { Uploader } from './types.ts';
* take advantage of IPFS features while not really using IPFS.
*/
const s3Uploader: Uploader = {
async upload(file) {
async upload(file, _signal) {
const cid = await IpfsHash.of(file.stream()) as string;
// FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24
await client().putObject(`ipfs/${cid}`, file.stream(), {
metadata: {
'Content-Type': file.type,
@ -23,7 +24,8 @@ const s3Uploader: Uploader = {
cid,
};
},
async delete(cid) {
async delete(cid, _signal) {
// FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24
await client().deleteObject(`ipfs/${cid}`);
},
};

View File

@ -1,9 +1,9 @@
/** Modular uploader interface, to support uploading to different backends. */
interface Uploader {
/** Upload the file to the backend. */
upload(file: File): Promise<UploadResult>;
upload(file: File, signal?: AbortSignal): Promise<UploadResult>;
/** Delete the file from the backend. */
delete(cid: string): Promise<void>;
delete(cid: string, signal?: AbortSignal): Promise<void>;
}
/** Return value from the uploader after uploading a file. */

View File

@ -46,12 +46,12 @@ interface UpdateEventFilter extends NostrFilter {
}
/** Fetch existing event, update it, then publish the new event. */
async function updateEvent<K extends number, E extends EventStub>(
async function updateEvent<E extends EventStub>(
filter: UpdateEventFilter,
fn: (prev: NostrEvent | undefined) => E,
c: AppContext,
): Promise<NostrEvent> {
const [prev] = await eventsDB.query([filter], { limit: 1 });
const [prev] = await eventsDB.query([filter], { limit: 1, signal: c.req.raw.signal });
return createEvent(fn(prev), c);
}
@ -84,7 +84,7 @@ async function createAdminEvent(t: EventStub, c: AppContext): Promise<NostrEvent
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
debug('EVENT', event);
try {
await pipeline.handleEvent(event);
await pipeline.handleEvent(event, c.req.raw.signal);
} catch (e) {
if (e instanceof pipeline.RelayError) {
throw new HTTPException(422, {