diff --git a/src/app.ts b/src/app.ts index dadfebf..00a1460 100644 --- a/src/app.ts +++ b/src/app.ts @@ -29,6 +29,7 @@ import { } from './controllers/api/statuses.ts'; import { streamingController } from './controllers/api/streaming.ts'; import { trendingTagsController } from './controllers/api/trends.ts'; +import { relayController } from './controllers/nostr/relay.ts'; import { indexController } from './controllers/site.ts'; import { hostMetaController } from './controllers/well-known/host-meta.ts'; import { nodeInfoController, nodeInfoSchemaController } from './controllers/well-known/nodeinfo.ts'; @@ -60,6 +61,7 @@ app.use('*', logger()); app.get('/api/v1/streaming', streamingController); app.get('/api/v1/streaming/', streamingController); +app.get('/relay', relayController); app.use('*', cors({ origin: '*', exposeHeaders: ['link'] }), auth19, auth98()); diff --git a/src/client.ts b/src/client.ts index 378dc9c..9c5e74a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,4 +1,4 @@ -import { Author, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; +import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; import { type Event, type SignedEvent } from '@/event.ts'; import { Conf } from './config.ts'; @@ -29,17 +29,6 @@ function getPool(): Pool { return pool; } -type Filter = { - ids?: string[]; - kinds?: K[]; - authors?: string[]; - since?: number; - until?: number; - limit?: number; - search?: string; - [key: `#${string}`]: string[]; -}; - interface GetFilterOpts { timeout?: number; } diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 3aaa7b1..68bd815 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,39 @@ +import { type Filter } from '@/deps.ts'; +import { getFilters } from '@/db/events.ts'; +import { jsonSchema } from '@/schema.ts'; +import { clientMsgSchema, type ClientREQ } from '@/schemas/nostr.ts'; + import type { AppController } from '@/app.ts'; +function connectStream(socket: WebSocket) { + socket.onmessage = (e) => { + const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); + + if (!result.success) { + socket.send(JSON.stringify(['NOTICE', JSON.stringify(result.error.message)])); + return; + } + + const clientMsg = result.data; + + switch (clientMsg[0]) { + case 'REQ': + handleReq(clientMsg); + return; + default: + socket.send(JSON.stringify(['NOTICE', 'Unknown command.'])); + return; + } + }; + + async function handleReq([_, sub, ...filters]: ClientREQ) { + for (const event of await getFilters(filters as Filter[])) { + socket.send(JSON.stringify(['EVENT', sub, event])); + } + socket.send(JSON.stringify(['EOSE', sub])); + } +} + const relayController: AppController = (c) => { const upgrade = c.req.headers.get('upgrade'); @@ -9,6 +43,7 @@ const relayController: AppController = (c) => { const { socket, response } = Deno.upgradeWebSocket(c.req.raw); + connectStream(socket); return response; }; diff --git a/src/db/events.ts b/src/db/events.ts index 641149c..488b5c3 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -53,7 +53,15 @@ function insertEvent(event: SignedEvent): Promise { function getFilterQuery(filter: Filter) { let query = db .selectFrom('events') - .select(['id', 'kind', 'pubkey', 'content', 'tags', 'created_at', 'sig']) + .select([ + 'events.id', + 'events.kind', + 'events.pubkey', + 'events.content', + 'events.tags', + 'events.created_at', + 'events.sig', + ]) .orderBy('created_at', 'desc'); for (const key of Object.keys(filter)) { @@ -91,8 +99,8 @@ function getFilterQuery(filter: Filter) { return query; } -async function getFilters(filters: [Filter]): Promise[]>; async function getFilters(filters: Filter[]): Promise; +async function getFilters(filters: [Filter]): Promise[]>; async function getFilters(filters: Filter[]) { const queries = filters .map(getFilterQuery) diff --git a/src/schemas/nostr.ts b/src/schemas/nostr.ts index e5e710f..96344a2 100644 --- a/src/schemas/nostr.ts +++ b/src/schemas/nostr.ts @@ -27,18 +27,31 @@ const filterSchema = z.object({ since: z.number().int().positive().optional(), until: z.number().int().positive().optional(), limit: z.number().int().positive().optional(), -}).and(z.record( - z.custom<`#${string}`>((val) => typeof val === 'string' && val.startsWith('#')), - z.string().array(), -)); +}).passthrough().and( + z.record( + z.custom<`#${string}`>((val) => typeof val === 'string' && val.startsWith('#')), + z.string().array(), + ).catch({}), +); + +const clientReqSchema = z.tuple([z.literal('REQ'), z.string().min(1)]).rest(filterSchema); +const clientEventSchema = z.tuple([z.literal('EVENT'), signedEventSchema]); +const clientCloseSchema = z.tuple([z.literal('CLOSE'), z.string().min(1)]); /** Client message to a Nostr relay. */ const clientMsgSchema = z.union([ - z.tuple([z.literal('REQ'), z.string().min(1)]).rest(filterSchema), - z.tuple([z.literal('EVENT'), signedEventSchema]), - z.tuple([z.literal('CLOSE'), z.string().min(1)]), + clientReqSchema, + clientEventSchema, + clientCloseSchema, ]); +/** REQ message from client to relay. */ +type ClientREQ = z.infer; +/** EVENT message from client to relay. */ +type ClientEVENT = z.infer; +/** CLOSE message from client to relay. */ +type ClientCLOSE = z.infer; + /** Kind 0 content schema. */ const metaContentSchema = z.object({ name: z.string().optional().catch(undefined), @@ -52,4 +65,14 @@ const metaContentSchema = z.object({ /** Parses kind 0 content from a JSON string. */ const jsonMetaContentSchema = jsonSchema.pipe(metaContentSchema).catch({}); -export { clientMsgSchema, filterSchema, jsonMetaContentSchema, metaContentSchema, nostrIdSchema, signedEventSchema }; +export { + type ClientCLOSE, + type ClientEVENT, + clientMsgSchema, + type ClientREQ, + filterSchema, + jsonMetaContentSchema, + metaContentSchema, + nostrIdSchema, + signedEventSchema, +};