Merge branch 'storages-async' into recompute

This commit is contained in:
Alex Gleason 2024-05-14 18:48:26 -05:00
commit 47bc551e0b
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
49 changed files with 478 additions and 349 deletions

View File

@ -25,6 +25,7 @@
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0",
"@std/assert": "jsr:@std/assert@^0.225.1",
"@std/cli": "jsr:@std/cli@^0.223.0", "@std/cli": "jsr:@std/cli@^0.223.0",
"@std/crypto": "jsr:@std/crypto@^0.224.0", "@std/crypto": "jsr:@std/crypto@^0.224.0",
"@std/dotenv": "jsr:@std/dotenv@^0.224.0", "@std/dotenv": "jsr:@std/dotenv@^0.224.0",

View File

@ -3,7 +3,8 @@ import Debug from '@soapbox/stickynotes/debug';
import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, type MiddlewareHandler } from 'hono'; import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, type MiddlewareHandler } from 'hono';
import { cors, logger, serveStatic } from 'hono/middleware'; import { cors, logger, serveStatic } from 'hono/middleware';
import '@/firehose.ts'; import { Conf } from '@/config.ts';
import { startFirehose } from '@/firehose.ts';
import { Time } from '@/utils.ts'; import { Time } from '@/utils.ts';
import { actorController } from '@/controllers/activitypub/actor.ts'; import { actorController } from '@/controllers/activitypub/actor.ts';
@ -108,6 +109,10 @@ const app = new Hono<AppEnv>();
const debug = Debug('ditto:http'); const debug = Debug('ditto:http');
if (Conf.firehoseEnabled) {
startFirehose();
}
app.use('/api/*', logger(debug)); app.use('/api/*', logger(debug));
app.use('/relay/*', logger(debug)); app.use('/relay/*', logger(debug));
app.use('/.well-known/*', logger(debug)); app.use('/.well-known/*', logger(debug));

View File

@ -215,6 +215,10 @@ class Conf {
return Number(Deno.env.get('PG_POOL_SIZE') ?? 10); return Number(Deno.env.get('PG_POOL_SIZE') ?? 10);
}, },
}; };
/** Whether to enable requesting events from known relays. */
static get firehoseEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
}
} }
const optionalBooleanSchema = z const optionalBooleanSchema = z

View File

