debug: firehose, stats, sub
This commit is contained in:
parent
c335deca74
commit
52d39c7a56
|
@ -1,9 +1,11 @@
|
||||||
import { type Event } from '@/deps.ts';
|
import { Debug, type Event } from '@/deps.ts';
|
||||||
import { activeRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
import { nostrNow } from '@/utils.ts';
|
import { nostrNow } from '@/utils.ts';
|
||||||
|
|
||||||
import * as pipeline from './pipeline.ts';
|
import * as pipeline from './pipeline.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:firehose');
|
||||||
|
|
||||||
// This file watches events on all known relays and performs
|
// This file watches events on all known relays and performs
|
||||||
// side-effects based on them, such as trending hashtag tracking
|
// side-effects based on them, such as trending hashtag tracking
|
||||||
// and storing events for notifications and the home feed.
|
// and storing events for notifications and the home feed.
|
||||||
|
@ -17,6 +19,8 @@ pool.subscribe(
|
||||||
|
|
||||||
/** Handle events through the firehose pipeline. */
|
/** Handle events through the firehose pipeline. */
|
||||||
function handleEvent(event: Event): Promise<void> {
|
function handleEvent(event: Event): Promise<void> {
|
||||||
|
debug(`Event<${event.kind}> ${event.id}`);
|
||||||
|
|
||||||
return pipeline
|
return pipeline
|
||||||
.handleEvent(event)
|
.handleEvent(event)
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import * as eventsDB from '@/db/events.ts';
|
||||||
import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
||||||
|
|
||||||
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
||||||
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
|
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
|
||||||
|
@ -9,6 +9,8 @@ type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: n
|
||||||
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
|
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
|
||||||
type StatDiff = AuthorStatDiff | EventStatDiff;
|
type StatDiff = AuthorStatDiff | EventStatDiff;
|
||||||
|
|
||||||
|
const debug = Debug('ditto:stats');
|
||||||
|
|
||||||
/** Store stats for the event in LMDB. */
|
/** Store stats for the event in LMDB. */
|
||||||
async function updateStats<K extends number>(event: Event<K>) {
|
async function updateStats<K extends number>(event: Event<K>) {
|
||||||
let prev: Event<K> | undefined;
|
let prev: Event<K> | undefined;
|
||||||
|
@ -26,6 +28,10 @@ async function updateStats<K extends number>(event: Event<K>) {
|
||||||
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
|
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
|
||||||
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
|
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
|
||||||
|
|
||||||
|
if (statDiffs.length) {
|
||||||
|
debug({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs });
|
||||||
|
}
|
||||||
|
|
||||||
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
|
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
|
||||||
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));
|
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import { type Event } from '@/deps.ts';
|
import { Debug, type Event } from '@/deps.ts';
|
||||||
import { Subscription } from '@/subscription.ts';
|
import { Subscription } from '@/subscription.ts';
|
||||||
|
|
||||||
import type { DittoFilter } from '@/filter.ts';
|
import type { DittoFilter } from '@/filter.ts';
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:subs');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages Ditto event subscriptions.
|
* Manages Ditto event subscriptions.
|
||||||
* Subscriptions can be added, removed, and matched against events.
|
* Subscriptions can be added, removed, and matched against events.
|
||||||
|
@ -21,6 +23,7 @@ class SubscriptionStore {
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
||||||
|
debug('sub', id, filters);
|
||||||
let subs = this.#store.get(socket);
|
let subs = this.#store.get(socket);
|
||||||
|
|
||||||
if (!subs) {
|
if (!subs) {
|
||||||
|
@ -38,12 +41,14 @@ class SubscriptionStore {
|
||||||
|
|
||||||
/** Remove a subscription from the store. */
|
/** Remove a subscription from the store. */
|
||||||
unsub(socket: unknown, id: string): void {
|
unsub(socket: unknown, id: string): void {
|
||||||
|
debug('unsub', id);
|
||||||
this.#store.get(socket)?.get(id)?.close();
|
this.#store.get(socket)?.get(id)?.close();
|
||||||
this.#store.get(socket)?.delete(id);
|
this.#store.get(socket)?.delete(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Remove an entire socket. */
|
/** Remove an entire socket. */
|
||||||
close(socket: unknown): void {
|
close(socket: unknown): void {
|
||||||
|
debug('close', socket);
|
||||||
const subs = this.#store.get(socket);
|
const subs = this.#store.get(socket);
|
||||||
|
|
||||||
if (subs) {
|
if (subs) {
|
||||||
|
|
Loading…
Reference in New Issue