Merge remote-tracking branch 'origin/main' into pool-worker

This commit is contained in:
Alex Gleason 2023-12-28 14:22:51 -06:00
commit 69d93b7cab
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
38 changed files with 594 additions and 134 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
.env .env
*.cpuprofile

23
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"request": "launch",
"name": "Launch Program",
"type": "node",
"program": "${workspaceFolder}/src/server.ts",
"cwd": "${workspaceFolder}",
"runtimeExecutable": "deno",
"runtimeArgs": [
"run",
"--inspect-wait",
"--allow-all",
"--unstable"
],
"attachSimplePort": 9229
}
]
}

View File

@ -3,8 +3,7 @@
"lock": false, "lock": false,
"tasks": { "tasks": {
"start": "deno run -A --unstable src/server.ts", "start": "deno run -A --unstable src/server.ts",
"dev": "deno run -A --unstable --watch src/server.ts", "dev": "deno run -A --unstable --watch --inspect src/server.ts",
"debug": "deno run -A --unstable --inspect src/server.ts",
"test": "DB_PATH=\":memory:\" deno test -A --unstable", "test": "DB_PATH=\":memory:\" deno test -A --unstable",
"check": "deno check src/server.ts", "check": "deno check src/server.ts",
"relays:sync": "deno run -A --unstable scripts/relays.ts sync" "relays:sync": "deno run -A --unstable scripts/relays.ts sync"

View File

@ -0,0 +1,15 @@
{
"id": "63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35",
"pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6",
"created_at": 1699398376,
"kind": 0,
"tags": [
[
"proxy",
"https://gleasonator.com/users/alex",
"activitypub"
]
],
"content": "{\"name\":\"Alex Gleason\",\"about\":\"I create Fediverse software that empowers people online.\\n\\nI'm vegan btw.\\n\\nNote: If you have a question for me, please tag me publicly. This gives the opportunity for others to chime in, and bystanders to learn.\",\"picture\":\"https://media.gleasonator.com/aae0071188681629f200ab41502e03b9861d2754a44c008d3869c8a08b08d1f1.png\",\"banner\":\"https://media.gleasonator.com/e5f6e0e380536780efa774e8d3c8a5a040e3f9f99dbb48910b261c32872ee3a3.gif\",\"nip05\":\"alex_at_gleasonator.com@mostr.pub\",\"lud16\":\"alex@alexgleason.me\"}",
"sig": "9d48bbb600aab44abaeee11c97f1753f1d7de08378e9b33d84f9be893a09270aeceecfde3cfb698c555ae1bde3e4e54b3463a61bb99bdf673d64c2202f98b0e9"
}

View File

@ -0,0 +1,15 @@
{
"kind": 1,
"content": "I'm vegan btw",
"tags": [
[
"proxy",
"https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79",
"activitypub"
]
],
"pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6",
"created_at": 1691091365,
"id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2",
"sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579"
}

View File

@ -4,6 +4,7 @@ import { type User } from '@/db/users.ts';
import { import {
type Context, type Context,
cors, cors,
Debug,
type Event, type Event,
type Handler, type Handler,
Hono, Hono,
@ -90,7 +91,14 @@ if (Conf.sentryDsn) {
app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn })); app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn }));
} }
app.use('*', logger()); const debug = Debug('ditto:http');
app.use('/api', logger(debug));
app.use('/relay', logger(debug));
app.use('/.well-known', logger(debug));
app.use('/users', logger(debug));
app.use('/nodeinfo', logger(debug));
app.use('/oauth', logger(debug));
app.get('/api/v1/streaming', streamingController); app.get('/api/v1/streaming', streamingController);
app.get('/api/v1/streaming/', streamingController); app.get('/api/v1/streaming/', streamingController);

View File

@ -1,19 +1,23 @@
import { type Event, type Filter, matchFilters } from '@/deps.ts'; import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { allRelays, pool } from '@/pool.ts'; import { activeRelays, pool } from '@/pool.ts';
import type { GetFiltersOpts } from '@/filter.ts'; import type { GetFiltersOpts } from '@/filter.ts';
const debug = Debug('ditto:client');
/** Get events from a NIP-01 filter. */ /** Get events from a NIP-01 filter. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> { function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
return new Promise((resolve) => { return new Promise((resolve) => {
let tid: number;
const results: Event[] = []; const results: Event[] = [];
const unsub = pool.subscribe( const unsub = pool.subscribe(
filters, filters,
allRelays, opts.relays ?? activeRelays,
(event: Event | null) => { (event: Event | null) => {
if (event && matchFilters(filters, event)) { if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {}); pipeline.handleEvent(event).catch(() => {});
@ -29,24 +33,20 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
} }
if (typeof opts.limit === 'number' && results.length >= opts.limit) { if (typeof opts.limit === 'number' && results.length >= opts.limit) {
unsub(); unsub();
clearTimeout(tid);
resolve(results as Event<K>[]); resolve(results as Event<K>[]);
} }
}, },
undefined, undefined,
() => { () => {
unsub(); unsub();
clearTimeout(tid);
resolve(results as Event<K>[]); resolve(results as Event<K>[]);
}, },
); );
if (typeof opts.timeout === 'number') { opts.signal?.addEventListener('abort', () => {
tid = setTimeout(() => { unsub();
unsub(); resolve(results as Event<K>[]);
resolve(results as Event<K>[]); });
}, opts.timeout);
}
}); });
} }

View File

@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts';
import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { uploadFile } from '@/upload.ts'; import { uploadFile } from '@/upload.ts';
import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts'; import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts';
import { paginated, paginationSchema, parseBody } from '@/utils/web.ts'; import { paginated, paginationSchema, parseBody } from '@/utils/web.ts';
import { createEvent } from '@/utils/web.ts'; import { createEvent } from '@/utils/web.ts';
import { renderEventAccounts } from '@/views.ts'; import { renderEventAccounts } from '@/views.ts';
@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => {
const events7 = await mixer.getFilters( const events7 = await mixer.getFilters(
[{ kinds: [7], authors: [pubkey], ...params }], [{ kinds: [7], authors: [pubkey], ...params }],
{ timeout: Time.seconds(1) }, { signal: AbortSignal.timeout(1000) },
); );
const ids = events7 const ids = events7
@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => {
.filter((id): id is string => !!id); .filter((id): id is string => !!id);
const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], {
timeout: Time.seconds(1), signal: AbortSignal.timeout(1000),
}); });
const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey'))));

View File

@ -10,8 +10,8 @@ const instanceController: AppController = (c) => {
return c.json({ return c.json({
uri: host, uri: host,
title: 'Ditto', title: 'Ditto',
description: 'An efficient and flexible social media server.', description: 'Nostr and the Fediverse',
short_description: 'An efficient and flexible social media server.', short_description: 'Nostr and the Fediverse',
registrations: Conf.registrations, registrations: Conf.registrations,
max_toot_chars: Conf.postCharLimit, max_toot_chars: Conf.postCharLimit,
configuration: { configuration: {

View File

@ -1,6 +1,5 @@
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts'; import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts';
@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => {
const events = await mixer.getFilters( const events = await mixer.getFilters(
[{ kinds: [1], '#p': [pubkey], since, until }], [{ kinds: [1], '#p': [pubkey], since, until }],
{ timeout: Time.seconds(3) }, { signal: AbortSignal.timeout(3000) },
); );
const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey)));

View File

@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { booleanParamSchema } from '@/schema.ts'; import { booleanParamSchema } from '@/schema.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts';
import { dedupeEvents, Time } from '@/utils.ts'; import { dedupeEvents } from '@/utils.ts';
import { lookupNip05Cached } from '@/utils/nip05.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts';
import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] {
} }
/** Resolve a searched value into an event, if applicable. */ /** Resolve a searched value into an event, if applicable. */
async function lookupEvent(query: SearchQuery): Promise<Event | undefined> { async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
const filters = await getLookupFilters(query); const filters = await getLookupFilters(query);
const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) }); const [event] = await mixer.getFilters(filters, { limit: 1, signal });
return event; return event;
} }

