Merge branch 'pipeline' into 'develop'

Event Pipeline

See merge request soapbox-pub/ditto!14
This commit is contained in:
Alex Gleason 2023-08-18 03:25:04 +00:00
commit 4a61ce26a9
8 changed files with 134 additions and 84 deletions

View File

@ -1,5 +1,6 @@
import { Conf } from '@/config.ts';
import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts';
import { Time } from '@/utils.ts';
import type { GetFiltersOpts } from '@/types.ts';
@ -37,6 +38,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
Conf.poolRelays,
(event: Event | null) => {
if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {});
results.push({
id: event.id,
kind: event.kind,
@ -70,14 +72,4 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
});
}
/** Publish an event to the Nostr relay. */
function publish(event: Event, relays = Conf.publishRelays): void {
console.log('Publishing event', event, relays);
try {
getPool().publish(event, relays);
} catch (e) {
console.error(e);
}
}
export { getFilters, publish };
export { getFilters };

View File

@ -1,7 +1,7 @@
import { type AppController } from '@/app.ts';
import { type Filter, findReplyTag, z } from '@/deps.ts';
import { publish } from '@/client.ts';
import * as mixer from '@/mixer.ts';
import * as pipeline from '@/pipeline.ts';
import { getAuthor, getFollows } from '@/queries.ts';
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
import { signEvent } from '@/sign.ts';
@ -167,7 +167,13 @@ const updateCredentialsController: AppController = async (c) => {
created_at: nostrNow(),
}, c);
publish(event);
try {
await pipeline.handleEvent(event);
} catch (e) {
if (e instanceof pipeline.RelayError) {
return c.json({ error: e.message }, 422);
}
}
const account = await toAccount(event);
return c.json(account);

View File

@ -1,6 +1,6 @@
import { type AppController } from '@/app.ts';
import { publish } from '@/client.ts';
import { ISO6391, Kind, z } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts';
import { getAncestors, getDescendants, getEvent } from '@/queries.ts';
import { signEvent } from '@/sign.ts';
import { toStatus } from '@/transformers/nostr-to-mastoapi.ts';
@ -77,7 +77,13 @@ const createStatusController: AppController = async (c) => {
created_at: nostrNow(),
}, c);
publish(event);
try {
await pipeline.handleEvent(event);
} catch (e) {
if (e instanceof pipeline.RelayError) {
return c.json({ error: e.message }, 422);
}
}
return c.json(await toStatus(event));
} else {
@ -118,7 +124,13 @@ const favouriteController: AppController = async (c) => {
created_at: nostrNow(),
}, c);
publish(event);
try {
await pipeline.handleEvent(event);
} catch (e) {
if (e instanceof pipeline.RelayError) {
return c.json({ error: e.message }, 422);
}
}
const status = await toStatus(target);

View File

