Merge branch 'signal' into 'main'

Replace all timeouts with AbortSignal

See merge request soapbox-pub/ditto!83
This commit is contained in:
Alex Gleason 2023-12-22 19:24:34 +00:00
commit e768032446
11 changed files with 38 additions and 44 deletions

View File

@ -1,19 +1,20 @@
import { type Event, type Filter, matchFilters } from '@/deps.ts'; import { type Event, type Filter, matchFilters } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { allRelays, pool } from '@/pool.ts'; import { activeRelays, pool } from '@/pool.ts';
import type { GetFiltersOpts } from '@/filter.ts'; import type { GetFiltersOpts } from '@/filter.ts';
/** Get events from a NIP-01 filter. */ /** Get events from a NIP-01 filter. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> { function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]);
return new Promise((resolve) => { return new Promise((resolve) => {
let tid: number;
const results: Event[] = []; const results: Event[] = [];
const unsub = pool.subscribe( const unsub = pool.subscribe(
filters, filters,
allRelays, activeRelays,
(event: Event | null) => { (event: Event | null) => {
if (event && matchFilters(filters, event)) { if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {}); pipeline.handleEvent(event).catch(() => {});
@ -29,24 +30,20 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
} }
if (typeof opts.limit === 'number' && results.length >= opts.limit) { if (typeof opts.limit === 'number' && results.length >= opts.limit) {
unsub(); unsub();
clearTimeout(tid);
resolve(results as Event<K>[]); resolve(results as Event<K>[]);
} }
}, },
undefined, undefined,
() => { () => {
unsub(); unsub();
clearTimeout(tid);
resolve(results as Event<K>[]); resolve(results as Event<K>[]);
}, },
); );
if (typeof opts.timeout === 'number') { opts.signal?.addEventListener('abort', () => {
tid = setTimeout(() => { unsub();
unsub(); resolve(results as Event<K>[]);
resolve(results as Event<K>[]); });
}, opts.timeout);
}
}); });
} }

View File

@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts';
import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { uploadFile } from '@/upload.ts'; import { uploadFile } from '@/upload.ts';
import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts'; import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts';
import { paginated, paginationSchema, parseBody } from '@/utils/web.ts'; import { paginated, paginationSchema, parseBody } from '@/utils/web.ts';
import { createEvent } from '@/utils/web.ts'; import { createEvent } from '@/utils/web.ts';
import { renderEventAccounts } from '@/views.ts'; import { renderEventAccounts } from '@/views.ts';
@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => {
const events7 = await mixer.getFilters( const events7 = await mixer.getFilters(
[{ kinds: [7], authors: [pubkey], ...params }], [{ kinds: [7], authors: [pubkey], ...params }],
{ timeout: Time.seconds(1) }, { signal: AbortSignal.timeout(1000) },
); );
const ids = events7 const ids = events7
@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => {
.filter((id): id is string => !!id); .filter((id): id is string => !!id);
const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], {
timeout: Time.seconds(1), signal: AbortSignal.timeout(1000),
}); });
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));

View File

@ -1,6 +1,5 @@
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts'; import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts';
@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => {
const events = await mixer.getFilters( const events = await mixer.getFilters(
[{ kinds: [1], '#p': [pubkey], since, until }], [{ kinds: [1], '#p': [pubkey], since, until }],
{ timeout: Time.seconds(3) }, { signal: AbortSignal.timeout(3000) },
); );
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));

View File

@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { booleanParamSchema } from '@/schema.ts'; import { booleanParamSchema } from '@/schema.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts';
import { dedupeEvents, Time } from '@/utils.ts'; import { dedupeEvents } from '@/utils.ts';
import { lookupNip05Cached } from '@/utils/nip05.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts';
import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] {
} }
/** Resolve a searched value into an event, if applicable. */ /** Resolve a searched value into an event, if applicable. */
async function lookupEvent(query: SearchQuery): Promise<Event | undefined> { async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
const filters = await getLookupFilters(query); const filters = await getLookupFilters(query);
const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) }); const [event] = await mixer.getFilters(filters, { limit: 1, signal });
return event; return event;
} }

View File

@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { booleanParamSchema } from '@/schema.ts'; import { booleanParamSchema } from '@/schema.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts'; import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => {
}; };
/** Render statuses for timelines. */ /** Render statuses for timelines. */
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) { async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
const events = await mixer.getFilters( const events = await mixer.getFilters(
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
{ timeout: Time.seconds(1) }, { signal },
); );
if (!events.length) { if (!events.length) {

View File

@ -16,8 +16,8 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
/** Additional options to apply to the whole subscription. */ /** Additional options to apply to the whole subscription. */
interface GetFiltersOpts { interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */ /** Signal to abort the request. */
timeout?: number; signal?: AbortSignal;
/** Event limit for the whole subscription. */ /** Event limit for the whole subscription. */
limit?: number; limit?: number;
} }

View File

@ -17,8 +17,6 @@ pool.subscribe(
/** Handle events through the firehose pipeline. */ /** Handle events through the firehose pipeline. */
function handleEvent(event: Event): Promise<void> { function handleEvent(event: Event): Promise<void> {
console.info(`firehose: Event<${event.kind}> ${event.id}`);
return pipeline return pipeline
.handleEvent(event) .handleEvent(event)
.catch(() => {}); .catch(() => {});

View File

@ -24,6 +24,7 @@ import type { EventData } from '@/types.ts';
async function handleEvent(event: Event): Promise<void> { async function handleEvent(event: Event): Promise<void> {
if (!(await verifySignatureWorker(event))) return; if (!(await verifySignatureWorker(event))) return;
if (encounterEvent(event)) return; if (encounterEvent(event)) return;
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
const data = await getEventData(event); const data = await getEventData(event);
await Promise.all([ await Promise.all([
@ -63,7 +64,7 @@ async function storeEvent(event: Event, data: EventData): Promise<void> {
if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const [deletion] = await mixer.getFilters( const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, timeout: Time.seconds(1) }, { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
); );
if (deletion) { if (deletion) {

View File

@ -4,8 +4,8 @@ import { type DittoFilter, type Relation } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
interface GetEventOpts<K extends number> { interface GetEventOpts<K extends number> {
/** Timeout in milliseconds. */ /** Signal to abort the request. */
timeout?: number; signal?: AbortSignal;
/** Event kind. */ /** Event kind. */
kind?: K; kind?: K;
/** Relations to include on the event. */ /** Relations to include on the event. */
@ -17,36 +17,36 @@ const getEvent = async <K extends number = number>(
id: string, id: string,
opts: GetEventOpts<K> = {}, opts: GetEventOpts<K> = {},
): Promise<Event<K> | undefined> => { ): Promise<Event<K> | undefined> => {
const { kind, relations, timeout = 1000 } = opts; const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 }; const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 };
if (kind) { if (kind) {
filter.kinds = [kind]; filter.kinds = [kind];
} }
const [event] = await mixer.getFilters([filter], { limit: 1, timeout }); const [event] = await mixer.getFilters([filter], { limit: 1, signal });
return event; return event;
}; };
/** Get a Nostr `set_medatadata` event for a user's pubkey. */ /** Get a Nostr `set_medatadata` event for a user's pubkey. */
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => { const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, timeout = 1000 } = opts; const { relations, signal = AbortSignal.timeout(1000) } = opts;
const [event] = await mixer.getFilters( const [event] = await mixer.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }], [{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, timeout }, { limit: 1, signal },
); );
return event; return event;
}; };
/** Get users the given pubkey follows. */ /** Get users the given pubkey follows. */
const getFollows = async (pubkey: string, timeout = 1000): Promise<Event<3> | undefined> => { const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise<Event<3> | undefined> => {
const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout }); const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
return event; return event;
}; };
/** Get pubkeys the user follows. */ /** Get pubkeys the user follows. */
async function getFollowedPubkeys(pubkey: string): Promise<string[]> { async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise<string[]> {
const event = await getFollows(pubkey); const event = await getFollows(pubkey, signal);
if (!event) return []; if (!event) return [];
return event.tags return event.tags
@ -78,10 +78,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
return result.reverse(); return result.reverse();
} }
function getDescendants(eventId: string): Promise<Event<1>[]> { function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
return mixer.getFilters( return mixer.getFilters(
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
{ limit: 200, timeout: 2000 }, { limit: 200, signal },
); );
} }

View File

@ -7,12 +7,12 @@ const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hour
const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/;
interface LookupOpts { interface LookupOpts {
timeout?: number; signal?: AbortSignal;
} }
/** Get pubkey from NIP-05. */ /** Get pubkey from NIP-05. */
async function lookup(value: string, opts: LookupOpts = {}): Promise<string | null> { async function lookup(value: string, opts: LookupOpts = {}): Promise<string | null> {
const { timeout = 2000 } = opts; const { signal = AbortSignal.timeout(2000) } = opts;
const match = value.match(NIP05_REGEX); const match = value.match(NIP05_REGEX);
if (!match) return null; if (!match) return null;
@ -21,7 +21,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise<string | nu
try { try {
const res = await fetchWorker(`https://${domain}/.well-known/nostr.json?name=${name}`, { const res = await fetchWorker(`https://${domain}/.well-known/nostr.json?name=${name}`, {
signal: AbortSignal.timeout(timeout), signal,
}); });
const { names } = nostrJsonSchema.parse(await res.json()); const { names } = nostrJsonSchema.parse(await res.json());

View File

@ -60,12 +60,12 @@ const previewCardCache = new TTLCache<string, Promise<PreviewCard | null>>({
}); });
/** Unfurl card from cache if available, otherwise fetch it. */ /** Unfurl card from cache if available, otherwise fetch it. */
function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise<PreviewCard | null> { function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise<PreviewCard | null> {
const cached = previewCardCache.get(url); const cached = previewCardCache.get(url);
if (cached !== undefined) { if (cached !== undefined) {
return cached; return cached;
} else { } else {
const card = unfurlCard(url, AbortSignal.timeout(timeout)); const card = unfurlCard(url, signal);
previewCardCache.set(url, card); previewCardCache.set(url, card);
return card; return card;
} }