Merge remote-tracking branch 'origin/main' into threads

This commit is contained in:
Alex Gleason 2023-12-22 22:56:12 -06:00
commit 17cef2f186
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
11 changed files with 36 additions and 42 deletions

View File

@ -6,9 +6,10 @@ import type { GetFiltersOpts } from '@/filter.ts';
/** Get events from a NIP-01 filter. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
return new Promise((resolve) => {
let tid: number;
const results: Event[] = [];
const unsub = pool.subscribe(
@ -29,24 +30,20 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
}
if (typeof opts.limit === 'number' && results.length >= opts.limit) {
unsub();
clearTimeout(tid);
resolve(results as Event<K>[]);
}
},
undefined,
() => {
unsub();
clearTimeout(tid);
resolve(results as Event<K>[]);
},
);
if (typeof opts.timeout === 'number') {
tid = setTimeout(() => {
unsub();
resolve(results as Event<K>[]);
}, opts.timeout);
}
opts.signal?.addEventListener('abort', () => {
unsub();
resolve(results as Event<K>[]);
});
});
}

View File

@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts';
import { booleanParamSchema, fileSchema } from '@/schema.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { uploadFile } from '@/upload.ts';
import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts';
import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts';
import { paginated, paginationSchema, parseBody } from '@/utils/web.ts';
import { createEvent } from '@/utils/web.ts';
import { renderEventAccounts } from '@/views.ts';
@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => {
const events7 = await mixer.getFilters(
[{ kinds: [7], authors: [pubkey], ...params }],
{ timeout: Time.seconds(1) },
{ signal: AbortSignal.timeout(1000) },
);
const ids = events7
@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => {
.filter((id): id is string => !!id);
const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], {
timeout: Time.seconds(1),
signal: AbortSignal.timeout(1000),
});
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));

View File

@ -1,6 +1,5 @@
import { type AppController } from '@/app.ts';
import * as mixer from '@/mixer.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts';
@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => {
const events = await mixer.getFilters(
[{ kinds: [1], '#p': [pubkey], since, until }],
{ timeout: Time.seconds(3) },
{ signal: AbortSignal.timeout(3000) },
);
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));

View File

@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts';
import { booleanParamSchema } from '@/schema.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import { dedupeEvents, Time } from '@/utils.ts';
import { dedupeEvents } from '@/utils.ts';
import { lookupNip05Cached } from '@/utils/nip05.ts';
import { renderAccount } from '@/views/mastodon/accounts.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] {
}
/** Resolve a searched value into an event, if applicable. */
async function lookupEvent(query: SearchQuery): Promise<Event | undefined> {
async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
const filters = await getLookupFilters(query);
const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) });
const [event] = await mixer.getFilters(filters, { limit: 1, signal });
return event;
}

View File

@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts';
import { getFeedPubkeys } from '@/queries.ts';
import { booleanParamSchema } from '@/schema.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => {
};
/** Render statuses for timelines. */
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) {
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
const events = await mixer.getFilters(
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
{ timeout: Time.seconds(1) },
{ signal },
);
if (!events.length) {

View File

@ -21,8 +21,8 @@ type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubke
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */
timeout?: number;
/** Signal to abort the request. */
signal?: AbortSignal;
/** Event limit for the whole subscription. */
limit?: number;
/** Relays to use, if applicable. */

View File

@ -17,8 +17,6 @@ pool.subscribe(
/** Handle events through the firehose pipeline. */
function handleEvent(event: Event): Promise<void> {
console.info(`firehose: Event<${event.kind}> ${event.id}`);
return pipeline
.handleEvent(event)
.catch(() => {});

View File

@ -26,6 +26,7 @@ async function handleEvent(event: Event): Promise<void> {
if (!(await verifySignatureWorker(event))) return;
const wanted = reqmeister.isWanted(event);
if (encounterEvent(event)) return;
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
const data = await getEventData(event);
await Promise.all([
@ -72,7 +73,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, timeout: Time.seconds(1) },
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
);
if (deletion) {

View File

@ -5,8 +5,8 @@ import * as mixer from '@/mixer.ts';
import { reqmeister } from '@/common.ts';
interface GetEventOpts<K extends number> {
/** Timeout in milliseconds. */
timeout?: number;
/** Signal to abort the request. */
signal?: AbortSignal;
/** Event kind. */
kind?: K;
/** Relations to include on the event. */
@ -18,36 +18,36 @@ const getEvent = async <K extends number = number>(
id: string,
opts: GetEventOpts<K> = {},
): Promise<Event<K> | undefined> => {
const { kind, relations, timeout = 1000 } = opts;
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 };
if (kind) {
filter.kinds = [kind];
}
const [event] = await mixer.getFilters([filter], { limit: 1, timeout });
const [event] = await mixer.getFilters([filter], { limit: 1, signal });
return event;
};
/** Get a Nostr `set_medatadata` event for a user's pubkey. */
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, timeout = 1000 } = opts;
const { relations, signal = AbortSignal.timeout(1000) } = opts;
const event = await eventsDB.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, timeout },
{ limit: 1, signal },
).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
return event;
};
/** Get users the given pubkey follows. */
const getFollows = async (pubkey: string, timeout = 1000): Promise<Event<3> | undefined> => {
const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout });
const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise<Event<3> | undefined> => {
const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
return event;
};
/** Get pubkeys the user follows. */
async function getFollowedPubkeys(pubkey: string): Promise<string[]> {
const event = await getFollows(pubkey);
async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise<string[]> {
const event = await getFollows(pubkey, signal);
if (!event) return [];
return event.tags
@ -79,10 +79,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
return result.reverse();
}
function getDescendants(eventId: string): Promise<Event<1>[]> {
function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
return mixer.getFilters(
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
{ limit: 200, timeout: 2000 },
{ limit: 200, signal },
);
}

View File

@ -7,12 +7,12 @@ const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hour
const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/;
interface LookupOpts {
timeout?: number;
signal?: AbortSignal;
}
/** Get pubkey from NIP-05. */
async function lookup(value: string, opts: LookupOpts = {}): Promise<string | null> {
const { timeout = 2000 } = opts;
const { signal = AbortSignal.timeout(2000) } = opts;
const match = value.match(NIP05_REGEX);
if (!match) return null;
@ -21,7 +21,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise<string | nu
try {
const res = await fetchWorker(`https://${domain}/.well-known/nostr.json?name=${name}`, {
signal: AbortSignal.timeout(timeout),
signal,
});
const { names } = nostrJsonSchema.parse(await res.json());

View File

@ -60,12 +60,12 @@ const previewCardCache = new TTLCache<string, Promise<PreviewCard | null>>({
});
/** Unfurl card from cache if available, otherwise fetch it. */
function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise<PreviewCard | null> {
function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise<PreviewCard | null> {
const cached = previewCardCache.get(url);
if (cached !== undefined) {
return cached;
} else {
const card = unfurlCard(url, AbortSignal.timeout(timeout));
const card = unfurlCard(url, signal);
previewCardCache.set(url, card);
return card;
}