@ -1,5 +1,5 @@
import * as eventsDB from '@/db/events.ts';
import { findUser } from '@/db/users.ts';
import * as pipeline from '@/pipeline.ts';
import { jsonSchema } from '@/schema.ts';
import {
type ClientCLOSE,
@ -15,12 +15,14 @@ import type { Event, Filter } from '@/deps.ts';
/** Limit of events returned per-filter. */
const FILTER_LIMIT = 100;
/** NIP-01 relay to client message. */
type RelayMsg =
| ['EVENT', string, Event]
| ['NOTICE', string]
| ['EOSE', string]
| ['OK', string, boolean, string];
/** Set up the Websocket connection. */
function connectStream(socket: WebSocket) {
socket.onmessage = (e) => {
const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data);
@ -31,6 +33,7 @@ function connectStream(socket: WebSocket) {
}
};
/** Handle client message. */
function handleMsg(msg: ClientMsg) {
switch (msg[0]) {
case 'REQ':
@ -45,6 +48,7 @@ function connectStream(socket: WebSocket) {
}
}
/** Handle REQ. Start a subscription. */
async function handleReq([_, sub, ...filters]: ClientREQ) {
for (const event of await eventsDB.getFilters(prepareFilters(filters))) {
send(['EVENT', sub, event]);
@ -52,20 +56,28 @@ function connectStream(socket: WebSocket) {
send(['EOSE', sub]);
}
/** Handle EVENT. Store the event. */
async function handleEvent([_, event]: ClientEVENT) {
if (await findUser({ pubkey: event.pubkey })) {
eventsDB.insertEvent(event);
try {
// This will store it (if eligible) and run other side-effects.
await pipeline.handleEvent(event);
send(['OK', event.id, true, '']);
} else {
send(['OK', event.id, false, 'blocked: only registered users can post']);
} catch (e) {
if (e instanceof pipeline.RelayError) {
send(['OK', event.id, false, e.message]);
} else {
send(['OK', event.id, false, 'error: something went wrong']);
}
}
}
/** Handle CLOSE. Close the subscription. */
function handleClose([_, _sub]: ClientCLOSE) {
// TODO: ???
return;
}
/** Send a message back to the client. */
function send(msg: RelayMsg) {
return socket.send(JSON.stringify(msg));
}

View File

@ -119,16 +119,4 @@ async function getFilters<K extends number>(
));
}
/** Returns whether the pubkey is followed by a local user. */
async function isLocallyFollowed(pubkey: string): Promise<boolean> {
return Boolean(
await getFilterQuery({
kinds: [3],
'#p': [pubkey],
limit: 1,
local: true,
}).executeTakeFirst(),
);
}
export { getFilters, insertEvent, isLocallyFollowed };
export { getFilters, insertEvent };

View File

@ -1,9 +1,8 @@
import { insertEvent, isLocallyFollowed } from '@/db/events.ts';
import { addRelays, getActiveRelays } from '@/db/relays.ts';
import { findUser } from '@/db/users.ts';
import { getActiveRelays } from '@/db/relays.ts';
import { type Event, RelayPool } from '@/deps.ts';
import { trends } from '@/trends.ts';
import { isRelay, nostrDate, nostrNow } from '@/utils.ts';
import { nostrNow } from '@/utils.ts';
import * as pipeline from './pipeline.ts';
const relays = await getActiveRelays();
const pool = new RelayPool(relays);
@ -20,48 +19,10 @@ pool.subscribe(
);
/** Handle events through the firehose pipeline. */
async function handleEvent(event: Event): Promise<void> {
function handleEvent(event: Event): Promise<void> {
console.info(`firehose: Event<${event.kind}> ${event.id}`);
trackHashtags(event);
trackRelays(event);
if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) {
insertEvent(event).catch(console.warn);
}
}
/** Track whenever a hashtag is used, for processing trending tags. */
function trackHashtags(event: Event): void {
const date = nostrDate(event.created_at);
const tags = event.tags
.filter((tag) => tag[0] === 't')
.map((tag) => tag[1])
.slice(0, 5);
if (!tags.length) return;
try {
console.info('tracking tags:', tags);
trends.addTagUsages(event.pubkey, tags, date);
} catch (_e) {
// do nothing
}
}
/** Tracks known relays in the database. */
function trackRelays(event: Event) {
const relays = new Set<`wss://${string}`>();
event.tags.forEach((tag) => {
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
relays.add(tag[2]);
}
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
relays.add(tag[1]);
}
});
return addRelays([...relays]);
return pipeline
.handleEvent(event)
.catch(() => {});
}

73
src/pipeline.ts Normal file
View File

@ -0,0 +1,73 @@
import * as eventsDB from '@/db/events.ts';
import { addRelays } from '@/db/relays.ts';
import { findUser } from '@/db/users.ts';
import { type Event } from '@/deps.ts';
import { isLocallyFollowed } from '@/queries.ts';
import { trends } from '@/trends.ts';
import { isRelay, nostrDate } from '@/utils.ts';
/**
* Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event.
*/
async function handleEvent(event: Event): Promise<void> {
await Promise.all([
storeEvent(event),
trackRelays(event),
trackHashtags(event),
]);
}
/** Maybe store the event, if eligible. */
async function storeEvent(event: Event): Promise<void> {
if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) {
await eventsDB.insertEvent(event).catch(console.warn);
} else {
return Promise.reject(new RelayError('blocked', 'only registered users can post'));
}
}
/** Track whenever a hashtag is used, for processing trending tags. */
// deno-lint-ignore require-await
async function trackHashtags(event: Event): Promise<void> {
const date = nostrDate(event.created_at);
const tags = event.tags
.filter((tag) => tag[0] === 't')
.map((tag) => tag[1])
.slice(0, 5);
if (!tags.length) return;
try {
console.info('tracking tags:', tags);
trends.addTagUsages(event.pubkey, tags, date);
} catch (_e) {
// do nothing
}
}
/** Tracks known relays in the database. */
function trackRelays(event: Event) {
const relays = new Set<`wss://${string}`>();
event.tags.forEach((tag) => {
if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) {
relays.add(tag[2]);
}
if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) {
relays.add(tag[1]);
}
});
return addRelays([...relays]);
}
/** NIP-20 command line result. */
class RelayError extends Error {
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {
super(`${prefix}: ${message}`);
}
}
export { handleEvent, RelayError };

View File

@ -1,8 +1,8 @@
import * as eventsDB from '@/db/events.ts';
import { type Event, type Filter, findReplyTag } from '@/deps.ts';
import * as mixer from '@/mixer.ts';
import { type PaginationParams } from '@/utils.ts';
import * as mixer from './mixer.ts';
interface GetEventOpts<K extends number> {
/** Timeout in milliseconds. */
timeout?: number;
@ -83,4 +83,10 @@ function getDescendants(eventId: string): Promise<Event<1>[]> {
return mixer.getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 });
}
export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed };
/** Returns whether the pubkey is followed by a local user. */
async function isLocallyFollowed(pubkey: string): Promise<boolean> {
const [event] = await eventsDB.getFilters([{ kinds: [3], '#p': [pubkey], local: true }], { limit: 1 });
return Boolean(event);
}
export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed, isLocallyFollowed };