Merge branch 'nip46' into 'develop'
Rework signing flow to use events See merge request soapbox-pub/ditto!22
This commit is contained in:
commit
2d619868aa
|
@ -11,7 +11,7 @@
|
|||
},
|
||||
"imports": {
|
||||
"@/": "./src/",
|
||||
"~/": "./"
|
||||
"~/fixtures/": "./fixtures/"
|
||||
},
|
||||
"lint": {
|
||||
"include": ["src/", "scripts/"],
|
||||
|
|
15
src/admin.ts
15
src/admin.ts
|
@ -1,15 +0,0 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { type Event, type EventTemplate, finishEvent, nip19 } from '@/deps.ts';
|
||||
|
||||
// deno-lint-ignore require-await
|
||||
async function signAdminEvent<K extends number = number>(event: EventTemplate<K>): Promise<Event<K>> {
|
||||
if (!Conf.nsec) throw new Error('No secret key. Set one with DITTO_NSEC.');
|
||||
|
||||
const result = nip19.decode(Conf.nsec);
|
||||
|
||||
if (result.type !== 'nsec') throw new Error('Invalid DITTO_NSEC. It should start with "nsec1..."');
|
||||
|
||||
return finishEvent(event, result.data);
|
||||
}
|
||||
|
||||
export { signAdminEvent };
|
|
@ -1,4 +1,4 @@
|
|||
import { dotenv, nip19, secp } from '@/deps.ts';
|
||||
import { dotenv, getPublicKey, nip19, secp } from '@/deps.ts';
|
||||
|
||||
/** Load environment config from `.env` */
|
||||
await dotenv.load({
|
||||
|
@ -9,6 +9,7 @@ await dotenv.load({
|
|||
|
||||
/** Application-wide configuration. */
|
||||
const Conf = {
|
||||
/** Ditto admin secret key in nip19 format. This is the way it's configured by an admin. */
|
||||
get nsec() {
|
||||
const value = Deno.env.get('DITTO_NSEC');
|
||||
if (!value) {
|
||||
|
@ -19,13 +20,15 @@ const Conf = {
|
|||
}
|
||||
return value as `nsec1${string}`;
|
||||
},
|
||||
/** Ditto admin secret key in hex format. */
|
||||
get seckey() {
|
||||
const result = nip19.decode(Conf.nsec);
|
||||
if (result.type !== 'nsec') {
|
||||
throw new Error('Invalid DITTO_NSEC');
|
||||
}
|
||||
return result.data;
|
||||
return nip19.decode(Conf.nsec).data;
|
||||
},
|
||||
/** Ditto admin public key in hex format. */
|
||||
get pubkey() {
|
||||
return getPublicKey(Conf.seckey);
|
||||
},
|
||||
/** Ditto admin secret key as a Web Crypto key. */
|
||||
get cryptoKey() {
|
||||
return crypto.subtle.importKey(
|
||||
'raw',
|
||||
|
@ -39,24 +42,31 @@ const Conf = {
|
|||
const { protocol, host } = Conf.url;
|
||||
return `${protocol === 'https:' ? 'wss:' : 'ws:'}//${host}/relay`;
|
||||
},
|
||||
/** Domain of the Ditto server, including the protocol. */
|
||||
get localDomain() {
|
||||
return Deno.env.get('LOCAL_DOMAIN') || 'http://localhost:8000';
|
||||
},
|
||||
/** Path to the main SQLite database which stores users, events, and more. */
|
||||
get dbPath() {
|
||||
return Deno.env.get('DB_PATH') || 'data/db.sqlite3';
|
||||
},
|
||||
/** Character limit to enforce for posts made through Mastodon API. */
|
||||
get postCharLimit() {
|
||||
return Number(Deno.env.get('POST_CHAR_LIMIT') || 5000);
|
||||
},
|
||||
/** Admin contact to expose through various endpoints. This information is public. */
|
||||
get adminEmail() {
|
||||
return Deno.env.get('ADMIN_EMAIL') || 'webmaster@localhost';
|
||||
},
|
||||
/** @deprecated Use relays from the database instead. */
|
||||
get poolRelays() {
|
||||
return (Deno.env.get('RELAY_POOL') || '').split(',').filter(Boolean);
|
||||
},
|
||||
/** @deprecated Publish only to the local relay unless users are mentioned, then try to also send to the relay of those users. Deletions should also be fanned out. */
|
||||
get publishRelays() {
|
||||
return ['wss://relay.mostr.pub'];
|
||||
},
|
||||
/** Domain of the Ditto server as a `URL` object, for easily grabbing the `hostname`, etc. */
|
||||
get url() {
|
||||
return new URL(Conf.localDomain);
|
||||
},
|
||||
|
|
|
@ -82,7 +82,7 @@ const relationshipsController: AppController = async (c) => {
|
|||
|
||||
const accountStatusesQuerySchema = z.object({
|
||||
pinned: booleanParamSchema.optional(),
|
||||
limit: z.coerce.number().positive().transform((v) => Math.min(v, 40)).catch(20),
|
||||
limit: z.coerce.number().nonnegative().transform((v) => Math.min(v, 40)).catch(20),
|
||||
exclude_replies: booleanParamSchema.optional(),
|
||||
tagged: z.string().optional(),
|
||||
});
|
||||
|
|
|
@ -5,6 +5,9 @@ import type { Context } from '@/deps.ts';
|
|||
function instanceController(c: Context) {
|
||||
const { host, protocol } = Conf.url;
|
||||
|
||||
/** Protocol to use for WebSocket URLs, depending on the protocol of the `LOCAL_DOMAIN`. */
|
||||
const wsProtocol = protocol === 'http:' ? 'ws:' : 'wss:';
|
||||
|
||||
return c.json({
|
||||
uri: host,
|
||||
title: 'Ditto',
|
||||
|
@ -35,10 +38,14 @@ function instanceController(c: Context) {
|
|||
user_count: 0,
|
||||
},
|
||||
urls: {
|
||||
streaming_api: `${protocol === 'http:' ? 'ws:' : 'wss:'}//${host}`,
|
||||
streaming_api: `${wsProtocol}//${host}`,
|
||||
},
|
||||
version: '0.0.0 (compatible; Ditto 0.0.1)',
|
||||
email: Conf.adminEmail,
|
||||
nostr: {
|
||||
pubkey: Conf.pubkey,
|
||||
relay: `${wsProtocol}//${host}/relay`,
|
||||
},
|
||||
rules: [],
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,10 +1,20 @@
|
|||
import { AppController } from '@/app.ts';
|
||||
import { z } from '@/deps.ts';
|
||||
import { type DittoFilter } from '@/filter.ts';
|
||||
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
|
||||
import { streamSchema, ws } from '@/stream.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
|
||||
import { bech32ToPubkey } from '@/utils.ts';
|
||||
|
||||
/**
|
||||
* Streaming timelines/categories.
|
||||
* https://docs.joinmastodon.org/methods/streaming/#streams
|
||||
*/
|
||||
const streamSchema = z.enum([
|
||||
'nostr',
|
||||
'public',
|
||||
'public:local',
|
||||
'user',
|
||||
]);
|
||||
|
||||
const streamingController: AppController = (c) => {
|
||||
const upgrade = c.req.headers.get('upgrade');
|
||||
|
@ -26,12 +36,6 @@ const streamingController: AppController = (c) => {
|
|||
|
||||
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token });
|
||||
|
||||
const conn = {
|
||||
socket,
|
||||
session: match[2],
|
||||
pubkey: bech32ToPubkey(match[1]),
|
||||
};
|
||||
|
||||
function send(name: string, payload: object) {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify({
|
||||
|
@ -44,9 +48,6 @@ const streamingController: AppController = (c) => {
|
|||
|
||||
socket.onopen = async () => {
|
||||
if (!stream) return;
|
||||
|
||||
ws.subscribe(conn, { stream });
|
||||
|
||||
const filter = topicToFilter(stream);
|
||||
|
||||
if (filter) {
|
||||
|
@ -60,7 +61,7 @@ const streamingController: AppController = (c) => {
|
|||
};
|
||||
|
||||
socket.onclose = () => {
|
||||
ws.unsubscribeAll(socket);
|
||||
Sub.close(socket);
|
||||
};
|
||||
|
||||
return response;
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { nip04 } from '@/deps.ts';
|
||||
|
||||
/** Encrypt a message as the Ditto server account. */
|
||||
function encryptAdmin(targetPubkey: string, message: string): Promise<string> {
|
||||
return nip04.encrypt(Conf.seckey, targetPubkey, message);
|
||||
}
|
||||
|
||||
/** Decrypt a message as the Ditto server account. */
|
||||
function decryptAdmin(targetPubkey: string, message: string): Promise<string> {
|
||||
return nip04.decrypt(Conf.seckey, targetPubkey, message);
|
||||
}
|
||||
|
||||
export { decryptAdmin, encryptAdmin };
|
|
@ -20,6 +20,7 @@ export {
|
|||
getSignature,
|
||||
Kind,
|
||||
matchFilters,
|
||||
nip04,
|
||||
nip05,
|
||||
nip19,
|
||||
nip21,
|
||||
|
@ -63,3 +64,5 @@ export {
|
|||
} from 'npm:kysely@^0.25.0';
|
||||
export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/v1.0.0/mod.ts';
|
||||
export { default as tldts } from 'npm:tldts@^6.0.14';
|
||||
|
||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||
|
||||
import type { EventData } from '@/types.ts';
|
||||
|
@ -16,7 +17,7 @@ interface GetFiltersOpts {
|
|||
}
|
||||
|
||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
||||
if (filter.local && !data.user) {
|
||||
if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/** Events are **regular**, which means they're all expected to be stored by relays. */
|
||||
function isRegularKind(kind: number) {
|
||||
return (1000 <= kind && kind < 10000) || [1, 2, 4, 5, 6, 7, 8, 16, 40, 41, 42, 43, 44].includes(kind);
|
||||
}
|
||||
|
||||
/** Events are **replaceable**, which means that, for each combination of `pubkey` and `kind`, only the latest event is expected to (SHOULD) be stored by relays, older versions are expected to be discarded. */
|
||||
function isReplaceableKind(kind: number) {
|
||||
return (10000 <= kind && kind < 20000) || [0, 3].includes(kind);
|
||||
}
|
||||
|
||||
/** Events are **ephemeral**, which means they are not expected to be stored by relays. */
|
||||
function isEphemeralKind(kind: number) {
|
||||
return 20000 <= kind && kind < 30000;
|
||||
}
|
||||
|
||||
/** Events are **parameterized replaceable**, which means that, for each combination of `pubkey`, `kind` and the `d` tag, only the latest event is expected to be stored by relays, older versions are expected to be discarded. */
|
||||
function isParameterizedReplaceableKind(kind: number) {
|
||||
return 30000 <= kind && kind < 40000;
|
||||
}
|
||||
|
||||
/** Classification of the event kind. */
|
||||
type KindClassification = 'regular' | 'replaceable' | 'ephemeral' | 'parameterized' | 'unknown';
|
||||
|
||||
/** Determine the classification of this kind of event if known, or `unknown`. */
|
||||
function classifyKind(kind: number): KindClassification {
|
||||
if (isRegularKind(kind)) return 'regular';
|
||||
if (isReplaceableKind(kind)) return 'replaceable';
|
||||
if (isEphemeralKind(kind)) return 'ephemeral';
|
||||
if (isParameterizedReplaceableKind(kind)) return 'parameterized';
|
||||
return 'unknown';
|
||||
}
|
||||
|
||||
export {
|
||||
classifyKind,
|
||||
isEphemeralKind,
|
||||
isParameterizedReplaceableKind,
|
||||
isRegularKind,
|
||||
isReplaceableKind,
|
||||
type KindClassification,
|
||||
};
|
|
@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts';
|
|||
import { addRelays } from '@/db/relays.ts';
|
||||
import { findUser } from '@/db/users.ts';
|
||||
import { type Event, LRUCache } from '@/deps.ts';
|
||||
import { isEphemeralKind } from '@/kinds.ts';
|
||||
import { isLocallyFollowed } from '@/queries.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { trends } from '@/trends.ts';
|
||||
|
@ -43,6 +44,7 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
|
|||
|
||||
/** Maybe store the event, if eligible. */
|
||||
async function storeEvent(event: Event, data: EventData): Promise<void> {
|
||||
if (isEphemeralKind(event.kind)) return;
|
||||
if (data.user || await isLocallyFollowed(event.pubkey)) {
|
||||
await eventsDB.insertEvent(event).catch(console.warn);
|
||||
} else {
|
||||
|
|
|
@ -5,7 +5,7 @@ import { jsonSchema, safeUrlSchema } from '../schema.ts';
|
|||
/** Schema to validate Nostr hex IDs such as event IDs and pubkeys. */
|
||||
const nostrIdSchema = z.string().regex(/^[0-9a-f]{64}$/);
|
||||
/** Nostr kinds are positive integers. */
|
||||
const kindSchema = z.number().int().positive();
|
||||
const kindSchema = z.number().int().nonnegative();
|
||||
|
||||
/** Nostr event schema. */
|
||||
const eventSchema = z.object({
|
||||
|
@ -26,9 +26,9 @@ const filterSchema = z.object({
|
|||
kinds: kindSchema.array().optional(),
|
||||
ids: nostrIdSchema.array().optional(),
|
||||
authors: nostrIdSchema.array().optional(),
|
||||
since: z.number().int().positive().optional(),
|
||||
until: z.number().int().positive().optional(),
|
||||
limit: z.number().int().positive().optional(),
|
||||
since: z.number().int().nonnegative().optional(),
|
||||
until: z.number().int().nonnegative().optional(),
|
||||
limit: z.number().int().nonnegative().optional(),
|
||||
}).passthrough().and(
|
||||
z.record(
|
||||
z.custom<`#${string}`>((val) => typeof val === 'string' && val.startsWith('#')),
|
||||
|
@ -75,17 +75,24 @@ const relayInfoDocSchema = z.object({
|
|||
description: z.string().transform((val) => val.slice(0, 3000)).optional().catch(undefined),
|
||||
pubkey: nostrIdSchema.optional().catch(undefined),
|
||||
contact: safeUrlSchema.optional().catch(undefined),
|
||||
supported_nips: z.number().int().positive().array().optional().catch(undefined),
|
||||
supported_nips: z.number().int().nonnegative().array().optional().catch(undefined),
|
||||
software: safeUrlSchema.optional().catch(undefined),
|
||||
icon: safeUrlSchema.optional().catch(undefined),
|
||||
});
|
||||
|
||||
/** NIP-46 signer response. */
|
||||
const connectResponseSchema = z.object({
|
||||
id: z.string(),
|
||||
result: signedEventSchema,
|
||||
});
|
||||
|
||||
export {
|
||||
type ClientCLOSE,
|
||||
type ClientEVENT,
|
||||
type ClientMsg,
|
||||
clientMsgSchema,
|
||||
type ClientREQ,
|
||||
connectResponseSchema,
|
||||
filterSchema,
|
||||
jsonMetaContentSchema,
|
||||
metaContentSchema,
|
||||
|
|
144
src/sign.ts
144
src/sign.ts
|
@ -1,71 +1,105 @@
|
|||
import { type AppContext } from '@/app.ts';
|
||||
import { type Event, type EventTemplate, getEventHash, getPublicKey, getSignature, HTTPException, z } from '@/deps.ts';
|
||||
import { signedEventSchema } from '@/schemas/nostr.ts';
|
||||
import { ws } from '@/stream.ts';
|
||||
|
||||
/** Get signing WebSocket from app context. */
|
||||
function getSignStream(c: AppContext): WebSocket | undefined {
|
||||
const pubkey = c.get('pubkey');
|
||||
const session = c.get('session');
|
||||
|
||||
if (pubkey && session) {
|
||||
const [socket] = ws.getSockets(`nostr:${pubkey}:${session}`);
|
||||
return socket;
|
||||
}
|
||||
}
|
||||
|
||||
const nostrStreamingEventSchema = z.object({
|
||||
type: z.literal('nostr.sign'),
|
||||
data: signedEventSchema,
|
||||
});
|
||||
import { Conf } from '@/config.ts';
|
||||
import { decryptAdmin, encryptAdmin } from '@/crypto.ts';
|
||||
import { type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts';
|
||||
import { connectResponseSchema } from '@/schemas/nostr.ts';
|
||||
import { jsonSchema } from '@/schema.ts';
|
||||
import { Sub } from '@/subs.ts';
|
||||
import { Time } from '@/utils.ts';
|
||||
import { createAdminEvent } from '@/utils/web.ts';
|
||||
|
||||
/**
|
||||
* Sign Nostr event using the app context.
|
||||
*
|
||||
* - If a secret key is provided, it will be used to sign the event.
|
||||
* - If a signing WebSocket is provided, it will be used to sign the event.
|
||||
* - If `X-Nostr-Sign` is passed, it will use a NIP-46 to sign the event.
|
||||
*/
|
||||
async function signEvent<K extends number = number>(event: EventTemplate<K>, c: AppContext): Promise<Event<K>> {
|
||||
const seckey = c.get('seckey');
|
||||
const stream = getSignStream(c);
|
||||
const header = c.req.headers.get('x-nostr-sign');
|
||||
|
||||
if (!seckey && stream) {
|
||||
try {
|
||||
return await new Promise<Event<K>>((resolve, reject) => {
|
||||
const handleMessage = (e: MessageEvent) => {
|
||||
try {
|
||||
const { data: event } = nostrStreamingEventSchema.parse(JSON.parse(e.data));
|
||||
stream.removeEventListener('message', handleMessage);
|
||||
resolve(event as Event<K>);
|
||||
} catch (_e) {
|
||||
//
|
||||
}
|
||||
};
|
||||
stream.addEventListener('message', handleMessage);
|
||||
stream.send(JSON.stringify({ event: 'nostr.sign', payload: JSON.stringify(event) }));
|
||||
setTimeout(() => {
|
||||
stream.removeEventListener('message', handleMessage);
|
||||
reject();
|
||||
}, 60000);
|
||||
});
|
||||
} catch (_e) {
|
||||
throw new HTTPException(408, {
|
||||
res: c.json({ id: 'ditto.timeout', error: 'Signing timeout' }, 408),
|
||||
});
|
||||
}
|
||||
if (seckey) {
|
||||
return finishEvent(event, seckey);
|
||||
}
|
||||
|
||||
if (header) {
|
||||
return await signNostrConnect(event, c);
|
||||
}
|
||||
|
||||
if (!seckey) {
|
||||
throw new HTTPException(400, {
|
||||
res: c.json({ id: 'ditto.private_key', error: 'No private key' }, 400),
|
||||
res: c.json({ id: 'ditto.sign', error: 'Unable to sign event' }, 400),
|
||||
});
|
||||
}
|
||||
|
||||
(event as Event<K>).pubkey = getPublicKey(seckey);
|
||||
(event as Event<K>).id = getEventHash(event as Event<K>);
|
||||
(event as Event<K>).sig = getSignature(event as Event<K>, seckey);
|
||||
|
||||
return event as Event<K>;
|
||||
}
|
||||
|
||||
export { signEvent };
|
||||
/** Sign event with NIP-46, waiting in the background for the signed event. */
|
||||
async function signNostrConnect<K extends number = number>(event: EventTemplate<K>, c: AppContext): Promise<Event<K>> {
|
||||
const pubkey = c.get('pubkey');
|
||||
|
||||
if (!pubkey) {
|
||||
throw new HTTPException(401);
|
||||
}
|
||||
|
||||
const messageId = crypto.randomUUID();
|
||||
|
||||
createAdminEvent({
|
||||
kind: 24133,
|
||||
content: await encryptAdmin(
|
||||
pubkey,
|
||||
JSON.stringify({
|
||||
id: messageId,
|
||||
method: 'sign_event',
|
||||
params: [event],
|
||||
}),
|
||||
),
|
||||
tags: [['p', pubkey]],
|
||||
}, c);
|
||||
|
||||
return awaitSignedEvent<K>(pubkey, messageId, c);
|
||||
}
|
||||
|
||||
/** Wait for signed event to be sent through Nostr relay. */
|
||||
function awaitSignedEvent<K extends number = number>(
|
||||
pubkey: string,
|
||||
messageId: string,
|
||||
c: AppContext,
|
||||
): Promise<Event<K>> {
|
||||
const sub = Sub.sub(messageId, '1', [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }]);
|
||||
|
||||
function close(): void {
|
||||
Sub.close(messageId);
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
close();
|
||||
reject(
|
||||
new HTTPException(408, {
|
||||
res: c.json({ id: 'ditto.timeout', error: 'Signing timeout' }),
|
||||
}),
|
||||
);
|
||||
}, Time.minutes(1));
|
||||
|
||||
(async () => {
|
||||
for await (const event of sub) {
|
||||
if (event.kind === 24133) {
|
||||
const decrypted = await decryptAdmin(event.pubkey, event.content);
|
||||
const msg = jsonSchema.pipe(connectResponseSchema).parse(decrypted);
|
||||
|
||||
if (msg.id === messageId) {
|
||||
close();
|
||||
clearTimeout(timeout);
|
||||
resolve(msg.result as Event<K>);
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
});
|
||||
}
|
||||
|
||||
/** Sign event as the Ditto server. */
|
||||
// deno-lint-ignore require-await
|
||||
async function signAdminEvent<K extends number = number>(event: EventTemplate<K>): Promise<Event<K>> {
|
||||
return finishEvent(event, Conf.seckey);
|
||||
}
|
||||
|
||||
export { signAdminEvent, signEvent };
|
||||
|
|
158
src/stream.ts
158
src/stream.ts
|
@ -1,158 +0,0 @@
|
|||
import { z } from '@/deps.ts';
|
||||
|
||||
/** Internal key for event subscriptions. */
|
||||
type Topic =
|
||||
| `nostr:${string}:${string}`
|
||||
| 'public'
|
||||
| 'public:local';
|
||||
|
||||
/**
|
||||
* Streaming timelines/categories.
|
||||
* https://docs.joinmastodon.org/methods/streaming/#streams
|
||||
*/
|
||||
const streamSchema = z.enum([
|
||||
'nostr',
|
||||
'public',
|
||||
'public:local',
|
||||
'user',
|
||||
]);
|
||||
|
||||
type Stream = z.infer<typeof streamSchema>;
|
||||
|
||||
/** Only the necessary metadata needed from the request. */
|
||||
interface StreamConn {
|
||||
/** Hex pubkey parsed from the `Sec-Websocket-Protocol` header. */
|
||||
pubkey?: string;
|
||||
/** Base62 session UUID parsed from the `Sec-Websocket-Protocol` header. */
|
||||
session?: string;
|
||||
/** The WebSocket stream. */
|
||||
socket: WebSocket;
|
||||
}
|
||||
|
||||
/** Requested streaming channel, eg `user`, `notifications`. Some channels like `hashtag` have additional params. */
|
||||
// TODO: Make this a discriminated union (needed for hashtags).
|
||||
interface StreamSub {
|
||||
/** Name of the channel, eg `user`. */
|
||||
stream: Stream;
|
||||
/** Additional query params, eg `tag`. */
|
||||
params?: Record<string, string>;
|
||||
}
|
||||
|
||||
/** Class to organize WebSocket connections by topic. */
|
||||
class WebSocketConnections {
|
||||
/** Set of WebSockets by topic. */
|
||||
#sockets = new Map<Topic, Set<WebSocket>>();
|
||||
/** Set of topics by WebSocket. We need to track this so we can unsubscribe properly. */
|
||||
#topics = new WeakMap<WebSocket, Set<Topic>>();
|
||||
|
||||
/** Add the WebSocket to the streaming channel. */
|
||||
subscribe(conn: StreamConn, sub: StreamSub): void {
|
||||
const topic = getTopic(conn, sub);
|
||||
|
||||
if (topic) {
|
||||
this.#addSocket(conn.socket, topic);
|
||||
this.#addTopic(conn.socket, topic);
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove the WebSocket from the streaming channel. */
|
||||
unsubscribe(conn: StreamConn, sub: StreamSub): void {
|
||||
const topic = getTopic(conn, sub);
|
||||
|
||||
if (topic) {
|
||||
this.#removeSocket(conn.socket, topic);
|
||||
this.#removeTopic(conn.socket, topic);
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove the WebSocket from all its streaming channels. */
|
||||
unsubscribeAll(socket: WebSocket): void {
|
||||
const topics = this.#topics.get(socket);
|
||||
|
||||
if (topics) {
|
||||
for (const topic of topics) {
|
||||
this.#removeSocket(socket, topic);
|
||||
}
|
||||
}
|
||||
|
||||
this.#topics.delete(socket);
|
||||
}
|
||||
|
||||
/** Get WebSockets for the given topic. */
|
||||
getSockets(topic: Topic): Set<WebSocket> {
|
||||
return this.#sockets.get(topic) ?? new Set<WebSocket>();
|
||||
}
|
||||
|
||||
/** Add a WebSocket to a topics set in the state. */
|
||||
#addSocket(socket: WebSocket, topic: Topic): void {
|
||||
let subscribers = this.#sockets.get(topic);
|
||||
|
||||
if (!subscribers) {
|
||||
subscribers = new Set<WebSocket>();
|
||||
this.#sockets.set(topic, subscribers);
|
||||
}
|
||||
|
||||
subscribers.add(socket);
|
||||
}
|
||||
|
||||
/** Remove a WebSocket from a topics set in the state. */
|
||||
#removeSocket(socket: WebSocket, topic: Topic): void {
|
||||
const subscribers = this.#sockets.get(topic);
|
||||
|
||||
if (subscribers) {
|
||||
subscribers.delete(socket);
|
||||
|
||||
if (subscribers.size === 0) {
|
||||
this.#sockets.delete(topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Add a topic to a WebSocket set in the state. */
|
||||
#addTopic(socket: WebSocket, topic: Topic): void {
|
||||
let topics = this.#topics.get(socket);
|
||||
|
||||
if (!topics) {
|
||||
topics = new Set<Topic>();
|
||||
this.#topics.set(socket, topics);
|
||||
}
|
||||
|
||||
topics.add(topic);
|
||||
}
|
||||
|
||||
/** Remove a topic from a WebSocket set in the state. */
|
||||
#removeTopic(socket: WebSocket, topic: Topic): void {
|
||||
const topics = this.#topics.get(socket);
|
||||
|
||||
if (topics) {
|
||||
topics.delete(topic);
|
||||
|
||||
if (topics.size === 0) {
|
||||
this.#topics.delete(socket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the "stream" parameter into a "topic".
|
||||
* The stream parameter is part of the public-facing API, while the topic is internal.
|
||||
*/
|
||||
function getTopic(conn: StreamConn, sub: StreamSub): Topic | undefined {
|
||||
switch (sub.stream) {
|
||||
case 'public':
|
||||
case 'public:local':
|
||||
return sub.stream;
|
||||
default:
|
||||
if (!conn.pubkey) {
|
||||
return;
|
||||
// Nostr signing topics contain the session ID for privacy reasons.
|
||||
} else if (sub.stream === 'nostr') {
|
||||
return conn.session ? `nostr:${conn.pubkey}:${conn.session}` : undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const ws = new WebSocketConnections();
|
||||
|
||||
export { type Stream, streamSchema, ws };
|
|
@ -9,7 +9,7 @@ import type { EventData } from '@/types.ts';
|
|||
* Subscriptions can be added, removed, and matched against events.
|
||||
*/
|
||||
class SubscriptionStore {
|
||||
#store = new Map<WebSocket, Map<string, Subscription>>();
|
||||
#store = new Map<unknown, Map<string, Subscription>>();
|
||||
|
||||
/**
|
||||
* Add a subscription to the store, and then iterate over it.
|
||||
|
@ -20,7 +20,7 @@ class SubscriptionStore {
|
|||
* }
|
||||
* ```
|
||||
*/
|
||||
sub<K extends number>(socket: WebSocket, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
||||
sub<K extends number>(socket: unknown, id: string, filters: DittoFilter<K>[]): Subscription<K> {
|
||||
let subs = this.#store.get(socket);
|
||||
|
||||
if (!subs) {
|
||||
|
@ -37,13 +37,13 @@ class SubscriptionStore {
|
|||
}
|
||||
|
||||
/** Remove a subscription from the store. */
|
||||
unsub(socket: WebSocket, id: string): void {
|
||||
unsub(socket: unknown, id: string): void {
|
||||
this.#store.get(socket)?.get(id)?.close();
|
||||
this.#store.get(socket)?.delete(id);
|
||||
}
|
||||
|
||||
/** Remove an entire socket. */
|
||||
close(socket: WebSocket): void {
|
||||
close(socket: unknown): void {
|
||||
const subs = this.#store.get(socket);
|
||||
|
||||
if (subs) {
|
||||
|
|
|
@ -3,12 +3,12 @@ import { lookupNip05Cached } from '@/nip05.ts';
|
|||
import { getAuthor } from '@/queries.ts';
|
||||
|
||||
/** Get the current time in Nostr format. */
|
||||
const nostrNow = () => Math.floor(new Date().getTime() / 1000);
|
||||
const nostrNow = (): number => Math.floor(Date.now() / 1000);
|
||||
/** Convenience function to convert Nostr dates into native Date objects. */
|
||||
const nostrDate = (seconds: number) => new Date(seconds * 1000);
|
||||
const nostrDate = (seconds: number): Date => new Date(seconds * 1000);
|
||||
|
||||
/** Pass to sort() to sort events by date. */
|
||||
const eventDateComparator = (a: Event, b: Event) => b.created_at - a.created_at;
|
||||
const eventDateComparator = (a: Event, b: Event): number => b.created_at - a.created_at;
|
||||
|
||||
/** Get pubkey from bech32 string, if applicable. */
|
||||
function bech32ToPubkey(bech32: string): string | undefined {
|
||||
|
@ -102,7 +102,6 @@ function isFollowing(source: Event<3>, targetPubkey: string): boolean {
|
|||
}
|
||||
|
||||
export {
|
||||
bech32ToPubkey,
|
||||
eventAge,
|
||||
eventDateComparator,
|
||||
findTag,
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { type Context, type Event, EventTemplate, HTTPException, parseFormData, z } from '@/deps.ts';
|
||||
import { type Context, type Event, EventTemplate, HTTPException, parseFormData, type TypeFest, z } from '@/deps.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { signEvent } from '@/sign.ts';
|
||||
import { signAdminEvent, signEvent } from '@/sign.ts';
|
||||
import { nostrNow } from '@/utils.ts';
|
||||
|
||||
import type { AppContext } from '@/app.ts';
|
||||
|
||||
/** Publish an event through the API, throwing a Hono exception on failure. */
|
||||
async function createEvent<K extends number>(
|
||||
t: Omit<EventTemplate<K>, 'created_at'>,
|
||||
c: AppContext,
|
||||
): Promise<Event<K>> {
|
||||
/** EventTemplate with defaults. */
|
||||
type EventStub<K extends number = number> = TypeFest.SetOptional<EventTemplate<K>, 'created_at' | 'tags'>;
|
||||
|
||||
/** Publish an event through the pipeline. */
|
||||
async function createEvent<K extends number>(t: EventStub<K>, c: AppContext): Promise<Event<K>> {
|
||||
const pubkey = c.get('pubkey');
|
||||
|
||||
if (!pubkey) {
|
||||
|
@ -19,9 +19,26 @@ async function createEvent<K extends number>(
|
|||
|
||||
const event = await signEvent({
|
||||
created_at: nostrNow(),
|
||||
tags: [],
|
||||
...t,
|
||||
}, c);
|
||||
|
||||
return publishEvent(event, c);
|
||||
}
|
||||
|
||||
/** Publish an admin event through the pipeline. */
|
||||
async function createAdminEvent<K extends number>(t: EventStub<K>, c: AppContext): Promise<Event<K>> {
|
||||
const event = await signAdminEvent({
|
||||
created_at: nostrNow(),
|
||||
tags: [],
|
||||
...t,
|
||||
});
|
||||
|
||||
return publishEvent(event, c);
|
||||
}
|
||||
|
||||
/** Push the event through the pipeline, rethrowing any RelayError. */
|
||||
async function publishEvent<K extends number>(event: Event<K>, c: AppContext): Promise<Event<K>> {
|
||||
try {
|
||||
await pipeline.handleEvent(event);
|
||||
} catch (e) {
|
||||
|
@ -90,4 +107,12 @@ function activityJson<T, P extends string>(c: Context<any, P>, object: T) {
|
|||
return response;
|
||||
}
|
||||
|
||||
export { activityJson, buildLinkHeader, createEvent, type PaginationParams, paginationSchema, parseBody };
|
||||
export {
|
||||
activityJson,
|
||||
buildLinkHeader,
|
||||
createAdminEvent,
|
||||
createEvent,
|
||||
type PaginationParams,
|
||||
paginationSchema,
|
||||
parseBody,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue