Pass Request signal down from API controllers
This commit is contained in:
parent
77f2e2d940
commit
5b24b7ad39
|
@ -19,5 +19,5 @@ async function publish(t: EventStub) {
|
||||||
...t,
|
...t,
|
||||||
});
|
});
|
||||||
|
|
||||||
await pipeline.handleEvent(event);
|
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,11 +7,12 @@ import type { AppContext, AppController } from '@/app.ts';
|
||||||
|
|
||||||
const actorController: AppController = async (c) => {
|
const actorController: AppController = async (c) => {
|
||||||
const username = c.req.param('username');
|
const username = c.req.param('username');
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const user = await findUser({ username });
|
const user = await findUser({ username }, signal);
|
||||||
if (!user) return notFound(c);
|
if (!user) return notFound(c);
|
||||||
|
|
||||||
const event = await getAuthor(user.pubkey);
|
const event = await getAuthor(user.pubkey, { signal });
|
||||||
if (!event) return notFound(c);
|
if (!event) return notFound(c);
|
||||||
|
|
||||||
const actor = await renderActor(event, user.username);
|
const actor = await renderActor(event, user.username);
|
||||||
|
|
|
@ -132,9 +132,10 @@ const accountStatusesController: AppController = async (c) => {
|
||||||
const pubkey = c.req.param('pubkey');
|
const pubkey = c.req.param('pubkey');
|
||||||
const { since, until } = paginationSchema.parse(c.req.query());
|
const { since, until } = paginationSchema.parse(c.req.query());
|
||||||
const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query());
|
const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query());
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
if (pinned) {
|
if (pinned) {
|
||||||
const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }]);
|
const [pinEvent] = await eventsDB.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal });
|
||||||
if (pinEvent) {
|
if (pinEvent) {
|
||||||
const pinnedEventIds = getTagSet(pinEvent.tags, 'e');
|
const pinnedEventIds = getTagSet(pinEvent.tags, 'e');
|
||||||
return renderStatuses(c, [...pinnedEventIds].reverse());
|
return renderStatuses(c, [...pinnedEventIds].reverse());
|
||||||
|
@ -156,7 +157,7 @@ const accountStatusesController: AppController = async (c) => {
|
||||||
filter['#t'] = [tagged];
|
filter['#t'] = [tagged];
|
||||||
}
|
}
|
||||||
|
|
||||||
let events = await eventsDB.query([filter]);
|
let events = await eventsDB.query([filter], { signal });
|
||||||
|
|
||||||
if (exclude_replies) {
|
if (exclude_replies) {
|
||||||
events = events.filter((event) => !findReplyTag(event.tags));
|
events = events.filter((event) => !findReplyTag(event.tags));
|
||||||
|
@ -292,10 +293,11 @@ const unblockController: AppController = async (c) => {
|
||||||
const favouritesController: AppController = async (c) => {
|
const favouritesController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
const params = paginationSchema.parse(c.req.query());
|
const params = paginationSchema.parse(c.req.query());
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const events7 = await eventsDB.query(
|
const events7 = await eventsDB.query(
|
||||||
[{ kinds: [7], authors: [pubkey], ...params }],
|
[{ kinds: [7], authors: [pubkey], ...params }],
|
||||||
{ signal: AbortSignal.timeout(1000) },
|
{ signal },
|
||||||
);
|
);
|
||||||
|
|
||||||
const ids = events7
|
const ids = events7
|
||||||
|
@ -304,9 +306,7 @@ const favouritesController: AppController = async (c) => {
|
||||||
|
|
||||||
const events1 = await eventsDB.query(
|
const events1 = await eventsDB.query(
|
||||||
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
|
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
|
||||||
{
|
{ signal },
|
||||||
signal: AbortSignal.timeout(1000),
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
|
|
||||||
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));
|
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));
|
||||||
|
|
|
@ -38,10 +38,11 @@ const adminAccountsController: AppController = async (c) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
const { since, until, limit } = paginationSchema.parse(c.req.query());
|
const { since, until, limit } = paginationSchema.parse(c.req.query());
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]);
|
const events = await eventsDB.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal });
|
||||||
const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!);
|
const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!);
|
||||||
const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }]);
|
const authors = await eventsDB.query([{ kinds: [0], authors: pubkeys }], { signal });
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
const d = event.tags.find(([name]) => name === 'd')?.[1];
|
const d = event.tags.find(([name]) => name === 'd')?.[1];
|
||||||
|
|
|
@ -6,10 +6,12 @@ import { renderAccounts } from '@/views.ts';
|
||||||
/** https://docs.joinmastodon.org/methods/blocks/#get */
|
/** https://docs.joinmastodon.org/methods/blocks/#get */
|
||||||
const blocksController: AppController = async (c) => {
|
const blocksController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event10000] = await eventsDB.query([
|
const [event10000] = await eventsDB.query(
|
||||||
{ kinds: [10000], authors: [pubkey], limit: 1 },
|
[{ kinds: [10000], authors: [pubkey], limit: 1 }],
|
||||||
]);
|
{ signal },
|
||||||
|
);
|
||||||
|
|
||||||
if (event10000) {
|
if (event10000) {
|
||||||
const pubkeys = getTagSet(event10000.tags, 'p');
|
const pubkeys = getTagSet(event10000.tags, 'p');
|
||||||
|
|
|
@ -6,10 +6,12 @@ import { renderStatuses } from '@/views.ts';
|
||||||
/** https://docs.joinmastodon.org/methods/bookmarks/#get */
|
/** https://docs.joinmastodon.org/methods/bookmarks/#get */
|
||||||
const bookmarksController: AppController = async (c) => {
|
const bookmarksController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event10003] = await eventsDB.query([
|
const [event10003] = await eventsDB.query(
|
||||||
{ kinds: [10003], authors: [pubkey], limit: 1 },
|
[{ kinds: [10003], authors: [pubkey], limit: 1 }],
|
||||||
]);
|
{ signal },
|
||||||
|
);
|
||||||
|
|
||||||
if (event10003) {
|
if (event10003) {
|
||||||
const eventIds = getTagSet(event10003.tags, 'e');
|
const eventIds = getTagSet(event10003.tags, 'e');
|
||||||
|
|
|
@ -5,8 +5,9 @@ import { eventsDB } from '@/storages.ts';
|
||||||
|
|
||||||
const instanceController: AppController = async (c) => {
|
const instanceController: AppController = async (c) => {
|
||||||
const { host, protocol } = Conf.url;
|
const { host, protocol } = Conf.url;
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]);
|
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal });
|
||||||
const meta = jsonServerMetaSchema.parse(event?.content);
|
const meta = jsonServerMetaSchema.parse(event?.content);
|
||||||
|
|
||||||
/** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */
|
/** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */
|
||||||
|
|
|
@ -13,7 +13,9 @@ const mediaBodySchema = z.object({
|
||||||
});
|
});
|
||||||
|
|
||||||
const mediaController: AppController = async (c) => {
|
const mediaController: AppController = async (c) => {
|
||||||
|
const pubkey = c.get('pubkey')!;
|
||||||
const result = mediaBodySchema.safeParse(await parseBody(c.req.raw));
|
const result = mediaBodySchema.safeParse(await parseBody(c.req.raw));
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
if (!result.success) {
|
if (!result.success) {
|
||||||
return c.json({ error: 'Bad request.', schema: result.error }, 422);
|
return c.json({ error: 'Bad request.', schema: result.error }, 422);
|
||||||
|
@ -21,7 +23,7 @@ const mediaController: AppController = async (c) => {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { file, description } = result.data;
|
const { file, description } = result.data;
|
||||||
const media = await uploadFile(file, { pubkey: c.get('pubkey')!, description });
|
const media = await uploadFile(file, { pubkey, description }, signal);
|
||||||
return c.json(renderAttachment(media));
|
return c.json(renderAttachment(media));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
|
|
|
@ -6,10 +6,11 @@ import { renderNotification } from '@/views/mastodon/notifications.ts';
|
||||||
const notificationsController: AppController = async (c) => {
|
const notificationsController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
const { since, until } = paginationSchema.parse(c.req.query());
|
const { since, until } = paginationSchema.parse(c.req.query());
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const events = await eventsDB.query(
|
const events = await eventsDB.query(
|
||||||
[{ kinds: [1], '#p': [pubkey], since, until }],
|
[{ kinds: [1], '#p': [pubkey], since, until }],
|
||||||
{ signal: AbortSignal.timeout(3000) },
|
{ signal },
|
||||||
);
|
);
|
||||||
|
|
||||||
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));
|
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));
|
||||||
|
|
|
@ -8,12 +8,14 @@ import { createAdminEvent } from '@/utils/api.ts';
|
||||||
import { jsonSchema } from '@/schema.ts';
|
import { jsonSchema } from '@/schema.ts';
|
||||||
|
|
||||||
const frontendConfigController: AppController = async (c) => {
|
const frontendConfigController: AppController = async (c) => {
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event] = await eventsDB.query([{
|
const [event] = await eventsDB.query([{
|
||||||
kinds: [30078],
|
kinds: [30078],
|
||||||
authors: [Conf.pubkey],
|
authors: [Conf.pubkey],
|
||||||
'#d': ['pub.ditto.pleroma.config'],
|
'#d': ['pub.ditto.pleroma.config'],
|
||||||
limit: 1,
|
limit: 1,
|
||||||
}]);
|
}], { signal });
|
||||||
|
|
||||||
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
||||||
event?.content ? await decryptAdmin(Conf.pubkey, event.content) : '',
|
event?.content ? await decryptAdmin(Conf.pubkey, event.content) : '',
|
||||||
|
@ -35,13 +37,14 @@ const frontendConfigController: AppController = async (c) => {
|
||||||
|
|
||||||
const configController: AppController = async (c) => {
|
const configController: AppController = async (c) => {
|
||||||
const { pubkey } = Conf;
|
const { pubkey } = Conf;
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event] = await eventsDB.query([{
|
const [event] = await eventsDB.query([{
|
||||||
kinds: [30078],
|
kinds: [30078],
|
||||||
authors: [pubkey],
|
authors: [pubkey],
|
||||||
'#d': ['pub.ditto.pleroma.config'],
|
'#d': ['pub.ditto.pleroma.config'],
|
||||||
limit: 1,
|
limit: 1,
|
||||||
}]);
|
}], { signal });
|
||||||
|
|
||||||
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
||||||
event?.content ? await decryptAdmin(pubkey, event.content) : '',
|
event?.content ? await decryptAdmin(pubkey, event.content) : '',
|
||||||
|
@ -53,13 +56,14 @@ const configController: AppController = async (c) => {
|
||||||
/** Pleroma admin config controller. */
|
/** Pleroma admin config controller. */
|
||||||
const updateConfigController: AppController = async (c) => {
|
const updateConfigController: AppController = async (c) => {
|
||||||
const { pubkey } = Conf;
|
const { pubkey } = Conf;
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const [event] = await eventsDB.query([{
|
const [event] = await eventsDB.query([{
|
||||||
kinds: [30078],
|
kinds: [30078],
|
||||||
authors: [pubkey],
|
authors: [pubkey],
|
||||||
'#d': ['pub.ditto.pleroma.config'],
|
'#d': ['pub.ditto.pleroma.config'],
|
||||||
limit: 1,
|
limit: 1,
|
||||||
}]);
|
}], { signal });
|
||||||
|
|
||||||
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
const configs = jsonSchema.pipe(z.array(configSchema)).catch([]).parse(
|
||||||
event?.content ? await decryptAdmin(pubkey, event.content) : '',
|
event?.content ? await decryptAdmin(pubkey, event.content) : '',
|
||||||
|
|
|
@ -25,13 +25,12 @@ type SearchQuery = z.infer<typeof searchQuerySchema>;
|
||||||
|
|
||||||
const searchController: AppController = async (c) => {
|
const searchController: AppController = async (c) => {
|
||||||
const result = searchQuerySchema.safeParse(c.req.query());
|
const result = searchQuerySchema.safeParse(c.req.query());
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
if (!result.success) {
|
if (!result.success) {
|
||||||
return c.json({ error: 'Bad request', schema: result.error }, 422);
|
return c.json({ error: 'Bad request', schema: result.error }, 422);
|
||||||
}
|
}
|
||||||
|
|
||||||
const signal = AbortSignal.timeout(1000);
|
|
||||||
|
|
||||||
const [event, events] = await Promise.all([
|
const [event, events] = await Promise.all([
|
||||||
lookupEvent(result.data, signal),
|
lookupEvent(result.data, signal),
|
||||||
searchEvents(result.data, signal),
|
searchEvents(result.data, signal),
|
||||||
|
@ -46,12 +45,12 @@ const searchController: AppController = async (c) => {
|
||||||
const [accounts, statuses] = await Promise.all([
|
const [accounts, statuses] = await Promise.all([
|
||||||
Promise.all(
|
Promise.all(
|
||||||
results
|
results
|
||||||
.filter((event): event is NostrEvent => event.kind === 0)
|
.filter((event) => event.kind === 0)
|
||||||
.map((event) => renderAccount(event)),
|
.map((event) => renderAccount(event)),
|
||||||
),
|
),
|
||||||
Promise.all(
|
Promise.all(
|
||||||
results
|
results
|
||||||
.filter((event): event is NostrEvent => event.kind === 1)
|
.filter((event) => event.kind === 1)
|
||||||
.map((event) => renderStatus(event, c.get('pubkey'))),
|
.map((event) => renderStatus(event, c.get('pubkey'))),
|
||||||
),
|
),
|
||||||
]);
|
]);
|
||||||
|
|
|
@ -241,10 +241,12 @@ const pinController: AppController = async (c) => {
|
||||||
const unpinController: AppController = async (c) => {
|
const unpinController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
const eventId = c.req.param('id');
|
const eventId = c.req.param('id');
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const event = await getEvent(eventId, {
|
const event = await getEvent(eventId, {
|
||||||
kind: 1,
|
kind: 1,
|
||||||
relations: ['author', 'event_stats', 'author_stats'],
|
relations: ['author', 'event_stats', 'author_stats'],
|
||||||
|
signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
if (event) {
|
if (event) {
|
||||||
|
@ -273,12 +275,13 @@ const zapController: AppController = async (c) => {
|
||||||
const id = c.req.param('id');
|
const id = c.req.param('id');
|
||||||
const body = await parseBody(c.req.raw);
|
const body = await parseBody(c.req.raw);
|
||||||
const params = zapSchema.safeParse(body);
|
const params = zapSchema.safeParse(body);
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
if (!params.success) {
|
if (!params.success) {
|
||||||
return c.json({ error: 'Bad request', schema: params.error }, 400);
|
return c.json({ error: 'Bad request', schema: params.error }, 400);
|
||||||
}
|
}
|
||||||
|
|
||||||
const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'] });
|
const target = await getEvent(id, { kind: 1, relations: ['author', 'event_stats', 'author_stats'], signal });
|
||||||
const author = target?.author;
|
const author = target?.author;
|
||||||
const meta = jsonMetaContentSchema.parse(author?.content);
|
const meta = jsonMetaContentSchema.parse(author?.content);
|
||||||
const lnurl = getLnurl(meta);
|
const lnurl = getLnurl(meta);
|
||||||
|
|
|
@ -31,7 +31,9 @@ const hashtagTimelineController: AppController = (c) => {
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Render statuses for timelines. */
|
/** Render statuses for timelines. */
|
||||||
async function renderStatuses(c: AppContext, filters: DittoFilter[], signal = AbortSignal.timeout(1000)) {
|
async function renderStatuses(c: AppContext, filters: DittoFilter[]) {
|
||||||
|
const { signal } = c.req.raw;
|
||||||
|
|
||||||
const events = await eventsDB.query(
|
const events = await eventsDB.query(
|
||||||
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
|
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
|
||||||
{ signal },
|
{ signal },
|
||||||
|
|
|
@ -4,7 +4,8 @@ import { jsonServerMetaSchema } from '@/schemas/nostr.ts';
|
||||||
import { eventsDB } from '@/storages.ts';
|
import { eventsDB } from '@/storages.ts';
|
||||||
|
|
||||||
const relayInfoController: AppController = async (c) => {
|
const relayInfoController: AppController = async (c) => {
|
||||||
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }]);
|
const { signal } = c.req.raw;
|
||||||
|
const [event] = await eventsDB.query([{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal });
|
||||||
const meta = jsonServerMetaSchema.parse(event?.content);
|
const meta = jsonServerMetaSchema.parse(event?.content);
|
||||||
|
|
||||||
return c.json({
|
return c.json({
|
||||||
|
|
|
@ -78,7 +78,7 @@ function connectStream(socket: WebSocket) {
|
||||||
async function handleEvent([_, event]: ClientEVENT): Promise<void> {
|
async function handleEvent([_, event]: ClientEVENT): Promise<void> {
|
||||||
try {
|
try {
|
||||||
// This will store it (if eligible) and run other side-effects.
|
// This will store it (if eligible) and run other side-effects.
|
||||||
await pipeline.handleEvent(event);
|
await pipeline.handleEvent(event, AbortSignal.timeout(1000));
|
||||||
send(['OK', event.id, true, '']);
|
send(['OK', event.id, true, '']);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof pipeline.RelayError) {
|
if (e instanceof pipeline.RelayError) {
|
||||||
|
|
|
@ -38,7 +38,7 @@ async function insertUser(user: User) {
|
||||||
throw new Error('User already exists');
|
throw new Error('User already exists');
|
||||||
}
|
}
|
||||||
const event = await buildUserEvent(user);
|
const event = await buildUserEvent(user);
|
||||||
return pipeline.handleEvent(event);
|
return pipeline.handleEvent(event, AbortSignal.timeout(1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,7 +48,7 @@ async function insertUser(user: User) {
|
||||||
* await findUser({ username: 'alex' });
|
* await findUser({ username: 'alex' });
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
async function findUser(user: Partial<User>): Promise<User | undefined> {
|
async function findUser(user: Partial<User>, signal?: AbortSignal): Promise<User | undefined> {
|
||||||
const filter: NostrFilter = { kinds: [30361], authors: [Conf.pubkey], limit: 1 };
|
const filter: NostrFilter = { kinds: [30361], authors: [Conf.pubkey], limit: 1 };
|
||||||
|
|
||||||
for (const [key, value] of Object.entries(user)) {
|
for (const [key, value] of Object.entries(user)) {
|
||||||
|
@ -65,7 +65,7 @@ async function findUser(user: Partial<User>): Promise<User | undefined> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const [event] = await eventsDB.query([filter]);
|
const [event] = await eventsDB.query([filter], { signal });
|
||||||
|
|
||||||
if (event) {
|
if (event) {
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -22,6 +22,6 @@ function handleEvent(event: NostrEvent): Promise<void> {
|
||||||
debug(`NostrEvent<${event.kind}> ${event.id}`);
|
debug(`NostrEvent<${event.kind}> ${event.id}`);
|
||||||
|
|
||||||
return pipeline
|
return pipeline
|
||||||
.handleEvent(event)
|
.handleEvent(event, AbortSignal.timeout(5000))
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,7 @@ const debug = Debug('ditto:pipeline');
|
||||||
* Common pipeline function to process (and maybe store) events.
|
* Common pipeline function to process (and maybe store) events.
|
||||||
* It is idempotent, so it can be called multiple times for the same event.
|
* It is idempotent, so it can be called multiple times for the same event.
|
||||||
*/
|
*/
|
||||||
async function handleEvent(event: DittoEvent): Promise<void> {
|
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
|
||||||
const signal = AbortSignal.timeout(5000);
|
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
const wanted = reqmeister.isWanted(event);
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (await encounterEvent(event, signal)) return;
|
if (await encounterEvent(event, signal)) return;
|
||||||
|
@ -203,7 +202,7 @@ async function payZap(event: DittoEvent, signal: AbortSignal) {
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
await handleEvent(nwcRequestEvent);
|
await handleEvent(nwcRequestEvent, signal);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
debug('lnurl error:', e);
|
debug('lnurl error:', e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import { Debug, type EventTemplate, finishEvent, HTTPException, type NostrEvent
|
||||||
import { connectResponseSchema } from '@/schemas/nostr.ts';
|
import { connectResponseSchema } from '@/schemas/nostr.ts';
|
||||||
import { jsonSchema } from '@/schema.ts';
|
import { jsonSchema } from '@/schema.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { eventMatchesTemplate, Time } from '@/utils.ts';
|
import { eventMatchesTemplate } from '@/utils.ts';
|
||||||
import { createAdminEvent } from '@/utils/api.ts';
|
import { createAdminEvent } from '@/utils/api.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:sign');
|
const debug = Debug('ditto:sign');
|
||||||
|
@ -87,9 +87,10 @@ async function awaitSignedEvent(
|
||||||
|
|
||||||
function close(): void {
|
function close(): void {
|
||||||
Sub.close(messageId);
|
Sub.close(messageId);
|
||||||
|
c.req.raw.signal.removeEventListener('abort', close);
|
||||||
}
|
}
|
||||||
|
|
||||||
const timeout = setTimeout(close, Time.minutes(1));
|
c.req.raw.signal.addEventListener('abort', close);
|
||||||
|
|
||||||
for await (const event of sub) {
|
for await (const event of sub) {
|
||||||
const decrypted = await decryptAdmin(event.pubkey, event.content);
|
const decrypted = await decryptAdmin(event.pubkey, event.content);
|
||||||
|
@ -102,7 +103,6 @@ async function awaitSignedEvent(
|
||||||
|
|
||||||
if (result.success) {
|
if (result.success) {
|
||||||
close();
|
close();
|
||||||
clearTimeout(timeout);
|
|
||||||
return result.data.result;
|
return result.data.result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ interface PoolStoreOpts {
|
||||||
pool: InstanceType<typeof RelayPoolWorker>;
|
pool: InstanceType<typeof RelayPoolWorker>;
|
||||||
relays: WebSocket['url'][];
|
relays: WebSocket['url'][];
|
||||||
publisher: {
|
publisher: {
|
||||||
handleEvent(event: NostrEvent): Promise<void>;
|
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ class PoolStore implements NStore {
|
||||||
#pool: InstanceType<typeof RelayPoolWorker>;
|
#pool: InstanceType<typeof RelayPoolWorker>;
|
||||||
#relays: WebSocket['url'][];
|
#relays: WebSocket['url'][];
|
||||||
#publisher: {
|
#publisher: {
|
||||||
handleEvent(event: NostrEvent): Promise<void>;
|
handleEvent(event: NostrEvent, signal: AbortSignal): Promise<void>;
|
||||||
};
|
};
|
||||||
|
|
||||||
constructor(opts: PoolStoreOpts) {
|
constructor(opts: PoolStoreOpts) {
|
||||||
|
@ -60,7 +60,7 @@ class PoolStore implements NStore {
|
||||||
opts.relays ?? this.#relays,
|
opts.relays ?? this.#relays,
|
||||||
(event: NostrEvent | null) => {
|
(event: NostrEvent | null) => {
|
||||||
if (event && matchFilters(filters, event)) {
|
if (event && matchFilters(filters, event)) {
|
||||||
this.#publisher.handleEvent(event).catch(() => {});
|
this.#publisher.handleEvent(event, AbortSignal.timeout(1000)).catch(() => {});
|
||||||
results.add({
|
results.add({
|
||||||
id: event.id,
|
id: event.id,
|
||||||
kind: event.kind,
|
kind: event.kind,
|
||||||
|
|
|
@ -8,7 +8,7 @@ interface FileMeta {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Upload a file, track it in the database, and return the resulting media object. */
|
/** Upload a file, track it in the database, and return the resulting media object. */
|
||||||
async function uploadFile(file: File, meta: FileMeta) {
|
async function uploadFile(file: File, meta: FileMeta, signal?: AbortSignal) {
|
||||||
const { name, type, size } = file;
|
const { name, type, size } = file;
|
||||||
const { pubkey, description } = meta;
|
const { pubkey, description } = meta;
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ async function uploadFile(file: File, meta: FileMeta) {
|
||||||
throw new Error('File size is too large.');
|
throw new Error('File size is too large.');
|
||||||
}
|
}
|
||||||
|
|
||||||
const { cid } = await uploader.upload(file);
|
const { cid } = await uploader.upload(file, signal);
|
||||||
const url = new URL(`/ipfs/${cid}`, Conf.mediaDomain).toString();
|
const url = new URL(`/ipfs/${cid}`, Conf.mediaDomain).toString();
|
||||||
|
|
||||||
return insertUnattachedMedia({
|
return insertUnattachedMedia({
|
||||||
|
|
|
@ -7,11 +7,11 @@ import type { Uploader } from './types.ts';
|
||||||
|
|
||||||
/** Meta-uploader determined from configuration. */
|
/** Meta-uploader determined from configuration. */
|
||||||
const configUploader: Uploader = {
|
const configUploader: Uploader = {
|
||||||
upload(file) {
|
upload(file, signal) {
|
||||||
return uploader().upload(file);
|
return uploader().upload(file, signal);
|
||||||
},
|
},
|
||||||
delete(cid) {
|
delete(cid, signal) {
|
||||||
return uploader().delete(cid);
|
return uploader().delete(cid, signal);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ const ipfsAddResponseSchema = z.object({
|
||||||
* and upload the file using the REST API.
|
* and upload the file using the REST API.
|
||||||
*/
|
*/
|
||||||
const ipfsUploader: Uploader = {
|
const ipfsUploader: Uploader = {
|
||||||
async upload(file) {
|
async upload(file, signal) {
|
||||||
const url = new URL('/api/v0/add', Conf.ipfs.apiUrl);
|
const url = new URL('/api/v0/add', Conf.ipfs.apiUrl);
|
||||||
|
|
||||||
const formData = new FormData();
|
const formData = new FormData();
|
||||||
|
@ -26,6 +26,7 @@ const ipfsUploader: Uploader = {
|
||||||
const response = await fetchWorker(url, {
|
const response = await fetchWorker(url, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
body: formData,
|
body: formData,
|
||||||
|
signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
const { Hash } = ipfsAddResponseSchema.parse(await response.json());
|
const { Hash } = ipfsAddResponseSchema.parse(await response.json());
|
||||||
|
@ -34,7 +35,7 @@ const ipfsUploader: Uploader = {
|
||||||
cid: Hash,
|
cid: Hash,
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
async delete(cid) {
|
async delete(cid, signal) {
|
||||||
const url = new URL('/api/v0/pin/rm', Conf.ipfs.apiUrl);
|
const url = new URL('/api/v0/pin/rm', Conf.ipfs.apiUrl);
|
||||||
|
|
||||||
const query = new URLSearchParams();
|
const query = new URLSearchParams();
|
||||||
|
@ -44,6 +45,7 @@ const ipfsUploader: Uploader = {
|
||||||
|
|
||||||
await fetchWorker(url, {
|
await fetchWorker(url, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
|
signal,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -9,9 +9,10 @@ import type { Uploader } from './types.ts';
|
||||||
* take advantage of IPFS features while not really using IPFS.
|
* take advantage of IPFS features while not really using IPFS.
|
||||||
*/
|
*/
|
||||||
const s3Uploader: Uploader = {
|
const s3Uploader: Uploader = {
|
||||||
async upload(file) {
|
async upload(file, _signal) {
|
||||||
const cid = await IpfsHash.of(file.stream()) as string;
|
const cid = await IpfsHash.of(file.stream()) as string;
|
||||||
|
|
||||||
|
// FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24
|
||||||
await client().putObject(`ipfs/${cid}`, file.stream(), {
|
await client().putObject(`ipfs/${cid}`, file.stream(), {
|
||||||
metadata: {
|
metadata: {
|
||||||
'Content-Type': file.type,
|
'Content-Type': file.type,
|
||||||
|
@ -23,7 +24,8 @@ const s3Uploader: Uploader = {
|
||||||
cid,
|
cid,
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
async delete(cid) {
|
async delete(cid, _signal) {
|
||||||
|
// FIXME: Can't cancel S3 requests: https://github.com/bradenmacdonald/deno-s3-lite-client/issues/24
|
||||||
await client().deleteObject(`ipfs/${cid}`);
|
await client().deleteObject(`ipfs/${cid}`);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
/** Modular uploader interface, to support uploading to different backends. */
|
/** Modular uploader interface, to support uploading to different backends. */
|
||||||
interface Uploader {
|
interface Uploader {
|
||||||
/** Upload the file to the backend. */
|
/** Upload the file to the backend. */
|
||||||
upload(file: File): Promise<UploadResult>;
|
upload(file: File, signal?: AbortSignal): Promise<UploadResult>;
|
||||||
/** Delete the file from the backend. */
|
/** Delete the file from the backend. */
|
||||||
delete(cid: string): Promise<void>;
|
delete(cid: string, signal?: AbortSignal): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return value from the uploader after uploading a file. */
|
/** Return value from the uploader after uploading a file. */
|
||||||
|
|
|
@ -46,12 +46,12 @@ interface UpdateEventFilter extends NostrFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Fetch existing event, update it, then publish the new event. */
|
/** Fetch existing event, update it, then publish the new event. */
|
||||||
async function updateEvent<K extends number, E extends EventStub>(
|
async function updateEvent<E extends EventStub>(
|
||||||
filter: UpdateEventFilter,
|
filter: UpdateEventFilter,
|
||||||
fn: (prev: NostrEvent | undefined) => E,
|
fn: (prev: NostrEvent | undefined) => E,
|
||||||
c: AppContext,
|
c: AppContext,
|
||||||
): Promise<NostrEvent> {
|
): Promise<NostrEvent> {
|
||||||
const [prev] = await eventsDB.query([filter], { limit: 1 });
|
const [prev] = await eventsDB.query([filter], { limit: 1, signal: c.req.raw.signal });
|
||||||
return createEvent(fn(prev), c);
|
return createEvent(fn(prev), c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ async function createAdminEvent(t: EventStub, c: AppContext): Promise<NostrEvent
|
||||||
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
|
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
|
||||||
debug('EVENT', event);
|
debug('EVENT', event);
|
||||||
try {
|
try {
|
||||||
await pipeline.handleEvent(event);
|
await pipeline.handleEvent(event, c.req.raw.signal);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof pipeline.RelayError) {
|
if (e instanceof pipeline.RelayError) {
|
||||||
throw new HTTPException(422, {
|
throw new HTTPException(422, {
|
||||||
|
|
Loading…
Reference in New Issue