Merge branch 'debug' into 'main'
debug See merge request soapbox-pub/ditto!85
This commit is contained in:
commit
06be0c0a50
|
@ -1 +1,2 @@
|
||||||
.env
|
.env
|
||||||
|
*.cpuprofile
|
|
@ -0,0 +1,23 @@
|
||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Launch Program",
|
||||||
|
"type": "node",
|
||||||
|
"program": "${workspaceFolder}/src/server.ts",
|
||||||
|
"cwd": "${workspaceFolder}",
|
||||||
|
"runtimeExecutable": "deno",
|
||||||
|
"runtimeArgs": [
|
||||||
|
"run",
|
||||||
|
"--inspect-wait",
|
||||||
|
"--allow-all",
|
||||||
|
"--unstable"
|
||||||
|
],
|
||||||
|
"attachSimplePort": 9229
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -3,8 +3,7 @@
|
||||||
"lock": false,
|
"lock": false,
|
||||||
"tasks": {
|
"tasks": {
|
||||||
"start": "deno run -A --unstable src/server.ts",
|
"start": "deno run -A --unstable src/server.ts",
|
||||||
"dev": "deno run -A --unstable --watch src/server.ts",
|
"dev": "deno run -A --unstable --watch --inspect src/server.ts",
|
||||||
"debug": "deno run -A --unstable --inspect src/server.ts",
|
|
||||||
"test": "DB_PATH=\":memory:\" deno test -A --unstable",
|
"test": "DB_PATH=\":memory:\" deno test -A --unstable",
|
||||||
"check": "deno check src/server.ts",
|
"check": "deno check src/server.ts",
|
||||||
"relays:sync": "deno run -A --unstable scripts/relays.ts sync"
|
"relays:sync": "deno run -A --unstable scripts/relays.ts sync"
|
||||||
|
|
10
src/app.ts
10
src/app.ts
|
@ -4,6 +4,7 @@ import { type User } from '@/db/users.ts';
|
||||||
import {
|
import {
|
||||||
type Context,
|
type Context,
|
||||||
cors,
|
cors,
|
||||||
|
Debug,
|
||||||
type Event,
|
type Event,
|
||||||
type Handler,
|
type Handler,
|
||||||
Hono,
|
Hono,
|
||||||
|
@ -90,7 +91,14 @@ if (Conf.sentryDsn) {
|
||||||
app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn }));
|
app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn }));
|
||||||
}
|
}
|
||||||
|
|
||||||
app.use('*', logger());
|
const debug = Debug('ditto:http');
|
||||||
|
|
||||||
|
app.use('/api', logger(debug));
|
||||||
|
app.use('/relay', logger(debug));
|
||||||
|
app.use('/.well-known', logger(debug));
|
||||||
|
app.use('/users', logger(debug));
|
||||||
|
app.use('/nodeinfo', logger(debug));
|
||||||
|
app.use('/oauth', logger(debug));
|
||||||
|
|
||||||
app.get('/api/v1/streaming', streamingController);
|
app.get('/api/v1/streaming', streamingController);
|
||||||
app.get('/api/v1/streaming/', streamingController);
|
app.get('/api/v1/streaming/', streamingController);
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { activeRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
|
|
||||||
import type { GetFiltersOpts } from '@/filter.ts';
|
import type { GetFiltersOpts } from '@/filter.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:client');
|
||||||
|
|
||||||
/** Get events from a NIP-01 filter. */
|
/** Get events from a NIP-01 filter. */
|
||||||
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
|
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
|
debug('REQ', JSON.stringify(filters));
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
const results: Event[] = [];
|
const results: Event[] = [];
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
import { Reqmeister } from '@/reqmeister.ts';
|
|
||||||
import { Time } from '@/utils/time.ts';
|
|
||||||
|
|
||||||
const reqmeister = new Reqmeister({
|
|
||||||
delay: Time.seconds(1),
|
|
||||||
signal: AbortSignal.timeout(Time.seconds(1)),
|
|
||||||
});
|
|
||||||
|
|
||||||
export { reqmeister };
|
|
|
@ -1,11 +1,13 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import { z } from '@/deps.ts';
|
import { Debug, z } from '@/deps.ts';
|
||||||
import { type DittoFilter } from '@/filter.ts';
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { getAuthor, getFeedPubkeys } from '@/queries.ts';
|
import { getAuthor, getFeedPubkeys } from '@/queries.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { bech32ToPubkey } from '@/utils.ts';
|
import { bech32ToPubkey } from '@/utils.ts';
|
||||||
import { renderStatus } from '@/views/mastodon/statuses.ts';
|
import { renderStatus } from '@/views/mastodon/statuses.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:streaming');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Streaming timelines/categories.
|
* Streaming timelines/categories.
|
||||||
* https://docs.joinmastodon.org/methods/streaming/#streams
|
* https://docs.joinmastodon.org/methods/streaming/#streams
|
||||||
|
@ -49,6 +51,7 @@ const streamingController: AppController = (c) => {
|
||||||
|
|
||||||
function send(name: string, payload: object) {
|
function send(name: string, payload: object) {
|
||||||
if (socket.readyState === WebSocket.OPEN) {
|
if (socket.readyState === WebSocket.OPEN) {
|
||||||
|
debug('send', name, JSON.stringify(payload));
|
||||||
socket.send(JSON.stringify({
|
socket.send(JSON.stringify({
|
||||||
event: name,
|
event: name,
|
||||||
payload: JSON.stringify(payload),
|
payload: JSON.stringify(payload),
|
||||||
|
|
|
@ -6,7 +6,7 @@ import { cidFromUrl } from '@/utils/ipfs.ts';
|
||||||
|
|
||||||
/** Delete files that aren't attached to any events. */
|
/** Delete files that aren't attached to any events. */
|
||||||
async function cleanupMedia() {
|
async function cleanupMedia() {
|
||||||
console.log('Deleting orphaned media files...');
|
console.info('Deleting orphaned media files...');
|
||||||
|
|
||||||
const until = new Date(Date.now() - Time.minutes(15));
|
const until = new Date(Date.now() - Time.minutes(15));
|
||||||
const media = await getUnattachedMedia(until);
|
const media = await getUnattachedMedia(until);
|
||||||
|
@ -22,7 +22,7 @@ async function cleanupMedia() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(`Removed ${media?.length ?? 0} orphaned media files.`);
|
console.info(`Removed ${media?.length ?? 0} orphaned media files.`);
|
||||||
}
|
}
|
||||||
|
|
||||||
await cleanupMedia();
|
await cleanupMedia();
|
||||||
|
|
16
src/db.ts
16
src/db.ts
|
@ -3,7 +3,7 @@ import path from 'node:path';
|
||||||
|
|
||||||
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
|
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { getPragma, setPragma } from '@/pragma.ts';
|
import { setPragma } from '@/pragma.ts';
|
||||||
import SqliteWorker from '@/workers/sqlite.ts';
|
import SqliteWorker from '@/workers/sqlite.ts';
|
||||||
|
|
||||||
interface DittoDB {
|
interface DittoDB {
|
||||||
|
@ -89,12 +89,6 @@ await Promise.all([
|
||||||
setPragma(db, 'mmap_size', Conf.sqlite.mmapSize),
|
setPragma(db, 'mmap_size', Conf.sqlite.mmapSize),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// Log out PRAGMA values for debugging.
|
|
||||||
['journal_mode', 'synchronous', 'temp_store', 'mmap_size'].forEach(async (pragma) => {
|
|
||||||
const value = await getPragma(db, pragma);
|
|
||||||
console.log(`PRAGMA ${pragma} = ${value};`);
|
|
||||||
});
|
|
||||||
|
|
||||||
const migrator = new Migrator({
|
const migrator = new Migrator({
|
||||||
db,
|
db,
|
||||||
provider: new FileMigrationProvider({
|
provider: new FileMigrationProvider({
|
||||||
|
@ -106,7 +100,7 @@ const migrator = new Migrator({
|
||||||
|
|
||||||
/** Migrate the database to the latest version. */
|
/** Migrate the database to the latest version. */
|
||||||
async function migrate() {
|
async function migrate() {
|
||||||
console.log('Running migrations...');
|
console.info('Running migrations...');
|
||||||
const results = await migrator.migrateToLatest();
|
const results = await migrator.migrateToLatest();
|
||||||
|
|
||||||
if (results.error) {
|
if (results.error) {
|
||||||
|
@ -114,11 +108,11 @@ async function migrate() {
|
||||||
Deno.exit(1);
|
Deno.exit(1);
|
||||||
} else {
|
} else {
|
||||||
if (!results.results?.length) {
|
if (!results.results?.length) {
|
||||||
console.log('Everything up-to-date.');
|
console.info('Everything up-to-date.');
|
||||||
} else {
|
} else {
|
||||||
console.log('Migrations finished!');
|
console.info('Migrations finished!');
|
||||||
for (const { migrationName, status } of results.results!) {
|
for (const { migrationName, status } of results.results!) {
|
||||||
console.log(` - ${migrationName}: ${status}`);
|
console.info(` - ${migrationName}: ${status}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { db, type DittoDB } from '@/db.ts';
|
import { db, type DittoDB } from '@/db.ts';
|
||||||
import { type Event, type SelectQueryBuilder } from '@/deps.ts';
|
import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts';
|
||||||
import { isParameterizedReplaceableKind } from '@/kinds.ts';
|
import { isParameterizedReplaceableKind } from '@/kinds.ts';
|
||||||
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
||||||
import { EventData } from '@/types.ts';
|
import { EventData } from '@/types.ts';
|
||||||
|
@ -7,6 +7,8 @@ import { isNostrId, isURL } from '@/utils.ts';
|
||||||
|
|
||||||
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
|
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:db:events');
|
||||||
|
|
||||||
/** Function to decide whether or not to index a tag. */
|
/** Function to decide whether or not to index a tag. */
|
||||||
type TagCondition = ({ event, count, value }: {
|
type TagCondition = ({ event, count, value }: {
|
||||||
event: Event;
|
event: Event;
|
||||||
|
@ -28,6 +30,8 @@ const tagConditions: Record<string, TagCondition> = {
|
||||||
|
|
||||||
/** Insert an event (and its tags) into the database. */
|
/** Insert an event (and its tags) into the database. */
|
||||||
function insertEvent(event: Event, data: EventData): Promise<void> {
|
function insertEvent(event: Event, data: EventData): Promise<void> {
|
||||||
|
debug('insertEvent', JSON.stringify(event));
|
||||||
|
|
||||||
return db.transaction().execute(async (trx) => {
|
return db.transaction().execute(async (trx) => {
|
||||||
/** Insert the event into the database. */
|
/** Insert the event into the database. */
|
||||||
async function addEvent() {
|
async function addEvent() {
|
||||||
|
@ -224,6 +228,7 @@ async function getFilters<K extends number>(
|
||||||
opts: GetFiltersOpts = {},
|
opts: GetFiltersOpts = {},
|
||||||
): Promise<DittoEvent<K>[]> {
|
): Promise<DittoEvent<K>[]> {
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
|
debug('getFilters', JSON.stringify(filters));
|
||||||
let query = getFiltersQuery(filters);
|
let query = getFiltersQuery(filters);
|
||||||
|
|
||||||
if (typeof opts.limit === 'number') {
|
if (typeof opts.limit === 'number') {
|
||||||
|
@ -276,6 +281,7 @@ async function getFilters<K extends number>(
|
||||||
/** Delete events based on filters from the database. */
|
/** Delete events based on filters from the database. */
|
||||||
function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
|
function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
|
debug('deleteFilters', JSON.stringify(filters));
|
||||||
|
|
||||||
return db.transaction().execute(async (trx) => {
|
return db.transaction().execute(async (trx) => {
|
||||||
const query = getFiltersQuery(filters).clearSelect().select('id');
|
const query = getFiltersQuery(filters).clearSelect().select('id');
|
||||||
|
@ -293,6 +299,7 @@ function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
|
||||||
/** Get number of events that would be returned by filters. */
|
/** Get number of events that would be returned by filters. */
|
||||||
async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
||||||
if (!filters.length) return Promise.resolve(0);
|
if (!filters.length) return Promise.resolve(0);
|
||||||
|
debug('countFilters', JSON.stringify(filters));
|
||||||
const query = getFiltersQuery(filters);
|
const query = getFiltersQuery(filters);
|
||||||
|
|
||||||
const [{ count }] = await query
|
const [{ count }] = await query
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import { type Insertable } from '@/deps.ts';
|
import { Debug, type Insertable } from '@/deps.ts';
|
||||||
|
|
||||||
import { db, type UserRow } from '../db.ts';
|
import { db, type UserRow } from '../db.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:users');
|
||||||
|
|
||||||
interface User {
|
interface User {
|
||||||
pubkey: string;
|
pubkey: string;
|
||||||
username: string;
|
username: string;
|
||||||
|
@ -11,6 +13,7 @@ interface User {
|
||||||
|
|
||||||
/** Adds a user to the database. */
|
/** Adds a user to the database. */
|
||||||
function insertUser(user: Insertable<UserRow>) {
|
function insertUser(user: Insertable<UserRow>) {
|
||||||
|
debug('insertUser', JSON.stringify(user));
|
||||||
return db.insertInto('users').values(user).execute();
|
return db.insertInto('users').values(user).execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -83,5 +83,7 @@ export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
|
||||||
export * as Comlink from 'npm:comlink@^4.4.1';
|
export * as Comlink from 'npm:comlink@^4.4.1';
|
||||||
export { EventEmitter } from 'npm:tseep@^1.1.3';
|
export { EventEmitter } from 'npm:tseep@^1.1.3';
|
||||||
export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
|
export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
|
||||||
|
// @deno-types="npm:@types/debug@^4.1.12"
|
||||||
|
export { default as Debug } from 'npm:debug@^4.3.4';
|
||||||
|
|
||||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||||
|
|
|
@ -52,7 +52,7 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
|
||||||
/** Get deterministic ID for a microfilter. */
|
/** Get deterministic ID for a microfilter. */
|
||||||
function getFilterId(filter: MicroFilter): string {
|
function getFilterId(filter: MicroFilter): string {
|
||||||
if ('ids' in filter) {
|
if ('ids' in filter) {
|
||||||
return stringifyStable({ ids: [filter.ids] });
|
return stringifyStable({ ids: [filter.ids[0]] });
|
||||||
} else {
|
} else {
|
||||||
return stringifyStable({
|
return stringifyStable({
|
||||||
kinds: [filter.kinds[0]],
|
kinds: [filter.kinds[0]],
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import { type Event } from '@/deps.ts';
|
import { Debug, type Event } from '@/deps.ts';
|
||||||
import { activeRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
import { nostrNow } from '@/utils.ts';
|
import { nostrNow } from '@/utils.ts';
|
||||||
|
|
||||||
import * as pipeline from './pipeline.ts';
|
import * as pipeline from './pipeline.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:firehose');
|
||||||
|
|
||||||
// This file watches events on all known relays and performs
|
// This file watches events on all known relays and performs
|
||||||
// side-effects based on them, such as trending hashtag tracking
|
// side-effects based on them, such as trending hashtag tracking
|
||||||
// and storing events for notifications and the home feed.
|
// and storing events for notifications and the home feed.
|
||||||
|
@ -17,6 +19,8 @@ pool.subscribe(
|
||||||
|
|
||||||
/** Handle events through the firehose pipeline. */
|
/** Handle events through the firehose pipeline. */
|
||||||
function handleEvent(event: Event): Promise<void> {
|
function handleEvent(event: Event): Promise<void> {
|
||||||
|
debug(`Event<${event.kind}> ${event.id}`);
|
||||||
|
|
||||||
return pipeline
|
return pipeline
|
||||||
.handleEvent(event)
|
.handleEvent(event)
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
|
import { Debug, type MiddlewareHandler } from '@/deps.ts';
|
||||||
import ExpiringCache from '@/utils/expiring-cache.ts';
|
import ExpiringCache from '@/utils/expiring-cache.ts';
|
||||||
|
|
||||||
import type { MiddlewareHandler } from '@/deps.ts';
|
const debug = Debug('ditto:middleware:cache');
|
||||||
|
|
||||||
export const cache = (options: {
|
export const cache = (options: {
|
||||||
cacheName: string;
|
cacheName: string;
|
||||||
|
@ -11,14 +12,14 @@ export const cache = (options: {
|
||||||
const cache = new ExpiringCache(await caches.open(options.cacheName));
|
const cache = new ExpiringCache(await caches.open(options.cacheName));
|
||||||
const response = await cache.match(key);
|
const response = await cache.match(key);
|
||||||
if (!response) {
|
if (!response) {
|
||||||
console.debug('Building cache for page', c.req.url);
|
debug('Building cache for page', c.req.url);
|
||||||
await next();
|
await next();
|
||||||
const response = c.res.clone();
|
const response = c.res.clone();
|
||||||
if (response.status < 500) {
|
if (response.status < 500) {
|
||||||
await cache.putExpiring(key, response, options.expires ?? 0);
|
await cache.putExpiring(key, response, options.expires ?? 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
console.debug('Serving page from cache', c.req.url);
|
debug('Serving page from cache', c.req.url);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
14
src/note.ts
14
src/note.ts
|
@ -43,7 +43,7 @@ interface ParsedNoteContent {
|
||||||
function parseNoteContent(content: string): ParsedNoteContent {
|
function parseNoteContent(content: string): ParsedNoteContent {
|
||||||
// Parsing twice is ineffecient, but I don't know how to do only once.
|
// Parsing twice is ineffecient, but I don't know how to do only once.
|
||||||
const html = linkifyStr(content, linkifyOpts);
|
const html = linkifyStr(content, linkifyOpts);
|
||||||
const links = linkify.find(content).filter(isValidLink);
|
const links = linkify.find(content).filter(isLinkURL);
|
||||||
const firstUrl = links.find(isNonMediaLink)?.href;
|
const firstUrl = links.find(isNonMediaLink)?.href;
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
@ -77,15 +77,9 @@ function isNonMediaLink({ href }: Link): boolean {
|
||||||
return /^https?:\/\//.test(href) && !getUrlMimeType(href);
|
return /^https?:\/\//.test(href) && !getUrlMimeType(href);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Ensures the URL can be parsed. Why linkifyjs doesn't already guarantee this, idk... */
|
/** Ensures the Link is a URL so it can be parsed. */
|
||||||
function isValidLink(link: Link): boolean {
|
function isLinkURL(link: Link): boolean {
|
||||||
try {
|
return link.type === 'url';
|
||||||
new URL(link.href);
|
|
||||||
return true;
|
|
||||||
} catch (_e) {
|
|
||||||
console.error(`Invalid link: ${link.href}`);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */
|
/** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
import { reqmeister } from '@/common.ts';
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import * as eventsDB from '@/db/events.ts';
|
||||||
import { addRelays } from '@/db/relays.ts';
|
import { addRelays } from '@/db/relays.ts';
|
||||||
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
||||||
import { findUser } from '@/db/users.ts';
|
import { findUser } from '@/db/users.ts';
|
||||||
import { type Event, LRUCache } from '@/deps.ts';
|
import { Debug, type Event, LRUCache } from '@/deps.ts';
|
||||||
import { isEphemeralKind } from '@/kinds.ts';
|
import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
import { publish } from '@/pool.ts';
|
import { publish } from '@/pool.ts';
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
|
import { reqmeister } from '@/reqmeister.ts';
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
|
@ -18,6 +18,8 @@ import { verifySignatureWorker } from '@/workers/verify.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:pipeline');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common pipeline function to process (and maybe store) events.
|
* Common pipeline function to process (and maybe store) events.
|
||||||
* It is idempotent, so it can be called multiple times for the same event.
|
* It is idempotent, so it can be called multiple times for the same event.
|
||||||
|
@ -26,7 +28,7 @@ async function handleEvent(event: Event): Promise<void> {
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
const wanted = reqmeister.isWanted(event);
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (encounterEvent(event)) return;
|
if (encounterEvent(event)) return;
|
||||||
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
|
debug(`Event<${event.kind}> ${event.id}`);
|
||||||
const data = await getEventData(event);
|
const data = await getEventData(event);
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
|
@ -80,8 +82,8 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
||||||
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
||||||
} else {
|
} else {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
eventsDB.insertEvent(event, data).catch(console.warn),
|
eventsDB.insertEvent(event, data).catch(debug),
|
||||||
updateStats(event).catch(console.warn),
|
updateStats(event).catch(debug),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -115,7 +117,7 @@ async function trackHashtags(event: Event): Promise<void> {
|
||||||
if (!tags.length) return;
|
if (!tags.length) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
console.info('tracking tags:', tags);
|
debug('tracking tags:', tags);
|
||||||
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
|
await TrendsWorker.addTagUsages(event.pubkey, tags, date);
|
||||||
} catch (_e) {
|
} catch (_e) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import { getActiveRelays } from '@/db/relays.ts';
|
import { getActiveRelays } from '@/db/relays.ts';
|
||||||
import { type Event, RelayPool } from '@/deps.ts';
|
import { Debug, type Event, RelayPool } from '@/deps.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:pool');
|
||||||
|
|
||||||
const activeRelays = await getActiveRelays();
|
const activeRelays = await getActiveRelays();
|
||||||
|
|
||||||
|
@ -14,6 +16,7 @@ const pool = new RelayPool(activeRelays, {
|
||||||
|
|
||||||
/** Publish an event to the given relays, or the entire pool. */
|
/** Publish an event to the given relays, or the entire pool. */
|
||||||
function publish(event: Event, relays: string[] = activeRelays) {
|
function publish(event: Event, relays: string[] = activeRelays) {
|
||||||
|
debug('publish', event);
|
||||||
return pool.publish(event, relays);
|
return pool.publish(event, relays);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ import * as eventsDB from '@/db/events.ts';
|
||||||
import { type Event, findReplyTag } from '@/deps.ts';
|
import { type Event, findReplyTag } from '@/deps.ts';
|
||||||
import { type DittoFilter, type Relation } from '@/filter.ts';
|
import { type DittoFilter, type Relation } from '@/filter.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
import { reqmeister } from '@/common.ts';
|
import { reqmeister } from '@/reqmeister.ts';
|
||||||
|
|
||||||
interface GetEventOpts<K extends number> {
|
interface GetEventOpts<K extends number> {
|
||||||
/** Signal to abort the request. */
|
/** Signal to abort the request. */
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
import * as client from '@/client.ts';
|
import * as client from '@/client.ts';
|
||||||
import { type Event, EventEmitter, type Filter } from '@/deps.ts';
|
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
|
||||||
|
|
||||||
import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
|
import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
|
||||||
|
import { Time } from '@/utils/time.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:reqmeister');
|
||||||
|
|
||||||
interface ReqmeisterOpts {
|
interface ReqmeisterOpts {
|
||||||
delay?: number;
|
delay?: number;
|
||||||
signal?: AbortSignal;
|
timeout?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
||||||
|
@ -20,11 +22,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
constructor(opts: ReqmeisterOpts = {}) {
|
constructor(opts: ReqmeisterOpts = {}) {
|
||||||
super();
|
super();
|
||||||
this.#opts = opts;
|
this.#opts = opts;
|
||||||
this.#cycle();
|
this.#tick();
|
||||||
this.#perform();
|
this.#perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
#cycle() {
|
#tick() {
|
||||||
this.#resolve?.();
|
this.#resolve?.();
|
||||||
this.#promise = new Promise((resolve) => {
|
this.#promise = new Promise((resolve) => {
|
||||||
this.#resolve = resolve;
|
this.#resolve = resolve;
|
||||||
|
@ -32,7 +34,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
}
|
}
|
||||||
|
|
||||||
async #perform() {
|
async #perform() {
|
||||||
const { delay } = this.#opts;
|
const { delay, timeout = Time.seconds(1) } = this.#opts;
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
|
||||||
const queue = this.#queue;
|
const queue = this.#queue;
|
||||||
|
@ -55,13 +57,16 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
|
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
|
||||||
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
||||||
|
|
||||||
const events = await client.getFilters(filters, { signal: this.#opts.signal });
|
if (filters.length) {
|
||||||
|
debug('REQ', JSON.stringify(filters));
|
||||||
|
const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) });
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
this.encounter(event);
|
this.encounter(event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.#cycle();
|
this.#tick();
|
||||||
this.#perform();
|
this.#perform();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +75,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
this.#queue.push([filterId, filter, relays]);
|
this.#queue.push([filterId, filter, relays]);
|
||||||
return new Promise<Event>((resolve, reject) => {
|
return new Promise<Event>((resolve, reject) => {
|
||||||
this.once(filterId, resolve);
|
this.once(filterId, resolve);
|
||||||
this.#promise.finally(reject);
|
this.#promise.finally(() => setTimeout(reject, 0));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,4 +91,9 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export { Reqmeister };
|
const reqmeister = new Reqmeister({
|
||||||
|
delay: Time.seconds(1),
|
||||||
|
timeout: Time.seconds(1),
|
||||||
|
});
|
||||||
|
|
||||||
|
export { reqmeister };
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
import { type AppContext } from '@/app.ts';
|
import { type AppContext } from '@/app.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { decryptAdmin, encryptAdmin } from '@/crypto.ts';
|
import { decryptAdmin, encryptAdmin } from '@/crypto.ts';
|
||||||
import { type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts';
|
import { Debug, type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts';
|
||||||
import { connectResponseSchema } from '@/schemas/nostr.ts';
|
import { connectResponseSchema } from '@/schemas/nostr.ts';
|
||||||
import { jsonSchema } from '@/schema.ts';
|
import { jsonSchema } from '@/schema.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { eventMatchesTemplate, Time } from '@/utils.ts';
|
import { eventMatchesTemplate, Time } from '@/utils.ts';
|
||||||
import { createAdminEvent } from '@/utils/web.ts';
|
import { createAdminEvent } from '@/utils/web.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:sign');
|
||||||
|
|
||||||
interface SignEventOpts {
|
interface SignEventOpts {
|
||||||
/** Target proof-of-work difficulty for the signed event. */
|
/** Target proof-of-work difficulty for the signed event. */
|
||||||
pow?: number;
|
pow?: number;
|
||||||
|
@ -28,10 +30,12 @@ async function signEvent<K extends number = number>(
|
||||||
const header = c.req.header('x-nostr-sign');
|
const header = c.req.header('x-nostr-sign');
|
||||||
|
|
||||||
if (seckey) {
|
if (seckey) {
|
||||||
|
debug(`Signing Event<${event.kind}> with secret key`);
|
||||||
return finishEvent(event, seckey);
|
return finishEvent(event, seckey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (header) {
|
if (header) {
|
||||||
|
debug(`Signing Event<${event.kind}> with NIP-46`);
|
||||||
return await signNostrConnect(event, c, opts);
|
return await signNostrConnect(event, c, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import * as eventsDB from '@/db/events.ts';
|
||||||
import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
||||||
|
|
||||||
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
||||||
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
|
type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
|
||||||
|
@ -9,6 +9,8 @@ type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: n
|
||||||
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
|
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
|
||||||
type StatDiff = AuthorStatDiff | EventStatDiff;
|
type StatDiff = AuthorStatDiff | EventStatDiff;
|
||||||
|
|
||||||
|
const debug = Debug('ditto:stats');
|
||||||
|
|
||||||
/** Store stats for the event in LMDB. */
|
/** Store stats for the event in LMDB. */
|
||||||
async function updateStats<K extends number>(event: Event<K>) {
|
async function updateStats<K extends number>(event: Event<K>) {
|
||||||
let prev: Event<K> | undefined;
|
let prev: Event<K> | undefined;
|
||||||
|
@ -26,6 +28,10 @@ async function updateStats<K extends number>(event: Event<K>) {
|
||||||
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
|
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
|
||||||
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
|
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
|
||||||
|
|
||||||
|
if (statDiffs.length) {
|
||||||
|
debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs }));
|
||||||
|
}
|
||||||
|
|
||||||
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
|
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
|
||||||
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));
|
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import { type Event } from '@/deps.ts';
|
import { Debug, type Event } from '@/deps.ts';
|
||||||
import { Subscription } from '@/subscription.ts';
|
import { Subscription } from '@/subscription.ts';
|
||||||
|
|
||||||
import type { DittoFilter } from '@/filter.ts';
|
import type { DittoFilter } from '@/filter.ts';
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:subs');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages Ditto event subscriptions.
|
* Manages Ditto event subscriptions.
|
||||||
* Subscriptions can be added, removed, and matched against events.
|
* Subscriptions can be added, removed, and matched against events.
|
||||||
|
@ -21,6 +23,7 @@ class SubscriptionStore {
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
||||||
|
debug('sub', id, JSON.stringify(filters));
|
||||||
let subs = this.#store.get(socket);
|
let subs = this.#store.get(socket);
|
||||||
|
|
||||||
if (!subs) {
|
if (!subs) {
|
||||||
|
@ -38,12 +41,14 @@ class SubscriptionStore {
|
||||||
|
|
||||||
/** Remove a subscription from the store. */
|
/** Remove a subscription from the store. */
|
||||||
unsub(socket: unknown, id: string): void {
|
unsub(socket: unknown, id: string): void {
|
||||||
|
debug('unsub', id);
|
||||||
this.#store.get(socket)?.get(id)?.close();
|
this.#store.get(socket)?.get(id)?.close();
|
||||||
this.#store.get(socket)?.delete(id);
|
this.#store.get(socket)?.delete(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Remove an entire socket. */
|
/** Remove an entire socket. */
|
||||||
close(socket: unknown): void {
|
close(socket: unknown): void {
|
||||||
|
debug('close', socket);
|
||||||
const subs = this.#store.get(socket);
|
const subs = this.#store.get(socket);
|
||||||
|
|
||||||
if (subs) {
|
if (subs) {
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import { TTLCache, z } from '@/deps.ts';
|
import { Debug, TTLCache, z } from '@/deps.ts';
|
||||||
import { Time } from '@/utils/time.ts';
|
import { Time } from '@/utils/time.ts';
|
||||||
import { fetchWorker } from '@/workers/fetch.ts';
|
import { fetchWorker } from '@/workers/fetch.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:nip05');
|
||||||
|
|
||||||
const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hours(1), max: 5000 });
|
const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hours(1), max: 5000 });
|
||||||
|
|
||||||
const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/;
|
const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/;
|
||||||
|
@ -46,7 +48,7 @@ function lookupNip05Cached(value: string): Promise<string | null> {
|
||||||
const cached = nip05Cache.get(value);
|
const cached = nip05Cache.get(value);
|
||||||
if (cached !== undefined) return cached;
|
if (cached !== undefined) return cached;
|
||||||
|
|
||||||
console.log(`Looking up NIP-05 for ${value}`);
|
debug(`Lookup ${value}`);
|
||||||
const result = lookup(value);
|
const result = lookup(value);
|
||||||
nip05Cache.set(value, result);
|
nip05Cache.set(value, result);
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import { TTLCache, unfurl } from '@/deps.ts';
|
import { Debug, TTLCache, unfurl } from '@/deps.ts';
|
||||||
import { Time } from '@/utils/time.ts';
|
import { Time } from '@/utils/time.ts';
|
||||||
import { fetchWorker } from '@/workers/fetch.ts';
|
import { fetchWorker } from '@/workers/fetch.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:unfurl');
|
||||||
|
|
||||||
interface PreviewCard {
|
interface PreviewCard {
|
||||||
url: string;
|
url: string;
|
||||||
title: string;
|
title: string;
|
||||||
|
@ -20,7 +22,7 @@ interface PreviewCard {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard | null> {
|
async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard | null> {
|
||||||
console.log(`Unfurling ${url}...`);
|
debug(`Unfurling ${url}...`);
|
||||||
try {
|
try {
|
||||||
const result = await unfurl(url, {
|
const result = await unfurl(url, {
|
||||||
fetch: (url) => fetchWorker(url, { signal }),
|
fetch: (url) => fetchWorker(url, { signal }),
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
import { Comlink } from '@/deps.ts';
|
import { Comlink, Debug } from '@/deps.ts';
|
||||||
|
|
||||||
import './handlers/abortsignal.ts';
|
import './handlers/abortsignal.ts';
|
||||||
|
|
||||||
|
const debug = Debug('ditto:fetch.worker');
|
||||||
|
|
||||||
export const FetchWorker = {
|
export const FetchWorker = {
|
||||||
async fetch(
|
async fetch(
|
||||||
url: string,
|
url: string,
|
||||||
init: Omit<RequestInit, 'signal'>,
|
init: Omit<RequestInit, 'signal'>,
|
||||||
signal: AbortSignal | null | undefined,
|
signal: AbortSignal | null | undefined,
|
||||||
): Promise<[BodyInit, ResponseInit]> {
|
): Promise<[BodyInit, ResponseInit]> {
|
||||||
|
debug(init.method, url);
|
||||||
const response = await fetch(url, { ...init, signal });
|
const response = await fetch(url, { ...init, signal });
|
||||||
return [
|
return [
|
||||||
await response.arrayBuffer(),
|
await response.arrayBuffer(),
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
/// <reference lib="webworker" />
|
/// <reference lib="webworker" />
|
||||||
|
|
||||||
import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts';
|
import { Comlink, type CompiledQuery, Debug, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts';
|
||||||
import '@/sentry.ts';
|
import '@/sentry.ts';
|
||||||
|
|
||||||
let db: DenoSqlite3 | undefined;
|
let db: DenoSqlite3 | undefined;
|
||||||
|
const debug = Debug('ditto:sqlite.worker');
|
||||||
|
|
||||||
export const SqliteWorker = {
|
export const SqliteWorker = {
|
||||||
open(path: string): void {
|
open(path: string): void {
|
||||||
|
@ -11,6 +12,7 @@ export const SqliteWorker = {
|
||||||
},
|
},
|
||||||
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
|
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
|
||||||
if (!db) throw new Error('Database not open');
|
if (!db) throw new Error('Database not open');
|
||||||
|
debug(sql);
|
||||||
|
|
||||||
const result: QueryResult<R> = Sentry.startSpan({ name: sql, op: 'db.query' }, () => {
|
const result: QueryResult<R> = Sentry.startSpan({ name: sql, op: 'db.query' }, () => {
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -1,307 +0,0 @@
|
||||||
[1mdiff --git a/src/client.ts b/src/client.ts[m
|
|
||||||
[1mindex 970a077..3cf2e8a 100644[m
|
|
||||||
[1m--- a/src/client.ts[m
|
|
||||||
[1m+++ b/src/client.ts[m
|
|
||||||
[36m@@ -14,7 +14,7 @@[m [mfunction getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts[m
|
|
||||||
[m
|
|
||||||
const unsub = pool.subscribe([m
|
|
||||||
filters,[m
|
|
||||||
[31m- activeRelays,[m
|
|
||||||
[32m+[m[32m opts.relays ?? activeRelays,[m
|
|
||||||
(event: Event | null) => {[m
|
|
||||||
if (event && matchFilters(filters, event)) {[m
|
|
||||||
pipeline.handleEvent(event).catch(() => {});[m
|
|
||||||
[1mdiff --git a/src/common.ts b/src/common.ts[m
|
|
||||||
[1mnew file mode 100644[m
|
|
||||||
[1mindex 0000000..0424b52[m
|
|
||||||
[1m--- /dev/null[m
|
|
||||||
[1m+++ b/src/common.ts[m
|
|
||||||
[36m@@ -0,0 +1,9 @@[m
|
|
||||||
[32m+[m[32mimport { Reqmeister } from '@/reqmeister.ts';[m
|
|
||||||
[32m+[m[32mimport { Time } from '@/utils/time.ts';[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mconst reqmeister = new Reqmeister({[m
|
|
||||||
[32m+[m[32m delay: Time.seconds(1),[m
|
|
||||||
[32m+[m[32m signal: AbortSignal.timeout(Time.seconds(1)),[m
|
|
||||||
[32m+[m[32m});[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mexport { reqmeister };[m
|
|
||||||
[1mdiff --git a/src/deps.ts b/src/deps.ts[m
|
|
||||||
[1mindex b9db9e2..4a9314b 100644[m
|
|
||||||
[1m--- a/src/deps.ts[m
|
|
||||||
[1m+++ b/src/deps.ts[m
|
|
||||||
[36m@@ -81,5 +81,7 @@[m [mexport { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1[m
|
|
||||||
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';[m
|
|
||||||
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';[m
|
|
||||||
export * as Comlink from 'npm:comlink@^4.4.1';[m
|
|
||||||
[32m+[m[32mexport { EventEmitter } from 'npm:tseep@^1.1.3';[m
|
|
||||||
[32m+[m[32mexport { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';[m
|
|
||||||
[m
|
|
||||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';[m
|
|
||||||
[1mdiff --git a/src/filter.ts b/src/filter.ts[m
|
|
||||||
[1mindex fb43251..38fcff7 100644[m
|
|
||||||
[1m--- a/src/filter.ts[m
|
|
||||||
[1m+++ b/src/filter.ts[m
|
|
||||||
[36m@@ -1,5 +1,5 @@[m
|
|
||||||
import { Conf } from '@/config.ts';[m
|
|
||||||
[31m-import { type Event, type Filter, matchFilters } from '@/deps.ts';[m
|
|
||||||
[32m+[m[32mimport { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';[m
|
|
||||||
[m
|
|
||||||
import type { EventData } from '@/types.ts';[m
|
|
||||||
[m
|
|
||||||
[36m@@ -14,12 +14,17 @@[m [minterface DittoFilter<K extends number = number> extends Filter<K> {[m
|
|
||||||
relations?: Relation[];[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
[32m+[m[32m/** Filter to get one specific event. */[m
|
|
||||||
[32m+[m[32mtype MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };[m
|
|
||||||
[32m+[m
|
|
||||||
/** Additional options to apply to the whole subscription. */[m
|
|
||||||
interface GetFiltersOpts {[m
|
|
||||||
/** Signal to abort the request. */[m
|
|
||||||
signal?: AbortSignal;[m
|
|
||||||
/** Event limit for the whole subscription. */[m
|
|
||||||
limit?: number;[m
|
|
||||||
[32m+[m[32m /** Relays to use, if applicable. */[m
|
|
||||||
[32m+[m[32m relays?: WebSocket['url'][];[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {[m
|
|
||||||
[36m@@ -44,4 +49,33 @@[m [mfunction matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData[m
|
|
||||||
return false;[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
[31m-export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };[m
|
|
||||||
[32m+[m[32m/** Get deterministic ID for a microfilter. */[m
|
|
||||||
[32m+[m[32mfunction getFilterId(filter: MicroFilter): string {[m
|
|
||||||
[32m+[m[32m if ('ids' in filter) {[m
|
|
||||||
[32m+[m[32m return stringifyStable({ ids: [filter.ids] });[m
|
|
||||||
[32m+[m[32m } else {[m
|
|
||||||
[32m+[m[32m return stringifyStable({[m
|
|
||||||
[32m+[m[32m kinds: [filter.kinds[0]],[m
|
|
||||||
[32m+[m[32m authors: [filter.authors[0]],[m
|
|
||||||
[32m+[m[32m });[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m/** Get a microfilter from a Nostr event. */[m
|
|
||||||
[32m+[m[32mfunction eventToMicroFilter(event: Event): MicroFilter {[m
|
|
||||||
[32m+[m[32m if (event.kind === 0) {[m
|
|
||||||
[32m+[m[32m return { kinds: [0], authors: [event.pubkey] };[m
|
|
||||||
[32m+[m[32m } else {[m
|
|
||||||
[32m+[m[32m return { ids: [event.id] };[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mexport {[m
|
|
||||||
[32m+[m[32m type DittoFilter,[m
|
|
||||||
[32m+[m[32m eventToMicroFilter,[m
|
|
||||||
[32m+[m[32m getFilterId,[m
|
|
||||||
[32m+[m[32m type GetFiltersOpts,[m
|
|
||||||
[32m+[m[32m matchDittoFilters,[m
|
|
||||||
[32m+[m[32m type MicroFilter,[m
|
|
||||||
[32m+[m[32m type Relation,[m
|
|
||||||
[32m+[m[32m};[m
|
|
||||||
[1mdiff --git a/src/pipeline.ts b/src/pipeline.ts[m
|
|
||||||
[1mindex adf8a84..923bf4e 100644[m
|
|
||||||
[1m--- a/src/pipeline.ts[m
|
|
||||||
[1m+++ b/src/pipeline.ts[m
|
|
||||||
[36m@@ -1,3 +1,4 @@[m
|
|
||||||
[32m+[m[32mimport { reqmeister } from '@/common.ts';[m
|
|
||||||
import { Conf } from '@/config.ts';[m
|
|
||||||
import * as eventsDB from '@/db/events.ts';[m
|
|
||||||
import { addRelays } from '@/db/relays.ts';[m
|
|
||||||
[36m@@ -23,15 +24,17 @@[m [mimport type { EventData } from '@/types.ts';[m
|
|
||||||
*/[m
|
|
||||||
async function handleEvent(event: Event): Promise<void> {[m
|
|
||||||
if (!(await verifySignatureWorker(event))) return;[m
|
|
||||||
[32m+[m[32m const wanted = reqmeister.isWanted(event);[m
|
|
||||||
if (encounterEvent(event)) return;[m
|
|
||||||
console.info(`pipeline: Event<${event.kind}> ${event.id}`);[m
|
|
||||||
const data = await getEventData(event);[m
|
|
||||||
[m
|
|
||||||
await Promise.all([[m
|
|
||||||
[31m- storeEvent(event, data),[m
|
|
||||||
[32m+[m[32m storeEvent(event, data, { force: wanted }),[m
|
|
||||||
processDeletions(event),[m
|
|
||||||
trackRelays(event),[m
|
|
||||||
trackHashtags(event),[m
|
|
||||||
[32m+[m[32m fetchRelatedEvents(event, data),[m
|
|
||||||
processMedia(event, data),[m
|
|
||||||
streamOut(event, data),[m
|
|
||||||
broadcast(event, data),[m
|
|
||||||
[36m@@ -39,13 +42,14 @@[m [masync function handleEvent(event: Event): Promise<void> {[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */[m
|
|
||||||
[31m-const encounters = new LRUCache<string, boolean>({ max: 1000 });[m
|
|
||||||
[32m+[m[32mconst encounters = new LRUCache<Event['id'], true>({ max: 1000 });[m
|
|
||||||
[m
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */[m
|
|
||||||
[31m-function encounterEvent(event: Event) {[m
|
|
||||||
[32m+[m[32mfunction encounterEvent(event: Event): boolean {[m
|
|
||||||
const result = encounters.get(event.id);[m
|
|
||||||
encounters.set(event.id, true);[m
|
|
||||||
[31m- return result;[m
|
|
||||||
[32m+[m[32m reqmeister.encounter(event);[m
|
|
||||||
[32m+[m[32m return !!result;[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
/** Preload data that will be useful to several tasks. */[m
|
|
||||||
[36m@@ -57,11 +61,16 @@[m [masync function getEventData({ pubkey }: Event): Promise<EventData> {[m
|
|
||||||
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */[m
|
|
||||||
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;[m
|
|
||||||
[m
|
|
||||||
[32m+[m[32minterface StoreEventOpts {[m
|
|
||||||
[32m+[m[32m force?: boolean;[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
/** Maybe store the event, if eligible. */[m
|
|
||||||
[31m-async function storeEvent(event: Event, data: EventData): Promise<void> {[m
|
|
||||||
[32m+[m[32masync function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {[m
|
|
||||||
if (isEphemeralKind(event.kind)) return;[m
|
|
||||||
[32m+[m[32m const { force = false } = opts;[m
|
|
||||||
[m
|
|
||||||
[31m- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {[m
|
|
||||||
[32m+[m[32m if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {[m
|
|
||||||
const [deletion] = await mixer.getFilters([m
|
|
||||||
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],[m
|
|
||||||
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },[m
|
|
||||||
[36m@@ -129,6 +138,18 @@[m [mfunction trackRelays(event: Event) {[m
|
|
||||||
return addRelays([...relays]);[m
|
|
||||||
}[m
|
|
||||||
[m
|
|
||||||
[32m+[m[32m/** Queue related events to fetch. */[m
|
|
||||||
[32m+[m[32mfunction fetchRelatedEvents(event: Event, data: EventData) {[m
|
|
||||||
[32m+[m[32m if (!data.user) {[m
|
|
||||||
[32m+[m[32m reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m for (const [name, id, relay] of event.tags) {[m
|
|
||||||
[32m+[m[32m if (name === 'e' && !encounters.has(id)) {[m
|
|
||||||
[32m+[m[32m reqmeister.req({ ids: [id] }, [relay]).catch(() => {});[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
/** Delete unattached media entries that are attached to the event. */[m
|
|
||||||
function processMedia({ tags, pubkey }: Event, { user }: EventData) {[m
|
|
||||||
if (user) {[m
|
|
||||||
[1mdiff --git a/src/queries.ts b/src/queries.ts[m
|
|
||||||
[1mindex fc7365a..1ecff7b 100644[m
|
|
||||||
[1m--- a/src/queries.ts[m
|
|
||||||
[1m+++ b/src/queries.ts[m
|
|
||||||
[36m@@ -2,6 +2,7 @@[m [mimport * as eventsDB from '@/db/events.ts';[m
|
|
||||||
import { type Event, findReplyTag } from '@/deps.ts';[m
|
|
||||||
import { type DittoFilter, type Relation } from '@/filter.ts';[m
|
|
||||||
import * as mixer from '@/mixer.ts';[m
|
|
||||||
[32m+[m[32mimport { reqmeister } from '@/common.ts';[m
|
|
||||||
[m
|
|
||||||
interface GetEventOpts<K extends number> {[m
|
|
||||||
/** Signal to abort the request. */[m
|
|
||||||
[36m@@ -30,10 +31,10 @@[m [mconst getEvent = async <K extends number = number>([m
|
|
||||||
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {[m
|
|
||||||
const { relations, signal = AbortSignal.timeout(1000) } = opts;[m
|
|
||||||
[m
|
|
||||||
[31m- const [event] = await mixer.getFilters([m
|
|
||||||
[32m+[m[32m const event = await eventsDB.getFilters([m
|
|
||||||
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],[m
|
|
||||||
{ limit: 1, signal },[m
|
|
||||||
[31m- );[m
|
|
||||||
[32m+[m[32m ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});[m
|
|
||||||
[m
|
|
||||||
return event;[m
|
|
||||||
};[m
|
|
||||||
[1mdiff --git a/src/reqmeister.ts b/src/reqmeister.ts[m
|
|
||||||
[1mnew file mode 100644[m
|
|
||||||
[1mindex 0000000..960151f[m
|
|
||||||
[1m--- /dev/null[m
|
|
||||||
[1m+++ b/src/reqmeister.ts[m
|
|
||||||
[36m@@ -0,0 +1,88 @@[m
|
|
||||||
[32m+[m[32mimport * as client from '@/client.ts';[m
|
|
||||||
[32m+[m[32mimport { type Event, EventEmitter, type Filter } from '@/deps.ts';[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mimport { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32minterface ReqmeisterOpts {[m
|
|
||||||
[32m+[m[32m delay?: number;[m
|
|
||||||
[32m+[m[32m signal?: AbortSignal;[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mtype ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mclass Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {[m
|
|
||||||
[32m+[m[32m #opts: ReqmeisterOpts;[m
|
|
||||||
[32m+[m[32m #queue: ReqmeisterQueueItem[] = [];[m
|
|
||||||
[32m+[m[32m #promise!: Promise<void>;[m
|
|
||||||
[32m+[m[32m #resolve!: () => void;[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m constructor(opts: ReqmeisterOpts = {}) {[m
|
|
||||||
[32m+[m[32m super();[m
|
|
||||||
[32m+[m[32m this.#opts = opts;[m
|
|
||||||
[32m+[m[32m this.#cycle();[m
|
|
||||||
[32m+[m[32m this.#perform();[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m #cycle() {[m
|
|
||||||
[32m+[m[32m this.#resolve?.();[m
|
|
||||||
[32m+[m[32m this.#promise = new Promise((resolve) => {[m
|
|
||||||
[32m+[m[32m this.#resolve = resolve;[m
|
|
||||||
[32m+[m[32m });[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m async #perform() {[m
|
|
||||||
[32m+[m[32m const { delay } = this.#opts;[m
|
|
||||||
[32m+[m[32m await new Promise((resolve) => setTimeout(resolve, delay));[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m const queue = this.#queue;[m
|
|
||||||
[32m+[m[32m this.#queue = [];[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m const wantedEvents = new Set<Event['id']>();[m
|
|
||||||
[32m+[m[32m const wantedAuthors = new Set<Event['pubkey']>();[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m // TODO: batch by relays.[m
|
|
||||||
[32m+[m[32m for (const [_filterId, filter, _relays] of queue) {[m
|
|
||||||
[32m+[m[32m if ('ids' in filter) {[m
|
|
||||||
[32m+[m[32m filter.ids.forEach((id) => wantedEvents.add(id));[m
|
|
||||||
[32m+[m[32m } else {[m
|
|
||||||
[32m+[m[32m wantedAuthors.add(filter.authors[0]);[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m const filters: Filter[] = [];[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });[m
|
|
||||||
[32m+[m[32m if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m const events = await client.getFilters(filters, { signal: this.#opts.signal });[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m for (const event of events) {[m
|
|
||||||
[32m+[m[32m this.encounter(event);[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m this.#cycle();[m
|
|
||||||
[32m+[m[32m this.#perform();[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {[m
|
|
||||||
[32m+[m[32m const filterId = getFilterId(filter);[m
|
|
||||||
[32m+[m[32m this.#queue.push([filterId, filter, relays]);[m
|
|
||||||
[32m+[m[32m return new Promise<Event>((resolve, reject) => {[m
|
|
||||||
[32m+[m[32m this.once(filterId, resolve);[m
|
|
||||||
[32m+[m[32m this.#promise.finally(reject);[m
|
|
||||||
[32m+[m[32m });[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m encounter(event: Event): void {[m
|
|
||||||
[32m+[m[32m const filterId = getFilterId(eventToMicroFilter(event));[m
|
|
||||||
[32m+[m[32m this.#queue = this.#queue.filter(([id]) => id !== filterId);[m
|
|
||||||
[32m+[m[32m this.emit(filterId, event);[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32m isWanted(event: Event): boolean {[m
|
|
||||||
[32m+[m[32m const filterId = getFilterId(eventToMicroFilter(event));[m
|
|
||||||
[32m+[m[32m return this.#queue.some(([id]) => id === filterId);[m
|
|
||||||
[32m+[m[32m }[m
|
|
||||||
[32m+[m[32m}[m
|
|
||||||
[32m+[m
|
|
||||||
[32m+[m[32mexport { Reqmeister };[m
|
|
Loading…
Reference in New Issue