diff --git a/deno.json b/deno.json index 7cd56f0..1ead2b9 100644 --- a/deno.json +++ b/deno.json @@ -25,6 +25,7 @@ "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", + "@std/assert": "jsr:@std/assert@^0.225.1", "@std/cli": "jsr:@std/cli@^0.223.0", "@std/crypto": "jsr:@std/crypto@^0.224.0", "@std/dotenv": "jsr:@std/dotenv@^0.224.0", diff --git a/src/app.ts b/src/app.ts index 84ad780..059e883 100644 --- a/src/app.ts +++ b/src/app.ts @@ -3,7 +3,8 @@ import Debug from '@soapbox/stickynotes/debug'; import { type Context, Env as HonoEnv, type Handler, Hono, Input as HonoInput, type MiddlewareHandler } from 'hono'; import { cors, logger, serveStatic } from 'hono/middleware'; -import '@/firehose.ts'; +import { Conf } from '@/config.ts'; +import { startFirehose } from '@/firehose.ts'; import { Time } from '@/utils.ts'; import { actorController } from '@/controllers/activitypub/actor.ts'; @@ -108,6 +109,10 @@ const app = new Hono(); const debug = Debug('ditto:http'); +if (Conf.firehoseEnabled) { + startFirehose(); +} + app.use('/api/*', logger(debug)); app.use('/relay/*', logger(debug)); app.use('/.well-known/*', logger(debug)); diff --git a/src/config.ts b/src/config.ts index 4266033..3d3e51b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -215,6 +215,10 @@ class Conf { return Number(Deno.env.get('PG_POOL_SIZE') ?? 10); }, }; + /** Whether to enable requesting events from known relays. */ + static get firehoseEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; + } } const optionalBooleanSchema = z diff --git a/src/controllers/activitypub/actor.ts b/src/controllers/activitypub/actor.ts index e82a88a..19f5f10 100644 --- a/src/controllers/activitypub/actor.ts +++ b/src/controllers/activitypub/actor.ts @@ -9,7 +9,7 @@ const actorController: AppController = async (c) => { const username = c.req.param('username'); const { signal } = c.req.raw; - const pointer = await localNip05Lookup(username); + const pointer = await localNip05Lookup(c.get('store'), username); if (!pointer) return notFound(c); const event = await getAuthor(pointer.pubkey, { signal }); diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 88d19b1..5c26ba5 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -94,15 +94,16 @@ const accountSearchController: AppController = async (c) => { } const query = decodeURIComponent(q); + const store = await Storages.search(); const [event, events] = await Promise.all([ lookupAccount(query), - Storages.search.query([{ kinds: [0], search: query, limit: 20 }], { signal: c.req.raw.signal }), + store.query([{ kinds: [0], search: query, limit: 20 }], { signal: c.req.raw.signal }), ]); const results = await hydrateEvents({ events: event ? [event, ...events] : events, - storage: Storages.db, + store, signal: c.req.raw.signal, }); @@ -147,8 +148,10 @@ const accountStatusesController: AppController = async (c) => { const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query()); const { signal } = c.req.raw; + const store = await Storages.db(); + if (pinned) { - const [pinEvent] = await Storages.db.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal }); + const [pinEvent] = await store.query([{ kinds: [10001], authors: [pubkey], limit: 1 }], { signal }); if (pinEvent) { const pinnedEventIds = getTagSet(pinEvent.tags, 'e'); return renderStatuses(c, [...pinnedEventIds].reverse()); @@ -169,8 +172,8 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - const events = await Storages.db.query([filter], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.db, signal })) + const events = await store.query([filter], { signal }) + .then((events) => hydrateEvents({ events, store, signal })) .then((events) => { if (exclude_replies) { return events.filter((event) => !findReplyTag(event.tags)); @@ -244,7 +247,7 @@ const followController: AppController = async (c) => { const targetPubkey = c.req.param('pubkey'); await updateListEvent( - { kinds: [3], authors: [sourcePubkey] }, + { kinds: [3], authors: [sourcePubkey], limit: 1 }, (tags) => addTag(tags, ['p', targetPubkey]), c, ); @@ -261,7 +264,7 @@ const unfollowController: AppController = async (c) => { const targetPubkey = c.req.param('pubkey'); await updateListEvent( - { kinds: [3], authors: [sourcePubkey] }, + { kinds: [3], authors: [sourcePubkey], limit: 1 }, (tags) => deleteTag(tags, ['p', targetPubkey]), c, ); @@ -298,7 +301,7 @@ const muteController: AppController = async (c) => { const targetPubkey = c.req.param('pubkey'); await updateListEvent( - { kinds: [10000], authors: [sourcePubkey] }, + { kinds: [10000], authors: [sourcePubkey], limit: 1 }, (tags) => addTag(tags, ['p', targetPubkey]), c, ); @@ -313,7 +316,7 @@ const unmuteController: AppController = async (c) => { const targetPubkey = c.req.param('pubkey'); await updateListEvent( - { kinds: [10000], authors: [sourcePubkey] }, + { kinds: [10000], authors: [sourcePubkey], limit: 1 }, (tags) => deleteTag(tags, ['p', targetPubkey]), c, ); @@ -327,7 +330,9 @@ const favouritesController: AppController = async (c) => { const params = paginationSchema.parse(c.req.query()); const { signal } = c.req.raw; - const events7 = await Storages.db.query( + const store = await Storages.db(); + + const events7 = await store.query( [{ kinds: [7], authors: [pubkey], ...params }], { signal }, ); @@ -336,8 +341,8 @@ const favouritesController: AppController = async (c) => { .map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1]) .filter((id): id is string => !!id); - const events1 = await Storages.db.query([{ kinds: [1], ids }], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.db, signal })); + const events1 = await store.query([{ kinds: [1], ids }], { signal }) + .then((events) => hydrateEvents({ events, store, signal })); const viewerPubkey = await c.get('signer')?.getPublicKey(); diff --git a/src/controllers/api/admin.ts b/src/controllers/api/admin.ts index 99a8e5b..b9464a3 100644 --- a/src/controllers/api/admin.ts +++ b/src/controllers/api/admin.ts @@ -39,12 +39,13 @@ const adminAccountsController: AppController = async (c) => { return c.json([]); } + const store = await Storages.db(); const { since, until, limit } = paginationSchema.parse(c.req.query()); const { signal } = c.req.raw; - const events = await Storages.db.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal }); + const events = await store.query([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }], { signal }); const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!); - const authors = await Storages.db.query([{ kinds: [0], authors: pubkeys }], { signal }); + const authors = await store.query([{ kinds: [0], authors: pubkeys }], { signal }); for (const event of events) { const d = event.tags.find(([name]) => name === 'd')?.[1]; @@ -78,7 +79,7 @@ const adminAccountAction: AppController = async (c) => { } await updateListAdminEvent( - { kinds: [10000], authors: [Conf.pubkey] }, + { kinds: [10000], authors: [Conf.pubkey], limit: 1 }, (tags) => addTag(tags, ['p', authorId]), c, ); diff --git a/src/controllers/api/bookmarks.ts b/src/controllers/api/bookmarks.ts index 1616fa2..7655182 100644 --- a/src/controllers/api/bookmarks.ts +++ b/src/controllers/api/bookmarks.ts @@ -5,10 +5,11 @@ import { renderStatuses } from '@/views.ts'; /** https://docs.joinmastodon.org/methods/bookmarks/#get */ const bookmarksController: AppController = async (c) => { + const store = await Storages.db(); const pubkey = await c.get('signer')?.getPublicKey()!; const { signal } = c.req.raw; - const [event10003] = await Storages.db.query( + const [event10003] = await store.query( [{ kinds: [10003], authors: [pubkey], limit: 1 }], { signal }, ); diff --git a/src/controllers/api/ditto.ts b/src/controllers/api/ditto.ts index f0f7036..df4f210 100644 --- a/src/controllers/api/ditto.ts +++ b/src/controllers/api/ditto.ts @@ -16,7 +16,9 @@ const relaySchema = z.object({ type RelayEntity = z.infer; export const adminRelaysController: AppController = async (c) => { - const [event] = await Storages.db.query([ + const store = await Storages.db(); + + const [event] = await store.query([ { kinds: [10002], authors: [Conf.pubkey], limit: 1 }, ]); @@ -28,6 +30,7 @@ export const adminRelaysController: AppController = async (c) => { }; export const adminSetRelaysController: AppController = async (c) => { + const store = await Storages.db(); const relays = relaySchema.array().parse(await c.req.json()); const event = await new AdminSigner().signEvent({ @@ -37,7 +40,7 @@ export const adminSetRelaysController: AppController = async (c) => { created_at: Math.floor(Date.now() / 1000), }); - await Storages.db.event(event); + await store.event(event); return c.json(renderRelays(event)); }; diff --git a/src/controllers/api/instance.ts b/src/controllers/api/instance.ts index cc71b1f..5f949b0 100644 --- a/src/controllers/api/instance.ts +++ b/src/controllers/api/instance.ts @@ -1,10 +1,11 @@ import { AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; +import { Storages } from '@/storages.ts'; import { getInstanceMetadata } from '@/utils/instance.ts'; const instanceController: AppController = async (c) => { const { host, protocol } = Conf.url; - const meta = await getInstanceMetadata(c.req.raw.signal); + const meta = await getInstanceMetadata(await Storages.db(), c.req.raw.signal); /** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */ const wsProtocol = protocol === 'http:' ? 'ws:' : 'wss:'; diff --git a/src/controllers/api/mutes.ts b/src/controllers/api/mutes.ts index fe048e9..4afb6c4 100644 --- a/src/controllers/api/mutes.ts +++ b/src/controllers/api/mutes.ts @@ -5,10 +5,11 @@ import { renderAccounts } from '@/views.ts'; /** https://docs.joinmastodon.org/methods/mutes/#get */ const mutesController: AppController = async (c) => { + const store = await Storages.db(); const pubkey = await c.get('signer')?.getPublicKey()!; const { signal } = c.req.raw; - const [event10000] = await Storages.db.query( + const [event10000] = await store.query( [{ kinds: [10000], authors: [pubkey], limit: 1 }], { signal }, ); diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 857f2a3..ba15bd0 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -20,7 +20,7 @@ async function renderNotifications(c: AppContext, filters: NostrFilter[]) { const events = await store .query(filters, { signal }) .then((events) => events.filter((event) => event.pubkey !== pubkey)) - .then((events) => hydrateEvents({ events, storage: store, signal })); + .then((events) => hydrateEvents({ events, store, signal })); if (!events.length) { return c.json([]); diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index 4b693c4..3bbdd70 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -1,4 +1,4 @@ -import { NSchema as n } from '@nostrify/nostrify'; +import { NSchema as n, NStore } from '@nostrify/nostrify'; import { z } from 'zod'; import { type AppController } from '@/app.ts'; @@ -9,7 +9,8 @@ import { Storages } from '@/storages.ts'; import { createAdminEvent } from '@/utils/api.ts'; const frontendConfigController: AppController = async (c) => { - const configs = await getConfigs(c.req.raw.signal); + const store = await Storages.db(); + const configs = await getConfigs(store, c.req.raw.signal); const frontendConfig = configs.find(({ group, key }) => group === ':pleroma' && key === ':frontend_configurations'); if (frontendConfig) { @@ -25,7 +26,8 @@ const frontendConfigController: AppController = async (c) => { }; const configController: AppController = async (c) => { - const configs = await getConfigs(c.req.raw.signal); + const store = await Storages.db(); + const configs = await getConfigs(store, c.req.raw.signal); return c.json({ configs, need_reboot: false }); }; @@ -33,7 +35,8 @@ const configController: AppController = async (c) => { const updateConfigController: AppController = async (c) => { const { pubkey } = Conf; - const configs = await getConfigs(c.req.raw.signal); + const store = await Storages.db(); + const configs = await getConfigs(store, c.req.raw.signal); const { configs: newConfigs } = z.object({ configs: z.array(configSchema) }).parse(await c.req.json()); for (const { group, key, value } of newConfigs) { @@ -63,10 +66,10 @@ const pleromaAdminDeleteStatusController: AppController = async (c) => { return c.json({}); }; -async function getConfigs(signal: AbortSignal): Promise { +async function getConfigs(store: NStore, signal: AbortSignal): Promise { const { pubkey } = Conf; - const [event] = await Storages.db.query([{ + const [event] = await store.query([{ kinds: [30078], authors: [pubkey], '#d': ['pub.ditto.pleroma.config'], diff --git a/src/controllers/api/reports.ts b/src/controllers/api/reports.ts index 55fb601..9cb2627 100644 --- a/src/controllers/api/reports.ts +++ b/src/controllers/api/reports.ts @@ -48,7 +48,7 @@ const reportController: AppController = async (c) => { tags, }, c); - await hydrateEvents({ events: [event], storage: store }); + await hydrateEvents({ events: [event], store }); return c.json(await renderReport(event)); }; @@ -58,7 +58,7 @@ const adminReportsController: AppController = async (c) => { const viewerPubkey = await c.get('signer')?.getPublicKey(); const reports = await store.query([{ kinds: [1984], '#P': [Conf.pubkey] }]) - .then((events) => hydrateEvents({ storage: store, events: events, signal: c.req.raw.signal })) + .then((events) => hydrateEvents({ store, events: events, signal: c.req.raw.signal })) .then((events) => Promise.all( events.map((event) => renderAdminReport(event, { viewerPubkey })), @@ -85,7 +85,7 @@ const adminReportController: AppController = async (c) => { return c.json({ error: 'This action is not allowed' }, 403); } - await hydrateEvents({ events: [event], storage: store, signal }); + await hydrateEvents({ events: [event], store, signal }); return c.json(await renderAdminReport(event, { viewerPubkey: pubkey })); }; @@ -107,7 +107,7 @@ const adminReportResolveController: AppController = async (c) => { return c.json({ error: 'This action is not allowed' }, 403); } - await hydrateEvents({ events: [event], storage: store, signal }); + await hydrateEvents({ events: [event], store, signal }); await createAdminEvent({ kind: 5, diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index fe08ace..0151f7d 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -78,7 +78,7 @@ const searchController: AppController = async (c) => { }; /** Get events for the search params. */ -function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise { +async function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: AbortSignal): Promise { if (type === 'hashtags') return Promise.resolve([]); const filter: NostrFilter = { @@ -91,8 +91,10 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: Abort filter.authors = [account_id]; } - return Storages.search.query([filter], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.search, signal })); + const store = await Storages.search(); + + return store.query([filter], { signal }) + .then((events) => hydrateEvents({ events, store, signal })); } /** Get event kinds to search from `type` query param. */ @@ -110,9 +112,10 @@ function typeToKinds(type: SearchQuery['type']): number[] { /** Resolve a searched value into an event, if applicable. */ async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise { const filters = await getLookupFilters(query, signal); + const store = await Storages.search(); - return Storages.search.query(filters, { limit: 1, signal }) - .then((events) => hydrateEvents({ events, storage: Storages.search, signal })) + return store.query(filters, { limit: 1, signal }) + .then((events) => hydrateEvents({ events, store, signal })) .then(([event]) => event); } diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 1138c0a..a52a408 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -140,7 +140,7 @@ const createStatusController: AppController = async (c) => { if (data.quote_id) { await hydrateEvents({ events: [event], - storage: Storages.db, + store: await Storages.db(), signal: c.req.raw.signal, }); } @@ -248,7 +248,7 @@ const reblogStatusController: AppController = async (c) => { await hydrateEvents({ events: [reblogEvent], - storage: Storages.db, + store: await Storages.db(), signal: signal, }); @@ -260,23 +260,30 @@ const reblogStatusController: AppController = async (c) => { /** https://docs.joinmastodon.org/methods/statuses/#unreblog */ const unreblogStatusController: AppController = async (c) => { const eventId = c.req.param('id'); - const pubkey = await c.get('signer')?.getPublicKey() as string; + const pubkey = await c.get('signer')?.getPublicKey()!; - const event = await getEvent(eventId, { - kind: 1, - }); - if (!event) return c.json({ error: 'Event not found.' }, 404); + const event = await getEvent(eventId, { kind: 1 }); - const filters: NostrFilter[] = [{ kinds: [6], authors: [pubkey], '#e': [event.id] }]; - const [repostedEvent] = await Storages.db.query(filters, { limit: 1 }); - if (!repostedEvent) return c.json({ error: 'Event not found.' }, 404); + if (!event) { + return c.json({ error: 'Event not found.' }, 404); + } + + const store = await Storages.db(); + + const [repostedEvent] = await store.query( + [{ kinds: [6], authors: [pubkey], '#e': [event.id], limit: 1 }], + ); + + if (!repostedEvent) { + return c.json({ error: 'Event not found.' }, 404); + } await createEvent({ kind: 5, tags: [['e', repostedEvent.id]], }, c); - return c.json(await renderStatus(event, {})); + return c.json(await renderStatus(event, { viewerPubkey: pubkey })); }; const rebloggedByController: AppController = (c) => { @@ -297,7 +304,7 @@ const bookmarkController: AppController = async (c) => { if (event) { await updateListEvent( - { kinds: [10003], authors: [pubkey] }, + { kinds: [10003], authors: [pubkey], limit: 1 }, (tags) => addTag(tags, ['e', eventId]), c, ); @@ -324,7 +331,7 @@ const unbookmarkController: AppController = async (c) => { if (event) { await updateListEvent( - { kinds: [10003], authors: [pubkey] }, + { kinds: [10003], authors: [pubkey], limit: 1 }, (tags) => deleteTag(tags, ['e', eventId]), c, ); @@ -351,7 +358,7 @@ const pinController: AppController = async (c) => { if (event) { await updateListEvent( - { kinds: [10001], authors: [pubkey] }, + { kinds: [10001], authors: [pubkey], limit: 1 }, (tags) => addTag(tags, ['e', eventId]), c, ); @@ -380,7 +387,7 @@ const unpinController: AppController = async (c) => { if (event) { await updateListEvent( - { kinds: [10001], authors: [pubkey] }, + { kinds: [10001], authors: [pubkey], limit: 1 }, (tags) => deleteTag(tags, ['e', eventId]), c, ); diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8d22d5c..e79c51e 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -68,13 +68,15 @@ const streamingController: AppController = (c) => { if (!filter) return; try { - for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + const store = await Storages.pubsub(); + + for await (const msg of store.req([filter], { signal: controller.signal })) { if (msg[0] === 'EVENT') { const event = msg[2]; await hydrateEvents({ events: [event], - storage: Storages.admin, + store, signal: AbortSignal.timeout(1000), }); diff --git a/src/controllers/api/suggestions.ts b/src/controllers/api/suggestions.ts index bde0916..6377bd4 100644 --- a/src/controllers/api/suggestions.ts +++ b/src/controllers/api/suggestions.ts @@ -40,7 +40,7 @@ async function renderSuggestedAccounts(store: NStore, signal?: AbortSignal) { [{ kinds: [0], authors: pubkeys, limit: pubkeys.length }], { signal }, ) - .then((events) => hydrateEvents({ events, storage: store, signal })); + .then((events) => hydrateEvents({ events, store, signal })); const accounts = await Promise.all(pubkeys.map((pubkey) => { const profile = profiles.find((event) => event.pubkey === pubkey); diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index 0880d84..e83c50c 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -49,13 +49,7 @@ async function renderStatuses(c: AppContext, filters: NostrFilter[]) { const events = await store .query(filters, { signal }) - .then((events) => - hydrateEvents({ - events, - storage: store, - signal, - }) - ); + .then((events) => hydrateEvents({ events, store, signal })); if (!events.length) { return c.json([]); diff --git a/src/controllers/nostr/relay-info.ts b/src/controllers/nostr/relay-info.ts index 192cab2..bbce7d3 100644 --- a/src/controllers/nostr/relay-info.ts +++ b/src/controllers/nostr/relay-info.ts @@ -1,9 +1,11 @@ import { AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; +import { Storages } from '@/storages.ts'; import { getInstanceMetadata } from '@/utils/instance.ts'; const relayInfoController: AppController = async (c) => { - const meta = await getInstanceMetadata(c.req.raw.signal); + const store = await Storages.db(); + const meta = await getInstanceMetadata(store, c.req.raw.signal); return c.json({ name: meta.name, diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index c0fa026..259f5e9 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -72,14 +72,17 @@ function connectStream(socket: WebSocket) { controllers.get(subId)?.abort(); controllers.set(subId, controller); - for (const event of await Storages.db.query(filters, { limit: FILTER_LIMIT })) { + const db = await Storages.db(); + const pubsub = await Storages.pubsub(); + + for (const event of await db.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } send(['EOSE', subId]); try { - for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + for await (const msg of pubsub.req(filters, { signal: controller.signal })) { if (msg[0] === 'EVENT') { send(['EVENT', subId, msg[2]]); } @@ -116,7 +119,8 @@ function connectStream(socket: WebSocket) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...rest]: NostrClientCOUNT): Promise { - const { count } = await Storages.db.count(prepareFilters(rest)); + const store = await Storages.db(); + const { count } = await store.count(prepareFilters(rest)); send(['COUNT', subId, { count, approximate: false }]); } diff --git a/src/controllers/well-known/nostr.ts b/src/controllers/well-known/nostr.ts index f1ebb6b..0669888 100644 --- a/src/controllers/well-known/nostr.ts +++ b/src/controllers/well-known/nostr.ts @@ -12,7 +12,7 @@ const nameSchema = z.string().min(1).regex(/^\w+$/); const nostrController: AppController = async (c) => { const result = nameSchema.safeParse(c.req.query('name')); const name = result.success ? result.data : undefined; - const pointer = name ? await localNip05Lookup(name) : undefined; + const pointer = name ? await localNip05Lookup(c.get('store'), name) : undefined; if (!name || !pointer) { return c.json({ names: {}, relays: {} }); diff --git a/src/controllers/well-known/webfinger.ts b/src/controllers/well-known/webfinger.ts index 38bc994..c1c8b81 100644 --- a/src/controllers/well-known/webfinger.ts +++ b/src/controllers/well-known/webfinger.ts @@ -45,7 +45,7 @@ async function handleAcct(c: AppContext, resource: URL): Promise { } const [username, host] = result.data; - const pointer = await localNip05Lookup(username); + const pointer = await localNip05Lookup(c.get('store'), username); if (!pointer) { return c.json({ error: 'Not found' }, 404); diff --git a/src/db.ts b/src/db.ts deleted file mode 100644 index 1a5f06d..0000000 --- a/src/db.ts +++ /dev/null @@ -1,41 +0,0 @@ -import fs from 'node:fs/promises'; -import path from 'node:path'; - -import { FileMigrationProvider, Migrator } from 'kysely'; - -import { DittoDB } from '@/db/DittoDB.ts'; - -const db = await DittoDB.getInstance(); - -const migrator = new Migrator({ - db, - provider: new FileMigrationProvider({ - fs, - path, - migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname, - }), -}); - -/** Migrate the database to the latest version. */ -async function migrate() { - console.info('Running migrations...'); - const results = await migrator.migrateToLatest(); - - if (results.error) { - console.error(results.error); - Deno.exit(1); - } else { - if (!results.results?.length) { - console.info('Everything up-to-date.'); - } else { - console.info('Migrations finished!'); - for (const { migrationName, status } of results.results!) { - console.info(` - ${migrationName}: ${status}`); - } - } - } -} - -await migrate(); - -export { db }; diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index abe068b..9c3b280 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -1,4 +1,7 @@ -import { Kysely } from 'kysely'; +import fs from 'node:fs/promises'; +import path from 'node:path'; + +import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; @@ -6,17 +9,63 @@ import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoTables } from '@/db/DittoTables.ts'; export class DittoDB { + private static kysely: Promise> | undefined; + static getInstance(): Promise> { + if (!this.kysely) { + this.kysely = this._getInstance(); + } + return this.kysely; + } + + static async _getInstance(): Promise> { const { databaseUrl } = Conf; + let kysely: Kysely; + switch (databaseUrl.protocol) { case 'sqlite:': - return DittoSQLite.getInstance(); + kysely = await DittoSQLite.getInstance(); + break; case 'postgres:': case 'postgresql:': - return DittoPostgres.getInstance(); + kysely = await DittoPostgres.getInstance(); + break; default: throw new Error('Unsupported database URL.'); } + + await this.migrate(kysely); + + return kysely; + } + + /** Migrate the database to the latest version. */ + private static async migrate(kysely: Kysely) { + const migrator = new Migrator({ + db: kysely, + provider: new FileMigrationProvider({ + fs, + path, + migrationFolder: new URL(import.meta.resolve('../db/migrations')).pathname, + }), + }); + + console.info('Running migrations...'); + const results = await migrator.migrateToLatest(); + + if (results.error) { + console.error(results.error); + Deno.exit(1); + } else { + if (!results.results?.length) { + console.info('Everything up-to-date.'); + } else { + console.info('Migrations finished!'); + for (const { migrationName, status } of results.results!) { + console.info(` - ${migrationName}: ${status}`); + } + } + } } } diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index 960abe8..708e2f9 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -1,6 +1,6 @@ import uuid62 from 'uuid62'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { type MediaData } from '@/schemas/nostr.ts'; interface UnattachedMedia { @@ -19,7 +19,8 @@ async function insertUnattachedMedia(media: Omit { if (!urls.length) return; - await db.deleteFrom('unattached_media') + const kysely = await DittoDB.getInstance(); + await kysely.deleteFrom('unattached_media') .where('pubkey', '=', pubkey) .where('url', 'in', urls) .execute(); diff --git a/src/db/users.ts b/src/db/users.ts index c7659e4..bf0cab7 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -60,7 +60,8 @@ async function findUser(user: Partial, signal?: AbortSignal): Promise { - debug(`NostrEvent<${event.kind}> ${event.id}`); + for await (const msg of store.req([{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }])) { + if (msg[0] === 'EVENT') { + const event = msg[2]; + console.debug(`NostrEvent<${event.kind}> ${event.id}`); - return pipeline - .handleEvent(event, AbortSignal.timeout(5000)) - .catch(() => {}); + pipeline + .handleEvent(event, AbortSignal.timeout(5000)) + .catch(() => {}); + } + } } diff --git a/src/middleware/storeMiddleware.ts b/src/middleware/storeMiddleware.ts index efb65ed..4e24ab0 100644 --- a/src/middleware/storeMiddleware.ts +++ b/src/middleware/storeMiddleware.ts @@ -7,10 +7,10 @@ export const storeMiddleware: AppMiddleware = async (c, next) => { const pubkey = await c.get('signer')?.getPublicKey(); if (pubkey) { - const store = new UserStore(pubkey, Storages.admin); + const store = new UserStore(pubkey, await Storages.admin()); c.set('store', store); } else { - c.set('store', Storages.admin); + c.set('store', await Storages.admin()); } await next(); }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 25cadb5..48e5c38 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -5,7 +5,7 @@ import Debug from '@soapbox/stickynotes/debug'; import { sql } from 'kysely'; import { Conf } from '@/config.ts'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { isEphemeralKind } from '@/kinds.ts'; @@ -57,7 +57,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { const policies: NPolicy[] = [ - new MuteListPolicy(Conf.pubkey, Storages.admin), + new MuteListPolicy(Conf.pubkey, await Storages.admin()), ]; try { @@ -76,17 +76,23 @@ async function policyFilter(event: NostrEvent): Promise { /** Encounter the event, and return whether it has already been encountered. */ async function encounterEvent(event: NostrEvent, signal: AbortSignal): Promise { - const [existing] = await Storages.cache.query([{ ids: [event.id], limit: 1 }]); - Storages.cache.event(event); - Storages.reqmeister.event(event, { signal }); + const cache = await Storages.cache(); + const reqmeister = await Storages.reqmeister(); + + const [existing] = await cache.query([{ ids: [event.id], limit: 1 }]); + + cache.event(event); + reqmeister.event(event, { signal }); + return !!existing; } /** Hydrate the event with the user, if applicable. */ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { - await hydrateEvents({ events: [event], storage: Storages.db, signal }); + await hydrateEvents({ events: [event], store: await Storages.db(), signal }); - const domain = await db + const kysely = await DittoDB.getInstance(); + const domain = await kysely .selectFrom('pubkey_domains') .select('domain') .where('pubkey', '=', event.pubkey) @@ -98,8 +104,9 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { if (isEphemeralKind(event.kind)) return; + const store = await Storages.db(); - const [deletion] = await Storages.db.query( + const [deletion] = await store.query( [{ kinds: [5], authors: [Conf.pubkey, event.pubkey], '#e': [event.id], limit: 1 }], { signal }, ); @@ -108,7 +115,7 @@ async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise pubkey_domains.last_updated_at - `.execute(db); + `.execute(kysely); } catch (_e) { // do nothing } @@ -153,17 +161,18 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise { if (event.kind === 5) { const ids = getTagSet(event.tags, 'e'); + const store = await Storages.db(); if (event.pubkey === Conf.pubkey) { - await Storages.db.remove([{ ids: [...ids] }], { signal }); + await store.remove([{ ids: [...ids] }], { signal }); } else { - const events = await Storages.db.query( + const events = await store.query( [{ ids: [...ids], authors: [event.pubkey] }], { signal }, ); const deleteIds = events.map(({ id }) => id); - await Storages.db.remove([{ ids: deleteIds }], { signal }); + await store.remove([{ ids: deleteIds }], { signal }); } } } @@ -189,19 +198,22 @@ async function trackHashtags(event: NostrEvent): Promise { /** Queue related events to fetch. */ async function fetchRelatedEvents(event: DittoEvent) { + const cache = await Storages.cache(); + const reqmeister = await Storages.reqmeister(); + if (!event.author) { const signal = AbortSignal.timeout(3000); - Storages.reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) + reqmeister.query([{ kinds: [0], authors: [event.pubkey] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) .catch(() => {}); } for (const [name, id] of event.tags) { if (name === 'e') { - const { count } = await Storages.cache.count([{ ids: [id] }]); + const { count } = await cache.count([{ ids: [id] }]); if (!count) { const signal = AbortSignal.timeout(3000); - Storages.reqmeister.query([{ ids: [id] }], { signal }) + reqmeister.query([{ ids: [id] }], { signal }) .then((events) => Promise.allSettled(events.map((event) => handleEvent(event, signal)))) .catch(() => {}); } @@ -272,7 +284,8 @@ function isFresh(event: NostrEvent): boolean { /** Distribute the event through active subscriptions. */ async function streamOut(event: NostrEvent): Promise { if (isFresh(event)) { - await Storages.pubsub.event(event); + const pubsub = await Storages.pubsub(); + await pubsub.event(event); } } diff --git a/src/pipeline/DVM.ts b/src/pipeline/DVM.ts index 953e9be..a811067 100644 --- a/src/pipeline/DVM.ts +++ b/src/pipeline/DVM.ts @@ -34,7 +34,9 @@ export class DVM { return DVM.feedback(event, 'error', `Forbidden user: ${user}`); } - const [label] = await Storages.db.query([{ + const store = await Storages.db(); + + const [label] = await store.query([{ kinds: [1985], authors: [admin], '#L': ['nip05'], diff --git a/src/pool.ts b/src/pool.ts deleted file mode 100644 index 3ac1a1d..0000000 --- a/src/pool.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { RelayPoolWorker } from 'nostr-relaypool'; - -import { Storages } from '@/storages.ts'; -import { Conf } from '@/config.ts'; - -const [relayList] = await Storages.db.query([ - { kinds: [10002], authors: [Conf.pubkey], limit: 1 }, -]); - -const tags = relayList?.tags ?? []; - -const activeRelays = tags.reduce((acc, [name, url, marker]) => { - if (name === 'r' && !marker) { - acc.push(url); - } - return acc; -}, []); - -console.log(`pool: connecting to ${activeRelays.length} relays.`); - -const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { - type: 'module', -}); - -// @ts-ignore Wrong types. -const pool = new RelayPoolWorker(worker, activeRelays, { - autoReconnect: true, - // The pipeline verifies events. - skipVerification: true, - // The logging feature overwhelms the CPU and creates too many logs. - logErrorsAndNotices: false, -}); - -export { activeRelays, pool }; diff --git a/src/queries.ts b/src/queries.ts index 5626c45..76fabfd 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -25,6 +25,7 @@ const getEvent = async ( opts: GetEventOpts = {}, ): Promise => { debug(`getEvent: ${id}`); + const store = await Storages.optimizer(); const { kind, signal = AbortSignal.timeout(1000) } = opts; const filter: NostrFilter = { ids: [id], limit: 1 }; @@ -32,23 +33,25 @@ const getEvent = async ( filter.kinds = [kind]; } - return await Storages.optimizer.query([filter], { limit: 1, signal }) - .then((events) => hydrateEvents({ events, storage: Storages.optimizer, signal })) + return await store.query([filter], { limit: 1, signal }) + .then((events) => hydrateEvents({ events, store, signal })) .then(([event]) => event); }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts = {}): Promise => { + const store = await Storages.optimizer(); const { signal = AbortSignal.timeout(1000) } = opts; - return await Storages.optimizer.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal }) - .then((events) => hydrateEvents({ events, storage: Storages.optimizer, signal })) + return await store.query([{ authors: [pubkey], kinds: [0], limit: 1 }], { limit: 1, signal }) + .then((events) => hydrateEvents({ events, store, signal })) .then(([event]) => event); }; /** Get users the given pubkey follows. */ const getFollows = async (pubkey: string, signal?: AbortSignal): Promise => { - const [event] = await Storages.db.query([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); + const store = await Storages.db(); + const [event] = await store.query([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; @@ -84,15 +87,18 @@ async function getAncestors(event: NostrEvent, result: NostrEvent[] = []): Promi } async function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise { - const events = await Storages.db.query([{ kinds: [1], '#e': [eventId] }], { limit: 200, signal }); - return hydrateEvents({ events, storage: Storages.db, signal }); + const store = await Storages.db(); + const events = await store.query([{ kinds: [1], '#e': [eventId] }], { limit: 200, signal }); + return hydrateEvents({ events, store, signal }); } /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { const { host } = Conf.url; - const [event] = await Storages.db.query( + const store = await Storages.db(); + + const [event] = await store.query( [{ kinds: [3], '#p': [pubkey], search: `domain:${host}`, limit: 1 }], { limit: 1 }, ); diff --git a/src/signers/ConnectSigner.ts b/src/signers/ConnectSigner.ts index 4dda0b1..f482413 100644 --- a/src/signers/ConnectSigner.ts +++ b/src/signers/ConnectSigner.ts @@ -1,5 +1,5 @@ // deno-lint-ignore-file require-await -import { NConnectSigner } from '@nostrify/nostrify'; +import { NConnectSigner, NostrEvent, NostrSigner } from '@nostrify/nostrify'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Storages } from '@/storages.ts'; @@ -9,24 +9,55 @@ import { Storages } from '@/storages.ts'; * * Simple extension of nostrify's `NConnectSigner`, with our options to keep it DRY. */ -export class ConnectSigner extends NConnectSigner { - private _pubkey: string; +export class ConnectSigner implements NostrSigner { + private signer: Promise; - constructor(pubkey: string, private relays?: string[]) { - super({ - pubkey, + constructor(private pubkey: string, private relays?: string[]) { + this.signer = this.init(); + } + + async init(): Promise { + return new NConnectSigner({ + pubkey: this.pubkey, // TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list) - relay: Storages.pubsub, + relay: await Storages.pubsub(), signer: new AdminSigner(), timeout: 60000, }); - - this._pubkey = pubkey; } + async signEvent(event: Omit): Promise { + const signer = await this.signer; + return signer.signEvent(event); + } + + readonly nip04 = { + encrypt: async (pubkey: string, plaintext: string): Promise => { + const signer = await this.signer; + return signer.nip04.encrypt(pubkey, plaintext); + }, + + decrypt: async (pubkey: string, ciphertext: string): Promise => { + const signer = await this.signer; + return signer.nip04.decrypt(pubkey, ciphertext); + }, + }; + + readonly nip44 = { + encrypt: async (pubkey: string, plaintext: string): Promise => { + const signer = await this.signer; + return signer.nip44.encrypt(pubkey, plaintext); + }, + + decrypt: async (pubkey: string, ciphertext: string): Promise => { + const signer = await this.signer; + return signer.nip44.decrypt(pubkey, ciphertext); + }, + }; + // Prevent unnecessary NIP-46 round-trips. async getPublicKey(): Promise { - return this._pubkey; + return this.pubkey; } /** Get the user's relays if they passed in an `nprofile` auth token. */ diff --git a/src/stats.ts b/src/stats.ts index 21a4d97..f8efe16 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -2,7 +2,7 @@ import { NKinds, NostrEvent } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; import { InsertQueryBuilder } from 'kysely'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { Storages } from '@/storages.ts'; import { findReplyTag } from '@/tags.ts'; @@ -25,7 +25,7 @@ async function updateStats(event: NostrEvent) { if (event.kind === 3) { prev = await getPrevEvent(event); if (!prev || event.created_at >= prev.created_at) { - queries.push(updateFollowingCountQuery(event)); + queries.push(await updateFollowingCountQuery(event)); } } @@ -37,8 +37,8 @@ async function updateStats(event: NostrEvent) { debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs })); } - if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); - if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); + if (pubkeyDiffs.length) queries.push(await authorStatsQuery(pubkeyDiffs)); + if (eventDiffs.length) queries.push(await eventStatsQuery(eventDiffs)); if (queries.length) { await Promise.all(queries.map((query) => query.execute())); @@ -47,6 +47,7 @@ async function updateStats(event: NostrEvent) { /** Calculate stats changes ahead of time so we can build an efficient query. */ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Promise { + const store = await Storages.db(); const statDiffs: StatDiff[] = []; const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1]; @@ -65,7 +66,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr case 5: { if (!firstTaggedId) break; - const [repostedEvent] = await Storages.db.query( + const [repostedEvent] = await store.query( [{ kinds: [6], ids: [firstTaggedId], authors: [event.pubkey] }], { limit: 1 }, ); @@ -77,7 +78,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr const eventBeingRepostedPubkey = repostedEvent.tags.find(([name]) => name === 'p')?.[1]; if (!eventBeingRepostedId || !eventBeingRepostedPubkey) break; - const [eventBeingReposted] = await Storages.db.query( + const [eventBeingReposted] = await store.query( [{ kinds: [1], ids: [eventBeingRepostedId], authors: [eventBeingRepostedPubkey] }], { limit: 1 }, ); @@ -101,7 +102,7 @@ async function getStatsDiff(event: NostrEvent, prev: NostrEvent | undefined): Pr } /** Create an author stats query from the list of diffs. */ -function authorStatsQuery(diffs: AuthorStatDiff[]) { +async function authorStatsQuery(diffs: AuthorStatDiff[]) { const values: DittoTables['author_stats'][] = diffs.map(([_, pubkey, stat, diff]) => { const row: DittoTables['author_stats'] = { pubkey, @@ -113,7 +114,8 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { return row; }); - return db.insertInto('author_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('author_stats') .values(values) .onConflict((oc) => oc @@ -127,7 +129,7 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { } /** Create an event stats query from the list of diffs. */ -function eventStatsQuery(diffs: EventStatDiff[]) { +async function eventStatsQuery(diffs: EventStatDiff[]) { const values: DittoTables['event_stats'][] = diffs.map(([_, event_id, stat, diff]) => { const row: DittoTables['event_stats'] = { event_id, @@ -139,7 +141,8 @@ function eventStatsQuery(diffs: EventStatDiff[]) { return row; }); - return db.insertInto('event_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('event_stats') .values(values) .onConflict((oc) => oc @@ -155,7 +158,9 @@ function eventStatsQuery(diffs: EventStatDiff[]) { /** Get the last version of the event, if any. */ async function getPrevEvent(event: NostrEvent): Promise { if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) { - const [prev] = await Storages.db.query([ + const store = await Storages.db(); + + const [prev] = await store.query([ { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, ]); @@ -164,14 +169,15 @@ async function getPrevEvent(event: NostrEvent): Promise } /** Set the following count to the total number of unique "p" tags in the follow list. */ -function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) { +async function updateFollowingCountQuery({ pubkey, tags }: NostrEvent) { const following_count = new Set( tags .filter(([name]) => name === 'p') .map(([_, value]) => value), ).size; - return db.insertInto('author_stats') + const kysely = await DittoDB.getInstance(); + return kysely.insertInto('author_stats') .values({ pubkey, following_count, diff --git a/src/storages.ts b/src/storages.ts index a51cbd1..1cf06c1 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,7 +1,9 @@ +// deno-lint-ignore-file require-await import { NCache } from '@nostrify/nostrify'; +import { RelayPoolWorker } from 'nostr-relaypool'; + import { Conf } from '@/config.ts'; -import { db } from '@/db.ts'; -import { activeRelays, pool } from '@/pool.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Optimizer } from '@/storages/optimizer.ts'; import { PoolStore } from '@/storages/pool-store.ts'; @@ -12,89 +14,130 @@ import { UserStore } from '@/storages/UserStore.ts'; import { Time } from '@/utils/time.ts'; export class Storages { - private static _db: EventsDB | undefined; - private static _admin: UserStore | undefined; - private static _cache: NCache | undefined; - private static _client: PoolStore | undefined; - private static _optimizer: Optimizer | undefined; - private static _reqmeister: Reqmeister | undefined; - private static _pubsub: InternalRelay | undefined; - private static _search: SearchStore | undefined; + private static _db: Promise | undefined; + private static _admin: Promise | undefined; + private static _cache: Promise | undefined; + private static _client: Promise | undefined; + private static _optimizer: Promise | undefined; + private static _reqmeister: Promise | undefined; + private static _pubsub: Promise | undefined; + private static _search: Promise | undefined; /** SQLite database to store events this Ditto server cares about. */ - public static get db(): EventsDB { + public static async db(): Promise { if (!this._db) { - this._db = new EventsDB(db); + this._db = (async () => { + const kysely = await DittoDB.getInstance(); + return new EventsDB(kysely); + })(); } return this._db; } /** Admin user storage. */ - public static get admin(): UserStore { + public static async admin(): Promise { if (!this._admin) { - this._admin = new UserStore(Conf.pubkey, this.db); + this._admin = Promise.resolve(new UserStore(Conf.pubkey, await this.db())); } return this._admin; } /** Internal pubsub relay between controllers and the pipeline. */ - public static get pubsub(): InternalRelay { + public static async pubsub(): Promise { if (!this._pubsub) { - this._pubsub = new InternalRelay(); + this._pubsub = Promise.resolve(new InternalRelay()); } return this._pubsub; } /** Relay pool storage. */ - public static get client(): PoolStore { + public static async client(): Promise { if (!this._client) { - this._client = new PoolStore({ - pool, - relays: activeRelays, - }); + this._client = (async () => { + const db = await this.db(); + + const [relayList] = await db.query([ + { kinds: [10002], authors: [Conf.pubkey], limit: 1 }, + ]); + + const tags = relayList?.tags ?? []; + + const activeRelays = tags.reduce((acc, [name, url, marker]) => { + if (name === 'r' && !marker) { + acc.push(url); + } + return acc; + }, []); + + console.log(`pool: connecting to ${activeRelays.length} relays.`); + + const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { + type: 'module', + }); + + // @ts-ignore Wrong types. + const pool = new RelayPoolWorker(worker, activeRelays, { + autoReconnect: true, + // The pipeline verifies events. + skipVerification: true, + // The logging feature overwhelms the CPU and creates too many logs. + logErrorsAndNotices: false, + }); + + return new PoolStore({ + pool, + relays: activeRelays, + }); + })(); } return this._client; } /** In-memory data store for cached events. */ - public static get cache(): NCache { + public static async cache(): Promise { if (!this._cache) { - this._cache = new NCache({ max: 3000 }); + this._cache = Promise.resolve(new NCache({ max: 3000 })); } return this._cache; } /** Batches requests for single events. */ - public static get reqmeister(): Reqmeister { + public static async reqmeister(): Promise { if (!this._reqmeister) { - this._reqmeister = new Reqmeister({ - client: this.client, - delay: Time.seconds(1), - timeout: Time.seconds(1), - }); + this._reqmeister = Promise.resolve( + new Reqmeister({ + client: await this.client(), + delay: Time.seconds(1), + timeout: Time.seconds(1), + }), + ); } return this._reqmeister; } /** Main Ditto storage adapter */ - public static get optimizer(): Optimizer { + public static async optimizer(): Promise { if (!this._optimizer) { - this._optimizer = new Optimizer({ - db: this.db, - cache: this.cache, - client: this.reqmeister, - }); + this._optimizer = Promise.resolve( + new Optimizer({ + db: await this.db(), + cache: await this.cache(), + client: await this.reqmeister(), + }), + ); } return this._optimizer; } /** Storage to use for remote search. */ - public static get search(): SearchStore { + public static async search(): Promise { if (!this._search) { - this._search = new SearchStore({ - relay: Conf.searchRelay, - fallback: this.optimizer, - }); + this._search = Promise.resolve( + new SearchStore({ + relay: Conf.searchRelay, + fallback: await this.optimizer(), + }), + ); } return this._search; } diff --git a/src/storages/UserStore.ts b/src/storages/UserStore.ts index 1c7aaee..c5657b6 100644 --- a/src/storages/UserStore.ts +++ b/src/storages/UserStore.ts @@ -4,13 +4,7 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { getTagSet } from '@/tags.ts'; export class UserStore implements NStore { - private store: NStore; - private pubkey: string; - - constructor(pubkey: string, store: NStore) { - this.pubkey = pubkey; - this.store = store; - } + constructor(private pubkey: string, private store: NStore) {} async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise { return await this.store.event(event, opts); @@ -21,12 +15,11 @@ export class UserStore implements NStore { * https://github.com/nostr-protocol/nips/blob/master/51.md#standard-lists */ async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - const allEvents = await this.store.query(filters, opts); + const events = await this.store.query(filters, opts); + const pubkeys = await this.getMutedPubkeys(); - const mutedPubkeys = await this.getMutedPubkeys(); - - return allEvents.filter((event) => { - return event.kind === 0 || mutedPubkeys.has(event.pubkey) === false; + return events.filter((event) => { + return event.kind === 0 || !pubkeys.has(event.pubkey); }); } diff --git a/src/storages/events-db.test.ts b/src/storages/events-db.test.ts index dd92c1b..d935e6b 100644 --- a/src/storages/events-db.test.ts +++ b/src/storages/events-db.test.ts @@ -1,12 +1,14 @@ -import { db } from '@/db.ts'; -import { assertEquals, assertRejects } from '@/deps-test.ts'; +import { assertEquals, assertRejects } from '@std/assert'; + +import { DittoDB } from '@/db/DittoDB.ts'; import event0 from '~/fixtures/events/event-0.json' with { type: 'json' }; import event1 from '~/fixtures/events/event-1.json' with { type: 'json' }; import { EventsDB } from '@/storages/events-db.ts'; -const eventsDB = new EventsDB(db); +const kysely = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); Deno.test('count filters', async () => { assertEquals((await eventsDB.count([{ kinds: [1] }])).count, 0); @@ -34,7 +36,7 @@ Deno.test('query events with domain search filter', async () => { assertEquals(await eventsDB.query([{ search: 'domain:localhost:8000' }]), []); assertEquals(await eventsDB.query([{ search: '' }]), [event1]); - await db + await kysely .insertInto('pubkey_domains') .values({ pubkey: event1.pubkey, domain: 'localhost:8000', last_updated_at: event1.created_at }) .execute(); diff --git a/src/storages/hydrate.test.ts b/src/storages/hydrate.test.ts index b55cd2b..f5c70af 100644 --- a/src/storages/hydrate.test.ts +++ b/src/storages/hydrate.test.ts @@ -17,7 +17,7 @@ Deno.test('hydrateEvents(): author --- WITHOUT stats', async () => { await hydrateEvents({ events: [event1], - storage: db, + store: db, }); const expectedEvent = { ...event1, author: event0 }; @@ -40,7 +40,7 @@ Deno.test('hydrateEvents(): repost --- WITHOUT stats', async () => { await hydrateEvents({ events: [event6], - storage: db, + store: db, }); const expectedEvent6 = { @@ -67,7 +67,7 @@ Deno.test('hydrateEvents(): quote repost --- WITHOUT stats', async () => { await hydrateEvents({ events: [event1quoteRepost], - storage: db, + store: db, }); const expectedEvent1quoteRepost = { @@ -95,7 +95,7 @@ Deno.test('hydrateEvents(): repost of quote repost --- WITHOUT stats', async () await hydrateEvents({ events: [event6], - storage: db, + store: db, }); const expectedEvent6 = { @@ -122,7 +122,7 @@ Deno.test('hydrateEvents(): report pubkey and post // kind 1984 --- WITHOUT stat await hydrateEvents({ events: [reportEvent], - storage: db, + store: db, }); const expectedEvent: DittoEvent = { diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index e197ca8..8d2d302 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -1,20 +1,20 @@ import { NostrEvent, NStore } from '@nostrify/nostrify'; import { matchFilter } from 'nostr-tools'; -import { db } from '@/db.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { Conf } from '@/config.ts'; interface HydrateOpts { events: DittoEvent[]; - storage: NStore; + store: NStore; signal?: AbortSignal; } /** Hydrate events using the provided storage. */ async function hydrateEvents(opts: HydrateOpts): Promise { - const { events, storage, signal } = opts; + const { events, store, signal } = opts; if (!events.length) { return events; @@ -22,31 +22,31 @@ async function hydrateEvents(opts: HydrateOpts): Promise { const cache = [...events]; - for (const event of await gatherReposts({ events: cache, storage, signal })) { + for (const event of await gatherReposts({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherReacted({ events: cache, storage, signal })) { + for (const event of await gatherReacted({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherQuotes({ events: cache, storage, signal })) { + for (const event of await gatherQuotes({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherAuthors({ events: cache, storage, signal })) { + for (const event of await gatherAuthors({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherUsers({ events: cache, storage, signal })) { + for (const event of await gatherUsers({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherReportedProfiles({ events: cache, storage, signal })) { + for (const event of await gatherReportedProfiles({ events: cache, store, signal })) { cache.push(event); } - for (const event of await gatherReportedNotes({ events: cache, storage, signal })) { + for (const event of await gatherReportedNotes({ events: cache, store, signal })) { cache.push(event); } @@ -123,7 +123,7 @@ function assembleEvents( } /** Collect reposts from the events. */ -function gatherReposts({ events, storage, signal }: HydrateOpts): Promise { +function gatherReposts({ events, store, signal }: HydrateOpts): Promise { const ids = new Set(); for (const event of events) { @@ -135,14 +135,14 @@ function gatherReposts({ events, storage, signal }: HydrateOpts): Promise { +function gatherReacted({ events, store, signal }: HydrateOpts): Promise { const ids = new Set(); for (const event of events) { @@ -154,14 +154,14 @@ function gatherReacted({ events, storage, signal }: HydrateOpts): Promise { +function gatherQuotes({ events, store, signal }: HydrateOpts): Promise { const ids = new Set(); for (const event of events) { @@ -173,34 +173,34 @@ function gatherQuotes({ events, storage, signal }: HydrateOpts): Promise { +function gatherAuthors({ events, store, signal }: HydrateOpts): Promise { const pubkeys = new Set(events.map((event) => event.pubkey)); - return storage.query( + return store.query( [{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], { signal }, ); } /** Collect users from the events. */ -function gatherUsers({ events, storage, signal }: HydrateOpts): Promise { +function gatherUsers({ events, store, signal }: HydrateOpts): Promise { const pubkeys = new Set(events.map((event) => event.pubkey)); - return storage.query( + return store.query( [{ kinds: [30361], authors: [Conf.pubkey], '#d': [...pubkeys], limit: pubkeys.size }], { signal }, ); } /** Collect reported notes from the events. */ -function gatherReportedNotes({ events, storage, signal }: HydrateOpts): Promise { +function gatherReportedNotes({ events, store, signal }: HydrateOpts): Promise { const ids = new Set(); for (const event of events) { if (event.kind === 1984) { @@ -213,14 +213,14 @@ function gatherReportedNotes({ events, storage, signal }: HydrateOpts): Promise< } } - return storage.query( + return store.query( [{ kinds: [1], ids: [...ids], limit: ids.size }], { signal }, ); } /** Collect reported profiles from the events. */ -function gatherReportedProfiles({ events, storage, signal }: HydrateOpts): Promise { +function gatherReportedProfiles({ events, store, signal }: HydrateOpts): Promise { const pubkeys = new Set(); for (const event of events) { @@ -232,14 +232,14 @@ function gatherReportedProfiles({ events, storage, signal }: HydrateOpts): Promi } } - return storage.query( + return store.query( [{ kinds: [0], authors: [...pubkeys], limit: pubkeys.size }], { signal }, ); } /** Collect author stats from the events. */ -function gatherAuthorStats(events: DittoEvent[]): Promise { +async function gatherAuthorStats(events: DittoEvent[]): Promise { const pubkeys = new Set( events .filter((event) => event.kind === 0) @@ -250,7 +250,8 @@ function gatherAuthorStats(events: DittoEvent[]): Promise { +async function gatherEventStats(events: DittoEvent[]): Promise { const ids = new Set( events .filter((event) => event.kind === 1) @@ -269,7 +270,8 @@ function gatherEventStats(events: DittoEvent[]): Promise { if (opts.signal?.aborted) return Promise.reject(abortError()); - const relaySet = await getRelays(event.pubkey); + const relaySet = await getRelays(await Storages.db(), event.pubkey); relaySet.delete(Conf.relay); const relays = [...relaySet].slice(0, 4); diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index be6e2b4..4951c72 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -48,7 +48,7 @@ class SearchStore implements NStore { return hydrateEvents({ events, - storage: this.#hydrator, + store: this.#hydrator, signal: opts?.signal, }); } else { diff --git a/src/utils/api.ts b/src/utils/api.ts index 81b157c..dceede7 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -42,7 +42,7 @@ async function createEvent(t: EventStub, c: AppContext): Promise { /** Filter for fetching an existing event to update. */ interface UpdateEventFilter extends NostrFilter { kinds: [number]; - limit?: 1; + limit: 1; } /** Fetch existing event, update it, then publish the new event. */ @@ -51,7 +51,8 @@ async function updateEvent( fn: (prev: NostrEvent | undefined) => E, c: AppContext, ): Promise { - const [prev] = await Storages.db.query([filter], { limit: 1, signal: c.req.raw.signal }); + const store = await Storages.db(); + const [prev] = await store.query([filter], { signal: c.req.raw.signal }); return createEvent(fn(prev), c); } @@ -101,7 +102,8 @@ async function updateAdminEvent( fn: (prev: NostrEvent | undefined) => E, c: AppContext, ): Promise { - const [prev] = await Storages.db.query([filter], { limit: 1, signal: c.req.raw.signal }); + const store = await Storages.db(); + const [prev] = await store.query([filter], { limit: 1, signal: c.req.raw.signal }); return createAdminEvent(fn(prev), c); } @@ -110,7 +112,8 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise { const uri = new URL('nostrconnect://'); - const { name, tagline } = await getInstanceMetadata(signal); + const { name, tagline } = await getInstanceMetadata(await Storages.db(), signal); const metadata: ConnectMetadata = { name, diff --git a/src/utils/instance.ts b/src/utils/instance.ts index 004e4cf..386c796 100644 --- a/src/utils/instance.ts +++ b/src/utils/instance.ts @@ -1,8 +1,7 @@ -import { NostrEvent, NostrMetadata, NSchema as n } from '@nostrify/nostrify'; +import { NostrEvent, NostrMetadata, NSchema as n, NStore } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; import { serverMetaSchema } from '@/schemas/nostr.ts'; -import { Storages } from '@/storages.ts'; /** Like NostrMetadata, but some fields are required and also contains some extra fields. */ export interface InstanceMetadata extends NostrMetadata { @@ -14,8 +13,8 @@ export interface InstanceMetadata extends NostrMetadata { } /** Get and parse instance metadata from the kind 0 of the admin user. */ -export async function getInstanceMetadata(signal?: AbortSignal): Promise { - const [event] = await Storages.db.query( +export async function getInstanceMetadata(store: NStore, signal?: AbortSignal): Promise { + const [event] = await store.query( [{ kinds: [0], authors: [Conf.pubkey], limit: 1 }], { signal }, ); diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index 0b4c6e3..eaab6ed 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -1,4 +1,4 @@ -import { NIP05 } from '@nostrify/nostrify'; +import { NIP05, NStore } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; import { nip19 } from 'nostr-tools'; @@ -16,7 +16,8 @@ const nip05Cache = new SimpleLRU( const [name, domain] = key.split('@'); try { if (domain === Conf.url.host) { - const pointer = await localNip05Lookup(name); + const store = await Storages.db(); + const pointer = await localNip05Lookup(store, name); if (pointer) { debug(`Found: ${key} is ${pointer.pubkey}`); return pointer; @@ -36,8 +37,8 @@ const nip05Cache = new SimpleLRU( { max: 500, ttl: Time.hours(1) }, ); -async function localNip05Lookup(name: string): Promise { - const [label] = await Storages.db.query([{ +async function localNip05Lookup(store: NStore, name: string): Promise { + const [label] = await store.query([{ kinds: [1985], authors: [Conf.pubkey], '#L': ['nip05'], diff --git a/src/utils/outbox.ts b/src/utils/outbox.ts index 13edaf6..72b8338 100644 --- a/src/utils/outbox.ts +++ b/src/utils/outbox.ts @@ -1,10 +1,11 @@ -import { Conf } from '@/config.ts'; -import { Storages } from '@/storages.ts'; +import { NStore } from '@nostrify/nostrify'; -export async function getRelays(pubkey: string): Promise> { +import { Conf } from '@/config.ts'; + +export async function getRelays(store: NStore, pubkey: string): Promise> { const relays = new Set<`wss://${string}`>(); - const events = await Storages.db.query([ + const events = await store.query([ { kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 }, ]); diff --git a/src/views.ts b/src/views.ts index 451ce14..a737542 100644 --- a/src/views.ts +++ b/src/views.ts @@ -12,15 +12,16 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal return c.json([]); } - const events = await Storages.db.query(filters, { signal }); + const store = await Storages.db(); + const events = await store.query(filters, { signal }); const pubkeys = new Set(events.map(({ pubkey }) => pubkey)); if (!pubkeys.size) { return c.json([]); } - const authors = await Storages.db.query([{ kinds: [0], authors: [...pubkeys] }], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.db, signal })); + const authors = await store.query([{ kinds: [0], authors: [...pubkeys] }], { signal }) + .then((events) => hydrateEvents({ events, store, signal })); const accounts = await Promise.all( authors.map((event) => renderAccount(event)), @@ -32,8 +33,10 @@ async function renderEventAccounts(c: AppContext, filters: NostrFilter[], signal async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) { const { since, until, limit } = paginationSchema.parse(c.req.query()); - const events = await Storages.db.query([{ kinds: [0], authors, since, until, limit }], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.db, signal })); + const store = await Storages.db(); + + const events = await store.query([{ kinds: [0], authors, since, until, limit }], { signal }) + .then((events) => hydrateEvents({ events, store, signal })); const accounts = await Promise.all( events.map((event) => renderAccount(event)), @@ -48,10 +51,11 @@ async function renderStatuses(c: AppContext, ids: string[], signal = AbortSignal return c.json([]); } + const store = await Storages.db(); const { limit } = paginationSchema.parse(c.req.query()); - const events = await Storages.db.query([{ kinds: [1], ids, limit }], { signal }) - .then((events) => hydrateEvents({ events, storage: Storages.db, signal })); + const events = await store.query([{ kinds: [1], ids, limit }], { signal }) + .then((events) => hydrateEvents({ events, store, signal })); if (!events.length) { return c.json([]); diff --git a/src/views/mastodon/relationships.ts b/src/views/mastodon/relationships.ts index d358024..2f8ffdd 100644 --- a/src/views/mastodon/relationships.ts +++ b/src/views/mastodon/relationships.ts @@ -2,7 +2,9 @@ import { Storages } from '@/storages.ts'; import { hasTag } from '@/tags.ts'; async function renderRelationship(sourcePubkey: string, targetPubkey: string) { - const events = await Storages.db.query([ + const db = await Storages.db(); + + const events = await db.query([ { kinds: [3], authors: [sourcePubkey], limit: 1 }, { kinds: [3], authors: [targetPubkey], limit: 1 }, { kinds: [10000], authors: [sourcePubkey], limit: 1 }, diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index d00759d..776f016 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -22,7 +22,7 @@ interface RenderStatusOpts { async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise { const { viewerPubkey, depth = 1 } = opts; - if (depth > 2 || depth < 0) return null; + if (depth > 2 || depth < 0) return; const note = nip19.noteEncode(event.id); @@ -40,7 +40,10 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise< ), ]; - const mentionedProfiles = await Storages.optimizer.query( + const db = await Storages.db(); + const optimizer = await Storages.optimizer(); + + const mentionedProfiles = await optimizer.query( [{ kinds: [0], authors: mentionedPubkeys, limit: mentionedPubkeys.length }], ); @@ -53,7 +56,7 @@ async function renderStatus(event: DittoEvent, opts: RenderStatusOpts): Promise< ), firstUrl ? unfurlCardCached(firstUrl) : null, viewerPubkey - ? await Storages.db.query([ + ? await db.query([ { kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [9734], '#e': [event.id], authors: [viewerPubkey], limit: 1 },