@ -9,7 +9,7 @@ const actorController: AppController = async (c) => {
const username = c.req.param('username'); const username = c.req.param('username');
const { signal } = c.req.raw; const { signal } = c.req.raw;
const pointer = await localNip05Lookup(username); const pointer = await localNip05Lookup(c.get('store'), username);
if (!pointer) return notFound(c); if (!pointer) return notFound(c);
const event = await getAuthor(pointer.pubkey, { signal }); const event = await getAuthor(pointer.pubkey, { signal });

View File

@ -94,15 +94,16 @@ const accountSearchController: AppController = async (c) => {
} }
const query = decodeURIComponent(q); const query = decodeURIComponent(q);
const store = await Storages.search();
const [event, events] = await Promise.all([ const [event, events] = await Promise.all([
lookupAccount(query), lookupAccount(query),
Storages.search.query([{ kinds: [0], search: query, limit: 20 }], { signal: c.req.raw.signal }), store.query([{ kinds: [0], search: query, limit: 20 }], { signal: c.req.raw.signal }),
]); ]);
const results = await hydrateEvents({ const results = await hydrateEvents({
events: event ? [event, ...events] : events, events: event ? [event, ...events] : events,
storage: Storages.db, store,
signal: c.req.raw.signal, signal: c.req.raw.signal,
}); });
@ -147,8 +148,10 @@ const accountStatusesController: AppController = async (c) => {
const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query()); const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query());
const { signal } = c.req.raw; const { signal } = c.req.raw;
const store = await Storages.db();
if (pinned) { if (pinned) {
const [pinEvent] = await Storages.db.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal }); const [pinEvent] = await store.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal });
if (pinEvent) { if (pinEvent) {
const pinnedEventIds = getTagSet(pinEvent.tags, 'e'); const pinnedEventIds = getTagSet(pinEvent.tags, 'e');
return renderStatuses(c, [...pinnedEventIds].reverse()); return renderStatuses(c, [...pinnedEventIds].reverse());
@ -169,8 +172,8 @@ const accountStatusesController: AppController = async (c) => {
filter['#t'] = [tagged]; filter['#t'] = [tagged];
} }
const events = await Storages.db.query([filter], { signal }) const events = await store.query([filter], { signal })
.then((events) => hydrateEvents({ events, storage: Storages.db, signal })) .then((events) => hydrateEvents({ events, store, signal }))
.then((events) => { .then((events) => {
if (exclude_replies) { if (exclude_replies) {
return events.filter((event) => !findReplyTag(event.tags)); return events.filter((event) => !findReplyTag(event.tags));
@ -244,7 +247,7 @@ const followController: AppController = async (c) => {
const targetPubkey = c.req.param('pubkey'); const targetPubkey = c.req.param('pubkey');
await updateListEvent( await updateListEvent(
{ kinds: [3], authors: [sourcePubkey] }, { kinds: [3], authors: [sourcePubkey], limit: 1 },
(tags) => addTag(tags, ['p', targetPubkey]), (tags) => addTag(tags, ['p', targetPubkey]),
c, c,
); );
@ -261,7 +264,7 @@ const unfollowController: AppController = async (c) => {
const targetPubkey = c.req.param('pubkey'); const targetPubkey = c.req.param('pubkey');
await updateListEvent( await updateListEvent(
{ kinds: [3], authors: [sourcePubkey] }, { kinds: [3], authors: [sourcePubkey], limit: 1 },
(tags) => deleteTag(tags, ['p', targetPubkey]), (tags) => deleteTag(tags, ['p', targetPubkey]),
c, c,
); );
@ -298,7 +301,7 @@ const muteController: AppController = async (c) => {
const targetPubkey = c.req.param('pubkey'); const targetPubkey = c.req.param('pubkey');
await updateListEvent( await updateListEvent(
{ kinds: [10000], authors: [sourcePubkey] }, { kinds: [10000], authors: [sourcePubkey], limit: 1 },
(tags) => addTag(tags, ['p', targetPubkey]), (tags) => addTag(tags, ['p', targetPubkey]),
c, c,
); );
@ -313,7 +316,7 @@ const unmuteController: AppController = async (c) => {
const targetPubkey = c.req.param('pubkey'); const targetPubkey = c.req.param('pubkey');
await updateListEvent( await updateListEvent(
{ kinds: [10000], authors: [sourcePubkey] }, { kinds: [10000], authors: [sourcePubkey], limit: 1 },
(tags) => deleteTag(tags, ['p', targetPubkey]), (tags) => deleteTag(tags, ['p', targetPubkey]),
c, c,
); );
@ -327,7 +330,9 @@ const favouritesController: AppController = async (c) => {
const params = paginationSchema.parse(c.req.query()); const params = paginationSchema.parse(c.req.query());
const { signal } = c.req.raw; const { signal } = c.req.raw;
const events7 = await Storages.db.query( const store = await Storages.db();
const events7 = await store.query(
[{ kinds: [7], authors: [pubkey], ...params }], [{ kinds: [7], authors: [pubkey], ...params }],
{ signal }, { signal },
); );
@ -336,8 +341,8 @@ const favouritesController: AppController = async (c) => {
.map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1]) .map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1])
.filter((id): id is string => !!id); .filter((id): id is string => !!id);
const events1 = await Storages.db.query([{ kinds: [1], ids }], { signal }) const events1 = await store.query([{ kinds: [1], ids }], { signal })
.then((events) => hydrateEvents({ events, storage: Storages.db, signal })); .then((events) => hydrateEvents({ events, store, signal }));
const viewerPubkey = await c.get('signer')?.getPublicKey(); const viewerPubkey = await c.get('signer')?.getPublicKey();

View File

@ -39,12 +39,13 @@ const adminAccountsController: AppController = async (c) => {
return c.json([]); return c.json([]);
} }
const store = await Storages.db();
const { since, until, limit } = paginationSchema.parse(c.req.query()); const { since, until, limit } = paginationSchema.parse(c.req.query());
const { signal } = c.req.raw; const { signal } = c.req.raw;
const events = await Storages.db.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal }); const events = await store.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal });
const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!); const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!);
const authors = await Storages.db.query([{ kinds: [0], authors: pubkeys }], { signal }); const authors = await store.query([{ kinds: [0], authors: pubkeys }], { signal });
for (const event of events) { for (const event of events) {
const d = event.tags.find(([name]) => name === 'd')?.[1]; const d = event.tags.find(([name]) => name === 'd')?.[1];
@ -78,7 +79,7 @@ const adminAccountAction: AppController = async (c) => {
} }
await updateListAdminEvent( await updateListAdminEvent(
{ kinds: [10000], authors: [Conf.pubkey] }, { kinds: [10000], authors: [Conf.pubkey], limit: 1 },
(tags) => addTag(tags, ['p', authorId]), (tags) => addTag(tags, ['p', authorId]),
c, c,
); );

View File

@ -5,10 +5,11 @@ import { renderStatuses } from '@/views.ts';
/** https://docs.joinmastodon.org/methods/bookmarks/#get */ /** https://docs.joinmastodon.org/methods/bookmarks/#get */
const bookmarksController: AppController = async (c) => { const bookmarksController: AppController = async (c) => {
const store = await Storages.db();
const pubkey = await c.get('signer')?.getPublicKey()!; const pubkey = await c.get('signer')?.getPublicKey()!;
const { signal } = c.req.raw; const { signal } = c.req.raw;
const [event10003] = await Storages.db.query( const [event10003] = await store.query(
[{ kinds: [10003], authors: [pubkey], limit: 1 }], [{ kinds: [10003], authors: [pubkey], limit: 1 }],
{ signal }, { signal },
); );

View File

@ -16,7 +16,9 @@ const relaySchema = z.object({
type RelayEntity = z.infer<typeof relaySchema>; type RelayEntity = z.infer<typeof relaySchema>;
export const adminRelaysController: AppController = async (c) => { export const adminRelaysController: AppController = async (c) => {
const [event] = await Storages.db.query([ const store = await Storages.db();
const [event] = await store.query([
{ kinds: [10002], authors: [Conf.pubkey], limit: 1 }, { kinds: [10002], authors: [Conf.pubkey], limit: 1 },
]); ]);
@ -28,6 +30,7 @@ export const adminRelaysController: AppController = async (c) => {
}; };
export const adminSetRelaysController: AppController = async (c) => { export const adminSetRelaysController: AppController = async (c) => {
const store = await Storages.db();
const relays = relaySchema.array().parse(await c.req.json()); const relays = relaySchema.array().parse(await c.req.json());
const event = await new AdminSigner().signEvent({ const event = await new AdminSigner().signEvent({
@ -37,7 +40,7 @@ export const adminSetRelaysController: AppController = async (c) => {
created_at: Math.floor(Date.now() / 1000), created_at: Math.floor(Date.now() / 1000),
}); });
await Storages.db.event(event); await store.event(event);
return c.json(renderRelays(event)); return c.json(renderRelays(event));
}; };

View File

@ -1,10 +1,11 @@
import { AppController } from '@/app.ts'; import { AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { Storages } from '@/storages.ts';
import { getInstanceMetadata } from '@/utils/instance.ts'; import { getInstanceMetadata } from '@/utils/instance.ts';
const instanceController: AppController = async (c) => { const instanceController: AppController = async (c) => {
const { host, protocol } = Conf.url; const { host, protocol } = Conf.url;
const meta = await getInstanceMetadata(c.req.raw.signal); const meta = await getInstanceMetadata(await Storages.db(), c.req.raw.signal);
/** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */ /** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */
const wsProtocol = protocol === 'http:' ? 'ws:' : 'wss:'; const wsProtocol = protocol === 'http:' ? 'ws:' : 'wss:';

View File

@ -5,10 +5,11 @@ import { renderAccounts } from '@/views.ts';
/** https://docs.joinmastodon.org/methods/mutes/#get */ /** https://docs.joinmastodon.org/methods/mutes/#get */
const mutesController: AppController = async (c) => { const mutesController: AppController = async (c) => {
const store = await Storages.db();
const pubkey = await c.get('signer')?.getPublicKey()!; const pubkey = await c.get('signer')?.getPublicKey()!;
const { signal } = c.req.raw; const { signal } = c.req.raw;
const [event10000] = await Storages.db.query( const [event10000] = await store.query(
[{ kinds: [10000], authors: [pubkey], limit: 1 }], [{ kinds: [10000], authors: [pubkey], limit: 1 }],
{ signal }, { signal },
); );

View File

@ -20,7 +20,7 @@ async function renderNotifications(c: AppContext, filters: NostrFilter[]) {
const events = await store const events = await store
.query(filters, { signal }) .query(filters, { signal })
.then((events) => events.filter((event) => event.pubkey !== pubkey)) .then((events) => events.filter((event) => event.pubkey !== pubkey))
.then((events) => hydrateEvents({ events, storage: store, signal })); .then((events) => hydrateEvents({ events, store, signal }));
if (!events.length) { if (!events.length) {
return c.json([]); return c.json([]);

View File

@ -1,4 +1,4 @@
import { NSchema as n } from '@nostrify/nostrify'; import { NSchema as n, NStore } from '@nostrify/nostrify';
import { z } from 'zod'; import { z } from 'zod';
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
@ -9,7 +9,8 @@ import { Storages } from '@/storages.ts';
import { createAdminEvent } from '@/utils/api.ts'; import { createAdminEvent } from '@/utils/api.ts';
const frontendConfigController: AppController = async (c) => { const frontendConfigController: AppController = async (c) => {
const configs = await getConfigs(c.req.raw.signal); const store = await Storages.db();
const configs = await getConfigs(store, c.req.raw.signal);
const frontendConfig = configs.find(({ group, key }) => group === ':pleroma' && key === ':frontend_configurations'); const frontendConfig = configs.find(({ group, key }) => group === ':pleroma' && key === ':frontend_configurations');
if (frontendConfig) { if (frontendConfig) {
@ -25,7 +26,8 @@ const frontendConfigController: AppController = async (c) => {
}; };
const configController: AppController = async (c) => { const configController: AppController = async (c) => {
const configs = await getConfigs(c.req.raw.signal); const store = await Storages.db();
const configs = await getConfigs(store, c.req.raw.signal);
return c.json({ configs, need_reboot: false }); return c.json({ configs, need_reboot: false });
}; };
@ -33,7 +35,8 @@ const configController: AppController = async (c) => {
const updateConfigController: AppController = async (c) => { const updateConfigController: AppController = async (c) => {
const { pubkey } = Conf; const { pubkey } = Conf;
const configs = await getConfigs(c.req.raw.signal); const store = await Storages.db();
const configs = await getConfigs(store, c.req.raw.signal);
const { configs: newConfigs } = z.object({ configs: z.array(configSchema) }).parse(await c.req.json()); const { configs: newConfigs } = z.object({ configs: z.array(configSchema) }).parse(await c.req.json());
for (const { group, key, value } of newConfigs) { for (const { group, key, value } of newConfigs) {
@ -63,10 +66,10 @@ const pleromaAdminDeleteStatusController: AppController = async (c) => {
return c.json({}); return c.json({});
}; };
async function getConfigs(signal: AbortSignal): Promise<PleromaConfig[]> { async function getConfigs(store: NStore, signal: AbortSignal): Promise<PleromaConfig[]> {
const { pubkey } = Conf; const { pubkey } = Conf;
const [event] = await Storages.db.query([{ const [event] = await store.query([{
kinds: [30078], kinds: [30078],
authors: [pubkey], authors: [pubkey],
'#d': ['pub.ditto.pleroma.config'], '#d': ['pub.ditto.pleroma.config'],

View File

@ -48,7 +48,7 @@ const reportController: AppController = async (c) => {
tags, tags,
}, c); }, c);
await hydrateEvents({ events: [event], storage: store }); await hydrateEvents({ events: [event], store });
return c.json(await renderReport(event)); return c.json(await renderReport(event));
}; };
@ -58,7 +58,7 @@ const adminReportsController: AppController = async (c) => {
const viewerPubkey = await c.get('signer')?.getPublicKey(); const viewerPubkey = await c.get('signer')?.getPublicKey();
const reports = await store.query([{ kinds: [1984], '#P': [Conf.pubkey] }]) const reports = await store.query([{ kinds: [1984], '#P': [Conf.pubkey] }])
.then((events) => hydrateEvents({ storage: store, events: events, signal: c.req.raw.signal })) .then((events) => hydrateEvents({ store, events: events, signal: c.req.raw.signal }))
.then((events) => .then((events) =>
Promise.all( Promise.all(
events.map((event) => renderAdminReport(event, { viewerPubkey })), events.map((event) => renderAdminReport(event, { viewerPubkey })),
@ -85,7 +85,7 @@ const adminReportController: AppController = async (c) => {
return c.json({ error: 'This action is not allowed' }, 403); return c.json({ error: 'This action is not allowed' }, 403);
} }
await hydrateEvents({ events: [event], storage: store, signal }); await hydrateEvents({ events: [event], store, signal });
return c.json(await renderAdminReport(event, { viewerPubkey: pubkey })); return c.json(await renderAdminReport(event, { viewerPubkey: pubkey }));
}; };
@ -107,7 +107,7 @@ const adminReportResolveController: AppController = async (c) => {
return c.json({ error: 'This action is not allowed' }, 403); return c.json({ error: 'This action is not allowed' }, 403);
} }
await hydrateEvents({ events: [event], storage: store, signal }); await hydrateEvents({ events: [event], store, signal });
await createAdminEvent({ await createAdminEvent({
kind: 5, kind: 5,

View File

@ -78,7 +78,7 @@ const searchController: AppController = async (c) => {
}; };
/** Get events for the search params. */ /** Get events for the search params. */
function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise<NostrEvent[]> { async function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise<NostrEvent[]> {
if (type === 'hashtags') return Promise.resolve([]); if (type === 'hashtags') return Promise.resolve([]);
const filter: NostrFilter = { const filter: NostrFilter = {
@ -91,8 +91,10 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: Abort
filter.authors = [account_id]; filter.authors = [account_id];
} }
return Storages.search.query([filter], { signal }) const store = await Storages.search();
.then((events) => hydrateEvents({ events, storage: Storages.search, signal }));
return store.query([filter], { signal })
.then((events) => hydrateEvents({ events, store, signal }));
} }
/** Get event kinds to search from `type` query param. */ /** Get event kinds to search from `type` query param. */
@ -110,9 +112,10 @@ function typeToKinds(type: SearchQuery['type']): number[] {
/** Resolve a searched value into an event, if applicable. */ /** Resolve a searched value into an event, if applicable. */
async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise<NostrEvent | undefined> { async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise<NostrEvent | undefined> {
const filters = await getLookupFilters(query, signal); const filters = await getLookupFilters(query, signal);
const store = await Storages.search();
return Storages.search.query(filters, { limit: 1, signal }) return store.query(filters, { limit: 1, signal })
.then((events) => hydrateEvents({ events, storage: Storages.search, signal })) .then((events) => hydrateEvents({ events, store, signal }))
.then(([event]) => event); .then(([event]) => event);
} }

View File

@ -140,7 +140,7 @@ const createStatusController: AppController = async (c) => {
if (data.quote_id) { if (data.quote_id) {
await hydrateEvents({ await hydrateEvents({
events: [event], events: [event],
storage: Storages.db, store: await Storages.db(),
signal: c.req.raw.signal, signal: c.req.raw.signal,
}); });
} }
@ -248,7 +248,7 @@ const reblogStatusController: AppController = async (c) => {
await hydrateEvents({ await hydrateEvents({
events: [reblogEvent], events: [reblogEvent],
storage: Storages.db, store: await Storages.db(),
signal: signal, signal: signal,
}); });
@ -260,23 +260,30 @@ const reblogStatusController: AppController = async (c) => {
/** https://docs.joinmastodon.org/methods/statuses/#unreblog */ /** https://docs.joinmastodon.org/methods/statuses/#unreblog */
const unreblogStatusController: AppController = async (c) => { const unreblogStatusController: AppController = async (c) => {
const eventId = c.req.param('id'); const eventId = c.req.param('id');
const pubkey = await c.get('signer')?.getPublicKey() as string; const pubkey = await c.get('signer')?.getPublicKey()!;
const event = await getEvent(eventId, { const event = await getEvent(eventId, { kind: 1 });
kind: 1,
});
if (!event) return c.json({ error: 'Event not found.' }, 404);
const filters: NostrFilter[] = [{ kinds: [6], authors: [pubkey], '#e': [event.id] }]; if (!event) {
const [repostedEvent] = await Storages.db.query(filters, { limit: 1 }); return c.json({ error: 'Event not found.' }, 404);
if (!repostedEvent) return c.json({ error: 'Event not found.' }, 404); }
const store = await Storages.db();
const [repostedEvent] = await store.query(
[{ kinds: [6], authors: [pubkey], '#e': [event.id], limit: 1 }],
);
if (!repostedEvent) {
return c.json({ error: 'Event not found.' }, 404);
}
await createEvent({ await createEvent({
kind: 5, kind: 5,
tags: [['e', repostedEvent.id]], tags: [['e', repostedEvent.id]],
}, c); }, c);
return c.json(await renderStatus(event, {})); return c.json(await renderStatus(event, { viewerPubkey: pubkey }));
}; };
const rebloggedByController: AppController = (c) => { const rebloggedByController: AppController = (c) => {
@ -297,7 +304,7 @@ const bookmarkController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10003], authors: [pubkey] }, { kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]), (tags) => addTag(tags, ['e', eventId]),
c, c,
); );
@ -324,7 +331,7 @@ const unbookmarkController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10003], authors: [pubkey] }, { kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]), (tags) => deleteTag(tags, ['e', eventId]),
c, c,
); );
@ -351,7 +358,7 @@ const pinController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10001], authors: [pubkey] }, { kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]), (tags) => addTag(tags, ['e', eventId]),
c, c,
); );
@ -380,7 +387,7 @@ const unpinController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10001], authors: [pubkey] }, { kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]), (tags) => deleteTag(tags, ['e', eventId]),
c, c,
); );

View File

@ -68,13 +68,15 @@ const streamingController: AppController = (c) => {
if (!filter) return; if (!filter) return;
try { try {
for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { const store = await Storages.pubsub();
for await (const msg of store.req([filter], { signal: controller.signal })) {
if (msg[0] === 'EVENT') { if (msg[0] === 'EVENT') {
const event = msg[2]; const event = msg[2];
await hydrateEvents({ await hydrateEvents({
events: [event], events: [event],
storage: Storages.admin, store,
signal: AbortSignal.timeout(1000), signal: AbortSignal.timeout(1000),
}); });

View File

@ -40,7 +40,7 @@ async function renderSuggestedAccounts(store: NStore, signal?: AbortSignal) {
[{ kinds: [0], authors: pubkeys, limit: pubkeys.length }], [{ kinds: [0], authors: pubkeys, limit: pubkeys.length }],
{ signal }, { signal },
) )
.then((events) => hydrateEvents({ events, storage: store, signal })); .then((events) => hydrateEvents({ events, store, signal }));
const accounts = await Promise.all(pubkeys.map((pubkey) => { const accounts = await Promise.all(pubkeys.map((pubkey) => {
const profile = profiles.find((event) => event.pubkey === pubkey); const profile = profiles.find((event) => event.pubkey === pubkey);

View File

@ -49,13 +49,7 @@ async function renderStatuses(c: AppContext, filters: NostrFilter[]) {
const events = await store const events = await store
.query(filters, { signal }) .query(filters, { signal })
.then((events) => .then((events) => hydrateEvents({ events, store, signal }));
hydrateEvents({
events,
storage: store,
signal,
})
);
if (!events.length) { if (!events.length) {
return c.json([]); return c.json([]);

View File

@ -1,9 +1,11 @@
import { AppController } from '@/app.ts'; import { AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { Storages } from '@/storages.ts';
import { getInstanceMetadata } from '@/utils/instance.ts'; import { getInstanceMetadata } from '@/utils/instance.ts';
const relayInfoController: AppController = async (c) => { const relayInfoController: AppController = async (c) => {
const meta = await getInstanceMetadata(c.req.raw.signal); const store = await Storages.db();
const meta = await getInstanceMetadata(store, c.req.raw.signal);
return c.json({ return c.json({
name: meta.name, name: meta.name,

View File

@ -72,14 +72,17 @@ function connectStream(socket: WebSocket) {
controllers.get(subId)?.abort(); controllers.get(subId)?.abort();
controllers.set(subId, controller); controllers.set(subId, controller);
for (const event of await Storages.db.query(filters, { limit: FILTER_LIMIT })) { const db = await Storages.db();
const pubsub = await Storages.pubsub();
for (const event of await db.query(filters, { limit: FILTER_LIMIT })) {
send(['EVENT', subId, event]); send(['EVENT', subId, event]);
} }
send(['EOSE', subId]); send(['EOSE', subId]);
try { try {
for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
if (msg[0] === 'EVENT') { if (msg[0] === 'EVENT') {
send(['EVENT', subId, msg[2]]); send(['EVENT', subId, msg[2]]);
} }
@ -116,7 +119,8 @@ function connectStream(socket: WebSocket) {
/** Handle COUNT. Return the number of events matching the filters. */ /** Handle COUNT. Return the number of events matching the filters. */
async function handleCount([_, subId, ...rest]: NostrClientCOUNT): Promise<void> { async function handleCount([_, subId, ...rest]: NostrClientCOUNT): Promise<void> {
const { count } = await Storages.db.count(prepareFilters(rest)); const store = await Storages.db();
const { count } = await store.count(prepareFilters(rest));
send(['COUNT', subId, { count, approximate: false }]); send(['COUNT', subId, { count, approximate: false }]);
} }

View File

@ -12,7 +12,7 @@ const nameSchema = z.string().min(1).regex(/^\w+$/);
const nostrController: AppController = async (c) => { const nostrController: AppController = async (c) => {
const result = nameSchema.safeParse(c.req.query('name')); const result = nameSchema.safeParse(c.req.query('name'));
const name = result.success ? result.data : undefined; const name = result.success ? result.data : undefined;
const pointer = name ? await localNip05Lookup(name) : undefined; const pointer = name ? await localNip05Lookup(c.get('store'), name) : undefined;
if (!name || !pointer) { if (!name || !pointer) {
return c.json({ names: {}, relays: {} }); return c.json({ names: {}, relays: {} });

View File

@ -45,7 +45,7 @@ async function handleAcct(c: AppContext, resource: URL): Promise<Response> {
} }
const [username, host] = result.data; const [username, host] = result.data;
const pointer = await localNip05Lookup(username); const pointer = await localNip05Lookup(c.get('store'), username);
if (!pointer) { if (!pointer) {
return c.json({ error: 'Not found' }, 404); return c.json({ error: 'Not found' }, 404);

View File

@ -1,41 +0,0 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { FileMigrationProvider, Migrator } from 'kysely';
import { DittoDB } from '@/db/DittoDB.ts';
const db = await DittoDB.getInstance();
const migrator = new Migrator({
db,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname,
}),
});
/** Migrate the database to the latest version. */
async function migrate() {
console.info('Running migrations...');
const results = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.info('Everything up-to-date.');
} else {
console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) {
console.info(` - ${migrationName}: ${status}`);
}
}
}
}
await migrate();
export { db };

View File

@ -1,4 +1,7 @@
import { Kysely } from 'kysely'; import fs from 'node:fs/promises';
import path from 'node:path';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
@ -6,17 +9,63 @@ import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
export class DittoDB { export class DittoDB {
private static kysely: Promise<Kysely<DittoTables>> | undefined;
static getInstance(): Promise<Kysely<DittoTables>> { static getInstance(): Promise<Kysely<DittoTables>> {
if (!this.kysely) {
this.kysely = this._getInstance();
}
return this.kysely;
}
static async _getInstance(): Promise<Kysely<DittoTables>> {
const { databaseUrl } = Conf; const { databaseUrl } = Conf;
let kysely: Kysely<DittoTables>;
switch (databaseUrl.protocol) { switch (databaseUrl.protocol) {
case 'sqlite:': case 'sqlite:':
return DittoSQLite.getInstance(); kysely = await DittoSQLite.getInstance();
break;
case 'postgres:': case 'postgres:':
case 'postgresql:': case 'postgresql:':
return DittoPostgres.getInstance(); kysely = await DittoPostgres.getInstance();
break;
default: default:
throw new Error('Unsupported database URL.'); throw new Error('Unsupported database URL.');
} }
await this.migrate(kysely);
return kysely;
}
/** Migrate the database to the latest version. */
private static async migrate(kysely: Kysely<DittoTables>) {
const migrator = new Migrator({
db: kysely,
provider: new FileMigrationProvider({
fs,
path,
migrationFolder: new URL(import.meta.resolve('../db/migrations')).pathname,
}),
});
console.info('Running migrations...');
const results = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.info('Everything up-to-date.');
} else {
console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) {
console.info(` - ${migrationName}: ${status}`);
}
}
}
} }
} }

View File

@ -1,6 +1,6 @@
import uuid62 from 'uuid62'; import uuid62 from 'uuid62';
import { db } from '@/db.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { type MediaData } from '@/schemas/nostr.ts'; import { type MediaData } from '@/schemas/nostr.ts';
interface UnattachedMedia { interface UnattachedMedia {
@ -19,7 +19,8 @@ async function insertUnattachedMedia(media: Omit<UnattachedMedia, 'id' | 'upload
...media, ...media,
}; };
await db.insertInto('unattached_media') const kysely = await DittoDB.getInstance();
await kysely.insertInto('unattached_media')
.values({ ...result, data: JSON.stringify(media.data) }) .values({ ...result, data: JSON.stringify(media.data) })
.execute(); .execute();
@ -27,8 +28,9 @@ async function insertUnattachedMedia(media: Omit<UnattachedMedia, 'id' | 'upload
} }
/** Select query for unattached media. */ /** Select query for unattached media. */
function selectUnattachedMediaQuery() { async function selectUnattachedMediaQuery() {
return db.selectFrom('unattached_media') const kysely = await DittoDB.getInstance();
return kysely.selectFrom('unattached_media')
.select([ .select([
'unattached_media.id', 'unattached_media.id',
'unattached_media.pubkey', 'unattached_media.pubkey',
@ -39,25 +41,27 @@ function selectUnattachedMediaQuery() {
} }
/** Find attachments that exist but aren't attached to any events. */ /** Find attachments that exist but aren't attached to any events. */
function getUnattachedMedia(until: Date) { async function getUnattachedMedia(until: Date) {
return selectUnattachedMediaQuery() const query = await selectUnattachedMediaQuery();
return query
.leftJoin('tags', 'unattached_media.url', 'tags.value') .leftJoin('tags', 'unattached_media.url', 'tags.value')
.where('uploaded_at', '<', until.getTime()) .where('uploaded_at', '<', until.getTime())
.execute(); .execute();
} }
/** Delete unattached media by URL. */ /** Delete unattached media by URL. */
function deleteUnattachedMediaByUrl(url: string) { async function deleteUnattachedMediaByUrl(url: string) {
return db.deleteFrom('unattached_media') const kysely = await DittoDB.getInstance();
return kysely.deleteFrom('unattached_media')
.where('url', '=', url) .where('url', '=', url)
.execute(); .execute();
} }
/** Get unattached media by IDs. */ /** Get unattached media by IDs. */
// deno-lint-ignore require-await
async function getUnattachedMediaByIds(ids: string[]) { async function getUnattachedMediaByIds(ids: string[]) {
if (!ids.length) return []; if (!ids.length) return [];
return selectUnattachedMediaQuery() const query = await selectUnattachedMediaQuery();
return query
.where('id', 'in', ids) .where('id', 'in', ids)
.execute(); .execute();
} }
@ -65,7 +69,8 @@ async function getUnattachedMediaByIds(ids: string[]) {
/** Delete rows as an event with media is being created. */ /** Delete rows as an event with media is being created. */
async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> { async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> {
if (!urls.length) return; if (!urls.length) return;
await db.deleteFrom('unattached_media') const kysely = await DittoDB.getInstance();
await kysely.deleteFrom('unattached_media')
.where('pubkey', '=', pubkey) .where('pubkey', '=', pubkey)
.where('url', 'in', urls) .where('url', 'in', urls)
.execute(); .execute();

View File

@ -60,7 +60,8 @@ async function findUser(user: Partial<User>, signal?: AbortSignal): Promise<User
} }
} }
const [event] = await Storages.db.query([filter], { signal }); const store = await Storages.db();
const [event] = await store.query([filter], { signal });
if (event) { if (event) {
return { return {

View File

@ -1,29 +1,28 @@
import { NostrEvent } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes';
import Debug from '@soapbox/stickynotes/debug';
import { activeRelays, pool } from '@/pool.ts'; import { Storages } from '@/storages.ts';
import { nostrNow } from '@/utils.ts'; import { nostrNow } from '@/utils.ts';
import * as pipeline from './pipeline.ts'; import * as pipeline from './pipeline.ts';
const debug = Debug('ditto:firehose'); const console = new Stickynotes('ditto:firehose');
// This file watches events on all known relays and performs /**
// side-effects based on them, such as trending hashtag tracking * This function watches events on all known relays and performs
// and storing events for notifications and the home feed. * side-effects based on them, such as trending hashtag tracking
pool.subscribe( * and storing events for notifications and the home feed.
[{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }], */
activeRelays, export async function startFirehose() {
handleEvent, const store = await Storages.client();
undefined,
undefined,
);
/** Handle events through the firehose pipeline. */ for await (const msg of store.req([{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }])) {
function handleEvent(event: NostrEvent): Promise<void> { if (msg[0] === 'EVENT') {
debug(`NostrEvent<${event.kind}> ${event.id}`); const event = msg[2];
console.debug(`NostrEvent<${event.kind}> ${event.id}`);
return pipeline pipeline
.handleEvent(event, AbortSignal.timeout(5000)) .handleEvent(event, AbortSignal.timeout(5000))
.catch(() => {}); .catch(() => {});
}
}
} }

View File

@ -7,10 +7,10 @@ export const storeMiddleware: AppMiddleware = async (c, next) => {
const pubkey = await c.get('signer')?.getPublicKey(); const pubkey = await c.get('signer')?.getPublicKey();
if (pubkey) { if (pubkey) {
const store = new UserStore(pubkey, Storages.admin); const store = new UserStore(pubkey, await Storages.admin());
c.set('store', store); c.set('store', store);
} else { } else {
c.set('store', Storages.admin); c.set('store', await Storages.admin());
} }
await next(); await next();
}; };

View File

@ -5,7 +5,7 @@ import Debug from '@soapbox/stickynotes/debug';
import { sql } from 'kysely'; import { sql } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { db } from '@/db.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { isEphemeralKind } from '@/kinds.ts'; import { isEphemeralKind } from '@/kinds.ts';
@ -57,7 +57,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
async function policyFilter(event: NostrEvent): Promise<void> { async function policyFilter(event: NostrEvent): Promise<void> {
const policies: NPolicy[] = [ const policies: NPolicy[] = [
new MuteListPolicy(Conf.pubkey, Storages.admin), new MuteListPolicy(Conf.pubkey, await Storages.admin()),
]; ];
try { try {
@ -76,17 +76,23 @@ async function policyFilter(event: NostrEvent): Promise<void> {
/** Encounter the event, and return whether it has already been encountered. */ /** Encounter the event, and return whether it has already been encountered. */
async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> { async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise<boolean> {
const [existing] = await Storages.cache.query([{ ids: [event.id], limit: 1 }]); const cache = await Storages.cache();
Storages.cache.event(event); const reqmeister = await Storages.reqmeister();
Storages.reqmeister.event(event, { signal });
const [existing] = await cache.query([{ ids: [event.id], limit: 1 }]);
cache.event(event);
reqmeister.event(event, { signal });
return !!existing; return !!existing;
} }
/** Hydrate the event with the user, if applicable. */ /** Hydrate the event with the user, if applicable. */
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> { async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], storage: Storages.db, signal }); await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const domain = await db const kysely = await DittoDB.getInstance();
const domain = await kysely
.selectFrom('pubkey_domains') .selectFrom('pubkey_domains')
.select('domain') .select('domain')
.where('pubkey', '=', event.pubkey) .where('pubkey', '=', event.pubkey)
@ -98,8 +104,9 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
/** Maybe store the event, if eligible. */ /** Maybe store the event, if eligible. */
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<void> { async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<void> {
if (isEphemeralKind(event.kind)) return; if (isEphemeralKind(event.kind)) return;
const store = await Storages.db();
const [deletion] = await Storages.db.query( const [deletion] = await store.query(
[{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }], [{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }],
{ signal }, { signal },
); );
@ -108,7 +115,7 @@ async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<void
return Promise.reject(new RelayError('blocked', 'event was deleted')); return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else { } else {
await updateStats(event).catch(debug); await updateStats(event).catch(debug);
await Storages.db.event(event, { signal }).catch(debug); await store.event(event, { signal }).catch(debug);
} }
} }
@ -134,6 +141,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain. // Track pubkey domain.
try { try {
const kysely = await DittoDB.getInstance();
const { domain } = parseNip05(nip05); const { domain } = parseNip05(nip05);
await sql` await sql`
@ -143,7 +151,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
domain = excluded.domain, domain = excluded.domain,
last_updated_at = excluded.last_updated_at last_updated_at = excluded.last_updated_at
WHERE excluded.last_updated_at > pubkey_domains.last_updated_at WHERE excluded.last_updated_at > pubkey_domains.last_updated_at
`.execute(db); `.execute(kysely);
} catch (_e) { } catch (_e) {
// do nothing // do nothing
} }
@ -153,17 +161,18 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
async function processDeletions(event: NostrEvent, signal: AbortSignal): Promise<void> { async function processDeletions(event: NostrEvent, signal: AbortSignal): Promise<void> {
if (event.kind === 5) { if (event.kind === 5) {
const ids = getTagSet(event.tags, 'e'); const ids = getTagSet(event.tags, 'e');
const store = await Storages.db();
if (event.pubkey === Conf.pubkey) { if (event.pubkey === Conf.pubkey) {
await Storages.db.remove([{ ids: [...ids] }], { signal }); await store.remove([{ ids: [...ids] }], { signal });
} else { } else {
const events = await Storages.db.query( const events = await store.query(
[{ ids: [...ids], authors: [event.pubkey] }], [{ ids: [...ids], authors: [event.pubkey] }],
{ signal }, { signal },
); );
const deleteIds = events.map(({ id }) => id); const deleteIds = events.map(({ id }) => id);
await Storages.db.remove([{ ids: deleteIds }], { signal }); await store.remove([{ ids: deleteIds }], { signal });
} }
} }
} }
@ -189,19 +198,22 @@ async function trackHashtags(event: NostrEvent): Promise<void> {
/** Queue related events to fetch. */ /** Queue related events to fetch. */
async function fetchRelatedEvents(event: DittoEvent) { async function fetchRelatedEvents(event: DittoEvent) {
const cache = await Storages.cache();
const reqmeister = await Storages.reqmeister();
if (!event.author) { if (!event.author) {
const signal = AbortSignal.timeout(3000); const signal = AbortSignal.timeout(3000);
Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal })
.then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal))))
.catch(() => {}); .catch(() => {});
} }
for (const [name, id] of event.tags) { for (const [name, id] of event.tags) {
if (name === 'e') { if (name === 'e') {
const { count } = await Storages.cache.count([{ ids: [id] }]); const { count } = await cache.count([{ ids: [id] }]);
if (!count) { if (!count) {
const signal = AbortSignal.timeout(3000); const signal = AbortSignal.timeout(3000);
Storages.reqmeister.query([{ ids: [id] }], { signal }) reqmeister.query([{ ids: [id] }], { signal })
.then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal))))
.catch(() => {}); .catch(() => {});
} }
@ -272,7 +284,8 @@ function isFresh(event: NostrEvent): boolean {
/** Distribute the event through active subscriptions. */ /** Distribute the event through active subscriptions. */
async function streamOut(event: NostrEvent): Promise<void> { async function streamOut(event: NostrEvent): Promise<void> {
if (isFresh(event)) { if (isFresh(event)) {
await Storages.pubsub.event(event); const pubsub = await Storages.pubsub();
await pubsub.event(event);
} }
} }

View File

@ -34,7 +34,9 @@ export class DVM {
return DVM.feedback(event, 'error', `Forbidden user: ${user}`); return DVM.feedback(event, 'error', `Forbidden user: ${user}`);
} }
const [label] = await Storages.db.query([{ const store = await Storages.db();
const [label] = await store.query([{
kinds: [1985], kinds: [1985],
authors: [admin], authors: [admin],
'#L': ['nip05'], '#L': ['nip05'],

View File

@ -1,34 +0,0 @@
import { RelayPoolWorker } from 'nostr-relaypool';
import { Storages } from '@/storages.ts';
import { Conf } from '@/config.ts';
const [relayList] = await Storages.db.query([
{ kinds: [10002], authors: [Conf.pubkey], limit: 1 },
]);
const tags = relayList?.tags ?? [];
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
if (name === 'r' && !marker) {
acc.push(url);
}
return acc;
}, []);
console.log(`pool: connecting to ${activeRelays.length} relays.`);
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
type: 'module',
});
// @ts-ignore Wrong types.
const pool = new RelayPoolWorker(worker, activeRelays, {
autoReconnect: true,
// The pipeline verifies events.
skipVerification: true,
// The logging feature overwhelms the CPU and creates too many logs.
logErrorsAndNotices: false,
});
export { activeRelays, pool };

View File

@ -25,6 +25,7 @@ const getEvent = async (
opts: GetEventOpts = {}, opts: GetEventOpts = {},
): Promise<DittoEvent | undefined> => { ): Promise<DittoEvent | undefined> => {
debug(`getEvent: ${id}`); debug(`getEvent: ${id}`);
const store = await Storages.optimizer();
const { kind, signal = AbortSignal.timeout(1000) } = opts; const { kind, signal = AbortSignal.timeout(1000) } = opts;
const filter: NostrFilter = { ids: [id], limit: 1 }; const filter: NostrFilter = { ids: [id], limit: 1 };
@ -32,23 +33,25 @@ const getEvent = async (
filter.kinds = [kind]; filter.kinds = [kind];
} }
return await Storages.optimizer.query([filter], { limit: 1, signal }) return await store.query([filter], { limit: 1, signal })
.then((events) => hydrateEvents({ events, storage: Storages.optimizer, signal })) .then((events) => hydrateEvents({ events, store, signal }))
.then(([event]) => event); .then(([event]) => event);
}; };
/** Get a Nostr `set_medatadata` event for a user's pubkey. */ /** Get a Nostr `set_medatadata` event for a user's pubkey. */
const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise<NostrEvent | undefined> => { const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise<NostrEvent | undefined> => {
const store = await Storages.optimizer();
const { signal = AbortSignal.timeout(1000) } = opts; const { signal = AbortSignal.timeout(1000) } = opts;
return await Storages.optimizer.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal }) return await store.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal })
.then((events) => hydrateEvents({ events, storage: Storages.optimizer, signal })) .then((events) => hydrateEvents({ events, store, signal }))
.then(([event]) => event); .then(([event]) => event);
}; };
/** Get users the given pubkey follows. */ /** Get users the given pubkey follows. */
const getFollows = async (pubkey: string, signal?: AbortSignal): Promise<NostrEvent | undefined> => { const getFollows = async (pubkey: string, signal?: AbortSignal): Promise<NostrEvent | undefined> => {
const [event] = await Storages.db.query([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); const store = await Storages.db();
const [event] = await store.query([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
return event; return event;
}; };
@ -84,15 +87,18 @@ async function getAncestors(event: NostrEvent, result: NostrEvent[] = []): Promi
} }
async function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<NostrEvent[]> { async function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<NostrEvent[]> {
const events = await Storages.db.query([{ kinds: [1], '#e': [eventId] }], { limit: 200, signal }); const store = await Storages.db();
return hydrateEvents({ events, storage: Storages.db, signal }); const events = await store.query([{ kinds: [1], '#e': [eventId] }], { limit: 200, signal });
return hydrateEvents({ events, store, signal });
} }
/** Returns whether the pubkey is followed by a local user. */ /** Returns whether the pubkey is followed by a local user. */
async function isLocallyFollowed(pubkey: string): Promise<boolean> { async function isLocallyFollowed(pubkey: string): Promise<boolean> {
const { host } = Conf.url; const { host } = Conf.url;
const [event] = await Storages.db.query( const store = await Storages.db();
const [event] = await store.query(
[{ kinds: [3], '#p': [pubkey], search: `domain:${host}`, limit: 1 }], [{ kinds: [3], '#p': [pubkey], search: `domain:${host}`, limit: 1 }],
{ limit: 1 }, { limit: 1 },
); );

View File

@ -1,5 +1,5 @@
// deno-lint-ignore-file require-await // deno-lint-ignore-file require-await
import { NConnectSigner } from '@nostrify/nostrify'; import { NConnectSigner, NostrEvent, NostrSigner } from '@nostrify/nostrify';
import { AdminSigner } from '@/signers/AdminSigner.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
@ -9,24 +9,55 @@ import { Storages } from '@/storages.ts';
* *
* Simple extension of nostrify's `NConnectSigner`, with our options to keep it DRY. * Simple extension of nostrify's `NConnectSigner`, with our options to keep it DRY.
*/ */
export class ConnectSigner extends NConnectSigner { export class ConnectSigner implements NostrSigner {
private _pubkey: string; private signer: Promise<NConnectSigner>;
constructor(pubkey: string, private relays?: string[]) { constructor(private pubkey: string, private relays?: string[]) {
super({ this.signer = this.init();
pubkey, }
async init(): Promise<NConnectSigner> {
return new NConnectSigner({
pubkey: this.pubkey,
// TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list) // TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list)
relay: Storages.pubsub, relay: await Storages.pubsub(),
signer: new AdminSigner(), signer: new AdminSigner(),
timeout: 60000, timeout: 60000,
}); });
this._pubkey = pubkey;
} }
async signEvent(event: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>): Promise<NostrEvent> {
const signer = await this.signer;
return signer.signEvent(event);
}
readonly nip04 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip04.encrypt(pubkey, plaintext);
},
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip04.decrypt(pubkey, ciphertext);
},
};
readonly nip44 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip44.encrypt(pubkey, plaintext);
},
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer;
return signer.nip44.decrypt(pubkey, ciphertext);
},
};
// Prevent unnecessary NIP-46 round-trips. // Prevent unnecessary NIP-46 round-trips.
async getPublicKey(): Promise<string> { async getPublicKey(): Promise<string> {
return this._pubkey; return this.pubkey;
} }
/** Get the user's relays if they passed in an `nprofile` auth token. */ /** Get the user's relays if they passed in an `nprofile` auth token. */

View File

@ -2,7 +2,7 @@ import { NKinds, NostrEvent } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import { InsertQueryBuilder } from 'kysely'; import { InsertQueryBuilder } from 'kysely';
import { db } from '@/db.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { findReplyTag } from '@/tags.ts'; import { findReplyTag } from '@/tags.ts';
@ -25,7 +25,7 @@ async function updateStats(event: NostrEvent) {
if (event.kind === 3) { if (event.kind === 3) {
prev = await getPrevEvent(event); prev = await getPrevEvent(event);
if (!prev || event.created_at >= prev.created_at) { if (!prev || event.created_at >= prev.created_at) {
queries.push(updateFollowingCountQuery(event)); queries.push(await updateFollowingCountQuery(event));
} }
} }
@ -37,8 +37,8 @@ async function updateStats(event: NostrEvent) {
debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs })); debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs }));
} }
if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); if (pubkeyDiffs.length) queries.push(await authorStatsQuery(pubkeyDiffs));
if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); if (eventDiffs.length) queries.push(await eventStatsQuery(eventDiffs));
if (queries.length) { if (queries.length) {
await Promise.all(queries.map((query) => query.execute())); await Promise.all(queries.map((query) => query.execute()));
@ -47,6 +47,7 @@ async function updateStats(event: NostrEvent) {
/** Calculate stats changes ahead of time so we can build an efficient query. */ /** Calculate stats changes ahead of time so we can build an efficient query. */
async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise<StatDiff[]> { async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise<StatDiff[]> {
const store = await Storages.db();
const statDiffs: StatDiff[] = []; const statDiffs: StatDiff[] = [];
const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1]; const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1];
@ -65,7 +66,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
case 5: { case 5: {
if (!firstTaggedId) break; if (!firstTaggedId) break;
const [repostedEvent] = await Storages.db.query( const [repostedEvent] = await store.query(
[{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }], [{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }],
{ limit: 1 }, { limit: 1 },
); );
@ -77,7 +78,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1]; const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1];
if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break; if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break;
const [eventBeingReposted] = await Storages.db.query( const [eventBeingReposted] = await store.query(
[{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }], [{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }],
{ limit: 1 }, { limit: 1 },
); );
@ -101,7 +102,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr
} }
/** Create an author stats query from the list of diffs. */ /** Create an author stats query from the list of diffs. */
function authorStatsQuery(diffs: AuthorStatDiff[]) { async function authorStatsQuery(diffs: AuthorStatDiff[]) {
const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => { const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => {
const row: DittoTables['author_stats'] = { const row: DittoTables['author_stats'] = {
pubkey, pubkey,
@ -113,7 +114,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) {
return row; return row;
}); });
return db.insertInto('author_stats') const kysely = await DittoDB.getInstance();
return kysely.insertInto('author_stats')
.values(values) .values(values)
.onConflict((oc) => .onConflict((oc) =>
oc oc
@ -127,7 +129,7 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) {
} }
/** Create an event stats query from the list of diffs. */ /** Create an event stats query from the list of diffs. */
function eventStatsQuery(diffs: EventStatDiff[]) { async function eventStatsQuery(diffs: EventStatDiff[]) {
const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => { const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => {
const row: DittoTables['event_stats'] = { const row: DittoTables['event_stats'] = {
event_id, event_id,
@ -139,7 +141,8 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
return row; return row;
}); });
return db.insertInto('event_stats') const kysely = await DittoDB.getInstance();
return kysely.insertInto('event_stats')
.values(values) .values(values)
.onConflict((oc) => .onConflict((oc) =>
oc oc
@ -155,7 +158,9 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
/** Get the last version of the event, if any. */ /** Get the last version of the event, if any. */
async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined> { async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined> {
if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) { if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) {
const [prev] = await Storages.db.query([ const store = await Storages.db();
const [prev] = await store.query([
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 }, { kinds: [event.kind], authors: [event.pubkey], limit: 1 },
]); ]);
@ -164,14 +169,15 @@ async function getPrevEvent(event: NostrEvent): Promise<NostrEvent | undefined>
} }
/** Set the following count to the total number of unique "p" tags in the follow list. */ /** Set the following count to the total number of unique "p" tags in the follow list. */
function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) { async function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) {
const following_count = new Set( const following_count = new Set(
tags tags
.filter(([name]) => name === 'p') .filter(([name]) => name === 'p')
.map(([_, value]) => value), .map(([_, value]) => value),
).size; ).size;
return db.insertInto('author_stats') const kysely = await DittoDB.getInstance();
return kysely.insertInto('author_stats')
.values({ .values({
pubkey, pubkey,
following_count, following_count,

View File

@ -1,7 +1,9 @@
// deno-lint-ignore-file require-await
import { NCache } from '@nostrify/nostrify'; import { NCache } from '@nostrify/nostrify';
import { RelayPoolWorker } from 'nostr-relaypool';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { db } from '@/db.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { activeRelays, pool } from '@/pool.ts';
import { EventsDB } from '@/storages/events-db.ts'; import { EventsDB } from '@/storages/events-db.ts';
import { Optimizer } from '@/storages/optimizer.ts'; import { Optimizer } from '@/storages/optimizer.ts';
import { PoolStore } from '@/storages/pool-store.ts'; import { PoolStore } from '@/storages/pool-store.ts';
@ -12,89 +14,130 @@ import { UserStore } from '@/storages/UserStore.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
export class Storages { export class Storages {
private static _db: EventsDB | undefined; private static _db: Promise<EventsDB> | undefined;
private static _admin: UserStore | undefined; private static _admin: Promise<UserStore> | undefined;
private static _cache: NCache | undefined; private static _cache: Promise<NCache> | undefined;
private static _client: PoolStore | undefined; private static _client: Promise<PoolStore> | undefined;
private static _optimizer: Optimizer | undefined; private static _optimizer: Promise<Optimizer> | undefined;
private static _reqmeister: Reqmeister | undefined; private static _reqmeister: Promise<Reqmeister> | undefined;
private static _pubsub: InternalRelay | undefined; private static _pubsub: Promise<InternalRelay> | undefined;
private static _search: SearchStore | undefined; private static _search: Promise<SearchStore> | undefined;
/** SQLite database to store events this Ditto server cares about. */ /** SQLite database to store events this Ditto server cares about. */
public static get db(): EventsDB { public static async db(): Promise<EventsDB> {
if (!this._db) { if (!this._db) {
this._db = new EventsDB(db); this._db = (async () => {
const kysely = await DittoDB.getInstance();
return new EventsDB(kysely);
})();
} }
return this._db; return this._db;
} }
/** Admin user storage. */ /** Admin user storage. */
public static get admin(): UserStore { public static async admin(): Promise<UserStore> {
if (!this._admin) { if (!this._admin) {
this._admin = new UserStore(Conf.pubkey, this.db); this._admin = Promise.resolve(new UserStore(Conf.pubkey, await this.db()));
} }
return this._admin; return this._admin;
} }
/** Internal pubsub relay between controllers and the pipeline. */ /** Internal pubsub relay between controllers and the pipeline. */
public static get pubsub(): InternalRelay { public static async pubsub(): Promise<InternalRelay> {
if (!this._pubsub) { if (!this._pubsub) {
this._pubsub = new InternalRelay(); this._pubsub = Promise.resolve(new InternalRelay());
} }
return this._pubsub; return this._pubsub;
} }
/** Relay pool storage. */ /** Relay pool storage. */
public static get client(): PoolStore { public static async client(): Promise<PoolStore> {
if (!this._client) { if (!this._client) {
this._client = new PoolStore({ this._client = (async () => {
pool, const db = await this.db();
relays: activeRelays,
}); const [relayList] = await db.query([
{ kinds: [10002], authors: [Conf.pubkey], limit: 1 },
]);
const tags = relayList?.tags ?? [];
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
if (name === 'r' && !marker) {
acc.push(url);
}
return acc;
}, []);
console.log(`pool: connecting to ${activeRelays.length} relays.`);
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
type: 'module',
});
// @ts-ignore Wrong types.
const pool = new RelayPoolWorker(worker, activeRelays, {
autoReconnect: true,
// The pipeline verifies events.
skipVerification: true,
// The logging feature overwhelms the CPU and creates too many logs.
logErrorsAndNotices: false,
});
return new PoolStore({
pool,
relays: activeRelays,
});
})();
} }
return this._client; return this._client;
} }
/** In-memory data store for cached events. */ /** In-memory data store for cached events. */
public static get cache(): NCache { public static async cache(): Promise<NCache> {
if (!this._cache) { if (!this._cache) {
this._cache = new NCache({ max: 3000 }); this._cache = Promise.resolve(new NCache({ max: 3000 }));
} }
return this._cache; return this._cache;
} }
/** Batches requests for single events. */ /** Batches requests for single events. */
public static get reqmeister(): Reqmeister { public static async reqmeister(): Promise<Reqmeister> {
if (!this._reqmeister) { if (!this._reqmeister) {
this._reqmeister = new Reqmeister({ this._reqmeister = Promise.resolve(
client: this.client, new Reqmeister({
delay: Time.seconds(1), client: await this.client(),
timeout: Time.seconds(1), delay: Time.seconds(1),
}); timeout: Time.seconds(1),
}),
);
} }
return this._reqmeister; return this._reqmeister;
} }
/** Main Ditto storage adapter */ /** Main Ditto storage adapter */
public static get optimizer(): Optimizer { public static async optimizer(): Promise<Optimizer> {
if (!this._optimizer) { if (!this._optimizer) {
this._optimizer = new Optimizer({ this._optimizer = Promise.resolve(
db: this.db, new Optimizer({
cache: this.cache, db: await this.db(),
client: this.reqmeister, cache: await this.cache(),
}); client: await this.reqmeister(),
}),
);
} }
return this._optimizer; return this._optimizer;
} }
/** Storage to use for remote search. */ /** Storage to use for remote search. */
public static get search(): SearchStore { public static async search(): Promise<SearchStore> {
if (!this._search) { if (!this._search) {
this._search = new SearchStore({ this._search = Promise.resolve(
relay: Conf.searchRelay, new SearchStore({
fallback: this.optimizer, relay: Conf.searchRelay,
}); fallback: await this.optimizer(),
}),
);
} }
return this._search; return this._search;
} }

View File

@ -4,13 +4,7 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { getTagSet } from '@/tags.ts'; import { getTagSet } from '@/tags.ts';
export class UserStore implements NStore { export class UserStore implements NStore {
private store: NStore; constructor(private pubkey: string, private store: NStore) {}
private pubkey: string;
constructor(pubkey: string, store: NStore) {
this.pubkey = pubkey;
this.store = store;
}
async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise<void> { async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise<void> {
return await this.store.event(event, opts); return await this.store.event(event, opts);
@ -21,12 +15,11 @@ export class UserStore implements NStore {
* https://github.com/nostr-protocol/nips/blob/master/51.md#standard-lists * https://github.com/nostr-protocol/nips/blob/master/51.md#standard-lists
*/ */
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<DittoEvent[]> { async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<DittoEvent[]> {
const allEvents = await this.store.query(filters, opts); const events = await this.store.query(filters, opts);
const pubkeys = await this.getMutedPubkeys();
const mutedPubkeys = await this.getMutedPubkeys(); return events.filter((event) => {
return event.kind === 0 || !pubkeys.has(event.pubkey);
return allEvents.filter((event) => {
return event.kind === 0 || mutedPubkeys.has(event.pubkey) === false;
}); });
} }

View File

@ -1,12 +1,14 @@
import { db } from '@/db.ts'; import { assertEquals, assertRejects } from '@std/assert';
import { assertEquals, assertRejects } from '@/deps-test.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import event0 from '~/fixtures/events/event-0.json' with { type: 'json' }; import event0 from '~/fixtures/events/event-0.json' with { type: 'json' };
import event1 from '~/fixtures/events/event-1.json' with { type: 'json' }; import event1 from '~/fixtures/events/event-1.json' with { type: 'json' };
import { EventsDB } from '@/storages/events-db.ts'; import { EventsDB } from '@/storages/events-db.ts';
const eventsDB = new EventsDB(db); const kysely = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
Deno.test('count filters', async () => { Deno.test('count filters', async () => {
assertEquals((await eventsDB.count([{ kinds: [1] }])).count, 0); assertEquals((await eventsDB.count([{ kinds: [1] }])).count, 0);
@ -34,7 +36,7 @@ Deno.test('query events with domain search filter', async () => {
assertEquals(await eventsDB.query([{ search: 'domain:localhost:8000' }]), []); assertEquals(await eventsDB.query([{ search: 'domain:localhost:8000' }]), []);
assertEquals(await eventsDB.query([{ search: '' }]), [event1]); assertEquals(await eventsDB.query([{ search: '' }]), [event1]);
await db await kysely
.insertInto('pubkey_domains') .insertInto('pubkey_domains')
.values({ pubkey: event1.pubkey, domain: 'localhost:8000', last_updated_at: event1.created_at }) .values({ pubkey: event1.pubkey, domain: 'localhost:8000', last_updated_at: event1.created_at })
.execute(); .execute();

View File

@ -17,7 +17,7 @@ Deno.test('hydrateEvents(): author --- WITHOUT stats', async () => {
await hydrateEvents({ await hydrateEvents({
events: [event1], events: [event1],
storage: db, store: db,
}); });
const expectedEvent = { ...event1, author: event0 }; const expectedEvent = { ...event1, author: event0 };
@ -40,7 +40,7 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => {
await hydrateEvents({ await hydrateEvents({
events: [event6], events: [event6],
storage: db, store: db,
}); });
const expectedEvent6 = { const expectedEvent6 = {
@ -67,7 +67,7 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => {
await hydrateEvents({ await hydrateEvents({
events: [event1quoteRepost], events: [event1quoteRepost],
storage: db, store: db,
}); });
const expectedEvent1quoteRepost = { const expectedEvent1quoteRepost = {
@ -95,7 +95,7 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async ()
await hydrateEvents({ await hydrateEvents({
events: [event6], events: [event6],
storage: db, store: db,
}); });
const expectedEvent6 = { const expectedEvent6 = {
@ -122,7 +122,7 @@ Deno.test('hydrateEvents(): report pubkey and post // kind 1984 --- WITHOUT stat
await hydrateEvents({ await hydrateEvents({
events: [reportEvent], events: [reportEvent],
storage: db, store: db,
}); });
const expectedEvent: DittoEvent = { const expectedEvent: DittoEvent = {

View File

@ -1,20 +1,20 @@
import { NostrEvent, NStore } from '@nostrify/nostrify'; import { NostrEvent, NStore } from '@nostrify/nostrify';
import { matchFilter } from 'nostr-tools'; import { matchFilter } from 'nostr-tools';
import { db } from '@/db.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
interface HydrateOpts { interface HydrateOpts {
events: DittoEvent[]; events: DittoEvent[];
storage: NStore; store: NStore;
signal?: AbortSignal; signal?: AbortSignal;
} }
/** Hydrate events using the provided storage. */ /** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> { async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, storage, signal } = opts; const { events, store, signal } = opts;
if (!events.length) { if (!events.length) {
return events; return events;
@ -22,31 +22,31 @@ async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const cache = [...events]; const cache = [...events];
for (const event of await gatherReposts({ events: cache, storage, signal })) { for (const event of await gatherReposts({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherReacted({ events: cache, storage, signal })) { for (const event of await gatherReacted({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherQuotes({ events: cache, storage, signal })) { for (const event of await gatherQuotes({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherAuthors({ events: cache, storage, signal })) { for (const event of await gatherAuthors({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherUsers({ events: cache, storage, signal })) { for (const event of await gatherUsers({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherReportedProfiles({ events: cache, storage, signal })) { for (const event of await gatherReportedProfiles({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
for (const event of await gatherReportedNotes({ events: cache, storage, signal })) { for (const event of await gatherReportedNotes({ events: cache, store, signal })) {
cache.push(event); cache.push(event);
} }
@ -123,7 +123,7 @@ function assembleEvents(
} }
/** Collect reposts from the events. */ /** Collect reposts from the events. */
function gatherReposts({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherReposts({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const ids = new Set<string>(); const ids = new Set<string>();
for (const event of events) { for (const event of events) {
@ -135,14 +135,14 @@ function gatherReposts({ events, storage, signal }: HydrateOpts): Promise<DittoE
} }
} }
return storage.query( return store.query(
[{ ids: [...ids], limit: ids.size }], [{ ids: [...ids], limit: ids.size }],
{ signal }, { signal },
); );
} }
/** Collect events being reacted to by the events. */ /** Collect events being reacted to by the events. */
function gatherReacted({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherReacted({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const ids = new Set<string>(); const ids = new Set<string>();
for (const event of events) { for (const event of events) {
@ -154,14 +154,14 @@ function gatherReacted({ events, storage, signal }: HydrateOpts): Promise<DittoE
} }
} }
return storage.query( return store.query(
[{ ids: [...ids], limit: ids.size }], [{ ids: [...ids], limit: ids.size }],
{ signal }, { signal },
); );
} }
/** Collect quotes from the events. */ /** Collect quotes from the events. */
function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherQuotes({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const ids = new Set<string>(); const ids = new Set<string>();
for (const event of events) { for (const event of events) {
@ -173,34 +173,34 @@ function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise<DittoEv
} }
} }
return storage.query( return store.query(
[{ ids: [...ids], limit: ids.size }], [{ ids: [...ids], limit: ids.size }],
{ signal }, { signal },
); );
} }
/** Collect authors from the events. */ /** Collect authors from the events. */
function gatherAuthors({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherAuthors({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const pubkeys = new Set(events.map((event) => event.pubkey)); const pubkeys = new Set(events.map((event) => event.pubkey));
return storage.query( return store.query(
[{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], [{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }],
{ signal }, { signal },
); );
} }
/** Collect users from the events. */ /** Collect users from the events. */
function gatherUsers({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherUsers({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const pubkeys = new Set(events.map((event) => event.pubkey)); const pubkeys = new Set(events.map((event) => event.pubkey));
return storage.query( return store.query(
[{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }], [{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }],
{ signal }, { signal },
); );
} }
/** Collect reported notes from the events. */ /** Collect reported notes from the events. */
function gatherReportedNotes({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherReportedNotes({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const ids = new Set<string>(); const ids = new Set<string>();
for (const event of events) { for (const event of events) {
if (event.kind === 1984) { if (event.kind === 1984) {
@ -213,14 +213,14 @@ function gatherReportedNotes({ events, storage, signal }: HydrateOpts): Promise<
} }
} }
return storage.query( return store.query(
[{ kinds: [1], ids: [...ids], limit: ids.size }], [{ kinds: [1], ids: [...ids], limit: ids.size }],
{ signal }, { signal },
); );
} }
/** Collect reported profiles from the events. */ /** Collect reported profiles from the events. */
function gatherReportedProfiles({ events, storage, signal }: HydrateOpts): Promise<DittoEvent[]> { function gatherReportedProfiles({ events, store, signal }: HydrateOpts): Promise<DittoEvent[]> {
const pubkeys = new Set<string>(); const pubkeys = new Set<string>();
for (const event of events) { for (const event of events) {
@ -232,14 +232,14 @@ function gatherReportedProfiles({ events, storage, signal }: HydrateOpts): Promi
} }
} }
return storage.query( return store.query(
[{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], [{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }],
{ signal }, { signal },
); );
} }
/** Collect author stats from the events. */ /** Collect author stats from the events. */
function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> { async function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_stats'][]> {
const pubkeys = new Set<string>( const pubkeys = new Set<string>(
events events
.filter((event) => event.kind === 0) .filter((event) => event.kind === 0)
@ -250,7 +250,8 @@ function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_st
return Promise.resolve([]); return Promise.resolve([]);
} }
return db const kysely = await DittoDB.getInstance();
return kysely
.selectFrom('author_stats') .selectFrom('author_stats')
.selectAll() .selectAll()
.where('pubkey', 'in', [...pubkeys]) .where('pubkey', 'in', [...pubkeys])
@ -258,7 +259,7 @@ function gatherAuthorStats(events: DittoEvent[]): Promise<DittoTables['author_st
} }
/** Collect event stats from the events. */ /** Collect event stats from the events. */
function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> { async function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stats'][]> {
const ids = new Set<string>( const ids = new Set<string>(
events events
.filter((event) => event.kind === 1) .filter((event) => event.kind === 1)
@ -269,7 +270,8 @@ function gatherEventStats(events: DittoEvent[]): Promise<DittoTables['event_stat
return Promise.resolve([]); return Promise.resolve([]);
} }
return db const kysely = await DittoDB.getInstance();
return kysely
.selectFrom('event_stats') .selectFrom('event_stats')
.selectAll() .selectAll()
.where('event_id', 'in', [...ids]) .where('event_id', 'in', [...ids])

View File

@ -13,6 +13,7 @@ import { RelayPoolWorker } from 'nostr-relaypool';
import { getFilterLimit, matchFilters } from 'nostr-tools'; import { getFilterLimit, matchFilters } from 'nostr-tools';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { Storages } from '@/storages.ts';
import { purifyEvent } from '@/storages/hydrate.ts'; import { purifyEvent } from '@/storages/hydrate.ts';
import { abortError } from '@/utils/abort.ts'; import { abortError } from '@/utils/abort.ts';
import { getRelays } from '@/utils/outbox.ts'; import { getRelays } from '@/utils/outbox.ts';
@ -35,7 +36,7 @@ class PoolStore implements NRelay {
async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> { async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> {
if (opts.signal?.aborted) return Promise.reject(abortError()); if (opts.signal?.aborted) return Promise.reject(abortError());
const relaySet = await getRelays(event.pubkey); const relaySet = await getRelays(await Storages.db(), event.pubkey);
relaySet.delete(Conf.relay); relaySet.delete(Conf.relay);
const relays = [...relaySet].slice(0, 4); const relays = [...relaySet].slice(0, 4);

View File

@ -48,7 +48,7 @@ class SearchStore implements NStore {
return hydrateEvents({ return hydrateEvents({
events, events,
storage: this.#hydrator, store: this.#hydrator,
signal: opts?.signal, signal: opts?.signal,
}); });
} else { } else {

View File

@ -42,7 +42,7 @@ async function createEvent(t: EventStub, c: AppContext): Promise<NostrEvent> {
/** Filter for fetching an existing event to update. */ /** Filter for fetching an existing event to update. */
interface UpdateEventFilter extends NostrFilter { interface UpdateEventFilter extends NostrFilter {
kinds: [number]; kinds: [number];
limit?: 1; limit: 1;
} }
/** Fetch existing event, update it, then publish the new event. */ /** Fetch existing event, update it, then publish the new event. */
@ -51,7 +51,8 @@ async function updateEvent<E extends EventStub>(
fn: (prev: NostrEvent | undefined) => E, fn: (prev: NostrEvent | undefined) => E,
c: AppContext, c: AppContext,
): Promise<NostrEvent> { ): Promise<NostrEvent> {
const [prev] = await Storages.db.query([filter], { limit: 1, signal: c.req.raw.signal }); const store = await Storages.db();
const [prev] = await store.query([filter], { signal: c.req.raw.signal });
return createEvent(fn(prev), c); return createEvent(fn(prev), c);
} }
@ -101,7 +102,8 @@ async function updateAdminEvent<E extends EventStub>(
fn: (prev: NostrEvent | undefined) => E, fn: (prev: NostrEvent | undefined) => E,
c: AppContext, c: AppContext,
): Promise<NostrEvent> { ): Promise<NostrEvent> {
const [prev] = await Storages.db.query([filter], { limit: 1, signal: c.req.raw.signal }); const store = await Storages.db();
const [prev] = await store.query([filter], { limit: 1, signal: c.req.raw.signal });
return createAdminEvent(fn(prev), c); return createAdminEvent(fn(prev), c);
} }
@ -110,7 +112,8 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEven
debug('EVENT', event); debug('EVENT', event);
try { try {
await pipeline.handleEvent(event, c.req.raw.signal); await pipeline.handleEvent(event, c.req.raw.signal);
await Storages.client.event(event); const client = await Storages.client();
await client.event(event);
} catch (e) { } catch (e) {
if (e instanceof RelayError) { if (e instanceof RelayError) {
throw new HTTPException(422, { throw new HTTPException(422, {

View File

@ -1,4 +1,5 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { Storages } from '@/storages.ts';
import { getInstanceMetadata } from '@/utils/instance.ts'; import { getInstanceMetadata } from '@/utils/instance.ts';
/** NIP-46 client-connect metadata. */ /** NIP-46 client-connect metadata. */
@ -11,7 +12,7 @@ interface ConnectMetadata {
/** Get NIP-46 `nostrconnect://` URI for the Ditto server. */ /** Get NIP-46 `nostrconnect://` URI for the Ditto server. */
export async function getClientConnectUri(signal?: AbortSignal): Promise<string> { export async function getClientConnectUri(signal?: AbortSignal): Promise<string> {
const uri = new URL('nostrconnect://'); const uri = new URL('nostrconnect://');
const { name, tagline } = await getInstanceMetadata(signal); const { name, tagline } = await getInstanceMetadata(await Storages.db(), signal);
const metadata: ConnectMetadata = { const metadata: ConnectMetadata = {
name, name,

View File

@ -1,8 +1,7 @@
import { NostrEvent, NostrMetadata, NSchema as n } from '@nostrify/nostrify'; import { NostrEvent, NostrMetadata, NSchema as n, NStore } from '@nostrify/nostrify';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { serverMetaSchema } from '@/schemas/nostr.ts'; import { serverMetaSchema } from '@/schemas/nostr.ts';
import { Storages } from '@/storages.ts';
/** Like NostrMetadata, but some fields are required and also contains some extra fields. */ /** Like NostrMetadata, but some fields are required and also contains some extra fields. */
export interface InstanceMetadata extends NostrMetadata { export interface InstanceMetadata extends NostrMetadata {
@ -14,8 +13,8 @@ export interface InstanceMetadata extends NostrMetadata {
} }
/** Get and parse instance metadata from the kind 0 of the admin user. */ /** Get and parse instance metadata from the kind 0 of the admin user. */
export async function getInstanceMetadata(signal?: AbortSignal): Promise<InstanceMetadata> { export async function getInstanceMetadata(store: NStore, signal?: AbortSignal): Promise<InstanceMetadata> {
const [event] = await Storages.db.query( const [event] = await store.query(
[{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], [{ kinds: [0], authors: [Conf.pubkey], limit: 1 }],
{ signal }, { signal },
); );

View File

@ -1,4 +1,4 @@
import { NIP05 } from '@nostrify/nostrify'; import { NIP05, NStore } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import { nip19 } from 'nostr-tools'; import { nip19 } from 'nostr-tools';
@ -16,7 +16,8 @@ const nip05Cache = new SimpleLRU<string, nip19.ProfilePointer>(
const [name, domain] = key.split('@'); const [name, domain] = key.split('@');
try { try {
if (domain === Conf.url.host) { if (domain === Conf.url.host) {
const pointer = await localNip05Lookup(name); const store = await Storages.db();
const pointer = await localNip05Lookup(store, name);
if (pointer) { if (pointer) {
debug(`Found: ${key} is ${pointer.pubkey}`); debug(`Found: ${key} is ${pointer.pubkey}`);
return pointer; return pointer;
@ -36,8 +37,8 @@ const nip05Cache = new SimpleLRU<string, nip19.ProfilePointer>(
{ max: 500, ttl: Time.hours(1) }, { max: 500, ttl: Time.hours(1) },
); );
async function localNip05Lookup(name: string): Promise<nip19.ProfilePointer | undefined> { async function localNip05Lookup(store: NStore, name: string): Promise<nip19.ProfilePointer | undefined> {
const [label] = await Storages.db.query([{ const [label] = await store.query([{
kinds: [1985], kinds: [1985],
authors: [Conf.pubkey], authors: [Conf.pubkey],
'#L': ['nip05'], '#L': ['nip05'],

View File

@ -1,10 +1,11 @@
import { Conf } from '@/config.ts'; import { NStore } from '@nostrify/nostrify';
import { Storages } from '@/storages.ts';
export async function getRelays(pubkey: string): Promise<Set<string>> { import { Conf } from '@/config.ts';
export async function getRelays(store: NStore, pubkey: string): Promise<Set<string>> {
const relays = new Set<`wss://${string}`>(); const relays = new Set<`wss://${string}`>();
const events = await Storages.db.query([ const events = await store.query([
{ kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 }, { kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 },
]); ]);

View File

@ -12,15 +12,16 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal
return c.json([]); return c.json([]);
} }
const events = await Storages.db.query(filters, { signal }); const store = await Storages.db();
const events = await store.query(filters, { signal });
const pubkeys = new Set(events.map(({ pubkey }) => pubkey)); const pubkeys = new Set(events.map(({ pubkey }) => pubkey));
if (!pubkeys.size) { if (!pubkeys.size) {
return c.json([]); return c.json([]);
} }
const authors = await Storages.db.query([{ kinds: [0], authors: [...pubkeys] }], { signal }) const authors = await store.query([{ kinds: [0], authors: [...pubkeys] }], { signal })
.then((events) => hydrateEvents({ events, storage: Storages.db, signal })); .then((events) => hydrateEvents({ events, store, signal }));
const accounts = await Promise.all( const accounts = await Promise.all(
authors.map((event) => renderAccount(event)), authors.map((event) => renderAccount(event)),
@ -32,8 +33,10 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal
async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) { async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) {
const { since, until, limit } = paginationSchema.parse(c.req.query()); const { since, until, limit } = paginationSchema.parse(c.req.query());
const events = await Storages.db.query([{ kinds: [0], authors, since, until, limit }], { signal }) const store = await Storages.db();
.then((events) => hydrateEvents({ events, storage: Storages.db, signal }));
const events = await store.query([{ kinds: [0], authors, since, until, limit }], { signal })
.then((events) => hydrateEvents({ events, store, signal }));
const accounts = await Promise.all( const accounts = await Promise.all(
events.map((event) => renderAccount(event)), events.map((event) => renderAccount(event)),
@ -48,10 +51,11 @@ async function renderStatuses(c: AppContext, ids: string[], signal = AbortSignal
return c.json([]); return c.json([]);
} }
const store = await Storages.db();
const { limit } = paginationSchema.parse(c.req.query()); const { limit } = paginationSchema.parse(c.req.query());
const events = await Storages.db.query([{ kinds: [1], ids, limit }], { signal }) const events = await store.query([{ kinds: [1], ids, limit }], { signal })
.then((events) => hydrateEvents({ events, storage: Storages.db, signal })); .then((events) => hydrateEvents({ events, store, signal }));
if (!events.length) { if (!events.length) {
return c.json([]); return c.json([]);

View File

@ -2,7 +2,9 @@ import { Storages } from '@/storages.ts';
import { hasTag } from '@/tags.ts'; import { hasTag } from '@/tags.ts';
async function renderRelationship(sourcePubkey: string, targetPubkey: string) { async function renderRelationship(sourcePubkey: string, targetPubkey: string) {
const events = await Storages.db.query([ const db = await Storages.db();
const events = await db.query([
{ kinds: [3], authors: [sourcePubkey], limit: 1 }, { kinds: [3], authors: [sourcePubkey], limit: 1 },
{ kinds: [3], authors: [targetPubkey], limit: 1 }, { kinds: [3], authors: [targetPubkey], limit: 1 },
{ kinds: [10000], authors: [sourcePubkey], limit: 1 }, { kinds: [10000], authors: [sourcePubkey], limit: 1 },

View File

@ -22,7 +22,7 @@ interface RenderStatusOpts {
async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<any> { async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<any> {
const { viewerPubkey, depth = 1 } = opts; const { viewerPubkey, depth = 1 } = opts;
if (depth > 2 || depth < 0) return null; if (depth > 2 || depth < 0) return;
const note = nip19.noteEncode(event.id); const note = nip19.noteEncode(event.id);
@ -40,7 +40,10 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
), ),
]; ];
const mentionedProfiles = await Storages.optimizer.query( const db = await Storages.db();
const optimizer = await Storages.optimizer();
const mentionedProfiles = await optimizer.query(
[{ kinds: [0], authors: mentionedPubkeys, limit: mentionedPubkeys.length }], [{ kinds: [0], authors: mentionedPubkeys, limit: mentionedPubkeys.length }],
); );
@ -53,7 +56,7 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise<
), ),
firstUrl ? unfurlCardCached(firstUrl) : null, firstUrl ? unfurlCardCached(firstUrl) : null,
viewerPubkey viewerPubkey
? await Storages.db.query([ ? await db.query([
{ kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
{ kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 },
{ kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 },