Merge branch 'public-stream' into 'develop'

MastoAPI public stream

See merge request soapbox-pub/ditto!21
This commit is contained in:
Alex Gleason 2023-08-26 01:19:14 +00:00
commit 4f12e067fc
10 changed files with 151 additions and 74 deletions

View File

@ -3,7 +3,7 @@ import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/de
import * as pipeline from '@/pipeline.ts';
import { Time } from '@/utils.ts';
import type { GetFiltersOpts } from '@/types.ts';
import type { GetFiltersOpts } from '@/filter.ts';
type Pool = InstanceType<typeof RelayPool>;

View File

@ -1,6 +1,9 @@
import { AppController } from '@/app.ts';
import { type DittoFilter } from '@/filter.ts';
import { TOKEN_REGEX } from '@/middleware/auth19.ts';
import { streamSchema, ws } from '@/stream.ts';
import { Sub } from '@/subs.ts';
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
import { bech32ToPubkey } from '@/utils.ts';
const streamingController: AppController = (c) => {
@ -29,21 +32,47 @@ const streamingController: AppController = (c) => {
pubkey: bech32ToPubkey(match[1]),
};
socket.addEventListener('open', () => {
console.log('websocket: connection opened');
if (stream) {
ws.subscribe(conn, { stream });
function send(name: string, payload: object) {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({
event: name,
payload: JSON.stringify(payload),
stream: [stream],
}));
}
}
});
socket.addEventListener('message', (e) => console.log('websocket message: ', e.data));
socket.onopen = async () => {
if (!stream) return;
socket.addEventListener('close', () => {
console.log('websocket: connection closed');
ws.subscribe(conn, { stream });
const filter = topicToFilter(stream);
if (filter) {
for await (const event of Sub.sub(socket, '1', [filter])) {
const status = await toStatus(event);
if (status) {
send('update', status);
}
}
}
};
socket.onclose = () => {
ws.unsubscribeAll(socket);
});
};
return response;
};
function topicToFilter(topic: string): DittoFilter<1> | undefined {
switch (topic) {
case 'public':
return { kinds: [1] };
case 'public:local':
return { kinds: [1], local: true };
}
}
export { streamingController };

View File

@ -54,20 +54,18 @@ function connectStream(socket: WebSocket) {
}
/** Handle REQ. Start a subscription. */
async function handleReq([_, subId, ...filters]: ClientREQ): Promise<void> {
const prepared = prepareFilters(filters);
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
const filters = prepareFilters(rest);
for (const event of await eventsDB.getFilters(prepared)) {
for (const event of await eventsDB.getFilters(filters)) {
send(['EVENT', subId, event]);
}
send(['EOSE', subId]);
Sub.sub({
id: subId,
filters: prepared,
socket,
});
for await (const event of Sub.sub(socket, subId, filters)) {
send(['EVENT', subId, event]);
}
}
/** Handle EVENT. Store the event. */
@ -87,12 +85,14 @@ function connectStream(socket: WebSocket) {
/** Handle CLOSE. Close the subscription. */
function handleClose([_, subId]: ClientCLOSE): void {
Sub.unsub({ id: subId, socket });
Sub.unsub(socket, subId);
}
/** Send a message back to the client. */
function send(msg: RelayMsg): void {
return socket.send(JSON.stringify(msg));
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(msg));
}
}
}

View File

@ -1,7 +1,7 @@
import { db, type TagRow } from '@/db.ts';
import { type Event, type Insertable } from '@/deps.ts';
import type { DittoFilter, GetFiltersOpts } from '@/types.ts';
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
type TagCondition = ({ event, count }: { event: Event; count: number }) => boolean;

View File

@ -1,6 +1,19 @@
import { type Event, matchFilters } from '@/deps.ts';
import { type Event, type Filter, matchFilters } from '@/deps.ts';
import type { DittoFilter, EventData } from '@/types.ts';
import type { EventData } from '@/types.ts';
/** Custom filter interface that extends Nostr filters with extra options for Ditto. */
interface DittoFilter<K extends number = number> extends Filter<K> {
local?: boolean;
}
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */
timeout?: number;
/** Event limit for the whole subscription. */
limit?: number;
}
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
if (filter.local && !data.user) {
@ -24,4 +37,4 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
return false;
}
export { matchDittoFilters };
export { type DittoFilter, type GetFiltersOpts, matchDittoFilters };

View File

