ditto/src/pipeline.ts

243 lines
7.4 KiB
TypeScript
Raw Normal View History

2023-09-03 18:49:45 -05:00
import { Conf } from '@/config.ts';
2024-01-22 12:42:39 -06:00
import { encryptAdmin } from '@/crypto.ts';
2023-08-17 16:47:22 -05:00
import { addRelays } from '@/db/relays.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
2023-08-17 16:47:22 -05:00
import { findUser } from '@/db/users.ts';
2024-01-22 14:24:37 -06:00
import { Debug, type Event, LNURL } from '@/deps.ts';
2023-08-26 13:40:10 -05:00
import { isEphemeralKind } from '@/kinds.ts';
2023-08-17 19:32:05 -05:00
import { isLocallyFollowed } from '@/queries.ts';
2023-12-10 11:10:11 -06:00
import { updateStats } from '@/stats.ts';
import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts';
2023-08-23 23:25:38 -05:00
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts';
2023-12-27 23:35:42 -06:00
import { type EventData } from '@/types.ts';
2024-01-16 17:46:53 -06:00
import { eventAge, isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
import { fetchWorker } from '@/workers/fetch.ts';
import { TrendsWorker } from '@/workers/trends.ts';
2023-12-03 16:55:34 -06:00
import { verifySignatureWorker } from '@/workers/verify.ts';
2024-01-16 17:46:53 -06:00
import { signAdminEvent } from '@/sign.ts';
2024-01-22 12:42:39 -06:00
import { lnurlCache } from '@/utils/lnurl.ts';
2023-08-17 16:47:22 -05:00
const debug = Debug('ditto:pipeline');
2023-08-17 16:47:22 -05:00
/**
* Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event.
*/
async function handleEvent(event: Event): Promise<void> {
const signal = AbortSignal.timeout(5000);
2023-12-03 16:55:34 -06:00
if (!(await verifySignatureWorker(event))) return;
2023-12-21 14:56:21 -06:00
const wanted = reqmeister.isWanted(event);
2023-12-29 13:35:57 -06:00
if (await encounterEvent(event)) return;
debug(`Event<${event.kind}> ${event.id}`);
2023-08-24 17:00:08 -05:00
const data = await getEventData(event);
await Promise.all([
2023-12-21 14:56:21 -06:00
storeEvent(event, data, { force: wanted }),
processDeletions(event),
trackRelays(event),
trackHashtags(event),
fetchRelatedEvents(event, data, signal),
processMedia(event, data),
payZap(event, data, signal),
2023-08-24 17:00:08 -05:00
streamOut(event, data),
broadcast(event, data),
]);
}
2023-08-17 16:47:22 -05:00
/** Encounter the event, and return whether it has already been encountered. */
2023-12-29 13:35:57 -06:00
async function encounterEvent(event: Event): Promise<boolean> {
const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0;
memorelay.add(event);
reqmeister.add(event);
2023-12-27 23:35:42 -06:00
return preexisting;
}
2023-08-24 17:00:08 -05:00
/** Preload data that will be useful to several tasks. */
async function getEventData({ pubkey }: Event): Promise<EventData> {
const user = await findUser({ pubkey });
return { user };
}
2023-09-03 18:49:45 -05:00
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
2023-12-21 14:56:21 -06:00
interface StoreEventOpts {
force?: boolean;
}
/** Maybe store the event, if eligible. */
2023-12-21 14:56:21 -06:00
async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
2023-08-26 13:40:10 -05:00
if (isEphemeralKind(event.kind)) return;
2023-12-21 14:56:21 -06:00
const { force = false } = opts;
2023-12-21 14:56:21 -06:00
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const isDeleted = await eventsDB.count(
[{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }],
) > 0;
if (isDeleted) {
return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else {
2023-12-10 11:10:11 -06:00
await Promise.all([
eventsDB.add(event, { data }).catch(debug),
2023-12-27 20:19:59 -06:00
updateStats(event).catch(debug),
2023-12-10 11:10:11 -06:00
]);
}
} else {
return Promise.reject(new RelayError('blocked', 'only registered users can post'));
2023-08-17 16:47:22 -05:00
}
}
/** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */
async function processDeletions(event: Event): Promise<void> {
if (event.kind === 5) {
const ids = getTagSet(event.tags, 'e');
2024-01-11 19:11:04 -06:00
if (event.pubkey === Conf.pubkey) {
await eventsDB.deleteFilters([{ ids: [...ids] }]);
} else {
const events = await eventsDB.filter([{
ids: [...ids],
authors: [event.pubkey],
}]);
2024-01-11 19:11:04 -06:00
const deleteIds = events.map(({ id }) => id);
await eventsDB.deleteFilters([{ ids: deleteIds }]);
}
}
}
2023-08-17 16:47:22 -05:00
/** Track whenever a hashtag is used, for processing trending tags. */
async function trackHashtags(event: Event): Promise<void> {
2023-08-17 16:47:22 -05:00
const date = nostrDate(event.created_at);
const tags = event.tags
.filter((tag) => tag[0] === 't')
.map((tag) => tag[1])
.slice(0, 5);
if (!tags.length) return;
try {
2023-12-27 22:27:05 -06:00
debug('tracking tags:', JSON.stringify(tags));
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
2023-08-17 16:47:22 -05:00
} catch (_e) {
// do nothing
}
}
/** Tracks known relays in the database. */
function trackRelays(event: Event) {
const relays = new Set<`wss://${string}`>();
event.tags.forEach((tag) => {
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
relays.add(tag[2]);
}
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
relays.add(tag[1]);
}
});
return addRelays([...relays]);
}
/** Queue related events to fetch. */
function fetchRelatedEvents(event: Event, data: EventData, signal: AbortSignal) {
2023-12-21 14:56:21 -06:00
if (!data.user) {
reqmeister.req({ kinds: [0], authors: [event.pubkey] }, { signal }).catch(() => {});
2023-12-21 14:56:21 -06:00
}
for (const [name, id, relay] of event.tags) {
if (name === 'e' && !memorelay.count([{ ids: [id] }])) {
2023-12-28 13:41:04 -06:00
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
2023-12-21 14:56:21 -06:00
}
}
}
/** Delete unattached media entries that are attached to the event. */
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
if (user) {
const urls = getTagSet(tags, 'media');
return deleteAttachedMedia(pubkey, [...urls]);
}
}
2024-01-22 14:24:37 -06:00
/** Emit Nostr Wallet Connect event from zaps so users may pay. */
async function payZap(event: Event, data: EventData, signal: AbortSignal) {
2024-01-22 14:24:37 -06:00
if (event.kind !== 9734 || !data.user) return;
const lnurl = event.tags.find(([name]) => name === 'lnurl')?.[1];
const amount = Number(event.tags.find(([name]) => name === 'amount')?.[1]);
if (!lnurl || !amount) return;
try {
const details = await lnurlCache.fetch(lnurl, { signal });
if (details.tag !== 'payRequest' || !details.allowsNostr || !details.nostrPubkey) {
throw new Error('invalid lnurl');
}
if (amount > details.maxSendable || amount < details.minSendable) {
throw new Error('amount out of range');
}
2024-01-22 14:24:37 -06:00
const { pr } = await LNURL.callback(
details.callback,
{ amount, nostr: event, lnurl },
{ fetch: fetchWorker, signal },
);
const nwcRequestEvent = await signAdminEvent({
kind: 23194,
content: await encryptAdmin(
event.pubkey,
JSON.stringify({ method: 'pay_invoice', params: { invoice: pr } }),
),
created_at: nostrNow(),
tags: [
['p', event.pubkey],
['e', event.id],
],
});
await handleEvent(nwcRequestEvent);
} catch (e) {
debug('lnurl error:', e);
}
}
/** Determine if the event is being received in a timely manner. */
2023-09-05 17:10:20 -05:00
const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10);
2023-08-23 23:25:38 -05:00
/** Distribute the event through active subscriptions. */
2023-08-24 17:00:08 -05:00
function streamOut(event: Event, data: EventData) {
if (!isFresh(event)) return;
for (const sub of Sub.matches(event, data)) {
sub.stream(event);
2023-08-23 23:25:38 -05:00
}
}
/**
* Publish the event to other relays.
* This should only be done in certain circumstances, like mentioning a user or publishing deletions.
*/
function broadcast(event: Event, data: EventData) {
if (!data.user || !isFresh(event)) return;
if (event.kind === 5) {
client.add(event);
}
}
/** NIP-20 command line result. */
class RelayError extends Error {
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {
super(`${prefix}: ${message}`);
}
}
export { handleEvent, RelayError };