2023-08-17 23:07:25 +00:00
|
|
|
import * as eventsDB from '@/db/events.ts';
|
2023-08-17 21:47:22 +00:00
|
|
|
import { addRelays } from '@/db/relays.ts';
|
|
|
|
import { findUser } from '@/db/users.ts';
|
2023-08-24 22:39:24 +00:00
|
|
|
import { type Event, LRUCache } from '@/deps.ts';
|
2023-08-26 18:40:10 +00:00
|
|
|
import { isEphemeralKind } from '@/kinds.ts';
|
2023-08-18 00:32:05 +00:00
|
|
|
import { isLocallyFollowed } from '@/queries.ts';
|
2023-08-24 04:25:38 +00:00
|
|
|
import { Sub } from '@/subs.ts';
|
2023-08-17 21:47:22 +00:00
|
|
|
import { trends } from '@/trends.ts';
|
2023-08-24 22:26:46 +00:00
|
|
|
import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
|
2023-08-17 21:47:22 +00:00
|
|
|
|
2023-08-24 22:00:08 +00:00
|
|
|
import type { EventData } from '@/types.ts';
|
|
|
|
|
2023-08-17 21:47:22 +00:00
|
|
|
/**
|
|
|
|
* Common pipeline function to process (and maybe store) events.
|
|
|
|
* It is idempotent, so it can be called multiple times for the same event.
|
|
|
|
*/
|
|
|
|
async function handleEvent(event: Event): Promise<void> {
|
2023-08-24 22:39:24 +00:00
|
|
|
if (encounterEvent(event)) return;
|
2023-08-24 22:00:08 +00:00
|
|
|
const data = await getEventData(event);
|
|
|
|
|
2023-08-17 23:07:25 +00:00
|
|
|
await Promise.all([
|
2023-08-24 22:00:08 +00:00
|
|
|
storeEvent(event, data),
|
2023-08-17 23:07:25 +00:00
|
|
|
trackRelays(event),
|
2023-08-18 01:24:16 +00:00
|
|
|
trackHashtags(event),
|
2023-08-24 22:00:08 +00:00
|
|
|
streamOut(event, data),
|
2023-08-17 23:07:25 +00:00
|
|
|
]);
|
|
|
|
}
|
2023-08-17 21:47:22 +00:00
|
|
|
|
2023-08-24 22:39:24 +00:00
|
|
|
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
|
|
|
|
const encounters = new LRUCache<string, boolean>({ max: 1000 });
|
|
|
|
|
|
|
|
/** Encounter the event, and return whether it has already been encountered. */
|
|
|
|
function encounterEvent(event: Event) {
|
|
|
|
const result = encounters.get(event.id);
|
|
|
|
encounters.set(event.id, true);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2023-08-24 22:00:08 +00:00
|
|
|
/** Preload data that will be useful to several tasks. */
|
|
|
|
async function getEventData({ pubkey }: Event): Promise<EventData> {
|
|
|
|
const user = await findUser({ pubkey });
|
|
|
|
return { user };
|
|
|
|
}
|
|
|
|
|
2023-08-17 23:07:25 +00:00
|
|
|
/** Maybe store the event, if eligible. */
|
2023-08-24 22:00:08 +00:00
|
|
|
async function storeEvent(event: Event, data: EventData): Promise<void> {
|
2023-08-26 18:40:10 +00:00
|
|
|
if (isEphemeralKind(event.kind)) return;
|
2023-08-24 22:00:08 +00:00
|
|
|
if (data.user || await isLocallyFollowed(event.pubkey)) {
|
2023-08-17 23:07:25 +00:00
|
|
|
await eventsDB.insertEvent(event).catch(console.warn);
|
2023-08-18 01:24:16 +00:00
|
|
|
} else {
|
|
|
|
return Promise.reject(new RelayError('blocked', 'only registered users can post'));
|
2023-08-17 21:47:22 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Track whenever a hashtag is used, for processing trending tags. */
|
2023-08-17 23:07:25 +00:00
|
|
|
// deno-lint-ignore require-await
|
|
|
|
async function trackHashtags(event: Event): Promise<void> {
|
2023-08-17 21:47:22 +00:00
|
|
|
const date = nostrDate(event.created_at);
|
|
|
|
|
|
|
|
const tags = event.tags
|
|
|
|
.filter((tag) => tag[0] === 't')
|
|
|
|
.map((tag) => tag[1])
|
|
|
|
.slice(0, 5);
|
|
|
|
|
|
|
|
if (!tags.length) return;
|
|
|
|
|
|
|
|
try {
|
|
|
|
console.info('tracking tags:', tags);
|
|
|
|
trends.addTagUsages(event.pubkey, tags, date);
|
|
|
|
} catch (_e) {
|
|
|
|
// do nothing
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Tracks known relays in the database. */
|
|
|
|
function trackRelays(event: Event) {
|
|
|
|
const relays = new Set<`wss://${string}`>();
|
|
|
|
|
|
|
|
event.tags.forEach((tag) => {
|
|
|
|
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
|
|
|
|
relays.add(tag[2]);
|
|
|
|
}
|
|
|
|
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
|
|
|
|
relays.add(tag[1]);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return addRelays([...relays]);
|
|
|
|
}
|
|
|
|
|
2023-08-24 22:26:46 +00:00
|
|
|
/** Determine if the event is being received in a timely manner. */
|
|
|
|
const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10);
|
|
|
|
|
2023-08-24 04:25:38 +00:00
|
|
|
/** Distribute the event through active subscriptions. */
|
2023-08-24 22:00:08 +00:00
|
|
|
function streamOut(event: Event, data: EventData) {
|
2023-08-24 22:26:46 +00:00
|
|
|
if (!isFresh(event)) return;
|
|
|
|
|
2023-08-25 18:35:20 +00:00
|
|
|
for (const sub of Sub.matches(event, data)) {
|
|
|
|
sub.stream(event);
|
2023-08-24 04:25:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-18 01:24:16 +00:00
|
|
|
/** NIP-20 command line result. */
|
|
|
|
class RelayError extends Error {
|
|
|
|
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {
|
|
|
|
super(`${prefix}: ${message}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export { handleEvent, RelayError };
|