Merge branch 'rework-stats' into 'main'
Rework stats See merge request soapbox-pub/ditto!308
This commit is contained in:
commit
ab22feacd2
|
@ -1,6 +1,8 @@
|
|||
import { nip19 } from 'nostr-tools';
|
||||
|
||||
import { refreshAuthorStats } from '@/stats.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { refreshAuthorStats } from '@/utils/stats.ts';
|
||||
|
||||
let pubkey: string;
|
||||
try {
|
||||
|
@ -15,4 +17,7 @@ try {
|
|||
Deno.exit(1);
|
||||
}
|
||||
|
||||
await refreshAuthorStats(pubkey);
|
||||
const store = await Storages.db();
|
||||
const kysely = await DittoDB.getInstance();
|
||||
|
||||
await refreshAuthorStats({ pubkey, kysely, store });
|
||||
|
|
|
@ -47,7 +47,7 @@ export class DittoDB {
|
|||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: new URL(import.meta.resolve('../db/migrations')).pathname,
|
||||
migrationFolder: new URL(import.meta.resolve('./migrations')).pathname,
|
||||
}),
|
||||
});
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ interface EventStatsRow {
|
|||
replies_count: number;
|
||||
reposts_count: number;
|
||||
reactions_count: number;
|
||||
reactions: string;
|
||||
}
|
||||
|
||||
interface EventRow {
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
import { Kysely } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await db.schema
|
||||
.alterTable('event_stats')
|
||||
.addColumn('reactions', 'text', (col) => col.defaultTo('{}'))
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.alterTable('event_stats').dropColumn('reactions').execute();
|
||||
}
|
|
@ -11,7 +11,7 @@ export interface AuthorStats {
|
|||
export interface EventStats {
|
||||
replies_count: number;
|
||||
reposts_count: number;
|
||||
reactions_count: number;
|
||||
reactions: Record<string, number>;
|
||||
}
|
||||
|
||||
/** Internal Event representation used by Ditto, including extra keys. */
|
||||
|
|
|
@ -10,7 +10,6 @@ import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
|||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { DVM } from '@/pipeline/DVM.ts';
|
||||
import { RelayError } from '@/RelayError.ts';
|
||||
import { updateStats } from '@/stats.ts';
|
||||
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { eventAge, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts';
|
||||
|
@ -21,6 +20,7 @@ import { verifyEventWorker } from '@/workers/verify.ts';
|
|||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||
import { lnurlCache } from '@/utils/lnurl.ts';
|
||||
import { nip05Cache } from '@/utils/nip05.ts';
|
||||
import { updateStats } from '@/utils/stats.ts';
|
||||
import { getTagSet } from '@/utils/tags.ts';
|
||||
|
||||
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
|
||||
|
@ -121,8 +121,9 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
|
|||
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<void> {
|
||||
if (NKinds.ephemeral(event.kind)) return;
|
||||
const store = await Storages.db();
|
||||
const kysely = await DittoDB.getInstance();
|
||||
|
||||
await updateStats(event).catch(debug);
|
||||
await updateStats({ event, store, kysely }).catch(debug);
|
||||
await store.event(event, { signal });
|
||||
}
|
||||
|
||||
|
|
273
src/stats.ts
273
src/stats.ts
|
@ -1,273 +0,0 @@
|
|||
import { Semaphore } from '@lambdalisue/async';
|
||||
import { NKinds, NostrEvent, NStore } from '@nostrify/nostrify';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import { InsertQueryBuilder, Kysely } from 'kysely';
|
||||
import { LRUCache } from 'lru-cache';
|
||||
import { SetRequired } from 'type-fest';
|
||||
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { findReplyTag, getTagSet } from '@/utils/tags.ts';
|
||||
|
||||
type AuthorStat = keyof Omit<DittoTables['author_stats'], 'pubkey'>;
|
||||
type EventStat = keyof Omit<DittoTables['event_stats'], 'event_id'>;
|
||||
|
||||
type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: number];
|
||||
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
|
||||
type StatDiff = AuthorStatDiff | EventStatDiff;
|
||||
|
||||
const debug = Debug('ditto:stats');
|
||||
|
||||
/** Store stats for the event. */
|
||||
async function updateStats(event: NostrEvent) {
|
||||
let prev: NostrEvent | undefined;
|
||||
const queries: InsertQueryBuilder<DittoTables, any, unknown>[] = [];
|
||||
|
||||
// Kind 3 is a special case - replace the count with the new list.
|
||||
if (event.kind === 3) {
|
||||
prev = await getPrevEvent(event);
|
||||
if (!prev || event.created_at >= prev.created_at) {
|
||||
queries.push(await updateFollowingCountQuery(event));
|
||||
}
|
||||
}
|
||||
|
||||
const statDiffs = await getStatsDiff(event, prev);
|
||||
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
|
||||
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
|
||||
|
||||
if (statDiffs.length) {
|
||||
debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs }));
|
||||
}
|
||||
|
||||
pubkeyDiffs.forEach(([_, pubkey]) => refreshAuthorStatsDebounced(pubkey));
|
||||
|
||||
const kysely = await DittoDB.getInstance();
|
||||
|
||||
if (pubkeyDiffs.length) queries.push(authorStatsQuery(kysely, pubkeyDiffs));
|
||||
if (eventDiffs.length) queries.push(eventStatsQuery(kysely, eventDiffs));
|
||||
|
||||
if (queries.length) {
|
||||
await Promise.all(queries.map((query) => query.execute()));
|
||||
}
|
||||
}
|
||||
|
||||
/** Calculate stats changes ahead of time so we can build an efficient query. */
|
||||
async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise<StatDiff[]> {
|
||||
const store = await Storages.db();
|
||||
const statDiffs: StatDiff[] = [];
|
||||
|
||||
const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1];
|
||||
const inReplyToId = findReplyTag(event.tags)?.[1];
|
||||
|
||||
switch (event.kind) {
|
||||
case 1:
|
||||
statDiffs.push(['author_stats', event.pubkey, 'notes_count', 1]);
|
||||
if (inReplyToId) {
|
||||
statDiffs.push(['event_stats', inReplyToId, 'replies_count', 1]);
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
statDiffs.push(...getFollowDiff(event, prev));
|
||||
break;
|
||||
case 5: {
|
||||
if (!firstTaggedId) break;
|
||||
|
||||
const [repostedEvent] = await store.query(
|
||||
[{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }],
|
||||
{ limit: 1 },
|
||||
);
|
||||
// Check if the event being deleted is of kind 6,
|
||||
// if it is then proceed, else just break
|
||||
if (!repostedEvent) break;
|
||||
|
||||
const eventBeingRepostedId = repostedEvent.tags.find(([name]) => name === 'e')?.[1];
|
||||
const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1];
|
||||
if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break;
|
||||
|
||||
const [eventBeingReposted] = await store.query(
|
||||
[{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }],
|
||||
{ limit: 1 },
|
||||
);
|
||||
if (!eventBeingReposted) break;
|
||||
|
||||
statDiffs.push(['event_stats', eventBeingRepostedId, 'reposts_count', -1]);
|
||||
break;
|
||||
}
|
||||
case 6:
|
||||
if (firstTaggedId) {
|
||||
statDiffs.push(['event_stats', firstTaggedId, 'reposts_count', 1]);
|
||||
}
|
||||
break;
|
||||
case 7:
|
||||
if (firstTaggedId) {
|
||||
statDiffs.push(['event_stats', firstTaggedId, 'reactions_count', 1]);
|
||||
}
|
||||
}
|
||||
|
||||
return statDiffs;
|
||||
}
|
||||
|
||||
/** Create an author stats query from the list of diffs. */
|
||||
function authorStatsQuery(kysely: Kysely<DittoTables>, diffs: AuthorStatDiff[]) {
|
||||
const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => {
|
||||
const row: DittoTables['author_stats'] = {
|
||||
pubkey,
|
||||
followers_count: 0,
|
||||
following_count: 0,
|
||||
notes_count: 0,
|
||||
};
|
||||
row[stat] = diff;
|
||||
return row;
|
||||
});
|
||||
|
||||
return kysely.insertInto('author_stats')
|
||||
.values(values)
|
||||
.onConflict((oc) =>
|
||||
oc
|
||||
.column('pubkey')
|
||||
.doUpdateSet((eb) => ({
|
||||
followers_count: eb('author_stats.followers_count', '+', eb.ref('excluded.followers_count')),
|
||||
following_count: eb('author_stats.following_count', '+', eb.ref('excluded.following_count')),
|
||||
notes_count: eb('author_stats.notes_count', '+', eb.ref('excluded.notes_count')),
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
/** Create an event stats query from the list of diffs. */
|
||||
function eventStatsQuery(kysely: Kysely<DittoTables>, diffs: EventStatDiff[]) {
|
||||
const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => {
|
||||
const row: DittoTables['event_stats'] = {
|
||||
event_id,
|
||||
replies_count: 0,
|
||||
reposts_count: 0,
|
||||
reactions_count: 0,
|
||||
};
|
||||
row[stat] = diff;
|
||||
return row;
|
||||
});
|
||||
|
||||
return kysely.insertInto('event_stats')
|
||||
.values(values)
|
||||
.onConflict((oc) =>
|
||||
oc
|
||||
.column('event_id')
|
||||
.doUpdateSet((eb) => ({
|
||||
replies_count: eb('event_stats.replies_count', '+', eb.ref('excluded.replies_count')),
|
||||
reposts_count: eb('event_stats.reposts_count', '+', eb.ref('excluded.reposts_count')),
|
||||
reactions_count: eb('event_stats.reactions_count', '+', eb.ref('excluded.reactions_count')),
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
/** Get the last version of the event, if any. */
|
||||
async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined> {
|
||||
if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) {
|
||||
const store = await Storages.db();
|
||||
|
||||
const [prev] = await store.query([
|
||||
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 },
|
||||
]);
|
||||
|
||||
return prev;
|
||||
}
|
||||
}
|
||||
|
||||
/** Set the following count to the total number of unique "p" tags in the follow list. */
|
||||
async function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) {
|
||||
const following_count = new Set(
|
||||
tags
|
||||
.filter(([name]) => name === 'p')
|
||||
.map(([_, value]) => value),
|
||||
).size;
|
||||
|
||||
const kysely = await DittoDB.getInstance();
|
||||
return kysely.insertInto('author_stats')
|
||||
.values({
|
||||
pubkey,
|
||||
following_count,
|
||||
followers_count: 0,
|
||||
notes_count: 0,
|
||||
})
|
||||
.onConflict((oc) =>
|
||||
oc
|
||||
.column('pubkey')
|
||||
.doUpdateSet({ following_count })
|
||||
);
|
||||
}
|
||||
|
||||
/** Compare the old and new follow events (if any), and return a diff array. */
|
||||
function getFollowDiff(event: NostrEvent, prev?: NostrEvent): AuthorStatDiff[] {
|
||||
const prevTags = prev?.tags ?? [];
|
||||
|
||||
const prevPubkeys = new Set(
|
||||
prevTags
|
||||
.filter(([name]) => name === 'p')
|
||||
.map(([_, value]) => value),
|
||||
);
|
||||
|
||||
const pubkeys = new Set(
|
||||
event.tags
|
||||
.filter(([name]) => name === 'p')
|
||||
.map(([_, value]) => value),
|
||||
);
|
||||
|
||||
const added = [...pubkeys].filter((pubkey) => !prevPubkeys.has(pubkey));
|
||||
const removed = [...prevPubkeys].filter((pubkey) => !pubkeys.has(pubkey));
|
||||
|
||||
return [
|
||||
...added.map((pubkey): AuthorStatDiff => ['author_stats', pubkey, 'followers_count', 1]),
|
||||
...removed.map((pubkey): AuthorStatDiff => ['author_stats', pubkey, 'followers_count', -1]),
|
||||
];
|
||||
}
|
||||
|
||||
/** Refresh the author's stats in the database. */
|
||||
async function refreshAuthorStats(pubkey: string): Promise<DittoTables['author_stats']> {
|
||||
const store = await Storages.db();
|
||||
const stats = await countAuthorStats(store, pubkey);
|
||||
|
||||
const kysely = await DittoDB.getInstance();
|
||||
await kysely.insertInto('author_stats')
|
||||
.values(stats)
|
||||
.onConflict((oc) => oc.column('pubkey').doUpdateSet(stats))
|
||||
.execute();
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/** Calculate author stats from the database. */
|
||||
async function countAuthorStats(
|
||||
store: SetRequired<NStore, 'count'>,
|
||||
pubkey: string,
|
||||
): Promise<DittoTables['author_stats']> {
|
||||
const [{ count: followers_count }, { count: notes_count }, [followList]] = await Promise.all([
|
||||
store.count([{ kinds: [3], '#p': [pubkey] }]),
|
||||
store.count([{ kinds: [1], authors: [pubkey] }]),
|
||||
store.query([{ kinds: [3], authors: [pubkey], limit: 1 }]),
|
||||
]);
|
||||
|
||||
return {
|
||||
pubkey,
|
||||
followers_count,
|
||||
following_count: getTagSet(followList?.tags ?? [], 'p').size,
|
||||
notes_count,
|
||||
};
|
||||
}
|
||||
|
||||
const authorStatsSemaphore = new Semaphore(10);
|
||||
const refreshedAuthors = new LRUCache<string, true>({ max: 1000 });
|
||||
|
||||
/** Calls `refreshAuthorStats` only once per author. */
|
||||
function refreshAuthorStatsDebounced(pubkey: string): void {
|
||||
if (refreshedAuthors.get(pubkey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
refreshedAuthors.set(pubkey, true);
|
||||
debug('refreshing author stats:', pubkey);
|
||||
|
||||
authorStatsSemaphore
|
||||
.lock(() => refreshAuthorStats(pubkey).catch(() => {}));
|
||||
}
|
||||
|
||||
export { refreshAuthorStats, refreshAuthorStatsDebounced, updateStats };
|
|
@ -2,10 +2,11 @@ import { NostrEvent, NStore } from '@nostrify/nostrify';
|
|||
import { matchFilter } from 'nostr-tools';
|
||||
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { refreshAuthorStatsDebounced } from '@/stats.ts';
|
||||
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { refreshAuthorStatsDebounced } from '@/utils/stats.ts';
|
||||
import { findQuoteTag } from '@/utils/tags.ts';
|
||||
|
||||
interface HydrateOpts {
|
||||
|
@ -77,6 +78,11 @@ function assembleEvents(
|
|||
): DittoEvent[] {
|
||||
const admin = Conf.pubkey;
|
||||
|
||||
const eventStats = stats.events.map((stat) => ({
|
||||
...stat,
|
||||
reactions: JSON.parse(stat.reactions),
|
||||
}));
|
||||
|
||||
for (const event of a) {
|
||||
event.author = b.find((e) => matchFilter({ kinds: [0], authors: [event.pubkey] }, e));
|
||||
event.user = b.find((e) => matchFilter({ kinds: [30361], authors: [admin], '#d': [event.pubkey] }, e));
|
||||
|
@ -120,7 +126,7 @@ function assembleEvents(
|
|||
}
|
||||
|
||||
event.author_stats = stats.authors.find((stats) => stats.pubkey === event.pubkey);
|
||||
event.event_stats = stats.events.find((stats) => stats.event_id === event.id);
|
||||
event.event_stats = eventStats.find((stats) => stats.event_id === event.id);
|
||||
}
|
||||
|
||||
return a;
|
||||
|
@ -270,7 +276,10 @@ async function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['aut
|
|||
}));
|
||||
}
|
||||
|
||||
function refreshMissingAuthorStats(events: NostrEvent[], stats: DittoTables['author_stats'][]) {
|
||||
async function refreshMissingAuthorStats(events: NostrEvent[], stats: DittoTables['author_stats'][]) {
|
||||
const store = await Storages.db();
|
||||
const kysely = await DittoDB.getInstance();
|
||||
|
||||
const pubkeys = new Set<string>(
|
||||
events
|
||||
.filter((event) => event.kind === 0)
|
||||
|
@ -282,7 +291,7 @@ function refreshMissingAuthorStats(events: NostrEvent[], stats: DittoTables['aut
|
|||
);
|
||||
|
||||
for (const pubkey of missing) {
|
||||
refreshAuthorStatsDebounced(pubkey);
|
||||
refreshAuthorStatsDebounced({ pubkey, store, kysely });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,8 +318,9 @@ async function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['even
|
|||
return rows.map((row) => ({
|
||||
event_id: row.event_id,
|
||||
reposts_count: Math.max(0, row.reposts_count),
|
||||
reactions_count: Math.max(0, row.reactions_count),
|
||||
replies_count: Math.max(0, row.replies_count),
|
||||
reactions_count: Math.max(0, row.reactions_count),
|
||||
reactions: row.reactions,
|
||||
}));
|
||||
}
|
||||
|
||||
|
|
37
src/test.ts
37
src/test.ts
|
@ -1,6 +1,13 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import { Database as Sqlite } from '@db/sqlite';
|
||||
import { NDatabase, NostrEvent } from '@nostrify/nostrify';
|
||||
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
|
||||
import { finalizeEvent, generateSecretKey } from 'nostr-tools';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
|
||||
/** Import an event fixture by name in tests. */
|
||||
|
@ -21,3 +28,31 @@ export function genEvent(t: Partial<NostrEvent> = {}, sk: Uint8Array = generateS
|
|||
|
||||
return purifyEvent(event);
|
||||
}
|
||||
|
||||
/** Get an in-memory SQLite database to use for testing. It's automatically destroyed when it goes out of scope. */
|
||||
export async function getTestDB() {
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
dialect: new DenoSqlite3Dialect({
|
||||
database: new Sqlite(':memory:'),
|
||||
}),
|
||||
});
|
||||
|
||||
const migrator = new Migrator({
|
||||
db: kysely,
|
||||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname,
|
||||
}),
|
||||
});
|
||||
|
||||
await migrator.migrateToLatest();
|
||||
|
||||
const store = new NDatabase(kysely);
|
||||
|
||||
return {
|
||||
store,
|
||||
kysely,
|
||||
[Symbol.asyncDispose]: () => kysely.destroy(),
|
||||
};
|
||||
}
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
import { assertEquals } from '@std/assert';
|
||||
import { generateSecretKey, getPublicKey } from 'nostr-tools';
|
||||
|
||||
import { genEvent, getTestDB } from '@/test.ts';
|
||||
import { countAuthorStats, getAuthorStats, getEventStats, getFollowDiff, updateStats } from '@/utils/stats.ts';
|
||||
|
||||
Deno.test('updateStats with kind 1 increments notes count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const pubkey = getPublicKey(sk);
|
||||
|
||||
await updateStats({ ...db, event: genEvent({ kind: 1 }, sk) });
|
||||
|
||||
const stats = await getAuthorStats(db.kysely, pubkey);
|
||||
|
||||
assertEquals(stats!.notes_count, 1);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 5 decrements notes count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const pubkey = getPublicKey(sk);
|
||||
|
||||
const create = genEvent({ kind: 1 }, sk);
|
||||
const remove = genEvent({ kind: 5, tags: [['e', create.id]] }, sk);
|
||||
|
||||
await updateStats({ ...db, event: create });
|
||||
assertEquals((await getAuthorStats(db.kysely, pubkey))!.notes_count, 1);
|
||||
await db.store.event(create);
|
||||
|
||||
await updateStats({ ...db, event: remove });
|
||||
assertEquals((await getAuthorStats(db.kysely, pubkey))!.notes_count, 0);
|
||||
await db.store.event(remove);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 3 increments followers count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
await updateStats({ ...db, event: genEvent({ kind: 3, tags: [['p', 'alex']] }) });
|
||||
await updateStats({ ...db, event: genEvent({ kind: 3, tags: [['p', 'alex']] }) });
|
||||
await updateStats({ ...db, event: genEvent({ kind: 3, tags: [['p', 'alex']] }) });
|
||||
|
||||
const stats = await getAuthorStats(db.kysely, 'alex');
|
||||
|
||||
assertEquals(stats!.followers_count, 3);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 3 decrements followers count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const follow = genEvent({ kind: 3, tags: [['p', 'alex']], created_at: 0 }, sk);
|
||||
const remove = genEvent({ kind: 3, tags: [], created_at: 1 }, sk);
|
||||
|
||||
await updateStats({ ...db, event: follow });
|
||||
assertEquals((await getAuthorStats(db.kysely, 'alex'))!.followers_count, 1);
|
||||
await db.store.event(follow);
|
||||
|
||||
await updateStats({ ...db, event: remove });
|
||||
assertEquals((await getAuthorStats(db.kysely, 'alex'))!.followers_count, 0);
|
||||
await db.store.event(remove);
|
||||
});
|
||||
|
||||
Deno.test('getFollowDiff returns added and removed followers', () => {
|
||||
const prev = genEvent({ tags: [['p', 'alex'], ['p', 'bob']] });
|
||||
const next = genEvent({ tags: [['p', 'alex'], ['p', 'carol']] });
|
||||
|
||||
const { added, removed } = getFollowDiff(next.tags, prev.tags);
|
||||
|
||||
assertEquals(added, new Set(['carol']));
|
||||
assertEquals(removed, new Set(['bob']));
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 6 increments reposts count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const note = genEvent({ kind: 1 });
|
||||
await updateStats({ ...db, event: note });
|
||||
await db.store.event(note);
|
||||
|
||||
const repost = genEvent({ kind: 6, tags: [['e', note.id]] });
|
||||
await updateStats({ ...db, event: repost });
|
||||
await db.store.event(repost);
|
||||
|
||||
const stats = await getEventStats(db.kysely, note.id);
|
||||
|
||||
assertEquals(stats!.reposts_count, 1);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 5 decrements reposts count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const note = genEvent({ kind: 1 });
|
||||
await updateStats({ ...db, event: note });
|
||||
await db.store.event(note);
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const repost = genEvent({ kind: 6, tags: [['e', note.id]] }, sk);
|
||||
await updateStats({ ...db, event: repost });
|
||||
await db.store.event(repost);
|
||||
|
||||
await updateStats({ ...db, event: genEvent({ kind: 5, tags: [['e', repost.id]] }, sk) });
|
||||
|
||||
const stats = await getEventStats(db.kysely, note.id);
|
||||
|
||||
assertEquals(stats!.reposts_count, 0);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 7 increments reactions count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const note = genEvent({ kind: 1 });
|
||||
await updateStats({ ...db, event: note });
|
||||
await db.store.event(note);
|
||||
|
||||
await updateStats({ ...db, event: genEvent({ kind: 7, content: '+', tags: [['e', note.id]] }) });
|
||||
await updateStats({ ...db, event: genEvent({ kind: 7, content: '😂', tags: [['e', note.id]] }) });
|
||||
|
||||
const stats = await getEventStats(db.kysely, note.id);
|
||||
|
||||
assertEquals(stats!.reactions, JSON.stringify({ '+': 1, '😂': 1 }));
|
||||
assertEquals(stats!.reactions_count, 2);
|
||||
});
|
||||
|
||||
Deno.test('updateStats with kind 5 decrements reactions count', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const note = genEvent({ kind: 1 });
|
||||
await updateStats({ ...db, event: note });
|
||||
await db.store.event(note);
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const reaction = genEvent({ kind: 7, content: '+', tags: [['e', note.id]] }, sk);
|
||||
await updateStats({ ...db, event: reaction });
|
||||
await db.store.event(reaction);
|
||||
|
||||
await updateStats({ ...db, event: genEvent({ kind: 5, tags: [['e', reaction.id]] }, sk) });
|
||||
|
||||
const stats = await getEventStats(db.kysely, note.id);
|
||||
|
||||
assertEquals(stats!.reactions, JSON.stringify({}));
|
||||
});
|
||||
|
||||
Deno.test('countAuthorStats counts author stats from the database', async () => {
|
||||
await using db = await getTestDB();
|
||||
|
||||
const sk = generateSecretKey();
|
||||
const pubkey = getPublicKey(sk);
|
||||
|
||||
await db.store.event(genEvent({ kind: 1, content: 'hello' }, sk));
|
||||
await db.store.event(genEvent({ kind: 1, content: 'yolo' }, sk));
|
||||
await db.store.event(genEvent({ kind: 3, tags: [['p', pubkey]] }));
|
||||
|
||||
const stats = await countAuthorStats(db.store, pubkey);
|
||||
|
||||
assertEquals(stats!.notes_count, 2);
|
||||
assertEquals(stats!.followers_count, 1);
|
||||
});
|
|
@ -0,0 +1,266 @@
|
|||
import { Semaphore } from '@lambdalisue/async';
|
||||
import { NostrEvent, NStore } from '@nostrify/nostrify';
|
||||
import { Kysely, UpdateObject } from 'kysely';
|
||||
import { LRUCache } from 'lru-cache';
|
||||
import { SetRequired } from 'type-fest';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { getTagSet } from '@/utils/tags.ts';
|
||||
|
||||
interface UpdateStatsOpts {
|
||||
kysely: Kysely<DittoTables>;
|
||||
store: NStore;
|
||||
event: NostrEvent;
|
||||
x?: 1 | -1;
|
||||
}
|
||||
|
||||
/** Handle one event at a time and update relevant stats for it. */
|
||||
// deno-lint-ignore require-await
|
||||
export async function updateStats({ event, kysely, store, x = 1 }: UpdateStatsOpts): Promise<void> {
|
||||
switch (event.kind) {
|
||||
case 1:
|
||||
return handleEvent1(kysely, event, x);
|
||||
case 3:
|
||||
return handleEvent3(kysely, event, x, store);
|
||||
case 5:
|
||||
return handleEvent5(kysely, event, -1, store);
|
||||
case 6:
|
||||
return handleEvent6(kysely, event, x);
|
||||
case 7:
|
||||
return handleEvent7(kysely, event, x);
|
||||
}
|
||||
}
|
||||
|
||||
/** Update stats for kind 1 event. */
|
||||
async function handleEvent1(kysely: Kysely<DittoTables>, event: NostrEvent, x: number): Promise<void> {
|
||||
await updateAuthorStats(kysely, event.pubkey, ({ notes_count }) => ({ notes_count: Math.max(0, notes_count + x) }));
|
||||
}
|
||||
|
||||
/** Update stats for kind 3 event. */
|
||||
async function handleEvent3(kysely: Kysely<DittoTables>, event: NostrEvent, x: number, store: NStore): Promise<void> {
|
||||
const following = getTagSet(event.tags, 'p');
|
||||
|
||||
await updateAuthorStats(kysely, event.pubkey, () => ({ following_count: following.size }));
|
||||
|
||||
const [prev] = await store.query([
|
||||
{ kinds: [3], authors: [event.pubkey], limit: 1 },
|
||||
]);
|
||||
|
||||
const { added, removed } = getFollowDiff(event.tags, prev?.tags);
|
||||
|
||||
for (const pubkey of added) {
|
||||
await updateAuthorStats(
|
||||
kysely,
|
||||
pubkey,
|
||||
({ followers_count }) => ({ followers_count: Math.max(0, followers_count + x) }),
|
||||
);
|
||||
}
|
||||
|
||||
for (const pubkey of removed) {
|
||||
await updateAuthorStats(
|
||||
kysely,
|
||||
pubkey,
|
||||
({ followers_count }) => ({ followers_count: Math.max(0, followers_count - x) }),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/** Update stats for kind 5 event. */
|
||||
async function handleEvent5(kysely: Kysely<DittoTables>, event: NostrEvent, x: -1, store: NStore): Promise<void> {
|
||||
const id = event.tags.find(([name]) => name === 'e')?.[1];
|
||||
if (id) {
|
||||
const [target] = await store.query([{ ids: [id], authors: [event.pubkey], limit: 1 }]);
|
||||
if (target) {
|
||||
await updateStats({ event: target, kysely, store, x });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Update stats for kind 6 event. */
|
||||
async function handleEvent6(kysely: Kysely<DittoTables>, event: NostrEvent, x: number): Promise<void> {
|
||||
const id = event.tags.find(([name]) => name === 'e')?.[1];
|
||||
if (id) {
|
||||
await updateEventStats(kysely, id, ({ reposts_count }) => ({ reposts_count: Math.max(0, reposts_count + x) }));
|
||||
}
|
||||
}
|
||||
|
||||
/** Update stats for kind 7 event. */
|
||||
async function handleEvent7(kysely: Kysely<DittoTables>, event: NostrEvent, x: number): Promise<void> {
|
||||
const id = event.tags.find(([name]) => name === 'e')?.[1];
|
||||
const emoji = event.content;
|
||||
|
||||
if (id && emoji && (['+', '-'].includes(emoji) || /^\p{RGI_Emoji}$/v.test(emoji))) {
|
||||
await updateEventStats(kysely, id, ({ reactions }) => {
|
||||
const data: Record<string, number> = JSON.parse(reactions);
|
||||
|
||||
// Increment or decrement the emoji count.
|
||||
data[emoji] = (data[emoji] ?? 0) + x;
|
||||
|
||||
// Remove reactions with a count of 0 or less.
|
||||
for (const key of Object.keys(data)) {
|
||||
if (data[key] < 1) {
|
||||
delete data[key];
|
||||
}
|
||||
}
|
||||
|
||||
// Total reactions count.
|
||||
const count = Object.values(data).reduce((result, value) => result + value, 0);
|
||||
|
||||
return {
|
||||
reactions: JSON.stringify(data),
|
||||
reactions_count: count,
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the pubkeys that were added and removed from a follow event. */
|
||||
export function getFollowDiff(
|
||||
tags: string[][],
|
||||
prevTags: string[][] = [],
|
||||
): { added: Set<string>; removed: Set<string> } {
|
||||
const pubkeys = getTagSet(tags, 'p');
|
||||
const prevPubkeys = getTagSet(prevTags, 'p');
|
||||
|
||||
return {
|
||||
added: pubkeys.difference(prevPubkeys),
|
||||
removed: prevPubkeys.difference(pubkeys),
|
||||
};
|
||||
}
|
||||
|
||||
/** Retrieve the author stats by the pubkey. */
|
||||
export function getAuthorStats(
|
||||
kysely: Kysely<DittoTables>,
|
||||
pubkey: string,
|
||||
): Promise<DittoTables['author_stats'] | undefined> {
|
||||
return kysely
|
||||
.selectFrom('author_stats')
|
||||
.selectAll()
|
||||
.where('pubkey', '=', pubkey)
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
/** Retrieve the author stats by the pubkey, then call the callback to update it. */
|
||||
export async function updateAuthorStats(
|
||||
kysely: Kysely<DittoTables>,
|
||||
pubkey: string,
|
||||
fn: (prev: DittoTables['author_stats']) => UpdateObject<DittoTables, 'author_stats'>,
|
||||
): Promise<void> {
|
||||
const empty: DittoTables['author_stats'] = {
|
||||
pubkey,
|
||||
followers_count: 0,
|
||||
following_count: 0,
|
||||
notes_count: 0,
|
||||
};
|
||||
|
||||
const prev = await getAuthorStats(kysely, pubkey);
|
||||
|
||||
const stats = fn(prev ?? empty);
|
||||
|
||||
if (prev) {
|
||||
await kysely.updateTable('author_stats')
|
||||
.set(stats)
|
||||
.where('pubkey', '=', pubkey)
|
||||
.execute();
|
||||
} else {
|
||||
await kysely.insertInto('author_stats')
|
||||
.values({ ...empty, ...stats })
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
|
||||
/** Retrieve the event stats by the event ID. */
|
||||
export function getEventStats(
|
||||
kysely: Kysely<DittoTables>,
|
||||
eventId: string,
|
||||
): Promise<DittoTables['event_stats'] | undefined> {
|
||||
return kysely
|
||||
.selectFrom('event_stats')
|
||||
.selectAll()
|
||||
.where('event_id', '=', eventId)
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
/** Retrieve the event stats by the event ID, then call the callback to update it. */
|
||||
export async function updateEventStats(
|
||||
kysely: Kysely<DittoTables>,
|
||||
eventId: string,
|
||||
fn: (prev: DittoTables['event_stats']) => UpdateObject<DittoTables, 'event_stats'>,
|
||||
): Promise<void> {
|
||||
const empty: DittoTables['event_stats'] = {
|
||||
event_id: eventId,
|
||||
replies_count: 0,
|
||||
reposts_count: 0,
|
||||
reactions_count: 0,
|
||||
reactions: '{}',
|
||||
};
|
||||
|
||||
const prev = await getEventStats(kysely, eventId);
|
||||
|
||||
const stats = fn(prev ?? empty);
|
||||
|
||||
if (prev) {
|
||||
await kysely.updateTable('event_stats')
|
||||
.set(stats)
|
||||
.where('event_id', '=', eventId)
|
||||
.execute();
|
||||
} else {
|
||||
await kysely.insertInto('event_stats')
|
||||
.values({ ...empty, ...stats })
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
|
||||
/** Calculate author stats from the database. */
|
||||
export async function countAuthorStats(
|
||||
store: SetRequired<NStore, 'count'>,
|
||||
pubkey: string,
|
||||
): Promise<DittoTables['author_stats']> {
|
||||
const [{ count: followers_count }, { count: notes_count }, [followList]] = await Promise.all([
|
||||
store.count([{ kinds: [3], '#p': [pubkey] }]),
|
||||
store.count([{ kinds: [1], authors: [pubkey] }]),
|
||||
store.query([{ kinds: [3], authors: [pubkey], limit: 1 }]),
|
||||
]);
|
||||
|
||||
return {
|
||||
pubkey,
|
||||
followers_count,
|
||||
following_count: getTagSet(followList?.tags ?? [], 'p').size,
|
||||
notes_count,
|
||||
};
|
||||
}
|
||||
|
||||
export interface RefreshAuthorStatsOpts {
|
||||
pubkey: string;
|
||||
kysely: Kysely<DittoTables>;
|
||||
store: SetRequired<NStore, 'count'>;
|
||||
}
|
||||
|
||||
/** Refresh the author's stats in the database. */
|
||||
export async function refreshAuthorStats(
|
||||
{ pubkey, kysely, store }: RefreshAuthorStatsOpts,
|
||||
): Promise<DittoTables['author_stats']> {
|
||||
const stats = await countAuthorStats(store, pubkey);
|
||||
|
||||
await kysely.insertInto('author_stats')
|
||||
.values(stats)
|
||||
.onConflict((oc) => oc.column('pubkey').doUpdateSet(stats))
|
||||
.execute();
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
const authorStatsSemaphore = new Semaphore(10);
|
||||
const refreshedAuthors = new LRUCache<string, true>({ max: 1000 });
|
||||
|
||||
/** Calls `refreshAuthorStats` only once per author. */
|
||||
export function refreshAuthorStatsDebounced(opts: RefreshAuthorStatsOpts): void {
|
||||
if (refreshedAuthors.get(opts.pubkey)) {
|
||||
return;
|
||||
}
|
||||
|
||||
refreshedAuthors.set(opts.pubkey, true);
|
||||
|
||||
authorStatsSemaphore
|
||||
.lock(() => refreshAuthorStats(opts).catch(() => {}));
|
||||
}
|
|
@ -82,6 +82,15 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
|
|||
|
||||
const media = imeta.length ? imeta : getMediaLinks(links);
|
||||
|
||||
/** Pleroma emoji reactions object. */
|
||||
const reactions = Object.entries(event.event_stats?.reactions ?? {}).reduce((acc, [emoji, count]) => {
|
||||
if (['+', '-'].includes(emoji)) return acc;
|
||||
acc.push({ name: emoji, count, me: reactionEvent?.content === emoji });
|
||||
return acc;
|
||||
}, [] as { name: string; count: number; me: boolean }[]);
|
||||
|
||||
const expiresAt = new Date(Number(event.tags.find(([name]) => name === 'expiration')?.[1]) * 1000);
|
||||
|
||||
return {
|
||||
id: event.id,
|
||||
account,
|
||||
|
@ -96,7 +105,7 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
|
|||
language: event.tags.find((tag) => tag[0] === 'lang')?.[1] || null,
|
||||
replies_count: event.event_stats?.replies_count ?? 0,
|
||||
reblogs_count: event.event_stats?.reposts_count ?? 0,
|
||||
favourites_count: event.event_stats?.reactions_count ?? 0,
|
||||
favourites_count: event.event_stats?.reactions['+'] ?? 0,
|
||||
favourited: reactionEvent?.content === '+',
|
||||
reblogged: Boolean(repostEvent),
|
||||
muted: false,
|
||||
|
@ -114,6 +123,10 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
|
|||
uri: Conf.external(note),
|
||||
url: Conf.external(note),
|
||||
zapped: Boolean(zapEvent),
|
||||
pleroma: {
|
||||
emoji_reactions: reactions,
|
||||
expires_at: !isNaN(expiresAt.getTime()) ? expiresAt.toISOString() : undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue