Merge branch 'outbox' into 'main'
Deliver API events to outbox relays See merge request soapbox-pub/ditto!127
This commit is contained in:
commit
8c18a284b7
|
@ -9,7 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import { DVM } from '@/pipeline/DVM.ts';
|
import { DVM } from '@/pipeline/DVM.ts';
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts';
|
||||||
import { cache, client, eventsDB, reqmeister } from '@/storages.ts';
|
import { cache, eventsDB, reqmeister } from '@/storages.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts';
|
import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts';
|
||||||
|
@ -43,7 +43,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
|
||||||
processMedia(event),
|
processMedia(event),
|
||||||
payZap(event, signal),
|
payZap(event, signal),
|
||||||
streamOut(event),
|
streamOut(event),
|
||||||
broadcast(event, signal),
|
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,22 +256,6 @@ function streamOut(event: NostrEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Publish the event to other relays.
|
|
||||||
* This should only be done in certain circumstances, like mentioning a user or publishing deletions.
|
|
||||||
*/
|
|
||||||
async function broadcast(event: DittoEvent, signal: AbortSignal) {
|
|
||||||
if (!event.user || !isFresh(event)) return;
|
|
||||||
|
|
||||||
if (event.kind === 5) {
|
|
||||||
try {
|
|
||||||
await client.event(event, { signal });
|
|
||||||
} catch (e) {
|
|
||||||
debug(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** NIP-20 command line result. */
|
/** NIP-20 command line result. */
|
||||||
class RelayError extends Error {
|
class RelayError extends Error {
|
||||||
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {
|
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {
|
||||||
|
|
|
@ -11,6 +11,8 @@ import {
|
||||||
import { normalizeFilters } from '@/filter.ts';
|
import { normalizeFilters } from '@/filter.ts';
|
||||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||||
import { abortError } from '@/utils/abort.ts';
|
import { abortError } from '@/utils/abort.ts';
|
||||||
|
import { getRelays } from '@/utils/outbox.ts';
|
||||||
|
import { Conf } from '@/config.ts';
|
||||||
|
|
||||||
interface PoolStoreOpts {
|
interface PoolStoreOpts {
|
||||||
pool: InstanceType<typeof RelayPoolWorker>;
|
pool: InstanceType<typeof RelayPoolWorker>;
|
||||||
|
@ -34,12 +36,16 @@ class PoolStore implements NStore {
|
||||||
this.#publisher = opts.publisher;
|
this.#publisher = opts.publisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
event(event: NostrEvent, opts: NStoreOpts = {}): Promise<void> {
|
async event(event: NostrEvent, opts: NStoreOpts = {}): Promise<void> {
|
||||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||||
const { relays = this.#relays } = opts;
|
|
||||||
|
const relaySet = await getRelays(event.pubkey);
|
||||||
|
relaySet.delete(Conf.relay);
|
||||||
|
|
||||||
|
const relays = [...relaySet].slice(0, 4);
|
||||||
|
|
||||||
event = purifyEvent(event);
|
event = purifyEvent(event);
|
||||||
this.#debug('EVENT', event);
|
this.#debug('EVENT', event, relays);
|
||||||
|
|
||||||
this.#pool.publish(event, relays);
|
this.#pool.publish(event, relays);
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
|
|
|
@ -14,7 +14,7 @@ import {
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||||
import { APISigner } from '@/signers/APISigner.ts';
|
import { APISigner } from '@/signers/APISigner.ts';
|
||||||
import { eventsDB } from '@/storages.ts';
|
import { client, eventsDB } from '@/storages.ts';
|
||||||
import { nostrNow } from '@/utils.ts';
|
import { nostrNow } from '@/utils.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:api');
|
const debug = Debug('ditto:api');
|
||||||
|
@ -89,7 +89,10 @@ async function createAdminEvent(t: EventStub, c: AppContext): Promise<NostrEvent
|
||||||
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
|
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
|
||||||
debug('EVENT', event);
|
debug('EVENT', event);
|
||||||
try {
|
try {
|
||||||
await pipeline.handleEvent(event, c.req.raw.signal);
|
await Promise.all([
|
||||||
|
pipeline.handleEvent(event, c.req.raw.signal),
|
||||||
|
client.event(event),
|
||||||
|
]);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof pipeline.RelayError) {
|
if (e instanceof pipeline.RelayError) {
|
||||||
throw new HTTPException(422, {
|
throw new HTTPException(422, {
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
import { Conf } from '@/config.ts';
|
||||||
|
import { eventsDB } from '@/storages.ts';
|
||||||
|
|
||||||
|
export async function getRelays(pubkey: string): Promise<Set<string>> {
|
||||||
|
const relays = new Set<`wss://${string}`>();
|
||||||
|
|
||||||
|
const events = await eventsDB.query([
|
||||||
|
{ kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 },
|
||||||
|
]);
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
for (const [name, relay, marker] of event.tags) {
|
||||||
|
if (name === 'r' && (marker === 'write' || !marker)) {
|
||||||
|
try {
|
||||||
|
const url = new URL(relay);
|
||||||
|
if (url.protocol === 'wss:') {
|
||||||
|
relays.add(url.toString() as `wss://${string}`);
|
||||||
|
}
|
||||||
|
} catch (_e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return relays;
|
||||||
|
}
|
Loading…
Reference in New Issue