Merge branch 'delete' into 'develop'

Process deletions

Closes #63

See merge request soapbox-pub/ditto!36
This commit is contained in:
Alex Gleason 2023-09-06 17:34:47 +00:00
commit 4b4bfd48c7
No known key found for this signature in database
17 changed files with 293 additions and 69 deletions

View File

@ -1,3 +1,4 @@
import '@/cron.ts';
import {
type Context,
cors,

View File

@ -1,32 +1,9 @@
import { Conf } from '@/config.ts';
import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts';
import { type Event, type Filter, matchFilters } from '@/deps.ts';
import * as pipeline from '@/pipeline.ts';
import { Time } from '@/utils.ts';
import { allRelays, pool } from '@/pool.ts';
import type { GetFiltersOpts } from '@/filter.ts';
type Pool = InstanceType<typeof RelayPool>;
/** HACK: Websockets in Deno are finnicky... get a new pool every 30 minutes. */
const poolCache = new TTLCache<0, Pool>({
ttl: Time.minutes(30),
max: 2,
dispose: (pool) => {
console.log('Closing pool.');
pool.close();
},
});
function getPool(): Pool {
const cached = poolCache.get(0);
if (cached !== undefined) return cached;
console.log('Creating new pool.');
const pool = new RelayPool(Conf.poolRelays);
poolCache.set(0, pool);
return pool;
}
/** Get events from a NIP-01 filter. */
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
if (!filters.length) return Promise.resolve([]);
@ -34,9 +11,9 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
let tid: number;
const results: Event[] = [];
const unsub = getPool().subscribe(
const unsub = pool.subscribe(
filters,
Conf.poolRelays,
allRelays,
(event: Event | null) => {
if (event && matchFilters(filters, event)) {
pipeline.handleEvent(event).catch(() => {});

View File

@ -58,14 +58,6 @@ const Conf = {
get adminEmail() {
return Deno.env.get('ADMIN_EMAIL') || 'webmaster@localhost';
},
/** @deprecated Use relays from the database instead. */
get poolRelays() {
return (Deno.env.get('RELAY_POOL') || '').split(',').filter(Boolean);
},
/** @deprecated Publish only to the local relay unless users are mentioned, then try to also send to the relay of those users. Deletions should also be fanned out. */
get publishRelays() {
return ['wss://relay.mostr.pub'];
},
/** Domain of the Ditto server as a `URL` object, for easily grabbing the `hostname`, etc. */
get url() {
return new URL(Conf.localDomain);

19
src/cron.ts Normal file
View File

@ -0,0 +1,19 @@
import * as eventsDB from '@/db/events.ts';
import { cron } from '@/deps.ts';
import { Time } from '@/utils/time.ts';
/** Clean up old remote events. */
async function cleanupEvents() {
console.log('Cleaning up old remote events...');
const [result] = await eventsDB.deleteFilters([{
until: Math.floor((Date.now() - Time.days(7)) / 1000),
local: false,
}]);
console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`);
}
await cleanupEvents();
cron.every15Minute(cleanupEvents);

View File

@ -29,9 +29,7 @@ interface EventFTSRow {
interface TagRow {
tag: string;
value_1: string | null;
value_2: string | null;
value_3: string | null;
value: string;
event_id: string;
}
@ -63,8 +61,26 @@ const migrator = new Migrator({
}),
});
console.log('Running migrations...');
const results = await migrator.migrateToLatest();
console.log('Migrations finished:', results);
/** Migrate the database to the latest version. */
async function migrate() {
console.log('Running migrations...');
const results = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.log('Everything up-to-date.');
} else {
console.log('Migrations finished!');
for (const { migrationName, status } of results.results) {
console.log(` - ${migrationName}: ${status}`);
}
}
}
}
await migrate();
export { db, type DittoDB, type EventRow, type TagRow, type UserRow };

View File

@ -1,7 +1,14 @@
import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' };
import { assertEquals } from '@/deps-test.ts';
import { getFilters, insertEvent } from './events.ts';
import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts';
import { insertUser } from '@/db/users.ts';
Deno.test('count filters', async () => {
assertEquals(await countFilters([{ kinds: [1] }]), 0);
await insertEvent(event55920b75);
assertEquals(await countFilters([{ kinds: [1] }]), 1);
});
Deno.test('insert and filter events', async () => {
await insertEvent(event55920b75);
@ -15,3 +22,28 @@ Deno.test('insert and filter events', async () => {
[event55920b75],
);
});
Deno.test('delete events', async () => {
await insertEvent(event55920b75);
assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]);
await deleteFilters([{ kinds: [1] }]);
assertEquals(await getFilters([{ kinds: [1] }]), []);
});
Deno.test('query events with local filter', async () => {
await insertEvent(event55920b75);
assertEquals(await getFilters([{}]), [event55920b75]);
assertEquals(await getFilters([{ local: true }]), []);
assertEquals(await getFilters([{ local: false }]), [event55920b75]);
await insertUser({
username: 'alex',
pubkey: event55920b75.pubkey,
inserted_at: new Date(),
admin: 0,
});
assertEquals(await getFilters([{ local: true }]), [event55920b75]);
assertEquals(await getFilters([{ local: false }]), []);
});

View File

@ -34,17 +34,14 @@ function insertEvent(event: Event): Promise<void> {
}
const tagCounts: Record<string, number> = {};
const tags = event.tags.reduce<Insertable<TagRow>[]>((results, tag) => {
const tagName = tag[0];
tagCounts[tagName] = (tagCounts[tagName] || 0) + 1;
const tags = event.tags.reduce<Insertable<TagRow>[]>((results, [name, value]) => {
tagCounts[name] = (tagCounts[name] || 0) + 1;
if (tagConditions[tagName]?.({ event, count: tagCounts[tagName] - 1 })) {
if (value && tagConditions[name]?.({ event, count: tagCounts[name] - 1 })) {
results.push({
event_id: event.id,
tag: tagName,
value_1: tag[1] || null,
value_2: tag[2] || null,
value_3: tag[3] || null,
tag: name,
value,
});
}
@ -111,12 +108,14 @@ function getFilterQuery(filter: DittoFilter) {
query = query
.leftJoin('tags', 'tags.event_id', 'events.id')
.where('tags.tag', '=', tag)
.where('tags.value_1', 'in', value) as typeof query;
.where('tags.value', 'in', value) as typeof query;
}
}
if (filter.local) {
query = query.innerJoin('users', 'users.pubkey', 'events.pubkey');
if (typeof filter.local === 'boolean') {
query = filter.local
? query.innerJoin('users', 'users.pubkey', 'events.pubkey') as typeof query
: query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null) as typeof query;
}
if (filter.search) {
@ -128,13 +127,20 @@ function getFilterQuery(filter: DittoFilter) {
return query;
}
/** Combine filter queries into a single union query. */
function getFiltersQuery(filters: DittoFilter[]) {
return filters
.map(getFilterQuery)
.reduce((result, query) => result.union(query));
}
/** Get events for filters from the database. */
async function getFilters<K extends number>(
filters: DittoFilter<K>[],
opts: GetFiltersOpts = {},
): Promise<Event<K>[]> {
if (!filters.length) return Promise.resolve([]);
let query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr));
let query = getFiltersQuery(filters);
if (typeof opts.limit === 'number') {
query = query.limit(opts.limit);
@ -145,10 +151,27 @@ async function getFilters<K extends number>(
));
}
/** Delete events based on filters from the database. */
function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
if (!filters.length) return Promise.resolve([]);
return db.transaction().execute(async (trx) => {
const query = getFiltersQuery(filters).clearSelect().select('id');
await trx.deleteFrom('events_fts')
.where('id', 'in', () => query)
.execute();
return trx.deleteFrom('events')
.where('id', 'in', () => query)
.execute();
});
}
/** Get number of events that would be returned by filters. */
async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
if (!filters.length) return Promise.resolve(0);
const query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr));
const query = getFiltersQuery(filters);
const [{ count }] = await query
.clearSelect()
@ -176,4 +199,4 @@ function buildUserSearchContent(event: Event<0>): string {
return [name, nip05, about].filter(Boolean).join('\n');
}
export { countFilters, getFilters, insertEvent };
export { countFilters, deleteFilters, getFilters, insertEvent };

View File

@ -8,5 +8,5 @@ export async function up(db: Kysely<any>): Promise<void> {
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('relays').execute();
await db.schema.alterTable('users').dropColumn('admin').execute();
}

View File

@ -0,0 +1,20 @@
import { Kysely } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createIndex('idx_users_pubkey')
.on('users')
.column('pubkey')
.execute();
await db.schema
.createIndex('idx_users_username')
.on('users')
.column('username')
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex('idx_users_pubkey').execute();
await db.schema.dropIndex('idx_users_username').execute();
}

View File

@ -0,0 +1,68 @@
import { Kysely, sql } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('tags_new')
.addColumn('tag', 'text', (col) => col.notNull())
.addColumn('value', 'text', (col) => col.notNull())
.addColumn('event_id', 'text', (col) => col.references('events.id').onDelete('cascade'))
.execute();
await sql`
INSERT INTO tags_new (tag, value, event_id)
SELECT tag, value_1 as value, event_id
FROM tags
WHERE value_1 IS NOT NULL
`.execute(db);
await db.schema
.dropTable('tags')
.execute();
await db.schema
.alterTable('tags_new')
.renameTo('tags').execute();
await db.schema
.createIndex('idx_tags_tag')
.on('tags')
.column('tag')
.execute();
await db.schema
.createIndex('idx_tags_value')
.on('tags')
.column('value')
.execute();
await db.schema
.createIndex('idx_tags_event_id')
.on('tags')
.column('event_id')
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('tags').execute();
await db.schema
.createTable('tags')
.addColumn('tag', 'text', (col) => col.notNull())
.addColumn('value_1', 'text')
.addColumn('value_2', 'text')
.addColumn('value_3', 'text')
.addColumn('event_id', 'text', (col) => col.notNull())
.execute();
await db.schema
.createIndex('idx_tags_tag')
.on('tags')
.column('tag')
.execute();
await db.schema
.createIndex('idx_tags_value_1')
.on('tags')
.column('value_1')
.execute();
}

View File

@ -0,0 +1,12 @@
import { Kysely, sql } from '@/deps.ts';
export async function up(db: Kysely<any>): Promise<void> {
await sql`PRAGMA foreign_keys = ON`.execute(db);
await sql`PRAGMA auto_vacuum = FULL`.execute(db);
await sql`VACUUM`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`PRAGMA foreign_keys = OFF`.execute(db);
await sql`PRAGMA auto_vacuum = NONE`.execute(db);
}

View File

@ -6,7 +6,7 @@ function addRelays(relays: `wss://${string}`[]) {
if (!relays.length) return Promise.resolve();
const values = relays.map((url) => ({
url,
url: new URL(url).toString(),
domain: tldts.getDomain(url)!,
active: true,
}));

View File

@ -65,5 +65,6 @@ export {
} from 'npm:kysely@^0.25.0';
export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/v1.0.1/mod.ts';
export { default as tldts } from 'npm:tldts@^6.0.14';
export * as cron from 'https://deno.land/x/deno_cron@v1.0.0/cron.ts';
export type * as TypeFest from 'npm:type-fest@^4.3.0';

View File

@ -1,18 +1,15 @@
import { getActiveRelays } from '@/db/relays.ts';
import { type Event, RelayPool } from '@/deps.ts';
import { type Event } from '@/deps.ts';
import { allRelays, pool } from '@/pool.ts';
import { nostrNow } from '@/utils.ts';
import * as pipeline from './pipeline.ts';
const relays = await getActiveRelays();
const pool = new RelayPool(relays);
// This file watches events on all known relays and performs
// side-effects based on them, such as trending hashtag tracking
// and storing events for notifications and the home feed.
pool.subscribe(
[{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }],
relays,
allRelays,
handleEvent,
undefined,
undefined,

View File

@ -4,10 +4,13 @@ import { addRelays } from '@/db/relays.ts';
import { findUser } from '@/db/users.ts';
import { type Event, LRUCache } from '@/deps.ts';
import { isEphemeralKind } from '@/kinds.ts';
import * as mixer from '@/mixer.ts';
import { publish } from '@/pool.ts';
import { isLocallyFollowed } from '@/queries.ts';
import { Sub } from '@/subs.ts';
import { getTagSet } from '@/tags.ts';
import { trends } from '@/trends.ts';
import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts';
import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts';
import type { EventData } from '@/types.ts';
@ -21,9 +24,11 @@ async function handleEvent(event: Event): Promise<void> {
await Promise.all([
storeEvent(event, data),
processDeletions(event),
trackRelays(event),
trackHashtags(event),
streamOut(event, data),
broadcast(event, data),
]);
}
@ -49,16 +54,39 @@ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
/** Maybe store the event, if eligible. */
async function storeEvent(event: Event, data: EventData): Promise<void> {
if (isEphemeralKind(event.kind)) return;
if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
await eventsDB.insertEvent(event).catch(console.warn);
const [deletion] = await mixer.getFilters(
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
{ limit: 1, timeout: Time.seconds(1) },
);
if (deletion) {
return Promise.reject(new RelayError('blocked', 'event was deleted'));
} else {
await eventsDB.insertEvent(event).catch(console.warn);
}
} else {
return Promise.reject(new RelayError('blocked', 'only registered users can post'));
}
}
/** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */
async function processDeletions(event: Event): Promise<void> {
if (event.kind === 5) {
const ids = getTagSet(event.tags, 'e');
const events = await eventsDB.getFilters([{ ids: [...ids] }]);
const deleteIds = events
.filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id))
.map((event) => event.id);
await eventsDB.deleteFilters([{ ids: deleteIds }]);
}
}
/** Track whenever a hashtag is used, for processing trending tags. */
// deno-lint-ignore require-await
async function trackHashtags(event: Event): Promise<void> {
function trackHashtags(event: Event): void {
const date = nostrDate(event.created_at);
const tags = event.tags
@ -93,7 +121,7 @@ function trackRelays(event: Event) {
}
/** Determine if the event is being received in a timely manner. */
const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10);
const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10);
/** Distribute the event through active subscriptions. */
function streamOut(event: Event, data: EventData) {
@ -104,6 +132,18 @@ function streamOut(event: Event, data: EventData) {
}
}
/**
* Publish the event to other relays.
* This should only be done in certain circumstances, like mentioning a user or publishing deletions.
*/
function broadcast(event: Event, data: EventData) {
if (!data.user || !isFresh(event)) return;
if (event.kind === 5) {
publish(event);
}
}
/** NIP-20 command line result. */
class RelayError extends Error {
constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) {

12
src/pool.ts Normal file
View File

@ -0,0 +1,12 @@
import { getActiveRelays } from '@/db/relays.ts';
import { type Event, RelayPool } from '@/deps.ts';
const allRelays = await getActiveRelays();
const pool = new RelayPool(allRelays);
/** Publish an event to the given relays, or the entire pool. */
function publish(event: Event, relays: string[] = allRelays) {
return pool.publish(event, relays);
}
export { allRelays, pool, publish };

14
src/tags.ts Normal file
View File

@ -0,0 +1,14 @@
/** Get the values for a tag in a `Set`. */
function getTagSet(tags: string[][], tagName: string): Set<string> {
const set = new Set<string>();
tags.forEach((tag) => {
if (tag[0] === tagName) {
set.add(tag[1]);
}
});
return set;
}
export { getTagSet };