Add pipeline module
This commit is contained in:
parent
3890df1a2d
commit
87c67c0a39
|
@ -1,9 +1,8 @@
|
||||||
import { insertEvent, isLocallyFollowed } from '@/db/events.ts';
|
import { getActiveRelays } from '@/db/relays.ts';
|
||||||
import { addRelays, getActiveRelays } from '@/db/relays.ts';
|
|
||||||
import { findUser } from '@/db/users.ts';
|
|
||||||
import { type Event, RelayPool } from '@/deps.ts';
|
import { type Event, RelayPool } from '@/deps.ts';
|
||||||
import { trends } from '@/trends.ts';
|
import { nostrNow } from '@/utils.ts';
|
||||||
import { isRelay, nostrDate, nostrNow } from '@/utils.ts';
|
|
||||||
|
import * as pipeline from './pipeline.ts';
|
||||||
|
|
||||||
const relays = await getActiveRelays();
|
const relays = await getActiveRelays();
|
||||||
const pool = new RelayPool(relays);
|
const pool = new RelayPool(relays);
|
||||||
|
@ -20,48 +19,7 @@ pool.subscribe(
|
||||||
);
|
);
|
||||||
|
|
||||||
/** Handle events through the firehose pipeline. */
|
/** Handle events through the firehose pipeline. */
|
||||||
async function handleEvent(event: Event): Promise<void> {
|
function handleEvent(event: Event): Promise<void> {
|
||||||
console.info(`firehose: Event<${event.kind}> ${event.id}`);
|
console.info(`firehose: Event<${event.kind}> ${event.id}`);
|
||||||
|
return pipeline.handleEvent(event);
|
||||||
trackHashtags(event);
|
|
||||||
trackRelays(event);
|
|
||||||
|
|
||||||
if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) {
|
|
||||||
insertEvent(event).catch(console.warn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Track whenever a hashtag is used, for processing trending tags. */
|
|
||||||
function trackHashtags(event: Event): void {
|
|
||||||
const date = nostrDate(event.created_at);
|
|
||||||
|
|
||||||
const tags = event.tags
|
|
||||||
.filter((tag) => tag[0] === 't')
|
|
||||||
.map((tag) => tag[1])
|
|
||||||
.slice(0, 5);
|
|
||||||
|
|
||||||
if (!tags.length) return;
|
|
||||||
|
|
||||||
try {
|
|
||||||
console.info('tracking tags:', tags);
|
|
||||||
trends.addTagUsages(event.pubkey, tags, date);
|
|
||||||
} catch (_e) {
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Tracks known relays in the database. */
|
|
||||||
function trackRelays(event: Event) {
|
|
||||||
const relays = new Set<`wss://${string}`>();
|
|
||||||
|
|
||||||
event.tags.forEach((tag) => {
|
|
||||||
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
|
|
||||||
relays.add(tag[2]);
|
|
||||||
}
|
|
||||||
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
|
|
||||||
relays.add(tag[1]);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return addRelays([...relays]);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
import { insertEvent, isLocallyFollowed } from '@/db/events.ts';
|
||||||
|
import { addRelays } from '@/db/relays.ts';
|
||||||
|
import { findUser } from '@/db/users.ts';
|
||||||
|
import { type Event } from '@/deps.ts';
|
||||||
|
import { trends } from '@/trends.ts';
|
||||||
|
import { isRelay, nostrDate } from '@/utils.ts';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common pipeline function to process (and maybe store) events.
|
||||||
|
* It is idempotent, so it can be called multiple times for the same event.
|
||||||
|
*/
|
||||||
|
async function handleEvent(event: Event): Promise<void> {
|
||||||
|
console.info(`firehose: Event<${event.kind}> ${event.id}`);
|
||||||
|
|
||||||
|
trackHashtags(event);
|
||||||
|
trackRelays(event);
|
||||||
|
|
||||||
|
if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) {
|
||||||
|
insertEvent(event).catch(console.warn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Track whenever a hashtag is used, for processing trending tags. */
|
||||||
|
function trackHashtags(event: Event): void {
|
||||||
|
const date = nostrDate(event.created_at);
|
||||||
|
|
||||||
|
const tags = event.tags
|
||||||
|
.filter((tag) => tag[0] === 't')
|
||||||
|
.map((tag) => tag[1])
|
||||||
|
.slice(0, 5);
|
||||||
|
|
||||||
|
if (!tags.length) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
console.info('tracking tags:', tags);
|
||||||
|
trends.addTagUsages(event.pubkey, tags, date);
|
||||||
|
} catch (_e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Tracks known relays in the database. */
|
||||||
|
function trackRelays(event: Event) {
|
||||||
|
const relays = new Set<`wss://${string}`>();
|
||||||
|
|
||||||
|
event.tags.forEach((tag) => {
|
||||||
|
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
|
||||||
|
relays.add(tag[2]);
|
||||||
|
}
|
||||||
|
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
|
||||||
|
relays.add(tag[1]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return addRelays([...relays]);
|
||||||
|
}
|
||||||
|
|
||||||
|
export { handleEvent };
|
Loading…
Reference in New Issue