View File

@ -1,11 +1,13 @@
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import { z } from '@/deps.ts'; import { Debug, z } from '@/deps.ts';
import { type DittoFilter } from '@/filter.ts'; import { type DittoFilter } from '@/filter.ts';
import { getAuthor, getFeedPubkeys } from '@/queries.ts'; import { getAuthor, getFeedPubkeys } from '@/queries.ts';
import { Sub } from '@/subs.ts'; import { Sub } from '@/subs.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey } from '@/utils.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
const debug = Debug('ditto:streaming');
/** /**
* Streaming timelines/categories. * Streaming timelines/categories.
* https://docs.joinmastodon.org/methods/streaming/#streams * https://docs.joinmastodon.org/methods/streaming/#streams
@ -49,6 +51,7 @@ const streamingController: AppController = (c) => {
function send(name: string, payload: object) { function send(name: string, payload: object) {
if (socket.readyState === WebSocket.OPEN) { if (socket.readyState === WebSocket.OPEN) {
debug('send', name, JSON.stringify(payload));
socket.send(JSON.stringify({ socket.send(JSON.stringify({
event: name, event: name,
payload: JSON.stringify(payload), payload: JSON.stringify(payload),

View File

@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { booleanParamSchema } from '@/schema.ts'; import { booleanParamSchema } from '@/schema.ts';
import { Time } from '@/utils.ts';
import { paginated, paginationSchema } from '@/utils/web.ts'; import { paginated, paginationSchema } from '@/utils/web.ts';
import { renderStatus } from '@/views/mastodon/statuses.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts';
@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => {
}; };
/** Render statuses for timelines. */ /** Render statuses for timelines. */
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) { async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
const events = await mixer.getFilters( const events = await mixer.getFilters(
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
{ timeout: Time.seconds(1) }, { signal },
); );
if (!events.length) { if (!events.length) {

View File

@ -0,0 +1,19 @@
import { AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
const relayInfoController: AppController = (c) => {
return c.json({
name: 'Ditto',
description: 'Nostr and the Fediverse.',
pubkey: Conf.pubkey,
contact: `mailto:${Conf.adminEmail}`,
supported_nips: [1, 5, 9, 11, 45, 46, 98],
software: 'Ditto',
version: '0.0.0',
limitation: {
// TODO.
},
});
};
export { relayInfoController };

View File

@ -1,3 +1,4 @@
import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
import * as eventsDB from '@/db/events.ts'; import * as eventsDB from '@/db/events.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { jsonSchema } from '@/schema.ts'; import { jsonSchema } from '@/schema.ts';
@ -116,9 +117,14 @@ function prepareFilters(filters: ClientREQ[2][]): Filter[] {
})); }));
} }
const relayController: AppController = (c) => { const relayController: AppController = (c, next) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
// NIP-11: https://github.com/nostr-protocol/nips/blob/master/11.md
if (c.req.header('accept') === 'application/nostr+json') {
return relayInfoController(c, next);
}
if (upgrade?.toLowerCase() !== 'websocket') { if (upgrade?.toLowerCase() !== 'websocket') {
return c.text('Please use a Nostr client to connect.', 400); return c.text('Please use a Nostr client to connect.', 400);
} }

View File

@ -1,25 +1,12 @@
import * as eventsDB from '@/db/events.ts';
import { deleteUnattachedMediaByUrl, getUnattachedMedia } from '@/db/unattached-media.ts'; import { deleteUnattachedMediaByUrl, getUnattachedMedia } from '@/db/unattached-media.ts';
import { cron } from '@/deps.ts'; import { cron } from '@/deps.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
import { configUploader as uploader } from '@/uploaders/config.ts'; import { configUploader as uploader } from '@/uploaders/config.ts';
import { cidFromUrl } from '@/utils/ipfs.ts'; import { cidFromUrl } from '@/utils/ipfs.ts';
/** Clean up old remote events. */
async function cleanupEvents() {
console.log('Cleaning up old remote events...');
const [result] = await eventsDB.deleteFilters([{
until: Math.floor((Date.now() - Time.days(7)) / 1000),
local: false,
}]);
console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`);
}
/** Delete files that aren't attached to any events. */ /** Delete files that aren't attached to any events. */
async function cleanupMedia() { async function cleanupMedia() {
console.log('Deleting orphaned media files...'); console.info('Deleting orphaned media files...');
const until = new Date(Date.now() - Time.minutes(15)); const until = new Date(Date.now() - Time.minutes(15));
const media = await getUnattachedMedia(until); const media = await getUnattachedMedia(until);
@ -35,11 +22,8 @@ async function cleanupMedia() {
} }
} }
console.log(`Removed ${media?.length ?? 0} orphaned media files.`); console.info(`Removed ${media?.length ?? 0} orphaned media files.`);
} }
await cleanupEvents();
await cleanupMedia(); await cleanupMedia();
cron.every15Minute(cleanupEvents);
cron.every15Minute(cleanupMedia); cron.every15Minute(cleanupMedia);

View File

@ -3,7 +3,7 @@ import path from 'node:path';
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts'; import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { getPragma, setPragma } from '@/pragma.ts'; import { setPragma } from '@/pragma.ts';
import SqliteWorker from '@/workers/sqlite.ts'; import SqliteWorker from '@/workers/sqlite.ts';
interface DittoDB { interface DittoDB {
@ -89,12 +89,6 @@ await Promise.all([
setPragma(db, 'mmap_size', Conf.sqlite.mmapSize), setPragma(db, 'mmap_size', Conf.sqlite.mmapSize),
]); ]);
// Log out PRAGMA values for debugging.
['journal_mode', 'synchronous', 'temp_store', 'mmap_size'].forEach(async (pragma) => {
const value = await getPragma(db, pragma);
console.log(`PRAGMA ${pragma} = ${value};`);
});
const migrator = new Migrator({ const migrator = new Migrator({
db, db,
provider: new FileMigrationProvider({ provider: new FileMigrationProvider({
@ -106,7 +100,7 @@ const migrator = new Migrator({
/** Migrate the database to the latest version. */ /** Migrate the database to the latest version. */
async function migrate() { async function migrate() {
console.log('Running migrations...'); console.info('Running migrations...');
const results = await migrator.migrateToLatest(); const results = await migrator.migrateToLatest();
if (results.error) { if (results.error) {
@ -114,11 +108,11 @@ async function migrate() {
Deno.exit(1); Deno.exit(1);
} else { } else {
if (!results.results?.length) { if (!results.results?.length) {
console.log('Everything up-to-date.'); console.info('Everything up-to-date.');
} else { } else {
console.log('Migrations finished!'); console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) { for (const { migrationName, status } of results.results!) {
console.log(` - ${migrationName}: ${status}`); console.info(` - ${migrationName}: ${status}`);
} }
} }
} }

View File

@ -1,5 +1,5 @@
import { db, type DittoDB } from '@/db.ts'; import { db, type DittoDB } from '@/db.ts';
import { type Event, type SelectQueryBuilder } from '@/deps.ts'; import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts';
import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { EventData } from '@/types.ts'; import { EventData } from '@/types.ts';
@ -7,6 +7,8 @@ import { isNostrId, isURL } from '@/utils.ts';
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
const debug = Debug('ditto:db:events');
/** Function to decide whether or not to index a tag. */ /** Function to decide whether or not to index a tag. */
type TagCondition = ({ event, count, value }: { type TagCondition = ({ event, count, value }: {
event: Event; event: Event;
@ -28,6 +30,8 @@ const tagConditions: Record<string, TagCondition> = {
/** Insert an event (and its tags) into the database. */ /** Insert an event (and its tags) into the database. */
function insertEvent(event: Event, data: EventData): Promise<void> { function insertEvent(event: Event, data: EventData): Promise<void> {
debug('insertEvent', JSON.stringify(event));
return db.transaction().execute(async (trx) => { return db.transaction().execute(async (trx) => {
/** Insert the event into the database. */ /** Insert the event into the database. */
async function addEvent() { async function addEvent() {
@ -224,6 +228,7 @@ async function getFilters<K extends number>(
opts: GetFiltersOpts = {}, opts: GetFiltersOpts = {},
): Promise<DittoEvent<K>[]> { ): Promise<DittoEvent<K>[]> {
if (!filters.length) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
let query = getFiltersQuery(filters); let query = getFiltersQuery(filters);
if (typeof opts.limit === 'number') { if (typeof opts.limit === 'number') {
@ -276,6 +281,7 @@ async function getFilters<K extends number>(
/** Delete events based on filters from the database. */ /** Delete events based on filters from the database. */
function deleteFilters<K extends number>(filters: DittoFilter<K>[]) { function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
if (!filters.length) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]);
debug('deleteFilters', JSON.stringify(filters));
return db.transaction().execute(async (trx) => { return db.transaction().execute(async (trx) => {
const query = getFiltersQuery(filters).clearSelect().select('id'); const query = getFiltersQuery(filters).clearSelect().select('id');
@ -293,6 +299,7 @@ function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
/** Get number of events that would be returned by filters. */ /** Get number of events that would be returned by filters. */
async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> { async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
if (!filters.length) return Promise.resolve(0); if (!filters.length) return Promise.resolve(0);
debug('countFilters', JSON.stringify(filters));
const query = getFiltersQuery(filters); const query = getFiltersQuery(filters);
const [{ count }] = await query const [{ count }] = await query

18
src/db/memorelay.test.ts Normal file
View File

@ -0,0 +1,18 @@
import { assertEquals } from '@/deps-test.ts';
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { memorelay } from './memorelay.ts';
Deno.test('memorelay', async () => {
assertEquals(memorelay.hasEvent(event1), false);
assertEquals(memorelay.hasEventById(event1.id), false);
memorelay.insertEvent(event1);
assertEquals(memorelay.hasEvent(event1), true);
assertEquals(memorelay.hasEventById(event1.id), true);
const result = await memorelay.getFilters([{ ids: [event1.id] }]);
assertEquals(result[0], event1);
});

69
src/db/memorelay.ts Normal file
View File

@ -0,0 +1,69 @@
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts';
const debug = Debug('ditto:memorelay');
const events = new LRUCache<string, Event>({
max: 3000,
maxEntrySize: 5000,
sizeCalculation: (event) => JSON.stringify(event).length,
});
/** Get events from memory. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (opts.signal?.aborted) return Promise.resolve([]);
if (!filters.length) return Promise.resolve([]);
debug('REQ', JSON.stringify(filters));
const results: Event<K>[] = [];
for (const filter of filters) {
if (isMicrofilter(filter)) {
const event = events.get(getFilterId(filter));
if (event) {
results.push(event as Event<K>);
}
}
}
return Promise.resolve(results);
}
/** Insert an event into memory. */
function insertEvent(event: Event): void {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (!existing || event.created_at > existing.created_at) {
events.set(filterId, event);
}
}
}
/** Check if an event is in memory. */
function hasEvent(event: Event): boolean {
for (const microfilter of getMicroFilters(event)) {
const filterId = getFilterId(microfilter);
const existing = events.get(filterId);
if (existing) {
return true;
}
}
return false;
}
/** Check if an event is in memory by ID. */
function hasEventById(eventId: string): boolean {
const filterId = getFilterId({ ids: [eventId] });
return events.has(filterId);
}
/** In-memory data store for events using microfilters. */
const memorelay = {
getFilters,
insertEvent,
hasEvent,
hasEventById,
};
export { memorelay };

View File

@ -1,7 +1,9 @@
import { type Insertable } from '@/deps.ts'; import { Debug, type Insertable } from '@/deps.ts';
import { db, type UserRow } from '../db.ts'; import { db, type UserRow } from '../db.ts';
const debug = Debug('ditto:users');
interface User { interface User {
pubkey: string; pubkey: string;
username: string; username: string;
@ -11,6 +13,7 @@ interface User {
/** Adds a user to the database. */ /** Adds a user to the database. */
function insertUser(user: Insertable<UserRow>) { function insertUser(user: Insertable<UserRow>) {
debug('insertUser', JSON.stringify(user));
return db.insertInto('users').values(user).execute(); return db.insertInto('users').values(user).execute();
} }

View File

@ -9,7 +9,7 @@ export {
} from 'https://deno.land/x/hono@v3.10.1/mod.ts'; } from 'https://deno.land/x/hono@v3.10.1/mod.ts';
export { cors, logger, serveStatic } from 'https://deno.land/x/hono@v3.10.1/middleware.ts'; export { cors, logger, serveStatic } from 'https://deno.land/x/hono@v3.10.1/middleware.ts';
export { z } from 'https://deno.land/x/zod@v3.21.4/mod.ts'; export { z } from 'https://deno.land/x/zod@v3.21.4/mod.ts';
export { Author, RelayPoolWorker } from 'https://dev.jspm.io/nostr-relaypool@0.6.30'; export { RelayPoolWorker } from 'https://dev.jspm.io/nostr-relaypool@0.6.30';
export { export {
type Event, type Event,
type EventTemplate, type EventTemplate,
@ -81,5 +81,9 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
export * as Comlink from 'npm:comlink@^4.4.1'; export * as Comlink from 'npm:comlink@^4.4.1';
export { EventEmitter } from 'npm:tseep@^1.1.3';
export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
// @deno-types="npm:@types/debug@^4.1.12"
export { default as Debug } from 'npm:debug@^4.3.4';
export type * as TypeFest from 'npm:type-fest@^4.3.0'; export type * as TypeFest from 'npm:type-fest@^4.3.0';

37
src/filter.test.ts Normal file
View File

@ -0,0 +1,37 @@
import { type Event } from '@/deps.ts';
import { assertEquals } from '@/deps-test.ts';
import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts';
Deno.test('getMicroFilters', () => {
const event = event0 as Event<0>;
const microfilters = getMicroFilters(event);
assertEquals(microfilters.length, 2);
assertEquals(microfilters[0], { authors: [event.pubkey], kinds: [0] });
assertEquals(microfilters[1], { ids: [event.id] });
});
Deno.test('eventToMicroFilter', () => {
assertEquals(eventToMicroFilter(event0), { authors: [event0.pubkey], kinds: [0] });
assertEquals(eventToMicroFilter(event1), { ids: [event1.id] });
});
Deno.test('isMicrofilter', () => {
assertEquals(isMicrofilter({ ids: [event0.id] }), true);
assertEquals(isMicrofilter({ authors: [event0.pubkey], kinds: [0] }), true);
assertEquals(isMicrofilter({ ids: [event0.id], authors: [event0.pubkey], kinds: [0] }), false);
});
Deno.test('getFilterId', () => {
assertEquals(
getFilterId({ ids: [event0.id] }),
'{"ids":["63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35"]}',
);
assertEquals(
getFilterId({ authors: [event0.pubkey], kinds: [0] }),
'{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}',
);
});

View File

@ -1,7 +1,7 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { type Event, type Filter, matchFilters } from '@/deps.ts'; import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts';
import { nostrIdSchema } from '@/schemas/nostr.ts';
import type { EventData } from '@/types.ts'; import { type EventData } from '@/types.ts';
/** Additional properties that may be added by Ditto to events. */ /** Additional properties that may be added by Ditto to events. */
type Relation = 'author' | 'author_stats' | 'event_stats'; type Relation = 'author' | 'author_stats' | 'event_stats';
@ -14,12 +14,21 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
relations?: Relation[]; relations?: Relation[];
} }
/** Microfilter to get one specific event by ID. */
type IdMicrofilter = { ids: [Event['id']] };
/** Microfilter to get an author. */
type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] };
/** Filter to get one specific event. */
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
/** Additional options to apply to the whole subscription. */ /** Additional options to apply to the whole subscription. */
interface GetFiltersOpts { interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */ /** Signal to abort the request. */
timeout?: number; signal?: AbortSignal;
/** Event limit for the whole subscription. */ /** Event limit for the whole subscription. */
limit?: number; limit?: number;
/** Relays to use, if applicable. */
relays?: WebSocket['url'][];
} }
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
@ -44,4 +53,55 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
return false; return false;
} }
export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; /** Get deterministic ID for a microfilter. */
function getFilterId(filter: MicroFilter): string {
if ('ids' in filter) {
return stringifyStable({ ids: [filter.ids[0]] });
} else {
return stringifyStable({
kinds: [filter.kinds[0]],
authors: [filter.authors[0]],
});
}
}
/** Get a microfilter from a Nostr event. */
function eventToMicroFilter(event: Event): MicroFilter {
const [microfilter] = getMicroFilters(event);
return microfilter;
}
/** Get all the microfilters for an event, in order of priority. */
function getMicroFilters(event: Event): MicroFilter[] {
const microfilters: MicroFilter[] = [];
if (event.kind === 0) {
microfilters.push({ kinds: [0], authors: [event.pubkey] });
}
microfilters.push({ ids: [event.id] });
return microfilters;
}
/** Microfilter schema. */
const microFilterSchema = z.union([
z.object({ ids: z.tuple([nostrIdSchema]) }).strict(),
z.object({ kinds: z.tuple([z.literal(0)]), authors: z.tuple([nostrIdSchema]) }).strict(),
]);
/** Checks whether the filter is a microfilter. */
function isMicrofilter(filter: Filter): filter is MicroFilter {
return microFilterSchema.safeParse(filter).success;
}
export {
type AuthorMicrofilter,
type DittoFilter,
eventToMicroFilter,
getFilterId,
type GetFiltersOpts,
getMicroFilters,
type IdMicrofilter,
isMicrofilter,
matchDittoFilters,
type MicroFilter,
type Relation,
};

View File

@ -1,9 +1,11 @@
import { type Event } from '@/deps.ts'; import { Debug, type Event } from '@/deps.ts';
import { activeRelays, pool } from '@/pool.ts'; import { activeRelays, pool } from '@/pool.ts';
import { nostrNow } from '@/utils.ts'; import { nostrNow } from '@/utils.ts';
import * as pipeline from './pipeline.ts'; import * as pipeline from './pipeline.ts';
const debug = Debug('ditto:firehose');
// This file watches events on all known relays and performs // This file watches events on all known relays and performs
// side-effects based on them, such as trending hashtag tracking // side-effects based on them, such as trending hashtag tracking
// and storing events for notifications and the home feed. // and storing events for notifications and the home feed.
@ -17,7 +19,7 @@ pool.subscribe(
/** Handle events through the firehose pipeline. */ /** Handle events through the firehose pipeline. */
function handleEvent(event: Event): Promise<void> { function handleEvent(event: Event): Promise<void> {
console.info(`firehose: Event<${event.kind}> ${event.id}`); debug(`Event<${event.kind}> ${event.id}`);
return pipeline return pipeline
.handleEvent(event) .handleEvent(event)

View File

@ -1,6 +1,7 @@
import { Debug, type MiddlewareHandler } from '@/deps.ts';
import ExpiringCache from '@/utils/expiring-cache.ts'; import ExpiringCache from '@/utils/expiring-cache.ts';
import type { MiddlewareHandler } from '@/deps.ts'; const debug = Debug('ditto:middleware:cache');
export const cache = (options: { export const cache = (options: {
cacheName: string; cacheName: string;
@ -11,14 +12,14 @@ export const cache = (options: {
const cache = new ExpiringCache(await caches.open(options.cacheName)); const cache = new ExpiringCache(await caches.open(options.cacheName));
const response = await cache.match(key); const response = await cache.match(key);
if (!response) { if (!response) {
console.debug('Building cache for page', c.req.url); debug('Building cache for page', c.req.url);
await next(); await next();
const response = c.res.clone(); const response = c.res.clone();
if (response.status < 500) { if (response.status < 500) {
await cache.putExpiring(key, response, options.expires ?? 0); await cache.putExpiring(key, response, options.expires ?? 0);
} }
} else { } else {
console.debug('Serving page from cache', c.req.url); debug('Serving page from cache', c.req.url);
return response; return response;
} }
}; };

View File

@ -43,7 +43,7 @@ interface ParsedNoteContent {
function parseNoteContent(content: string): ParsedNoteContent { function parseNoteContent(content: string): ParsedNoteContent {
// Parsing twice is ineffecient, but I don't know how to do only once. // Parsing twice is ineffecient, but I don't know how to do only once.
const html = linkifyStr(content, linkifyOpts); const html = linkifyStr(content, linkifyOpts);
const links = linkify.find(content).filter(isValidLink); const links = linkify.find(content).filter(isLinkURL);
const firstUrl = links.find(isNonMediaLink)?.href; const firstUrl = links.find(isNonMediaLink)?.href;
return { return {
@ -77,15 +77,9 @@ function isNonMediaLink({ href }: Link): boolean {
return /^https?:\/\//.test(href) && !getUrlMimeType(href); return /^https?:\/\//.test(href) && !getUrlMimeType(href);
} }
/** Ensures the URL can be parsed. Why linkifyjs doesn't already guarantee this, idk... */ /** Ensures the Link is a URL so it can be parsed. */
function isValidLink(link: Link): boolean { function isLinkURL(link: Link): boolean {
try { return link.type === 'url';
new URL(link.href);
return true;
} catch (_e) {
console.error(`Invalid link: ${link.href}`);
return false;
}
} }
/** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */ /** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */

View File

@ -1,21 +1,24 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import * as eventsDB from '@/db/events.ts'; import * as eventsDB from '@/db/events.ts';
import { memorelay } from '@/db/memorelay.ts';
import { addRelays } from '@/db/relays.ts'; import { addRelays } from '@/db/relays.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { findUser } from '@/db/users.ts'; import { findUser } from '@/db/users.ts';
import { type Event, LRUCache } from '@/deps.ts'; import { Debug, type Event } from '@/deps.ts';
import { isEphemeralKind } from '@/kinds.ts'; import { isEphemeralKind } from '@/kinds.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { publish } from '@/pool.ts'; import { publish } from '@/pool.ts';
import { isLocallyFollowed } from '@/queries.ts'; import { isLocallyFollowed } from '@/queries.ts';
import { reqmeister } from '@/reqmeister.ts';
import { updateStats } from '@/stats.ts'; import { updateStats } from '@/stats.ts';
import { Sub } from '@/subs.ts'; import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts'; import { getTagSet } from '@/tags.ts';
import { type EventData } from '@/types.ts';
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
import { TrendsWorker } from '@/workers/trends.ts'; import { TrendsWorker } from '@/workers/trends.ts';
import { verifySignatureWorker } from '@/workers/verify.ts'; import { verifySignatureWorker } from '@/workers/verify.ts';
import type { EventData } from '@/types.ts'; const debug = Debug('ditto:pipeline');
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
@ -23,28 +26,29 @@ import type { EventData } from '@/types.ts';
*/ */
async function handleEvent(event: Event): Promise<void> { async function handleEvent(event: Event): Promise<void> {
if (!(await verifySignatureWorker(event))) return; if (!(await verifySignatureWorker(event))) return;
const wanted = reqmeister.isWanted(event);
if (encounterEvent(event)) return; if (encounterEvent(event)) return;
debug(`Event<${event.kind}> ${event.id}`);
const data = await getEventData(event); const data = await getEventData(event);
await Promise.all([ await Promise.all([
storeEvent(event, data), storeEvent(event, data, { force: wanted }),
processDeletions(event), processDeletions(event),
trackRelays(event), trackRelays(event),
trackHashtags(event), trackHashtags(event),
fetchRelatedEvents(event, data),
processMedia(event, data), processMedia(event, data),
streamOut(event, data), streamOut(event, data),
broadcast(event, data), broadcast(event, data),
]); ]);
} }
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
const encounters = new LRUCache<string, boolean>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */ /** Encounter the event, and return whether it has already been encountered. */
function encounterEvent(event: Event) { function encounterEvent(event: Event): boolean {
const result = encounters.get(event.id); const preexisting = memorelay.hasEvent(event);
encounters.set(event.id, true); memorelay.insertEvent(event);
return result; reqmeister.encounter(event);
return preexisting;
} }
/** Preload data that will be useful to several tasks. */ /** Preload data that will be useful to several tasks. */
@ -56,22 +60,27 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */ /** Check if the pubkey is the `DITTO_NSEC` pubkey. */
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
/** Maybe store the event, if eligible. */ interface StoreEventOpts {
async function storeEvent(event: Event, data: EventData): Promise<void> { force?: boolean;
if (isEphemeralKind(event.kind)) return; }
if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { /** Maybe store the event, if eligible. */
async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
if (isEphemeralKind(event.kind)) return;
const { force = false } = opts;
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
const [deletion] = await mixer.getFilters( const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, timeout: Time.seconds(1) }, { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
); );
if (deletion) { if (deletion) {
return Promise.reject(new RelayError('blocked', 'event was deleted')); return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else { } else {
await Promise.all([ await Promise.all([
eventsDB.insertEvent(event, data).catch(console.warn), eventsDB.insertEvent(event, data).catch(debug),
updateStats(event).catch(console.warn), updateStats(event).catch(debug),
]); ]);
} }
} else { } else {
@ -105,7 +114,7 @@ async function trackHashtags(event: Event): Promise<void> {
if (!tags.length) return; if (!tags.length) return;
try { try {
console.info('tracking tags:', tags); debug('tracking tags:', JSON.stringify(tags));
await TrendsWorker.addTagUsages(event.pubkey, tags, date); await TrendsWorker.addTagUsages(event.pubkey, tags, date);
} catch (_e) { } catch (_e) {
// do nothing // do nothing
@ -128,6 +137,18 @@ function trackRelays(event: Event) {
return addRelays([...relays]); return addRelays([...relays]);
} }
/** Queue related events to fetch. */
function fetchRelatedEvents(event: Event, data: EventData) {
if (!data.user) {
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
}
for (const [name, id, relay] of event.tags) {
if (name === 'e' && !memorelay.hasEventById(id)) {
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
}
}
}
/** Delete unattached media entries that are attached to the event. */ /** Delete unattached media entries that are attached to the event. */
function processMedia({ tags, pubkey }: Event, { user }: EventData) { function processMedia({ tags, pubkey }: Event, { user }: EventData) {
if (user) { if (user) {

View File

@ -1,5 +1,7 @@
import { getActiveRelays } from '@/db/relays.ts'; import { getActiveRelays } from '@/db/relays.ts';
import { type Event, RelayPoolWorker } from '@/deps.ts'; import { Debug, type Event, RelayPoolWorker } from '@/deps.ts';
const debug = Debug('ditto:pool');
const activeRelays = await getActiveRelays(); const activeRelays = await getActiveRelays();
@ -17,6 +19,7 @@ const pool = new RelayPoolWorker(worker, activeRelays, {
/** Publish an event to the given relays, or the entire pool. */ /** Publish an event to the given relays, or the entire pool. */
function publish(event: Event, relays: string[] = activeRelays) { function publish(event: Event, relays: string[] = activeRelays) {
debug('publish', event);
return pool.publish(event, relays); return pool.publish(event, relays);
} }

View File

@ -1,11 +1,13 @@
import * as eventsDB from '@/db/events.ts'; import * as eventsDB from '@/db/events.ts';
import { type Event, findReplyTag } from '@/deps.ts'; import { type Event, findReplyTag } from '@/deps.ts';
import { type DittoFilter, type Relation } from '@/filter.ts'; import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
import * as mixer from '@/mixer.ts'; import * as mixer from '@/mixer.ts';
import { reqmeister } from '@/reqmeister.ts';
import { memorelay } from '@/db/memorelay.ts';
interface GetEventOpts<K extends number> { interface GetEventOpts<K extends number> {
/** Timeout in milliseconds. */ /** Signal to abort the request. */
timeout?: number; signal?: AbortSignal;
/** Event kind. */ /** Event kind. */
kind?: K; kind?: K;
/** Relations to include on the event. */ /** Relations to include on the event. */
@ -17,36 +19,73 @@ const getEvent = async <K extends number = number>(
id: string, id: string,
opts: GetEventOpts<K> = {}, opts: GetEventOpts<K> = {},
): Promise<Event<K> | undefined> => { ): Promise<Event<K> | undefined> => {
const { kind, relations, timeout = 1000 } = opts; const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
const microfilter: IdMicrofilter = { ids: [id] };
const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent<K>[];
if (memoryEvent && !relations) {
return memoryEvent;
}
const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 }; const filter: DittoFilter<K> = { ids: [id], relations, limit: 1 };
if (kind) { if (kind) {
filter.kinds = [kind]; filter.kinds = [kind];
} }
const [event] = await mixer.getFilters([filter], { limit: 1, timeout });
return event; const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal })
.then(([event]) => event);
// TODO: make this DRY-er.
if (dbEvent && !dbEvent.author) {
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
dbEvent.author = author;
}
if (dbEvent) return dbEvent;
if (memoryEvent && !memoryEvent.author) {
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
memoryEvent.author = author;
}
if (memoryEvent) return memoryEvent;
return await reqmeister.req(microfilter, opts).catch(() => undefined) as Event<K> | undefined;
}; };
/** Get a Nostr `set_medatadata` event for a user's pubkey. */ /** Get a Nostr `set_medatadata` event for a user's pubkey. */
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => { const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
const { relations, timeout = 1000 } = opts; const { relations, signal = AbortSignal.timeout(1000) } = opts;
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
const [event] = await mixer.getFilters( const [memoryEvent] = await memorelay.getFilters([microfilter], opts);
if (memoryEvent && !relations) {
return memoryEvent;
}
const dbEvent = await eventsDB.getFilters(
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }], [{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
{ limit: 1, timeout }, { limit: 1, signal },
); ).then(([event]) => event);
return event; if (dbEvent) return dbEvent;
if (memoryEvent) return memoryEvent;
return reqmeister.req(microfilter, opts).catch(() => undefined);
}; };
/** Get users the given pubkey follows. */ /** Get users the given pubkey follows. */
const getFollows = async (pubkey: string, timeout = 1000): Promise<Event<3> | undefined> => { const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise<Event<3> | undefined> => {
const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout }); const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
return event; return event;
}; };
/** Get pubkeys the user follows. */ /** Get pubkeys the user follows. */
async function getFollowedPubkeys(pubkey: string): Promise<string[]> { async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise<string[]> {
const event = await getFollows(pubkey); const event = await getFollows(pubkey, signal);
if (!event) return []; if (!event) return [];
return event.tags return event.tags
@ -78,10 +117,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
return result.reverse(); return result.reverse();
} }
function getDescendants(eventId: string): Promise<Event<1>[]> { function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
return mixer.getFilters( return mixer.getFilters(
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
{ limit: 200, timeout: 2000 }, { limit: 200, signal },
); );
} }

113
src/reqmeister.ts Normal file
View File

@ -0,0 +1,113 @@
import * as client from '@/client.ts';
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
import { Time } from '@/utils/time.ts';
const debug = Debug('ditto:reqmeister');
interface ReqmeisterOpts {
delay?: number;
timeout?: number;
}
interface ReqmeisterReqOpts {
relays?: WebSocket['url'][];
signal?: AbortSignal;
}
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
/** Batches requests to Nostr relays using microfilters. */
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
#opts: ReqmeisterOpts;
#queue: ReqmeisterQueueItem[] = [];
#promise!: Promise<void>;
#resolve!: () => void;
constructor(opts: ReqmeisterOpts = {}) {
super();
this.#opts = opts;
this.#tick();
this.#perform();
}
#tick() {
this.#resolve?.();
this.#promise = new Promise((resolve) => {
this.#resolve = resolve;
});
}
async #perform() {
const { delay, timeout = Time.seconds(1) } = this.#opts;
await new Promise((resolve) => setTimeout(resolve, delay));
const queue = this.#queue;
this.#queue = [];
const wantedEvents = new Set<Event['id']>();
const wantedAuthors = new Set<Event['pubkey']>();
// TODO: batch by relays.
for (const [_filterId, filter, _relays] of queue) {
if ('ids' in filter) {
filter.ids.forEach((id) => wantedEvents.add(id));
} else {
wantedAuthors.add(filter.authors[0]);
}
}
const filters: Filter[] = [];
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
if (filters.length) {
debug('REQ', JSON.stringify(filters));
const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) });
for (const event of events) {
this.encounter(event);
}
}
this.#tick();
this.#perform();
}
req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise<Event<0>>;
req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise<Event>;
req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise<Event> {
const { relays = [], signal } = opts;
if (signal?.aborted) return Promise.reject(new DOMException('Aborted', 'AbortError'));
const filterId = getFilterId(filter);
this.#queue.push([filterId, filter, relays]);
return new Promise<Event>((resolve, reject) => {
this.once(filterId, resolve);
this.#promise.finally(() => setTimeout(reject, 0));
signal?.addEventListener('abort', () => reject(new DOMException('Aborted', 'AbortError')), { once: true });
});
}
encounter(event: Event): void {
const filterId = getFilterId(eventToMicroFilter(event));
this.#queue = this.#queue.filter(([id]) => id !== filterId);
this.emit(filterId, event);
}
isWanted(event: Event): boolean {
const filterId = getFilterId(eventToMicroFilter(event));
return this.#queue.some(([id]) => id === filterId);
}
}
const reqmeister = new Reqmeister({
delay: Time.seconds(1),
timeout: Time.seconds(1),
});
export { reqmeister };

View File

@ -1,13 +1,15 @@
import { type AppContext } from '@/app.ts'; import { type AppContext } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { decryptAdmin, encryptAdmin } from '@/crypto.ts'; import { decryptAdmin, encryptAdmin } from '@/crypto.ts';
import { type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts'; import { Debug, type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts';
import { connectResponseSchema } from '@/schemas/nostr.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts';
import { jsonSchema } from '@/schema.ts'; import { jsonSchema } from '@/schema.ts';
import { Sub } from '@/subs.ts'; import { Sub } from '@/subs.ts';
import { eventMatchesTemplate, Time } from '@/utils.ts'; import { eventMatchesTemplate, Time } from '@/utils.ts';
import { createAdminEvent } from '@/utils/web.ts'; import { createAdminEvent } from '@/utils/web.ts';
const debug = Debug('ditto:sign');
interface SignEventOpts { interface SignEventOpts {
/** Target proof-of-work difficulty for the signed event. */ /** Target proof-of-work difficulty for the signed event. */
pow?: number; pow?: number;
@ -28,10 +30,12 @@ async function signEvent<K extends number = number>(
const header = c.req.header('x-nostr-sign'); const header = c.req.header('x-nostr-sign');
if (seckey) { if (seckey) {
debug(`Signing Event<${event.kind}> with secret key`);
return finishEvent(event, seckey); return finishEvent(event, seckey);
} }
if (header) { if (header) {
debug(`Signing Event<${event.kind}> with NIP-46`);
return await signNostrConnect(event, c, opts); return await signNostrConnect(event, c, opts);
} }

View File

@ -1,6 +1,6 @@
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
import * as eventsDB from '@/db/events.ts'; import * as eventsDB from '@/db/events.ts';
import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>; type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
type EventStat = keyof Omit<EventStatsRow, 'event_id'>; type EventStat = keyof Omit<EventStatsRow, 'event_id'>;
@ -9,6 +9,8 @@ type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: n
type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number]; type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number];
type StatDiff = AuthorStatDiff | EventStatDiff; type StatDiff = AuthorStatDiff | EventStatDiff;
const debug = Debug('ditto:stats');
/** Store stats for the event in LMDB. */ /** Store stats for the event in LMDB. */
async function updateStats<K extends number>(event: Event<K>) { async function updateStats<K extends number>(event: Event<K>) {
let prev: Event<K> | undefined; let prev: Event<K> | undefined;
@ -26,6 +28,10 @@ async function updateStats<K extends number>(event: Event<K>) {
const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[]; const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[];
const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[]; const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[];
if (statDiffs.length) {
debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs }));
}
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs));
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs));

View File

@ -1,9 +1,11 @@
import { type Event } from '@/deps.ts'; import { Debug, type Event } from '@/deps.ts';
import { Subscription } from '@/subscription.ts'; import { Subscription } from '@/subscription.ts';
import type { DittoFilter } from '@/filter.ts'; import type { DittoFilter } from '@/filter.ts';
import type { EventData } from '@/types.ts'; import type { EventData } from '@/types.ts';
const debug = Debug('ditto:subs');
/** /**
* Manages Ditto event subscriptions. * Manages Ditto event subscriptions.
* Subscriptions can be added, removed, and matched against events. * Subscriptions can be added, removed, and matched against events.
@ -21,6 +23,7 @@ class SubscriptionStore {
* ``` * ```
*/ */
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> { sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
debug('sub', id, JSON.stringify(filters));
let subs = this.#store.get(socket); let subs = this.#store.get(socket);
if (!subs) { if (!subs) {
@ -38,12 +41,14 @@ class SubscriptionStore {
/** Remove a subscription from the store. */ /** Remove a subscription from the store. */
unsub(socket: unknown, id: string): void { unsub(socket: unknown, id: string): void {
debug('unsub', id);
this.#store.get(socket)?.get(id)?.close(); this.#store.get(socket)?.get(id)?.close();
this.#store.get(socket)?.delete(id); this.#store.get(socket)?.delete(id);
} }
/** Remove an entire socket. */ /** Remove an entire socket. */
close(socket: unknown): void { close(socket: unknown): void {
debug('close', socket);
const subs = this.#store.get(socket); const subs = this.#store.get(socket);
if (subs) { if (subs) {

View File

@ -1,18 +1,20 @@
import { TTLCache, z } from '@/deps.ts'; import { Debug, TTLCache, z } from '@/deps.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
const debug = Debug('ditto:nip05');
const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hours(1), max: 5000 }); const nip05Cache = new TTLCache<string, Promise<string | null>>({ ttl: Time.hours(1), max: 5000 });
const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/;
interface LookupOpts { interface LookupOpts {
timeout?: number; signal?: AbortSignal;
} }
/** Get pubkey from NIP-05. */ /** Get pubkey from NIP-05. */
async function lookup(value: string, opts: LookupOpts = {}): Promise<string | null> { async function lookup(value: string, opts: LookupOpts = {}): Promise<string | null> {
const { timeout = 2000 } = opts; const { signal = AbortSignal.timeout(2000) } = opts;
const match = value.match(NIP05_REGEX); const match = value.match(NIP05_REGEX);
if (!match) return null; if (!match) return null;
@ -21,7 +23,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise<string | nu
try { try {
const res = await fetchWorker(`https://${domain}/.well-known/nostr.json?name=${name}`, { const res = await fetchWorker(`https://${domain}/.well-known/nostr.json?name=${name}`, {
signal: AbortSignal.timeout(timeout), signal,
}); });
const { names } = nostrJsonSchema.parse(await res.json()); const { names } = nostrJsonSchema.parse(await res.json());
@ -46,7 +48,7 @@ function lookupNip05Cached(value: string): Promise<string | null> {
const cached = nip05Cache.get(value); const cached = nip05Cache.get(value);
if (cached !== undefined) return cached; if (cached !== undefined) return cached;
console.log(`Looking up NIP-05 for ${value}`); debug(`Lookup ${value}`);
const result = lookup(value); const result = lookup(value);
nip05Cache.set(value, result); nip05Cache.set(value, result);

View File

@ -1,7 +1,9 @@
import { TTLCache, unfurl } from '@/deps.ts'; import { Debug, TTLCache, unfurl } from '@/deps.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
const debug = Debug('ditto:unfurl');
interface PreviewCard { interface PreviewCard {
url: string; url: string;
title: string; title: string;
@ -20,7 +22,7 @@ interface PreviewCard {
} }
async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard | null> { async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard | null> {
console.log(`Unfurling ${url}...`); debug(`Unfurling ${url}...`);
try { try {
const result = await unfurl(url, { const result = await unfurl(url, {
fetch: (url) => fetchWorker(url, { signal }), fetch: (url) => fetchWorker(url, { signal }),
@ -60,12 +62,12 @@ const previewCardCache = new TTLCache<string, Promise<PreviewCard | null>>({
}); });
/** Unfurl card from cache if available, otherwise fetch it. */ /** Unfurl card from cache if available, otherwise fetch it. */
function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise<PreviewCard | null> { function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise<PreviewCard | null> {
const cached = previewCardCache.get(url); const cached = previewCardCache.get(url);
if (cached !== undefined) { if (cached !== undefined) {
return cached; return cached;
} else { } else {
const card = unfurlCard(url, AbortSignal.timeout(timeout)); const card = unfurlCard(url, signal);
previewCardCache.set(url, card); previewCardCache.set(url, card);
return card; return card;
} }

View File

@ -1,13 +1,16 @@
import { Comlink } from '@/deps.ts'; import { Comlink, Debug } from '@/deps.ts';
import './handlers/abortsignal.ts'; import './handlers/abortsignal.ts';
const debug = Debug('ditto:fetch.worker');
export const FetchWorker = { export const FetchWorker = {
async fetch( async fetch(
url: string, url: string,
init: Omit<RequestInit, 'signal'>, init: Omit<RequestInit, 'signal'>,
signal: AbortSignal | null | undefined, signal: AbortSignal | null | undefined,
): Promise<[BodyInit, ResponseInit]> { ): Promise<[BodyInit, ResponseInit]> {
debug(init.method, url);
const response = await fetch(url, { ...init, signal }); const response = await fetch(url, { ...init, signal });
return [ return [
await response.arrayBuffer(), await response.arrayBuffer(),

View File

@ -1,9 +1,10 @@
/// <reference lib="webworker" /> /// <reference lib="webworker" />
import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts'; import { Comlink, type CompiledQuery, Debug, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts';
import '@/sentry.ts'; import '@/sentry.ts';
let db: DenoSqlite3 | undefined; let db: DenoSqlite3 | undefined;
const debug = Debug('ditto:sqlite.worker');
export const SqliteWorker = { export const SqliteWorker = {
open(path: string): void { open(path: string): void {
@ -11,6 +12,7 @@ export const SqliteWorker = {
}, },
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> { executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
if (!db) throw new Error('Database not open'); if (!db) throw new Error('Database not open');
debug(sql);
const result: QueryResult<R> = Sentry.startSpan({ name: sql, op: 'db.query' }, () => { const result: QueryResult<R> = Sentry.startSpan({ name: sql, op: 'db.query' }, () => {
return { return {