Relay: make REQ work (doesn't stream yet)

This commit is contained in:
Alex Gleason 2023-08-12 13:40:21 -05:00
parent e2adc7ad1a
commit 808e8941b6
No known key found for this signature in database
GPG Key ID: 7211D1F99744FBB7
5 changed files with 79 additions and 22 deletions

View File

@ -29,6 +29,7 @@ import {
} from './controllers/api/statuses.ts';
import { streamingController } from './controllers/api/streaming.ts';
import { trendingTagsController } from './controllers/api/trends.ts';
import { relayController } from './controllers/nostr/relay.ts';
import { indexController } from './controllers/site.ts';
import { hostMetaController } from './controllers/well-known/host-meta.ts';
import { nodeInfoController, nodeInfoSchemaController } from './controllers/well-known/nodeinfo.ts';
@ -60,6 +61,7 @@ app.use('*', logger());
app.get('/api/v1/streaming', streamingController);
app.get('/api/v1/streaming/', streamingController);
app.get('/relay', relayController);
app.use('*', cors({ origin: '*', exposeHeaders: ['link'] }), auth19, auth98());

View File

@ -1,4 +1,4 @@
import { Author, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts';
import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts';
import { type Event, type SignedEvent } from '@/event.ts';
import { Conf } from './config.ts';
@ -29,17 +29,6 @@ function getPool(): Pool {
return pool;
}
type Filter<K extends number = number> = {
ids?: string[];
kinds?: K[];
authors?: string[];
since?: number;
until?: number;
limit?: number;
search?: string;
[key: `#${string}`]: string[];
};
interface GetFilterOpts {
timeout?: number;
}

View File

@ -1,5 +1,39 @@
import { type Filter } from '@/deps.ts';
import { getFilters } from '@/db/events.ts';
import { jsonSchema } from '@/schema.ts';
import { clientMsgSchema, type ClientREQ } from '@/schemas/nostr.ts';
import type { AppController } from '@/app.ts';
function connectStream(socket: WebSocket) {
socket.onmessage = (e) => {
const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data);
if (!result.success) {
socket.send(JSON.stringify(['NOTICE', JSON.stringify(result.error.message)]));
return;
}
const clientMsg = result.data;
switch (clientMsg[0]) {
case 'REQ':
handleReq(clientMsg);
return;
default:
socket.send(JSON.stringify(['NOTICE', 'Unknown command.']));
return;
}
};
async function handleReq([_, sub, ...filters]: ClientREQ) {
for (const event of await getFilters(filters as Filter[])) {
socket.send(JSON.stringify(['EVENT', sub, event]));
}
socket.send(JSON.stringify(['EOSE', sub]));
}
}
const relayController: AppController = (c) => {
const upgrade = c.req.headers.get('upgrade');
@ -9,6 +43,7 @@ const relayController: AppController = (c) => {
const { socket, response } = Deno.upgradeWebSocket(c.req.raw);
connectStream(socket);
return response;
};

View File

@ -53,7 +53,15 @@ function insertEvent(event: SignedEvent): Promise<void> {
function getFilterQuery(filter: Filter) {
let query = db
.selectFrom('events')
.select(['id', 'kind', 'pubkey', 'content', 'tags', 'created_at', 'sig'])
.select([
'events.id',
'events.kind',
'events.pubkey',
'events.content',
'events.tags',
'events.created_at',
'events.sig',
])
.orderBy('created_at', 'desc');
for (const key of Object.keys(filter)) {
@ -91,8 +99,8 @@ function getFilterQuery(filter: Filter) {
return query;
}
async function getFilters<K extends number>(filters: [Filter<K>]): Promise<SignedEvent<K>[]>;
async function getFilters(filters: Filter[]): Promise<SignedEvent[]>;
async function getFilters<K extends number>(filters: [Filter<K>]): Promise<SignedEvent<K>[]>;
async function getFilters(filters: Filter[]) {
const queries = filters
.map(getFilterQuery)

View File

@ -27,18 +27,31 @@ const filterSchema = z.object({
since: z.number().int().positive().optional(),
until: z.number().int().positive().optional(),
limit: z.number().int().positive().optional(),
}).and(z.record(
}).passthrough().and(
z.record(
z.custom<`#${string}`>((val) => typeof val === 'string' && val.startsWith('#')),
z.string().array(),
));
).catch({}),
);
const clientReqSchema = z.tuple([z.literal('REQ'), z.string().min(1)]).rest(filterSchema);
const clientEventSchema = z.tuple([z.literal('EVENT'), signedEventSchema]);
const clientCloseSchema = z.tuple([z.literal('CLOSE'), z.string().min(1)]);
/** Client message to a Nostr relay. */
const clientMsgSchema = z.union([
z.tuple([z.literal('REQ'), z.string().min(1)]).rest(filterSchema),
z.tuple([z.literal('EVENT'), signedEventSchema]),
z.tuple([z.literal('CLOSE'), z.string().min(1)]),
clientReqSchema,
clientEventSchema,
clientCloseSchema,
]);
/** REQ message from client to relay. */
type ClientREQ = z.infer<typeof clientReqSchema>;
/** EVENT message from client to relay. */
type ClientEVENT = z.infer<typeof clientEventSchema>;
/** CLOSE message from client to relay. */
type ClientCLOSE = z.infer<typeof clientCloseSchema>;
/** Kind 0 content schema. */
const metaContentSchema = z.object({
name: z.string().optional().catch(undefined),
@ -52,4 +65,14 @@ const metaContentSchema = z.object({
/** Parses kind 0 content from a JSON string. */
const jsonMetaContentSchema = jsonSchema.pipe(metaContentSchema).catch({});
export { clientMsgSchema, filterSchema, jsonMetaContentSchema, metaContentSchema, nostrIdSchema, signedEventSchema };
export {
type ClientCLOSE,
type ClientEVENT,
clientMsgSchema,
type ClientREQ,
filterSchema,
jsonMetaContentSchema,
metaContentSchema,
nostrIdSchema,
signedEventSchema,
};