@ -4,7 +4,7 @@ import * as client from '@/client.ts';
import * as eventsDB from '@/db/events.ts';
import { eventDateComparator } from '@/utils.ts';
import type { DittoFilter, GetFiltersOpts } from '@/types.ts';
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
/** Get filters from the database and pool, and mix the best results together. */
async function getFilters<K extends number>(

View File

@ -93,8 +93,8 @@ const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - T
function streamOut(event: Event, data: EventData) {
if (!isFresh(event)) return;
for (const { socket, id } of Sub.matches(event, data)) {
socket.send(JSON.stringify(['EVENT', id, event]));
for (const sub of Sub.matches(event, data)) {
sub.stream(event);
}
}

View File

@ -1,52 +1,57 @@
import { type Event } from '@/deps.ts';
import { matchDittoFilters } from './filter.ts';
import { Subscription } from '@/subscription.ts';
import type { DittoFilter, EventData } from '@/types.ts';
/** Nostr subscription to receive realtime events. */
interface Subscription {
/** User-defined NIP-01 subscription ID. */
id: string;
/** Event filters for the subscription. */
filters: DittoFilter[];
/** WebSocket to deliver results to. */
socket: WebSocket;
}
import type { DittoFilter } from '@/filter.ts';
import type { EventData } from '@/types.ts';
/**
* Manages Ditto event subscriptions.
*
* Subscriptions can be added, removed, and matched against events.
*
* ```ts
* for (const sub of Sub.matches(event)) {
* // Send event to sub.socket
* sub.socket.send(JSON.stringify(event));
* }
* ```
*/
class SubscriptionStore {
#store = new Map<WebSocket, Map<string, Subscription>>();
/** Add a subscription to the store. */
sub(data: Subscription): void {
let subs = this.#store.get(data.socket);
/**
* Add a subscription to the store, and then iterate over it.
*
* ```ts
* for (const event of Sub.sub(socket, subId, filters)) {
* console.log(event);
* }
* ```
*/
sub<K extends number>(socket: WebSocket, id: string, filters: DittoFilter<K>[]): Subscription<K> {
let subs = this.#store.get(socket);
if (!subs) {
subs = new Map();
this.#store.set(data.socket, subs);
this.#store.set(socket, subs);
}
subs.set(data.id, data);
const sub = new Subscription(filters);
this.unsub(socket, id);
subs.set(id, sub as unknown as Subscription);
return sub;
}
/** Remove a subscription from the store. */
unsub(sub: Pick<Subscription, 'socket' | 'id'>): void {
this.#store.get(sub.socket)?.delete(sub.id);
unsub(socket: WebSocket, id: string): void {
this.#store.get(socket)?.get(id)?.close();
this.#store.get(socket)?.delete(id);
}
/** Remove an entire socket. */
close(socket: WebSocket): void {
const subs = this.#store.get(socket);
if (subs) {
for (const sub of subs.values()) {
sub.close();
}
}
this.#store.delete(socket);
}
@ -54,16 +59,15 @@ class SubscriptionStore {
* Loop through matching subscriptions to stream out.
*
* ```ts
* for (const sub of Sub.matches(event)) {
* // Send event to sub.socket
* sub.socket.send(JSON.stringify(event));
* for (const sub of Sub.matches(event, data)) {
* sub.stream(event);
* }
* ```
*/
*matches(event: Event, data: EventData): Iterable<Subscription> {
for (const subs of this.#store.values()) {
for (const sub of subs.values()) {
if (matchDittoFilters(sub.filters, event, data)) {
if (sub.matches(event, data)) {
yield sub;
}
}

46
src/subscription.ts Normal file
View File

@ -0,0 +1,46 @@
import { type Event } from '@/deps.ts';
import { type DittoFilter, matchDittoFilters } from '@/filter.ts';
import type { EventData } from '@/types.ts';
class Subscription<K extends number = number> implements AsyncIterable<Event<K>> {
filters: DittoFilter<K>[];
#next?: (event: Event<K>) => void;
#closed = false;
constructor(filters: DittoFilter<K>[]) {
this.filters = filters;
}
stream(event: Event<K>): void {
if (this.#next) {
this.#next(event);
this.#next = undefined;
}
}
matches(event: Event, data: EventData): boolean {
return matchDittoFilters(this.filters, event, data);
}
close() {
this.#closed = true;
this.#next?.(undefined!);
}
async *[Symbol.asyncIterator]() {
while (true) {
const event = await new Promise<Event<K>>((resolve) => {
this.#next = resolve;
});
if (this.#closed) {
return;
}
yield event;
}
}
}
export { Subscription };

View File

@ -1,21 +1,6 @@
import { UserRow } from '@/db.ts';
import { type Filter } from '@/deps.ts';
/** Custom filter interface that extends Nostr filters with extra options for Ditto. */
interface DittoFilter<K extends number = number> extends Filter<K> {
local?: boolean;
}
/** Additional options to apply to the whole subscription. */
interface GetFiltersOpts {
/** How long to wait (in milliseconds) until aborting the request. */
timeout?: number;
/** Event limit for the whole subscription. */
limit?: number;
}
interface EventData {
user: UserRow | undefined;
}
export type { DittoFilter, EventData, GetFiltersOpts };
export type { EventData };