Merge branch 'internal-relay' into 'main'

Replace old subscription store with an InternalRelay class

See merge request soapbox-pub/ditto!186
This commit is contained in:
Alex Gleason 2024-04-26 00:01:27 +00:00
commit d8475e1b14
10 changed files with 165 additions and 196 deletions

View File

@ -16,7 +16,7 @@
"exclude": ["./public"], "exclude": ["./public"],
"imports": { "imports": {
"@/": "./src/", "@/": "./src/",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.12.1", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.14.2",
"@std/cli": "jsr:@std/cli@^0.223.0", "@std/cli": "jsr:@std/cli@^0.223.0",
"@std/json": "jsr:@std/json@^0.223.0", "@std/json": "jsr:@std/json@^0.223.0",
"@std/streams": "jsr:@std/streams@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0",

View File

@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { Debug } from '@/deps.ts'; import { Debug } from '@/deps.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { Sub } from '@/subs.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
import { eventsDB } from '@/storages.ts'; import { eventsDB } from '@/storages.ts';
import { Storages } from '@/storages.ts';
const debug = Debug('ditto:streaming'); const debug = Debug('ditto:streaming');
@ -38,6 +38,7 @@ const streamingController: AppController = (c) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol'); const token = c.req.header('sec-websocket-protocol');
const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream'));
const controller = new AbortController();
if (upgrade?.toLowerCase() !== 'websocket') { if (upgrade?.toLowerCase() !== 'websocket') {
return c.text('Please use websocket protocol', 400); return c.text('Please use websocket protocol', 400);
@ -63,33 +64,41 @@ const streamingController: AppController = (c) => {
socket.onopen = async () => { socket.onopen = async () => {
if (!stream) return; if (!stream) return;
const filter = await topicToFilter(stream, c.req.query(), pubkey);
if (filter) { const filter = await topicToFilter(stream, c.req.query(), pubkey);
for await (const event of Sub.sub(socket, '1', [filter])) { if (!filter) return;
if (event.kind === 6) {
await hydrateEvents({ try {
events: [event], for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) {
if (msg[0] === 'EVENT') {
const [event] = await hydrateEvents({
events: [msg[2]],
storage: eventsDB, storage: eventsDB,
signal: AbortSignal.timeout(1000), signal: AbortSignal.timeout(1000),
}); });
const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') }); if (event.kind === 1) {
if (status) {
send('update', status);
}
continue;
}
const status = await renderStatus(event, { viewerPubkey: pubkey }); const status = await renderStatus(event, { viewerPubkey: pubkey });
if (status) { if (status) {
send('update', status); send('update', status);
} }
} }
if (event.kind === 6) {
const status = await renderReblog(event, { viewerPubkey: pubkey });
if (status) {
send('update', status);
}
}
}
}
} catch (e) {
debug('streaming error:', e);
} }
}; };
socket.onclose = () => { socket.onclose = () => {
Sub.close(socket); controller.abort();
}; };
return response; return response;

View File

@ -1,8 +1,7 @@
import { NostrEvent, NostrFilter } from '@nostrify/nostrify'; import { NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
import { eventsDB } from '@/storages.ts'; import { eventsDB } from '@/storages.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { jsonSchema } from '@/schema.ts';
import { import {
type ClientCLOSE, type ClientCLOSE,
type ClientCOUNT, type ClientCOUNT,
@ -11,10 +10,10 @@ import {
clientMsgSchema, clientMsgSchema,
type ClientREQ, type ClientREQ,
} from '@/schemas/nostr.ts'; } from '@/schemas/nostr.ts';
import { purifyEvent } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts';
import { Sub } from '@/subs.ts';
import type { AppController } from '@/app.ts'; import type { AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
/** Limit of initial events returned for a subscription. */ /** Limit of initial events returned for a subscription. */
const FILTER_LIMIT = 100; const FILTER_LIMIT = 100;
@ -29,8 +28,10 @@ type RelayMsg =
/** Set up the Websocket connection. */ /** Set up the Websocket connection. */
function connectStream(socket: WebSocket) { function connectStream(socket: WebSocket) {
const controllers = new Map<string, AbortController>();
socket.onmessage = (e) => { socket.onmessage = (e) => {
const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); const result = n.json().pipe(clientMsgSchema).safeParse(e.data);
if (result.success) { if (result.success) {
handleMsg(result.data); handleMsg(result.data);
} else { } else {
@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) {
}; };
socket.onclose = () => { socket.onclose = () => {
Sub.close(socket); for (const controller of controllers.values()) {
controller.abort();
}
}; };
/** Handle client message. */ /** Handle client message. */
@ -64,14 +67,24 @@ function connectStream(socket: WebSocket) {
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> { async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
const filters = prepareFilters(rest); const filters = prepareFilters(rest);
const controller = new AbortController();
controllers.get(subId)?.abort();
controllers.set(subId, controller);
for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) {
send(['EVENT', subId, event]); send(['EVENT', subId, event]);
} }
send(['EOSE', subId]); send(['EOSE', subId]);
for await (const event of Sub.sub(socket, subId, filters)) { try {
send(['EVENT', subId, purifyEvent(event)]); for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) {
if (msg[0] === 'EVENT') {
send(['EVENT', subId, msg[2]]);
}
}
} catch (_e) {
controllers.delete(subId);
} }
} }
@ -93,7 +106,11 @@ function connectStream(socket: WebSocket) {
/** Handle CLOSE. Close the subscription. */ /** Handle CLOSE. Close the subscription. */
function handleClose([_, subId]: ClientCLOSE): void { function handleClose([_, subId]: ClientCLOSE): void {
Sub.unsub(socket, subId); const controller = controllers.get(subId);
if (controller) {
controller.abort();
controllers.delete(subId);
}
} }
/** Handle COUNT. Return the number of events matching the filters. */ /** Handle COUNT. Return the number of events matching the filters. */
@ -112,11 +129,12 @@ function connectStream(socket: WebSocket) {
/** Enforce the filters with certain criteria. */ /** Enforce the filters with certain criteria. */
function prepareFilters(filters: ClientREQ[2][]): NostrFilter[] { function prepareFilters(filters: ClientREQ[2][]): NostrFilter[] {
return filters.map((filter) => ({ return filters.map((filter) => {
...filter, const narrow = Boolean(filter.ids?.length || filter.authors?.length);
const search = narrow ? filter.search : `domain:${Conf.url.host} ${filter.search ?? ''}`;
// Return only local events unless the query is already narrow. // Return only local events unless the query is already narrow.
local: (filter.ids?.length || filter.authors?.length) ? undefined : true, return { ...filter, search };
})); });
} }
const relayController: AppController = (c, next) => { const relayController: AppController = (c, next) => {

View File

@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts';
import { DVM } from '@/pipeline/DVM.ts'; import { DVM } from '@/pipeline/DVM.ts';
import { updateStats } from '@/stats.ts'; import { updateStats } from '@/stats.ts';
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
import { cache, eventsDB, reqmeister } from '@/storages.ts'; import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts';
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts'; import { getTagSet } from '@/tags.ts';
import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) {
} }
/** Determine if the event is being received in a timely manner. */ /** Determine if the event is being received in a timely manner. */
const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10); function isFresh(event: NostrEvent): boolean {
return eventAge(event) < Time.seconds(10);
}
/** Distribute the event through active subscriptions. */ /** Distribute the event through active subscriptions. */
function streamOut(event: NostrEvent) { async function streamOut(event: NostrEvent): Promise<void> {
if (!isFresh(event)) return; if (isFresh(event)) {
await Storages.pubsub.event(event);
for (const sub of Sub.matches(event)) {
sub.stream(event);
} }
} }

View File

@ -6,7 +6,7 @@ import { Stickynotes } from '@/deps.ts';
import { connectResponseSchema } from '@/schemas/nostr.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts';
import { jsonSchema } from '@/schema.ts'; import { jsonSchema } from '@/schema.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts';
import { Sub } from '@/subs.ts'; import { Storages } from '@/storages.ts';
import { eventMatchesTemplate } from '@/utils.ts'; import { eventMatchesTemplate } from '@/utils.ts';
import { createAdminEvent } from '@/utils/api.ts'; import { createAdminEvent } from '@/utils/api.ts';
@ -78,16 +78,14 @@ export class APISigner implements NostrSigner {
messageId: string, messageId: string,
template: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>, template: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>,
): Promise<NostrEvent> { ): Promise<NostrEvent> {
const sub = Sub.sub(messageId, '1', [{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }]); const sub = Storages.pubsub.req(
[{ kinds: [24133], authors: [pubkey], '#p': [Conf.pubkey] }],
{ signal: this.#c.req.raw.signal },
);
const close = (): void => { for await (const msg of sub) {
Sub.close(messageId); if (msg[0] === 'EVENT') {
this.#c.req.raw.signal.removeEventListener('abort', close); const event = msg[2];
};
this.#c.req.raw.signal.addEventListener('abort', close);
for await (const event of sub) {
const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content); const decrypted = await new AdminSigner().nip04.decrypt(event.pubkey, event.content);
const result = jsonSchema const result = jsonSchema
@ -97,10 +95,10 @@ export class APISigner implements NostrSigner {
.safeParse(decrypted); .safeParse(decrypted);
if (result.success) { if (result.success) {
close();
return result.data.result; return result.data.result;
} }
} }
}
throw new HTTPException(408, { throw new HTTPException(408, {
res: this.#c.json({ id: 'ditto.timeout', error: 'Signing timeout' }), res: this.#c.json({ id: 'ditto.timeout', error: 'Signing timeout' }),

View File

@ -8,6 +8,7 @@ import { Optimizer } from '@/storages/optimizer.ts';
import { PoolStore } from '@/storages/pool-store.ts'; import { PoolStore } from '@/storages/pool-store.ts';
import { Reqmeister } from '@/storages/reqmeister.ts'; import { Reqmeister } from '@/storages/reqmeister.ts';
import { SearchStore } from '@/storages/search-store.ts'; import { SearchStore } from '@/storages/search-store.ts';
import { InternalRelay } from '@/storages/InternalRelay.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
/** Relay pool storage. */ /** Relay pool storage. */
@ -43,4 +44,16 @@ const searchStore = new SearchStore({
fallback: optimizer, fallback: optimizer,
}); });
export class Storages {
private static _pubsub: InternalRelay | undefined;
static get pubsub(): InternalRelay {
if (!this._pubsub) {
this._pubsub = new InternalRelay();
}
return this._pubsub;
}
}
export { cache, client, eventsDB, optimizer, reqmeister, searchStore }; export { cache, client, eventsDB, optimizer, reqmeister, searchStore };

View File

@ -0,0 +1,71 @@
// deno-lint-ignore-file require-await
import {
Machina,
NIP50,
NostrEvent,
NostrFilter,
NostrRelayCLOSED,
NostrRelayEOSE,
NostrRelayEVENT,
NRelay,
} from '@nostrify/nostrify';
import { matchFilter } from '@/deps.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
/**
* PubSub event store for streaming events within the application.
* The pipeline should push events to it, then anything in the application can subscribe to it.
*/
export class InternalRelay implements NRelay {
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
async *req(
filters: NostrFilter[],
opts?: { signal?: AbortSignal },
): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const id = crypto.randomUUID();
const machina = new Machina<NostrEvent>(opts?.signal);
yield ['EOSE', id];
this.subs.set(id, { filters, machina });
try {
for await (const event of machina) {
yield ['EVENT', id, event];
}
} finally {
this.subs.delete(id);
}
}
async event(event: DittoEvent): Promise<void> {
for (const { filters, machina } of this.subs.values()) {
for (const filter of filters) {
if (matchFilter(filter, event)) {
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
const domain = (tokens.find((t) =>
typeof t === 'object' && t.key === 'domain'
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain === event.author_domain) {
return machina.push(purifyEvent(event));
}
} else {
return machina.push(purifyEvent(event));
}
}
}
}
return Promise.resolve();
}
async query(): Promise<NostrEvent[]> {
return [];
}
}

View File

@ -1,84 +0,0 @@
import { NostrFilter } from '@nostrify/nostrify';
import { Debug } from '@/deps.ts';
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
import { Subscription } from '@/subscription.ts';
const debug = Debug('ditto:subs');
/**
* Manages Ditto event subscriptions.
* Subscriptions can be added, removed, and matched against events.
*/
class SubscriptionStore {
#store = new Map<unknown, Map<string, Subscription>>();
/**
* Add a subscription to the store, and then iterate over it.
*
* ```ts
* for (const event of Sub.sub(socket, subId, filters)) {
* console.log(event);
* }
* ```
*/
sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription {
debug('sub', id, JSON.stringify(filters));
let subs = this.#store.get(socket);
if (!subs) {
subs = new Map();
this.#store.set(socket, subs);
}
const sub = new Subscription(filters);
this.unsub(socket, id);
subs.set(id, sub as unknown as Subscription);
return sub;
}
/** Remove a subscription from the store. */
unsub(socket: unknown, id: string): void {
debug('unsub', id);
this.#store.get(socket)?.get(id)?.close();
this.#store.get(socket)?.delete(id);
}
/** Remove an entire socket. */
close(socket: unknown): void {
debug('close', (socket as any)?.constructor?.name);
const subs = this.#store.get(socket);
if (subs) {
for (const sub of subs.values()) {
sub.close();
}
}
this.#store.delete(socket);
}
/**
* Loop through matching subscriptions to stream out.
*
* ```ts
* for (const sub of Sub.matches(event, data)) {
* sub.stream(event);
* }
* ```
*/
*matches(event: DittoEvent): Iterable<Subscription> {
for (const subs of this.#store.values()) {
for (const sub of subs.values()) {
if (sub.matches(event)) {
yield sub;
}
}
}
}
}
const Sub = new SubscriptionStore();
export { Sub };

View File

@ -1,49 +0,0 @@
import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify';
import { Machina, matchFilter } from '@/deps.ts';
import { type DittoEvent } from '@/interfaces/DittoEvent.ts';
class Subscription implements AsyncIterable<NostrEvent> {
filters: NostrFilter[];
#machina: Machina<NostrEvent>;
constructor(filters: NostrFilter[]) {
this.filters = filters;
this.#machina = new Machina();
}
stream(event: NostrEvent): void {
this.#machina.push(event);
}
matches(event: DittoEvent): boolean {
for (const filter of this.filters) {
if (matchFilter(filter, event)) {
if (filter.search) {
const tokens = NIP50.parseInput(filter.search);
const domain = (tokens.find((t) =>
typeof t === 'object' && t.key === 'domain'
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain) {
return domain === event.author_domain;
}
}
return true;
}
}
return false;
}
close() {
this.#machina.close();
}
[Symbol.asyncIterator]() {
return this.#machina.stream();
}
}
export { Subscription };

View File

@ -18,12 +18,6 @@ type EventStub = TypeFest.SetOptional<EventTemplate, 'content' | 'created_at' |
/** Publish an event through the pipeline. */ /** Publish an event through the pipeline. */
async function createEvent(t: EventStub, c: AppContext): Promise<NostrEvent> { async function createEvent(t: EventStub, c: AppContext): Promise<NostrEvent> {
const pubkey = c.get('pubkey');
if (!pubkey) {
throw new HTTPException(401);
}
const signer = new APISigner(c); const signer = new APISigner(c);
const event = await signer.signEvent({ const event = await signer.signEvent({