diff --git a/.gitignore b/.gitignore index 2eea525..17f06fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.env \ No newline at end of file +.env +*.cpuprofile \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..71abdef --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "request": "launch", + "name": "Launch Program", + "type": "node", + "program": "${workspaceFolder}/src/server.ts", + "cwd": "${workspaceFolder}", + "runtimeExecutable": "deno", + "runtimeArgs": [ + "run", + "--inspect-wait", + "--allow-all", + "--unstable" + ], + "attachSimplePort": 9229 + } + ] +} \ No newline at end of file diff --git a/deno.json b/deno.json index 8d07861..3716529 100644 --- a/deno.json +++ b/deno.json @@ -3,8 +3,7 @@ "lock": false, "tasks": { "start": "deno run -A --unstable src/server.ts", - "dev": "deno run -A --unstable --watch src/server.ts", - "debug": "deno run -A --unstable --inspect src/server.ts", + "dev": "deno run -A --unstable --watch --inspect src/server.ts", "test": "DB_PATH=\":memory:\" deno test -A --unstable", "check": "deno check src/server.ts", "relays:sync": "deno run -A --unstable scripts/relays.ts sync" diff --git a/src/app.ts b/src/app.ts index 5ac2636..b8a9695 100644 --- a/src/app.ts +++ b/src/app.ts @@ -4,6 +4,7 @@ import { type User } from '@/db/users.ts'; import { type Context, cors, + Debug, type Event, type Handler, Hono, @@ -90,7 +91,14 @@ if (Conf.sentryDsn) { app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn })); } -app.use('*', logger()); +const debug = Debug('ditto:http'); + +app.use('/api', logger(debug)); +app.use('/relay', logger(debug)); +app.use('/.well-known', logger(debug)); +app.use('/users', logger(debug)); +app.use('/nodeinfo', logger(debug)); +app.use('/oauth', logger(debug)); app.get('/api/v1/streaming', streamingController); app.get('/api/v1/streaming/', streamingController); diff --git a/src/client.ts b/src/client.ts index 3cf2e8a..85e430a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,13 +1,16 @@ -import { type Event, type Filter, matchFilters } from '@/deps.ts'; +import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { activeRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; +const debug = Debug('ditto:client'); + /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); return new Promise((resolve) => { const results: Event[] = []; diff --git a/src/common.ts b/src/common.ts deleted file mode 100644 index 0424b52..0000000 --- a/src/common.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Reqmeister } from '@/reqmeister.ts'; -import { Time } from '@/utils/time.ts'; - -const reqmeister = new Reqmeister({ - delay: Time.seconds(1), - signal: AbortSignal.timeout(Time.seconds(1)), -}); - -export { reqmeister }; diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8cdd7d4..0751260 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,11 +1,13 @@ import { type AppController } from '@/app.ts'; -import { z } from '@/deps.ts'; +import { Debug, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { getAuthor, getFeedPubkeys } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; +const debug = Debug('ditto:streaming'); + /** * Streaming timelines/categories. * https://docs.joinmastodon.org/methods/streaming/#streams @@ -49,6 +51,7 @@ const streamingController: AppController = (c) => { function send(name: string, payload: object) { if (socket.readyState === WebSocket.OPEN) { + debug('send', name, JSON.stringify(payload)); socket.send(JSON.stringify({ event: name, payload: JSON.stringify(payload), diff --git a/src/cron.ts b/src/cron.ts index 6015271..bfaf773 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -6,7 +6,7 @@ import { cidFromUrl } from '@/utils/ipfs.ts'; /** Delete files that aren't attached to any events. */ async function cleanupMedia() { - console.log('Deleting orphaned media files...'); + console.info('Deleting orphaned media files...'); const until = new Date(Date.now() - Time.minutes(15)); const media = await getUnattachedMedia(until); @@ -22,7 +22,7 @@ async function cleanupMedia() { } } - console.log(`Removed ${media?.length ?? 0} orphaned media files.`); + console.info(`Removed ${media?.length ?? 0} orphaned media files.`); } await cleanupMedia(); diff --git a/src/db.ts b/src/db.ts index 4f1a8d9..25d6d78 100644 --- a/src/db.ts +++ b/src/db.ts @@ -3,7 +3,7 @@ import path from 'node:path'; import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts'; import { Conf } from '@/config.ts'; -import { getPragma, setPragma } from '@/pragma.ts'; +import { setPragma } from '@/pragma.ts'; import SqliteWorker from '@/workers/sqlite.ts'; interface DittoDB { @@ -89,12 +89,6 @@ await Promise.all([ setPragma(db, 'mmap_size', Conf.sqlite.mmapSize), ]); -// Log out PRAGMA values for debugging. -['journal_mode', 'synchronous', 'temp_store', 'mmap_size'].forEach(async (pragma) => { - const value = await getPragma(db, pragma); - console.log(`PRAGMA ${pragma} = ${value};`); -}); - const migrator = new Migrator({ db, provider: new FileMigrationProvider({ @@ -106,7 +100,7 @@ const migrator = new Migrator({ /** Migrate the database to the latest version. */ async function migrate() { - console.log('Running migrations...'); + console.info('Running migrations...'); const results = await migrator.migrateToLatest(); if (results.error) { @@ -114,11 +108,11 @@ async function migrate() { Deno.exit(1); } else { if (!results.results?.length) { - console.log('Everything up-to-date.'); + console.info('Everything up-to-date.'); } else { - console.log('Migrations finished!'); + console.info('Migrations finished!'); for (const { migrationName, status } of results.results!) { - console.log(` - ${migrationName}: ${status}`); + console.info(` - ${migrationName}: ${status}`); } } } diff --git a/src/db/events.ts b/src/db/events.ts index 9de8828..83551fb 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,5 +1,5 @@ import { db, type DittoDB } from '@/db.ts'; -import { type Event, type SelectQueryBuilder } from '@/deps.ts'; +import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { EventData } from '@/types.ts'; @@ -7,6 +7,8 @@ import { isNostrId, isURL } from '@/utils.ts'; import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; +const debug = Debug('ditto:db:events'); + /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: Event; @@ -28,6 +30,8 @@ const tagConditions: Record = { /** Insert an event (and its tags) into the database. */ function insertEvent(event: Event, data: EventData): Promise { + debug('insertEvent', JSON.stringify(event)); + return db.transaction().execute(async (trx) => { /** Insert the event into the database. */ async function addEvent() { @@ -224,6 +228,7 @@ async function getFilters( opts: GetFiltersOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); + debug('getFilters', JSON.stringify(filters)); let query = getFiltersQuery(filters); if (typeof opts.limit === 'number') { @@ -276,6 +281,7 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { if (!filters.length) return Promise.resolve([]); + debug('deleteFilters', JSON.stringify(filters)); return db.transaction().execute(async (trx) => { const query = getFiltersQuery(filters).clearSelect().select('id'); @@ -293,6 +299,7 @@ function deleteFilters(filters: DittoFilter[]) { /** Get number of events that would be returned by filters. */ async function countFilters(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); + debug('countFilters', JSON.stringify(filters)); const query = getFiltersQuery(filters); const [{ count }] = await query diff --git a/src/db/users.ts b/src/db/users.ts index 0e8d9a0..7d56e1c 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -1,7 +1,9 @@ -import { type Insertable } from '@/deps.ts'; +import { Debug, type Insertable } from '@/deps.ts'; import { db, type UserRow } from '../db.ts'; +const debug = Debug('ditto:users'); + interface User { pubkey: string; username: string; @@ -11,6 +13,7 @@ interface User { /** Adds a user to the database. */ function insertUser(user: Insertable) { + debug('insertUser', JSON.stringify(user)); return db.insertInto('users').values(user).execute(); } diff --git a/src/deps.ts b/src/deps.ts index 4a9314b..0f76433 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -83,5 +83,7 @@ export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; export * as Comlink from 'npm:comlink@^4.4.1'; export { EventEmitter } from 'npm:tseep@^1.1.3'; export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0'; +// @deno-types="npm:@types/debug@^4.1.12" +export { default as Debug } from 'npm:debug@^4.3.4'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/filter.ts b/src/filter.ts index 38fcff7..7178e2d 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -52,7 +52,7 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData /** Get deterministic ID for a microfilter. */ function getFilterId(filter: MicroFilter): string { if ('ids' in filter) { - return stringifyStable({ ids: [filter.ids] }); + return stringifyStable({ ids: [filter.ids[0]] }); } else { return stringifyStable({ kinds: [filter.kinds[0]], diff --git a/src/firehose.ts b/src/firehose.ts index 5e24de7..b95d33f 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,9 +1,11 @@ -import { type Event } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { activeRelays, pool } from '@/pool.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; +const debug = Debug('ditto:firehose'); + // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. @@ -17,6 +19,8 @@ pool.subscribe( /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { + debug(`Event<${event.kind}> ${event.id}`); + return pipeline .handleEvent(event) .catch(() => {}); diff --git a/src/middleware/cache.ts b/src/middleware/cache.ts index 932632b..87de611 100644 --- a/src/middleware/cache.ts +++ b/src/middleware/cache.ts @@ -1,6 +1,7 @@ +import { Debug, type MiddlewareHandler } from '@/deps.ts'; import ExpiringCache from '@/utils/expiring-cache.ts'; -import type { MiddlewareHandler } from '@/deps.ts'; +const debug = Debug('ditto:middleware:cache'); export const cache = (options: { cacheName: string; @@ -11,14 +12,14 @@ export const cache = (options: { const cache = new ExpiringCache(await caches.open(options.cacheName)); const response = await cache.match(key); if (!response) { - console.debug('Building cache for page', c.req.url); + debug('Building cache for page', c.req.url); await next(); const response = c.res.clone(); if (response.status < 500) { await cache.putExpiring(key, response, options.expires ?? 0); } } else { - console.debug('Serving page from cache', c.req.url); + debug('Serving page from cache', c.req.url); return response; } }; diff --git a/src/note.ts b/src/note.ts index 93689e9..a19a793 100644 --- a/src/note.ts +++ b/src/note.ts @@ -43,7 +43,7 @@ interface ParsedNoteContent { function parseNoteContent(content: string): ParsedNoteContent { // Parsing twice is ineffecient, but I don't know how to do only once. const html = linkifyStr(content, linkifyOpts); - const links = linkify.find(content).filter(isValidLink); + const links = linkify.find(content).filter(isLinkURL); const firstUrl = links.find(isNonMediaLink)?.href; return { @@ -77,15 +77,9 @@ function isNonMediaLink({ href }: Link): boolean { return /^https?:\/\//.test(href) && !getUrlMimeType(href); } -/** Ensures the URL can be parsed. Why linkifyjs doesn't already guarantee this, idk... */ -function isValidLink(link: Link): boolean { - try { - new URL(link.href); - return true; - } catch (_e) { - console.error(`Invalid link: ${link.href}`); - return false; - } +/** Ensures the Link is a URL so it can be parsed. */ +function isLinkURL(link: Link): boolean { + return link.type === 'url'; } /** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index 923bf4e..7b829cb 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,14 +1,14 @@ -import { reqmeister } from '@/common.ts'; import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; -import { type Event, LRUCache } from '@/deps.ts'; +import { Debug, type Event, LRUCache } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; @@ -18,6 +18,8 @@ import { verifySignatureWorker } from '@/workers/verify.ts'; import type { EventData } from '@/types.ts'; +const debug = Debug('ditto:pipeline'); + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. @@ -26,7 +28,7 @@ async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; const wanted = reqmeister.isWanted(event); if (encounterEvent(event)) return; - console.info(`pipeline: Event<${event.kind}> ${event.id}`); + debug(`Event<${event.kind}> ${event.id}`); const data = await getEventData(event); await Promise.all([ @@ -80,8 +82,8 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.insertEvent(event, data).catch(console.warn), - updateStats(event).catch(console.warn), + eventsDB.insertEvent(event, data).catch(debug), + updateStats(event).catch(debug), ]); } } else { @@ -115,7 +117,7 @@ async function trackHashtags(event: Event): Promise { if (!tags.length) return; try { - console.info('tracking tags:', tags); + debug('tracking tags:', tags); await TrendsWorker.addTagUsages(event.pubkey, tags, date); } catch (_e) { // do nothing diff --git a/src/pool.ts b/src/pool.ts index 07ac6b6..8cab8d1 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -1,5 +1,7 @@ import { getActiveRelays } from '@/db/relays.ts'; -import { type Event, RelayPool } from '@/deps.ts'; +import { Debug, type Event, RelayPool } from '@/deps.ts'; + +const debug = Debug('ditto:pool'); const activeRelays = await getActiveRelays(); @@ -14,6 +16,7 @@ const pool = new RelayPool(activeRelays, { /** Publish an event to the given relays, or the entire pool. */ function publish(event: Event, relays: string[] = activeRelays) { + debug('publish', event); return pool.publish(event, relays); } diff --git a/src/queries.ts b/src/queries.ts index 1ecff7b..3af2253 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -2,7 +2,7 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; import { type DittoFilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; -import { reqmeister } from '@/common.ts'; +import { reqmeister } from '@/reqmeister.ts'; interface GetEventOpts { /** Signal to abort the request. */ diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 7830b84..d8fef75 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,11 +1,13 @@ import * as client from '@/client.ts'; -import { type Event, EventEmitter, type Filter } from '@/deps.ts'; - +import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; +import { Time } from '@/utils/time.ts'; + +const debug = Debug('ditto:reqmeister'); interface ReqmeisterOpts { delay?: number; - signal?: AbortSignal; + timeout?: number; } type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; @@ -20,11 +22,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an constructor(opts: ReqmeisterOpts = {}) { super(); this.#opts = opts; - this.#cycle(); + this.#tick(); this.#perform(); } - #cycle() { + #tick() { this.#resolve?.(); this.#promise = new Promise((resolve) => { this.#resolve = resolve; @@ -32,7 +34,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an } async #perform() { - const { delay } = this.#opts; + const { delay, timeout = Time.seconds(1) } = this.#opts; await new Promise((resolve) => setTimeout(resolve, delay)); const queue = this.#queue; @@ -55,13 +57,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); - const events = await client.getFilters(filters, { signal: this.#opts.signal }); + if (filters.length) { + debug('REQ', JSON.stringify(filters)); + const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) }); - for (const event of events) { - this.encounter(event); + for (const event of events) { + this.encounter(event); + } } - this.#cycle(); + this.#tick(); this.#perform(); } @@ -70,7 +75,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an this.#queue.push([filterId, filter, relays]); return new Promise((resolve, reject) => { this.once(filterId, resolve); - this.#promise.finally(reject); + this.#promise.finally(() => setTimeout(reject, 0)); }); } @@ -86,4 +91,9 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an } } -export { Reqmeister }; +const reqmeister = new Reqmeister({ + delay: Time.seconds(1), + timeout: Time.seconds(1), +}); + +export { reqmeister }; diff --git a/src/sign.ts b/src/sign.ts index 0662668..de7149f 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -1,13 +1,15 @@ import { type AppContext } from '@/app.ts'; import { Conf } from '@/config.ts'; import { decryptAdmin, encryptAdmin } from '@/crypto.ts'; -import { type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts'; +import { Debug, type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts'; import { jsonSchema } from '@/schema.ts'; import { Sub } from '@/subs.ts'; import { eventMatchesTemplate, Time } from '@/utils.ts'; import { createAdminEvent } from '@/utils/web.ts'; +const debug = Debug('ditto:sign'); + interface SignEventOpts { /** Target proof-of-work difficulty for the signed event. */ pow?: number; @@ -28,10 +30,12 @@ async function signEvent( const header = c.req.header('x-nostr-sign'); if (seckey) { + debug(`Signing Event<${event.kind}> with secret key`); return finishEvent(event, seckey); } if (header) { + debug(`Signing Event<${event.kind}> with NIP-46`); return await signNostrConnect(event, c, opts); } diff --git a/src/stats.ts b/src/stats.ts index 32208df..b462021 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,6 +1,6 @@ import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; import * as eventsDB from '@/db/events.ts'; -import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; +import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; type AuthorStat = keyof Omit; type EventStat = keyof Omit; @@ -9,6 +9,8 @@ type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: n type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number]; type StatDiff = AuthorStatDiff | EventStatDiff; +const debug = Debug('ditto:stats'); + /** Store stats for the event in LMDB. */ async function updateStats(event: Event) { let prev: Event | undefined; @@ -26,6 +28,10 @@ async function updateStats(event: Event) { const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[]; const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[]; + if (statDiffs.length) { + debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs })); + } + if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); diff --git a/src/subs.ts b/src/subs.ts index f9d6606..c716e01 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,9 +1,11 @@ -import { type Event } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { Subscription } from '@/subscription.ts'; import type { DittoFilter } from '@/filter.ts'; import type { EventData } from '@/types.ts'; +const debug = Debug('ditto:subs'); + /** * Manages Ditto event subscriptions. * Subscriptions can be added, removed, and matched against events. @@ -21,6 +23,7 @@ class SubscriptionStore { * ``` */ sub(socket: unknown, id: string, filters: DittoFilter[]): Subscription { + debug('sub', id, JSON.stringify(filters)); let subs = this.#store.get(socket); if (!subs) { @@ -38,12 +41,14 @@ class SubscriptionStore { /** Remove a subscription from the store. */ unsub(socket: unknown, id: string): void { + debug('unsub', id); this.#store.get(socket)?.get(id)?.close(); this.#store.get(socket)?.delete(id); } /** Remove an entire socket. */ close(socket: unknown): void { + debug('close', socket); const subs = this.#store.get(socket); if (subs) { diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index df08150..4fa3808 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -1,7 +1,9 @@ -import { TTLCache, z } from '@/deps.ts'; +import { Debug, TTLCache, z } from '@/deps.ts'; import { Time } from '@/utils/time.ts'; import { fetchWorker } from '@/workers/fetch.ts'; +const debug = Debug('ditto:nip05'); + const nip05Cache = new TTLCache>({ ttl: Time.hours(1), max: 5000 }); const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; @@ -46,7 +48,7 @@ function lookupNip05Cached(value: string): Promise { const cached = nip05Cache.get(value); if (cached !== undefined) return cached; - console.log(`Looking up NIP-05 for ${value}`); + debug(`Lookup ${value}`); const result = lookup(value); nip05Cache.set(value, result); diff --git a/src/utils/unfurl.ts b/src/utils/unfurl.ts index 3088750..9f03c0f 100644 --- a/src/utils/unfurl.ts +++ b/src/utils/unfurl.ts @@ -1,7 +1,9 @@ -import { TTLCache, unfurl } from '@/deps.ts'; +import { Debug, TTLCache, unfurl } from '@/deps.ts'; import { Time } from '@/utils/time.ts'; import { fetchWorker } from '@/workers/fetch.ts'; +const debug = Debug('ditto:unfurl'); + interface PreviewCard { url: string; title: string; @@ -20,7 +22,7 @@ interface PreviewCard { } async function unfurlCard(url: string, signal: AbortSignal): Promise { - console.log(`Unfurling ${url}...`); + debug(`Unfurling ${url}...`); try { const result = await unfurl(url, { fetch: (url) => fetchWorker(url, { signal }), diff --git a/src/workers/fetch.worker.ts b/src/workers/fetch.worker.ts index 2988a2e..8a2c0b1 100644 --- a/src/workers/fetch.worker.ts +++ b/src/workers/fetch.worker.ts @@ -1,13 +1,16 @@ -import { Comlink } from '@/deps.ts'; +import { Comlink, Debug } from '@/deps.ts'; import './handlers/abortsignal.ts'; +const debug = Debug('ditto:fetch.worker'); + export const FetchWorker = { async fetch( url: string, init: Omit, signal: AbortSignal | null | undefined, ): Promise<[BodyInit, ResponseInit]> { + debug(init.method, url); const response = await fetch(url, { ...init, signal }); return [ await response.arrayBuffer(), diff --git a/src/workers/sqlite.worker.ts b/src/workers/sqlite.worker.ts index 6f42213..564149a 100644 --- a/src/workers/sqlite.worker.ts +++ b/src/workers/sqlite.worker.ts @@ -1,9 +1,10 @@ /// -import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts'; +import { Comlink, type CompiledQuery, Debug, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts'; import '@/sentry.ts'; let db: DenoSqlite3 | undefined; +const debug = Debug('ditto:sqlite.worker'); export const SqliteWorker = { open(path: string): void { @@ -11,6 +12,7 @@ export const SqliteWorker = { }, executeQuery({ sql, parameters }: CompiledQuery): QueryResult { if (!db) throw new Error('Database not open'); + debug(sql); const result: QueryResult = Sentry.startSpan({ name: sql, op: 'db.query' }, () => { return { diff --git a/tringifyStable from npm:fast-stable-stringify b/tringifyStable from npm:fast-stable-stringify deleted file mode 100644 index edb0958..0000000 --- a/tringifyStable from npm:fast-stable-stringify +++ /dev/null @@ -1,307 +0,0 @@ -diff --git a/src/client.ts b/src/client.ts -index 970a077..3cf2e8a 100644 ---- a/src/client.ts -+++ b/src/client.ts -@@ -14,7 +14,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts -  - const unsub = pool.subscribe( - filters, -- activeRelays, -+ opts.relays ?? activeRelays, - (event: Event | null) => { - if (event && matchFilters(filters, event)) { - pipeline.handleEvent(event).catch(() => {}); -diff --git a/src/common.ts b/src/common.ts -new file mode 100644 -index 0000000..0424b52 ---- /dev/null -+++ b/src/common.ts -@@ -0,0 +1,9 @@ -+import { Reqmeister } from '@/reqmeister.ts'; -+import { Time } from '@/utils/time.ts'; -+ -+const reqmeister = new Reqmeister({ -+ delay: Time.seconds(1), -+ signal: AbortSignal.timeout(Time.seconds(1)), -+}); -+ -+export { reqmeister }; -diff --git a/src/deps.ts b/src/deps.ts -index b9db9e2..4a9314b 100644 ---- a/src/deps.ts -+++ b/src/deps.ts -@@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1 - export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; - export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; - export * as Comlink from 'npm:comlink@^4.4.1'; -+export { EventEmitter } from 'npm:tseep@^1.1.3'; -+export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0'; -  - export type * as TypeFest from 'npm:type-fest@^4.3.0'; -diff --git a/src/filter.ts b/src/filter.ts -index fb43251..38fcff7 100644 ---- a/src/filter.ts -+++ b/src/filter.ts -@@ -1,5 +1,5 @@ - import { Conf } from '@/config.ts'; --import { type Event, type Filter, matchFilters } from '@/deps.ts'; -+import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts'; -  - import type { EventData } from '@/types.ts'; -  -@@ -14,12 +14,17 @@ interface DittoFilter extends Filter { - relations?: Relation[]; - } -  -+/** Filter to get one specific event. */ -+type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; -+ - /** Additional options to apply to the whole subscription. */ - interface GetFiltersOpts { - /** Signal to abort the request. */ - signal?: AbortSignal; - /** Event limit for the whole subscription. */ - limit?: number; -+ /** Relays to use, if applicable. */ -+ relays?: WebSocket['url'][]; - } -  - function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { -@@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData - return false; - } -  --export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; -+/** Get deterministic ID for a microfilter. */ -+function getFilterId(filter: MicroFilter): string { -+ if ('ids' in filter) { -+ return stringifyStable({ ids: [filter.ids] }); -+ } else { -+ return stringifyStable({ -+ kinds: [filter.kinds[0]], -+ authors: [filter.authors[0]], -+ }); -+ } -+} -+ -+/** Get a microfilter from a Nostr event. */ -+function eventToMicroFilter(event: Event): MicroFilter { -+ if (event.kind === 0) { -+ return { kinds: [0], authors: [event.pubkey] }; -+ } else { -+ return { ids: [event.id] }; -+ } -+} -+ -+export { -+ type DittoFilter, -+ eventToMicroFilter, -+ getFilterId, -+ type GetFiltersOpts, -+ matchDittoFilters, -+ type MicroFilter, -+ type Relation, -+}; -diff --git a/src/pipeline.ts b/src/pipeline.ts -index adf8a84..923bf4e 100644 ---- a/src/pipeline.ts -+++ b/src/pipeline.ts -@@ -1,3 +1,4 @@ -+import { reqmeister } from '@/common.ts'; - import { Conf } from '@/config.ts'; - import * as eventsDB from '@/db/events.ts'; - import { addRelays } from '@/db/relays.ts'; -@@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts'; - */ - async function handleEvent(event: Event): Promise { - if (!(await verifySignatureWorker(event))) return; -+ const wanted = reqmeister.isWanted(event); - if (encounterEvent(event)) return; - console.info(`pipeline: Event<${event.kind}> ${event.id}`); - const data = await getEventData(event); -  - await Promise.all([ -- storeEvent(event, data), -+ storeEvent(event, data, { force: wanted }), - processDeletions(event), - trackRelays(event), - trackHashtags(event), -+ fetchRelatedEvents(event, data), - processMedia(event, data), - streamOut(event, data), - broadcast(event, data), -@@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise { - } -  - /** Tracks encountered events to skip duplicates, improving idempotency and performance. */ --const encounters = new LRUCache({ max: 1000 }); -+const encounters = new LRUCache({ max: 1000 }); -  - /** Encounter the event, and return whether it has already been encountered. */ --function encounterEvent(event: Event) { -+function encounterEvent(event: Event): boolean { - const result = encounters.get(event.id); - encounters.set(event.id, true); -- return result; -+ reqmeister.encounter(event); -+ return !!result; - } -  - /** Preload data that will be useful to several tasks. */ -@@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise { - /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ - const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; -  -+interface StoreEventOpts { -+ force?: boolean; -+} -+ - /** Maybe store the event, if eligible. */ --async function storeEvent(event: Event, data: EventData): Promise { -+async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { - if (isEphemeralKind(event.kind)) return; -+ const { force = false } = opts; -  -- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { -+ if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - const [deletion] = await mixer.getFilters( - [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], - { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, -@@ -129,6 +138,18 @@ function trackRelays(event: Event) { - return addRelays([...relays]); - } -  -+/** Queue related events to fetch. */ -+function fetchRelatedEvents(event: Event, data: EventData) { -+ if (!data.user) { -+ reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); -+ } -+ for (const [name, id, relay] of event.tags) { -+ if (name === 'e' && !encounters.has(id)) { -+ reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); -+ } -+ } -+} -+ - /** Delete unattached media entries that are attached to the event. */ - function processMedia({ tags, pubkey }: Event, { user }: EventData) { - if (user) { -diff --git a/src/queries.ts b/src/queries.ts -index fc7365a..1ecff7b 100644 ---- a/src/queries.ts -+++ b/src/queries.ts -@@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; - import { type Event, findReplyTag } from '@/deps.ts'; - import { type DittoFilter, type Relation } from '@/filter.ts'; - import * as mixer from '@/mixer.ts'; -+import { reqmeister } from '@/common.ts'; -  - interface GetEventOpts { - /** Signal to abort the request. */ -@@ -30,10 +31,10 @@ const getEvent = async ( - const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const { relations, signal = AbortSignal.timeout(1000) } = opts; -  -- const [event] = await mixer.getFilters( -+ const event = await eventsDB.getFilters( - [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], - { limit: 1, signal }, -- ); -+ ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); -  - return event; - }; -diff --git a/src/reqmeister.ts b/src/reqmeister.ts -new file mode 100644 -index 0000000..960151f ---- /dev/null -+++ b/src/reqmeister.ts -@@ -0,0 +1,88 @@ -+import * as client from '@/client.ts'; -+import { type Event, EventEmitter, type Filter } from '@/deps.ts'; -+ -+import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; -+ -+interface ReqmeisterOpts { -+ delay?: number; -+ signal?: AbortSignal; -+} -+ -+type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; -+ -+class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { -+ #opts: ReqmeisterOpts; -+ #queue: ReqmeisterQueueItem[] = []; -+ #promise!: Promise; -+ #resolve!: () => void; -+ -+ constructor(opts: ReqmeisterOpts = {}) { -+ super(); -+ this.#opts = opts; -+ this.#cycle(); -+ this.#perform(); -+ } -+ -+ #cycle() { -+ this.#resolve?.(); -+ this.#promise = new Promise((resolve) => { -+ this.#resolve = resolve; -+ }); -+ } -+ -+ async #perform() { -+ const { delay } = this.#opts; -+ await new Promise((resolve) => setTimeout(resolve, delay)); -+ -+ const queue = this.#queue; -+ this.#queue = []; -+ -+ const wantedEvents = new Set(); -+ const wantedAuthors = new Set(); -+ -+ // TODO: batch by relays. -+ for (const [_filterId, filter, _relays] of queue) { -+ if ('ids' in filter) { -+ filter.ids.forEach((id) => wantedEvents.add(id)); -+ } else { -+ wantedAuthors.add(filter.authors[0]); -+ } -+ } -+ -+ const filters: Filter[] = []; -+ -+ if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); -+ if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); -+ -+ const events = await client.getFilters(filters, { signal: this.#opts.signal }); -+ -+ for (const event of events) { -+ this.encounter(event); -+ } -+ -+ this.#cycle(); -+ this.#perform(); -+ } -+ -+ req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { -+ const filterId = getFilterId(filter); -+ this.#queue.push([filterId, filter, relays]); -+ return new Promise((resolve, reject) => { -+ this.once(filterId, resolve); -+ this.#promise.finally(reject); -+ }); -+ } -+ -+ encounter(event: Event): void { -+ const filterId = getFilterId(eventToMicroFilter(event)); -+ this.#queue = this.#queue.filter(([id]) => id !== filterId); -+ this.emit(filterId, event); -+ } -+ -+ isWanted(event: Event): boolean { -+ const filterId = getFilterId(eventToMicroFilter(event)); -+ return this.#queue.some(([id]) => id === filterId); -+ } -+} -+ -+export { Reqmeister };