Merge branch 'store-refactor' into 'main'
Add EventStore interface, extend eventsDB, client, and memorelay from it See merge request soapbox-pub/ditto!88
This commit is contained in:
commit
716a7019c2
|
@ -1,15 +0,0 @@
|
||||||
{
|
|
||||||
"kind": 1,
|
|
||||||
"content": "I'm vegan btw",
|
|
||||||
"tags": [
|
|
||||||
[
|
|
||||||
"proxy",
|
|
||||||
"https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79",
|
|
||||||
"activitypub"
|
|
||||||
]
|
|
||||||
],
|
|
||||||
"pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6",
|
|
||||||
"created_at": 1691091365,
|
|
||||||
"id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2",
|
|
||||||
"sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579"
|
|
||||||
}
|
|
|
@ -1,13 +1,12 @@
|
||||||
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
|
import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { activeRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
|
import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
|
||||||
import type { GetFiltersOpts } from '@/filter.ts';
|
|
||||||
|
|
||||||
const debug = Debug('ditto:client');
|
const debug = Debug('ditto:client');
|
||||||
|
|
||||||
/** Get events from a NIP-01 filter. */
|
/** Get events from a NIP-01 filter. */
|
||||||
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
|
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
debug('REQ', JSON.stringify(filters));
|
debug('REQ', JSON.stringify(filters));
|
||||||
|
@ -50,4 +49,19 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export { getFilters };
|
/** Publish an event to the given relays, or the entire pool. */
|
||||||
|
function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||||
|
const { relays = activeRelays } = opts;
|
||||||
|
debug('EVENT', event);
|
||||||
|
pool.publish(event, relays);
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
const client: EventStore = {
|
||||||
|
getEvents,
|
||||||
|
storeEvent,
|
||||||
|
countEvents: () => Promise.reject(new Error('COUNT not implemented')),
|
||||||
|
deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')),
|
||||||
|
};
|
||||||
|
|
||||||
|
export { client };
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { insertUser } from '@/db/users.ts';
|
import { insertUser } from '@/db/users.ts';
|
||||||
import { findReplyTag, nip19, z } from '@/deps.ts';
|
import { findReplyTag, nip19, z } from '@/deps.ts';
|
||||||
import { type DittoFilter } from '@/filter.ts';
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
|
@ -151,7 +151,7 @@ const accountStatusesController: AppController = async (c) => {
|
||||||
filter['#t'] = [tagged];
|
filter['#t'] = [tagged];
|
||||||
}
|
}
|
||||||
|
|
||||||
let events = await eventsDB.getFilters([filter]);
|
let events = await eventsDB.getEvents([filter]);
|
||||||
|
|
||||||
if (exclude_replies) {
|
if (exclude_replies) {
|
||||||
events = events.filter((event) => !findReplyTag(event));
|
events = events.filter((event) => !findReplyTag(event));
|
||||||
|
@ -256,7 +256,7 @@ const favouritesController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
const params = paginationSchema.parse(c.req.query());
|
const params = paginationSchema.parse(c.req.query());
|
||||||
|
|
||||||
const events7 = await eventsDB.getFilters(
|
const events7 = await eventsDB.getEvents(
|
||||||
[{ kinds: [7], authors: [pubkey], ...params }],
|
[{ kinds: [7], authors: [pubkey], ...params }],
|
||||||
{ signal: AbortSignal.timeout(1000) },
|
{ signal: AbortSignal.timeout(1000) },
|
||||||
);
|
);
|
||||||
|
@ -265,7 +265,7 @@ const favouritesController: AppController = async (c) => {
|
||||||
.map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1])
|
.map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1])
|
||||||
.filter((id): id is string => !!id);
|
.filter((id): id is string => !!id);
|
||||||
|
|
||||||
const events1 = await eventsDB.getFilters(
|
const events1 = await eventsDB.getEvents(
|
||||||
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
|
[{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }],
|
||||||
{
|
{
|
||||||
signal: AbortSignal.timeout(1000),
|
signal: AbortSignal.timeout(1000),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { paginated, paginationSchema } from '@/utils/web.ts';
|
import { paginated, paginationSchema } from '@/utils/web.ts';
|
||||||
import { renderNotification } from '@/views/mastodon/notifications.ts';
|
import { renderNotification } from '@/views/mastodon/notifications.ts';
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ const notificationsController: AppController = async (c) => {
|
||||||
const pubkey = c.get('pubkey')!;
|
const pubkey = c.get('pubkey')!;
|
||||||
const { since, until } = paginationSchema.parse(c.req.query());
|
const { since, until } = paginationSchema.parse(c.req.query());
|
||||||
|
|
||||||
const events = await eventsDB.getFilters(
|
const events = await eventsDB.getEvents(
|
||||||
[{ kinds: [1], '#p': [pubkey], since, until }],
|
[{ kinds: [1], '#p': [pubkey], since, until }],
|
||||||
{ signal: AbortSignal.timeout(3000) },
|
{ signal: AbortSignal.timeout(3000) },
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
import { type AppController } from '@/app.ts';
|
import { type AppController } from '@/app.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { z } from '@/deps.ts';
|
import { z } from '@/deps.ts';
|
||||||
import { configSchema, elixirTupleSchema } from '@/schemas/pleroma-api.ts';
|
import { configSchema, elixirTupleSchema } from '@/schemas/pleroma-api.ts';
|
||||||
import { createAdminEvent } from '@/utils/web.ts';
|
import { createAdminEvent } from '@/utils/web.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
|
|
||||||
const frontendConfigController: AppController = async (c) => {
|
const frontendConfigController: AppController = async (c) => {
|
||||||
const [event] = await eventsDB.getFilters([{
|
const [event] = await eventsDB.getEvents([{
|
||||||
kinds: [30078],
|
kinds: [30078],
|
||||||
authors: [Conf.pubkey],
|
authors: [Conf.pubkey],
|
||||||
'#d': ['pub.ditto.frontendConfig'],
|
'#d': ['pub.ditto.frontendConfig'],
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { AppController } from '@/app.ts';
|
import { AppController } from '@/app.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { type Event, nip19, z } from '@/deps.ts';
|
import { type Event, nip19, z } from '@/deps.ts';
|
||||||
import { type DittoFilter } from '@/filter.ts';
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { booleanParamSchema } from '@/schema.ts';
|
import { booleanParamSchema } from '@/schema.ts';
|
||||||
|
@ -76,7 +76,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery): Promise<Even
|
||||||
filter.authors = [account_id];
|
filter.authors = [account_id];
|
||||||
}
|
}
|
||||||
|
|
||||||
return eventsDB.getFilters([filter]);
|
return eventsDB.getEvents([filter]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get event kinds to search from `type` query param. */
|
/** Get event kinds to search from `type` query param. */
|
||||||
|
@ -94,7 +94,7 @@ function typeToKinds(type: SearchQuery['type']): number[] {
|
||||||
/** Resolve a searched value into an event, if applicable. */
|
/** Resolve a searched value into an event, if applicable. */
|
||||||
async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
|
async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise<Event | undefined> {
|
||||||
const filters = await getLookupFilters(query);
|
const filters = await getLookupFilters(query);
|
||||||
const [event] = await eventsDB.getFilters(filters, { limit: 1, signal });
|
const [event] = await eventsDB.getEvents(filters, { limit: 1, signal });
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { z } from '@/deps.ts';
|
import { z } from '@/deps.ts';
|
||||||
import { type DittoFilter } from '@/filter.ts';
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { getFeedPubkeys } from '@/queries.ts';
|
import { getFeedPubkeys } from '@/queries.ts';
|
||||||
|
@ -33,7 +33,7 @@ const hashtagTimelineController: AppController = (c) => {
|
||||||
|
|
||||||
/** Render statuses for timelines. */
|
/** Render statuses for timelines. */
|
||||||
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
|
async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) {
|
||||||
const events = await eventsDB.getFilters(
|
const events = await eventsDB.getEvents(
|
||||||
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
|
filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })),
|
||||||
{ signal },
|
{ signal },
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
|
import { relayInfoController } from '@/controllers/nostr/relay-info.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { jsonSchema } from '@/schema.ts';
|
import { jsonSchema } from '@/schema.ts';
|
||||||
import {
|
import {
|
||||||
|
@ -63,7 +63,7 @@ function connectStream(socket: WebSocket) {
|
||||||
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
|
async function handleReq([_, subId, ...rest]: ClientREQ): Promise<void> {
|
||||||
const filters = prepareFilters(rest);
|
const filters = prepareFilters(rest);
|
||||||
|
|
||||||
for (const event of await eventsDB.getFilters(filters, { limit: FILTER_LIMIT })) {
|
for (const event of await eventsDB.getEvents(filters, { limit: FILTER_LIMIT })) {
|
||||||
send(['EVENT', subId, event]);
|
send(['EVENT', subId, event]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,49 +1,50 @@
|
||||||
import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' };
|
|
||||||
import { assertEquals } from '@/deps-test.ts';
|
import { assertEquals } from '@/deps-test.ts';
|
||||||
|
|
||||||
import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts';
|
|
||||||
import { insertUser } from '@/db/users.ts';
|
import { insertUser } from '@/db/users.ts';
|
||||||
|
|
||||||
|
import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
||||||
|
|
||||||
|
import { eventsDB as db } from './events.ts';
|
||||||
|
|
||||||
Deno.test('count filters', async () => {
|
Deno.test('count filters', async () => {
|
||||||
assertEquals(await countFilters([{ kinds: [1] }]), 0);
|
assertEquals(await db.countEvents([{ kinds: [1] }]), 0);
|
||||||
await insertEvent(event55920b75, { user: undefined });
|
await db.storeEvent(event1);
|
||||||
assertEquals(await countFilters([{ kinds: [1] }]), 1);
|
assertEquals(await db.countEvents([{ kinds: [1] }]), 1);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('insert and filter events', async () => {
|
Deno.test('insert and filter events', async () => {
|
||||||
await insertEvent(event55920b75, { user: undefined });
|
await db.storeEvent(event1);
|
||||||
|
|
||||||
assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]);
|
assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]);
|
||||||
assertEquals(await getFilters([{ kinds: [3] }]), []);
|
assertEquals(await db.getEvents([{ kinds: [3] }]), []);
|
||||||
assertEquals(await getFilters([{ since: 1691091000 }]), [event55920b75]);
|
assertEquals(await db.getEvents([{ since: 1691091000 }]), [event1]);
|
||||||
assertEquals(await getFilters([{ until: 1691091000 }]), []);
|
assertEquals(await db.getEvents([{ until: 1691091000 }]), []);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
await getFilters([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
|
await db.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]),
|
||||||
[event55920b75],
|
[event1],
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('delete events', async () => {
|
Deno.test('delete events', async () => {
|
||||||
await insertEvent(event55920b75, { user: undefined });
|
await db.storeEvent(event1);
|
||||||
assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]);
|
assertEquals(await db.getEvents([{ kinds: [1] }]), [event1]);
|
||||||
await deleteFilters([{ kinds: [1] }]);
|
await db.deleteEvents([{ kinds: [1] }]);
|
||||||
assertEquals(await getFilters([{ kinds: [1] }]), []);
|
assertEquals(await db.getEvents([{ kinds: [1] }]), []);
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test('query events with local filter', async () => {
|
Deno.test('query events with local filter', async () => {
|
||||||
await insertEvent(event55920b75, { user: undefined });
|
await db.storeEvent(event1);
|
||||||
|
|
||||||
assertEquals(await getFilters([{}]), [event55920b75]);
|
assertEquals(await db.getEvents([{}]), [event1]);
|
||||||
assertEquals(await getFilters([{ local: true }]), []);
|
assertEquals(await db.getEvents([{ local: true }]), []);
|
||||||
assertEquals(await getFilters([{ local: false }]), [event55920b75]);
|
assertEquals(await db.getEvents([{ local: false }]), [event1]);
|
||||||
|
|
||||||
await insertUser({
|
await insertUser({
|
||||||
username: 'alex',
|
username: 'alex',
|
||||||
pubkey: event55920b75.pubkey,
|
pubkey: event1.pubkey,
|
||||||
inserted_at: new Date(),
|
inserted_at: new Date(),
|
||||||
admin: 0,
|
admin: 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
assertEquals(await getFilters([{ local: true }]), [event55920b75]);
|
assertEquals(await db.getEvents([{ local: true }]), [event1]);
|
||||||
assertEquals(await getFilters([{ local: false }]), []);
|
assertEquals(await db.getEvents([{ local: false }]), []);
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,18 +1,17 @@
|
||||||
import { db, type DittoDB } from '@/db.ts';
|
import { db, type DittoDB } from '@/db.ts';
|
||||||
import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts';
|
import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts';
|
||||||
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
import { isParameterizedReplaceableKind } from '@/kinds.ts';
|
import { isParameterizedReplaceableKind } from '@/kinds.ts';
|
||||||
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
import { jsonMetaContentSchema } from '@/schemas/nostr.ts';
|
||||||
import { EventData } from '@/types.ts';
|
import { type DittoEvent, EventStore, type GetEventsOpts, type StoreEventOpts } from '@/store.ts';
|
||||||
import { isNostrId, isURL } from '@/utils.ts';
|
import { isNostrId, isURL } from '@/utils.ts';
|
||||||
|
|
||||||
import type { DittoFilter, GetFiltersOpts } from '@/filter.ts';
|
|
||||||
|
|
||||||
const debug = Debug('ditto:db:events');
|
const debug = Debug('ditto:db:events');
|
||||||
|
|
||||||
/** Function to decide whether or not to index a tag. */
|
/** Function to decide whether or not to index a tag. */
|
||||||
type TagCondition = ({ event, count, value }: {
|
type TagCondition = ({ event, count, value }: {
|
||||||
event: Event;
|
event: Event;
|
||||||
data: EventData;
|
opts: StoreEventOpts;
|
||||||
count: number;
|
count: number;
|
||||||
value: string;
|
value: string;
|
||||||
}) => boolean;
|
}) => boolean;
|
||||||
|
@ -21,7 +20,7 @@ type TagCondition = ({ event, count, value }: {
|
||||||
const tagConditions: Record<string, TagCondition> = {
|
const tagConditions: Record<string, TagCondition> = {
|
||||||
'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind),
|
'd': ({ event, count }) => count === 0 && isParameterizedReplaceableKind(event.kind),
|
||||||
'e': ({ count, value }) => count < 15 && isNostrId(value),
|
'e': ({ count, value }) => count < 15 && isNostrId(value),
|
||||||
'media': ({ count, value, data }) => (data.user || count < 4) && isURL(value),
|
'media': ({ count, value, opts }) => (opts.data?.user || count < 4) && isURL(value),
|
||||||
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
|
'p': ({ event, count, value }) => (count < 15 || event.kind === 3) && isNostrId(value),
|
||||||
'proxy': ({ count, value }) => count === 0 && isURL(value),
|
'proxy': ({ count, value }) => count === 0 && isURL(value),
|
||||||
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
||||||
|
@ -29,8 +28,8 @@ const tagConditions: Record<string, TagCondition> = {
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Insert an event (and its tags) into the database. */
|
/** Insert an event (and its tags) into the database. */
|
||||||
function insertEvent(event: Event, data: EventData): Promise<void> {
|
function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise<void> {
|
||||||
debug('insertEvent', JSON.stringify(event));
|
debug('EVENT', JSON.stringify(event));
|
||||||
|
|
||||||
return db.transaction().execute(async (trx) => {
|
return db.transaction().execute(async (trx) => {
|
||||||
/** Insert the event into the database. */
|
/** Insert the event into the database. */
|
||||||
|
@ -51,7 +50,7 @@ function insertEvent(event: Event, data: EventData): Promise<void> {
|
||||||
|
|
||||||
/** Index event tags depending on the conditions defined above. */
|
/** Index event tags depending on the conditions defined above. */
|
||||||
async function indexTags() {
|
async function indexTags() {
|
||||||
const tags = filterIndexableTags(event, data);
|
const tags = filterIndexableTags(event, opts);
|
||||||
const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value }));
|
const rows = tags.map(([tag, value]) => ({ event_id: event.id, tag, value }));
|
||||||
|
|
||||||
if (!tags.length) return;
|
if (!tags.length) return;
|
||||||
|
@ -207,29 +206,20 @@ function getFilterQuery(filter: DittoFilter): EventQuery {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Combine filter queries into a single union query. */
|
/** Combine filter queries into a single union query. */
|
||||||
function getFiltersQuery(filters: DittoFilter[]) {
|
function getEventsQuery(filters: DittoFilter[]) {
|
||||||
return filters
|
return filters
|
||||||
.map((filter) => db.selectFrom(() => getFilterQuery(filter).as('events')).selectAll())
|
.map((filter) => db.selectFrom(() => getFilterQuery(filter).as('events')).selectAll())
|
||||||
.reduce((result, query) => result.unionAll(query));
|
.reduce((result, query) => result.unionAll(query));
|
||||||
}
|
}
|
||||||
|
|
||||||
type AuthorStats = Omit<DittoDB['author_stats'], 'pubkey'>;
|
|
||||||
type EventStats = Omit<DittoDB['event_stats'], 'event_id'>;
|
|
||||||
|
|
||||||
interface DittoEvent<K extends number = number> extends Event<K> {
|
|
||||||
author?: DittoEvent<0>;
|
|
||||||
author_stats?: AuthorStats;
|
|
||||||
event_stats?: EventStats;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get events for filters from the database. */
|
/** Get events for filters from the database. */
|
||||||
async function getFilters<K extends number>(
|
async function getEvents<K extends number>(
|
||||||
filters: DittoFilter<K>[],
|
filters: DittoFilter<K>[],
|
||||||
opts: GetFiltersOpts = {},
|
opts: GetEventsOpts = {},
|
||||||
): Promise<DittoEvent<K>[]> {
|
): Promise<DittoEvent<K>[]> {
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
debug('REQ', JSON.stringify(filters));
|
debug('REQ', JSON.stringify(filters));
|
||||||
let query = getFiltersQuery(filters);
|
let query = getEventsQuery(filters);
|
||||||
|
|
||||||
if (typeof opts.limit === 'number') {
|
if (typeof opts.limit === 'number') {
|
||||||
query = query.limit(opts.limit);
|
query = query.limit(opts.limit);
|
||||||
|
@ -279,12 +269,12 @@ async function getFilters<K extends number>(
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Delete events based on filters from the database. */
|
/** Delete events based on filters from the database. */
|
||||||
function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
|
async function deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void> {
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve();
|
||||||
debug('deleteFilters', JSON.stringify(filters));
|
debug('DELETE', JSON.stringify(filters));
|
||||||
|
|
||||||
return db.transaction().execute(async (trx) => {
|
await db.transaction().execute(async (trx) => {
|
||||||
const query = getFiltersQuery(filters).clearSelect().select('id');
|
const query = getEventsQuery(filters).clearSelect().select('id');
|
||||||
|
|
||||||
await trx.deleteFrom('events_fts')
|
await trx.deleteFrom('events_fts')
|
||||||
.where('id', 'in', () => query)
|
.where('id', 'in', () => query)
|
||||||
|
@ -297,10 +287,10 @@ function deleteFilters<K extends number>(filters: DittoFilter<K>[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get number of events that would be returned by filters. */
|
/** Get number of events that would be returned by filters. */
|
||||||
async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
async function countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number> {
|
||||||
if (!filters.length) return Promise.resolve(0);
|
if (!filters.length) return Promise.resolve(0);
|
||||||
debug('countFilters', JSON.stringify(filters));
|
debug('COUNT', JSON.stringify(filters));
|
||||||
const query = getFiltersQuery(filters);
|
const query = getEventsQuery(filters);
|
||||||
|
|
||||||
const [{ count }] = await query
|
const [{ count }] = await query
|
||||||
.clearSelect()
|
.clearSelect()
|
||||||
|
@ -311,7 +301,7 @@ async function countFilters<K extends number>(filters: DittoFilter<K>[]): Promis
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Return only the tags that should be indexed. */
|
/** Return only the tags that should be indexed. */
|
||||||
function filterIndexableTags(event: Event, data: EventData): string[][] {
|
function filterIndexableTags(event: Event, opts: StoreEventOpts): string[][] {
|
||||||
const tagCounts: Record<string, number> = {};
|
const tagCounts: Record<string, number> = {};
|
||||||
|
|
||||||
function getCount(name: string) {
|
function getCount(name: string) {
|
||||||
|
@ -325,7 +315,7 @@ function filterIndexableTags(event: Event, data: EventData): string[][] {
|
||||||
function checkCondition(name: string, value: string, condition: TagCondition) {
|
function checkCondition(name: string, value: string, condition: TagCondition) {
|
||||||
return condition({
|
return condition({
|
||||||
event,
|
event,
|
||||||
data,
|
opts,
|
||||||
count: getCount(name),
|
count: getCount(name),
|
||||||
value,
|
value,
|
||||||
});
|
});
|
||||||
|
@ -362,4 +352,12 @@ function buildUserSearchContent(event: Event<0>): string {
|
||||||
return [name, nip05, about].filter(Boolean).join('\n');
|
return [name, nip05, about].filter(Boolean).join('\n');
|
||||||
}
|
}
|
||||||
|
|
||||||
export { countFilters, deleteFilters, type DittoEvent, getFilters, insertEvent };
|
/** SQLite database storage adapter for Nostr events. */
|
||||||
|
const eventsDB: EventStore = {
|
||||||
|
getEvents,
|
||||||
|
storeEvent,
|
||||||
|
countEvents,
|
||||||
|
deleteEvents,
|
||||||
|
};
|
||||||
|
|
||||||
|
export { eventsDB };
|
||||||
|
|
|
@ -5,14 +5,12 @@ import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' };
|
||||||
import { memorelay } from './memorelay.ts';
|
import { memorelay } from './memorelay.ts';
|
||||||
|
|
||||||
Deno.test('memorelay', async () => {
|
Deno.test('memorelay', async () => {
|
||||||
assertEquals(memorelay.hasEvent(event1), false);
|
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0);
|
||||||
assertEquals(memorelay.hasEventById(event1.id), false);
|
|
||||||
|
|
||||||
memorelay.insertEvent(event1);
|
await memorelay.storeEvent(event1);
|
||||||
|
|
||||||
assertEquals(memorelay.hasEvent(event1), true);
|
assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 1);
|
||||||
assertEquals(memorelay.hasEventById(event1.id), true);
|
|
||||||
|
|
||||||
const result = await memorelay.getFilters([{ ids: [event1.id] }]);
|
const result = await memorelay.getEvents([{ ids: [event1.id] }]);
|
||||||
assertEquals(result[0], event1);
|
assertEquals(result[0], event1);
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts';
|
||||||
import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
import { getFilterId, getMicroFilters, isMicrofilter } from '@/filter.ts';
|
||||||
|
import { type EventStore, type GetEventsOpts } from '@/store.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:memorelay');
|
const debug = Debug('ditto:memorelay');
|
||||||
|
|
||||||
|
@ -10,7 +11,7 @@ const events = new LRUCache<string, Event>({
|
||||||
});
|
});
|
||||||
|
|
||||||
/** Get events from memory. */
|
/** Get events from memory. */
|
||||||
function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts = {}): Promise<Event<K>[]> {
|
function getEvents<K extends number>(filters: Filter<K>[], opts: GetEventsOpts = {}): Promise<Event<K>[]> {
|
||||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||||
if (!filters.length) return Promise.resolve([]);
|
if (!filters.length) return Promise.resolve([]);
|
||||||
debug('REQ', JSON.stringify(filters));
|
debug('REQ', JSON.stringify(filters));
|
||||||
|
@ -30,7 +31,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Insert an event into memory. */
|
/** Insert an event into memory. */
|
||||||
function insertEvent(event: Event): void {
|
function storeEvent(event: Event): Promise<void> {
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
for (const microfilter of getMicroFilters(event)) {
|
||||||
const filterId = getFilterId(microfilter);
|
const filterId = getFilterId(microfilter);
|
||||||
const existing = events.get(filterId);
|
const existing = events.get(filterId);
|
||||||
|
@ -38,32 +39,31 @@ function insertEvent(event: Event): void {
|
||||||
events.set(filterId, event);
|
events.set(filterId, event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if an event is in memory. */
|
/** Count events in memory for the filters. */
|
||||||
function hasEvent(event: Event): boolean {
|
async function countEvents(filters: Filter[]): Promise<number> {
|
||||||
for (const microfilter of getMicroFilters(event)) {
|
const events = await getEvents(filters);
|
||||||
const filterId = getFilterId(microfilter);
|
return events.length;
|
||||||
const existing = events.get(filterId);
|
|
||||||
if (existing) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if an event is in memory by ID. */
|
/** Delete events from memory. */
|
||||||
function hasEventById(eventId: string): boolean {
|
function deleteEvents(filters: Filter[]): Promise<void> {
|
||||||
const filterId = getFilterId({ ids: [eventId] });
|
for (const filter of filters) {
|
||||||
return events.has(filterId);
|
if (isMicrofilter(filter)) {
|
||||||
|
events.delete(getFilterId(filter));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** In-memory data store for events using microfilters. */
|
/** In-memory data store for events using microfilters. */
|
||||||
const memorelay = {
|
const memorelay: EventStore = {
|
||||||
getFilters,
|
getEvents,
|
||||||
insertEvent,
|
storeEvent,
|
||||||
hasEvent,
|
countEvents,
|
||||||
hasEventById,
|
deleteEvents,
|
||||||
};
|
};
|
||||||
|
|
||||||
export { memorelay };
|
export { memorelay };
|
||||||
|
|
|
@ -21,16 +21,6 @@ type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] };
|
||||||
/** Filter to get one specific event. */
|
/** Filter to get one specific event. */
|
||||||
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
|
type MicroFilter = IdMicrofilter | AuthorMicrofilter;
|
||||||
|
|
||||||
/** Additional options to apply to the whole subscription. */
|
|
||||||
interface GetFiltersOpts {
|
|
||||||
/** Signal to abort the request. */
|
|
||||||
signal?: AbortSignal;
|
|
||||||
/** Event limit for the whole subscription. */
|
|
||||||
limit?: number;
|
|
||||||
/** Relays to use, if applicable. */
|
|
||||||
relays?: WebSocket['url'][];
|
|
||||||
}
|
|
||||||
|
|
||||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
||||||
if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) {
|
if (filter.local && !(data.user || event.pubkey === Conf.pubkey)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -97,7 +87,6 @@ export {
|
||||||
type DittoFilter,
|
type DittoFilter,
|
||||||
eventToMicroFilter,
|
eventToMicroFilter,
|
||||||
getFilterId,
|
getFilterId,
|
||||||
type GetFiltersOpts,
|
|
||||||
getMicroFilters,
|
getMicroFilters,
|
||||||
type IdMicrofilter,
|
type IdMicrofilter,
|
||||||
isMicrofilter,
|
isMicrofilter,
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
|
import { client } from '@/client.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { memorelay } from '@/db/memorelay.ts';
|
import { memorelay } from '@/db/memorelay.ts';
|
||||||
import { addRelays } from '@/db/relays.ts';
|
import { addRelays } from '@/db/relays.ts';
|
||||||
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
import { deleteAttachedMedia } from '@/db/unattached-media.ts';
|
||||||
import { findUser } from '@/db/users.ts';
|
import { findUser } from '@/db/users.ts';
|
||||||
import { Debug, type Event } from '@/deps.ts';
|
import { Debug, type Event } from '@/deps.ts';
|
||||||
import { isEphemeralKind } from '@/kinds.ts';
|
import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import { publish } from '@/pool.ts';
|
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
import { reqmeister } from '@/reqmeister.ts';
|
import { reqmeister } from '@/reqmeister.ts';
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
|
@ -26,7 +26,7 @@ const debug = Debug('ditto:pipeline');
|
||||||
async function handleEvent(event: Event): Promise<void> {
|
async function handleEvent(event: Event): Promise<void> {
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
const wanted = reqmeister.isWanted(event);
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (encounterEvent(event)) return;
|
if (await encounterEvent(event)) return;
|
||||||
debug(`Event<${event.kind}> ${event.id}`);
|
debug(`Event<${event.kind}> ${event.id}`);
|
||||||
const data = await getEventData(event);
|
const data = await getEventData(event);
|
||||||
|
|
||||||
|
@ -43,9 +43,9 @@ async function handleEvent(event: Event): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
function encounterEvent(event: Event): boolean {
|
async function encounterEvent(event: Event): Promise<boolean> {
|
||||||
const preexisting = memorelay.hasEvent(event);
|
const preexisting = (await memorelay.countEvents([{ ids: [event.id] }])) > 0;
|
||||||
memorelay.insertEvent(event);
|
memorelay.storeEvent(event);
|
||||||
reqmeister.encounter(event);
|
reqmeister.encounter(event);
|
||||||
return preexisting;
|
return preexisting;
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
||||||
const { force = false } = opts;
|
const { force = false } = opts;
|
||||||
|
|
||||||
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||||
const [deletion] = await eventsDB.getFilters(
|
const [deletion] = await eventsDB.getEvents(
|
||||||
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
||||||
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
|
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
|
||||||
);
|
);
|
||||||
|
@ -78,7 +78,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
||||||
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
return Promise.reject(new RelayError('blocked', 'event was deleted'));
|
||||||
} else {
|
} else {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
eventsDB.insertEvent(event, data).catch(debug),
|
eventsDB.storeEvent(event, { data }).catch(debug),
|
||||||
updateStats(event).catch(debug),
|
updateStats(event).catch(debug),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
@ -91,13 +91,13 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts =
|
||||||
async function processDeletions(event: Event): Promise<void> {
|
async function processDeletions(event: Event): Promise<void> {
|
||||||
if (event.kind === 5) {
|
if (event.kind === 5) {
|
||||||
const ids = getTagSet(event.tags, 'e');
|
const ids = getTagSet(event.tags, 'e');
|
||||||
const events = await eventsDB.getFilters([{ ids: [...ids] }]);
|
const events = await eventsDB.getEvents([{ ids: [...ids] }]);
|
||||||
|
|
||||||
const deleteIds = events
|
const deleteIds = events
|
||||||
.filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id))
|
.filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id))
|
||||||
.map((event) => event.id);
|
.map((event) => event.id);
|
||||||
|
|
||||||
await eventsDB.deleteFilters([{ ids: deleteIds }]);
|
await eventsDB.deleteEvents([{ ids: deleteIds }]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ function fetchRelatedEvents(event: Event, data: EventData) {
|
||||||
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
||||||
}
|
}
|
||||||
for (const [name, id, relay] of event.tags) {
|
for (const [name, id, relay] of event.tags) {
|
||||||
if (name === 'e' && !memorelay.hasEventById(id)) {
|
if (name === 'e' && !memorelay.countEvents([{ ids: [id] }])) {
|
||||||
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ function broadcast(event: Event, data: EventData) {
|
||||||
if (!data.user || !isFresh(event)) return;
|
if (!data.user || !isFresh(event)) return;
|
||||||
|
|
||||||
if (event.kind === 5) {
|
if (event.kind === 5) {
|
||||||
publish(event);
|
client.storeEvent(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
12
src/pool.ts
12
src/pool.ts
|
@ -1,7 +1,5 @@
|
||||||
import { getActiveRelays } from '@/db/relays.ts';
|
import { getActiveRelays } from '@/db/relays.ts';
|
||||||
import { Debug, type Event, RelayPoolWorker } from '@/deps.ts';
|
import { RelayPoolWorker } from '@/deps.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:pool');
|
|
||||||
|
|
||||||
const activeRelays = await getActiveRelays();
|
const activeRelays = await getActiveRelays();
|
||||||
|
|
||||||
|
@ -17,10 +15,4 @@ const pool = new RelayPoolWorker(worker, activeRelays, {
|
||||||
logErrorsAndNotices: false,
|
logErrorsAndNotices: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
/** Publish an event to the given relays, or the entire pool. */
|
export { activeRelays, pool };
|
||||||
function publish(event: Event, relays: string[] = activeRelays) {
|
|
||||||
debug('publish', event);
|
|
||||||
return pool.publish(event, relays);
|
|
||||||
}
|
|
||||||
|
|
||||||
export { activeRelays, pool, publish };
|
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
|
import { memorelay } from '@/db/memorelay.ts';
|
||||||
import { type Event, findReplyTag } from '@/deps.ts';
|
import { type Event, findReplyTag } from '@/deps.ts';
|
||||||
import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
|
import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts';
|
||||||
import { reqmeister } from '@/reqmeister.ts';
|
import { reqmeister } from '@/reqmeister.ts';
|
||||||
import { memorelay } from '@/db/memorelay.ts';
|
import { type DittoEvent } from '@/store.ts';
|
||||||
|
|
||||||
interface GetEventOpts<K extends number> {
|
interface GetEventOpts<K extends number> {
|
||||||
/** Signal to abort the request. */
|
/** Signal to abort the request. */
|
||||||
|
@ -21,7 +22,7 @@ const getEvent = async <K extends number = number>(
|
||||||
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { kind, relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: IdMicrofilter = { ids: [id] };
|
const microfilter: IdMicrofilter = { ids: [id] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent<K>[];
|
const [memoryEvent] = await memorelay.getEvents([microfilter], opts) as DittoEvent<K>[];
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
return memoryEvent;
|
return memoryEvent;
|
||||||
|
@ -32,20 +33,20 @@ const getEvent = async <K extends number = number>(
|
||||||
filter.kinds = [kind];
|
filter.kinds = [kind];
|
||||||
}
|
}
|
||||||
|
|
||||||
const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal })
|
const dbEvent = await eventsDB.getEvents([filter], { limit: 1, signal })
|
||||||
.then(([event]) => event);
|
.then(([event]) => event);
|
||||||
|
|
||||||
// TODO: make this DRY-er.
|
// TODO: make this DRY-er.
|
||||||
|
|
||||||
if (dbEvent && !dbEvent.author) {
|
if (dbEvent && !dbEvent.author) {
|
||||||
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [dbEvent.pubkey] }], opts);
|
||||||
dbEvent.author = author;
|
dbEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbEvent) return dbEvent;
|
if (dbEvent) return dbEvent;
|
||||||
|
|
||||||
if (memoryEvent && !memoryEvent.author) {
|
if (memoryEvent && !memoryEvent.author) {
|
||||||
const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
const [author] = await memorelay.getEvents([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts);
|
||||||
memoryEvent.author = author;
|
memoryEvent.author = author;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,13 +60,13 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Ev
|
||||||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] };
|
||||||
|
|
||||||
const [memoryEvent] = await memorelay.getFilters([microfilter], opts);
|
const [memoryEvent] = await memorelay.getEvents([microfilter], opts);
|
||||||
|
|
||||||
if (memoryEvent && !relations) {
|
if (memoryEvent && !relations) {
|
||||||
return memoryEvent;
|
return memoryEvent;
|
||||||
}
|
}
|
||||||
|
|
||||||
const dbEvent = await eventsDB.getFilters(
|
const dbEvent = await eventsDB.getEvents(
|
||||||
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
||||||
{ limit: 1, signal },
|
{ limit: 1, signal },
|
||||||
).then(([event]) => event);
|
).then(([event]) => event);
|
||||||
|
@ -78,7 +79,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Ev
|
||||||
|
|
||||||
/** Get users the given pubkey follows. */
|
/** Get users the given pubkey follows. */
|
||||||
const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise<Event<3> | undefined> => {
|
const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise<Event<3> | undefined> => {
|
||||||
const [event] = await eventsDB.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
|
const [event] = await eventsDB.getEvents([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal });
|
||||||
return event;
|
return event;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -117,7 +118,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise
|
||||||
}
|
}
|
||||||
|
|
||||||
function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
|
function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise<Event<1>[]> {
|
||||||
return eventsDB.getFilters(
|
return eventsDB.getEvents(
|
||||||
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
|
[{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }],
|
||||||
{ limit: 200, signal },
|
{ limit: 200, signal },
|
||||||
);
|
);
|
||||||
|
@ -125,7 +126,7 @@ function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Pr
|
||||||
|
|
||||||
/** Returns whether the pubkey is followed by a local user. */
|
/** Returns whether the pubkey is followed by a local user. */
|
||||||
async function isLocallyFollowed(pubkey: string): Promise<boolean> {
|
async function isLocallyFollowed(pubkey: string): Promise<boolean> {
|
||||||
const [event] = await eventsDB.getFilters([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 });
|
const [event] = await eventsDB.getEvents([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 });
|
||||||
return Boolean(event);
|
return Boolean(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import * as client from '@/client.ts';
|
import { client } from '@/client.ts';
|
||||||
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
|
import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts';
|
||||||
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
|
import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts';
|
||||||
import { Time } from '@/utils/time.ts';
|
import { Time } from '@/utils/time.ts';
|
||||||
|
@ -64,7 +64,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an
|
||||||
|
|
||||||
if (filters.length) {
|
if (filters.length) {
|
||||||
debug('REQ', JSON.stringify(filters));
|
debug('REQ', JSON.stringify(filters));
|
||||||
const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) });
|
const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) });
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
this.encounter(event);
|
this.encounter(event);
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts';
|
||||||
|
|
||||||
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
type AuthorStat = keyof Omit<AuthorStatsRow, 'pubkey'>;
|
||||||
|
@ -125,7 +125,7 @@ function eventStatsQuery(diffs: EventStatDiff[]) {
|
||||||
|
|
||||||
/** Get the last version of the event, if any. */
|
/** Get the last version of the event, if any. */
|
||||||
async function maybeGetPrev<K extends number>(event: Event<K>): Promise<Event<K>> {
|
async function maybeGetPrev<K extends number>(event: Event<K>): Promise<Event<K>> {
|
||||||
const [prev] = await eventsDB.getFilters([
|
const [prev] = await eventsDB.getEvents([
|
||||||
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 },
|
{ kinds: [event.kind], authors: [event.pubkey], limit: 1 },
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
import { type DittoDB } from '@/db.ts';
|
||||||
|
import { type Event } from '@/deps.ts';
|
||||||
|
import { type DittoFilter } from '@/filter.ts';
|
||||||
|
import { type EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
/** Additional options to apply to the whole subscription. */
|
||||||
|
interface GetEventsOpts {
|
||||||
|
/** Signal to abort the request. */
|
||||||
|
signal?: AbortSignal;
|
||||||
|
/** Event limit for the whole subscription. */
|
||||||
|
limit?: number;
|
||||||
|
/** Relays to use, if applicable. */
|
||||||
|
relays?: WebSocket['url'][];
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Options when storing an event. */
|
||||||
|
interface StoreEventOpts {
|
||||||
|
/** Event data to store. */
|
||||||
|
data?: EventData;
|
||||||
|
/** Relays to use, if applicable. */
|
||||||
|
relays?: WebSocket['url'][];
|
||||||
|
}
|
||||||
|
|
||||||
|
type AuthorStats = Omit<DittoDB['author_stats'], 'pubkey'>;
|
||||||
|
type EventStats = Omit<DittoDB['event_stats'], 'event_id'>;
|
||||||
|
|
||||||
|
/** Internal Event representation used by Ditto, including extra keys. */
|
||||||
|
interface DittoEvent<K extends number = number> extends Event<K> {
|
||||||
|
author?: DittoEvent<0>;
|
||||||
|
author_stats?: AuthorStats;
|
||||||
|
event_stats?: EventStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Storage interface for Nostr events. */
|
||||||
|
interface EventStore {
|
||||||
|
/** Add an event to the store. */
|
||||||
|
storeEvent(event: Event, opts?: StoreEventOpts): Promise<void>;
|
||||||
|
/** Get events from filters. */
|
||||||
|
getEvents<K extends number>(filters: DittoFilter<K>[], opts?: GetEventsOpts): Promise<DittoEvent<K>[]>;
|
||||||
|
/** Get the number of events from filters. */
|
||||||
|
countEvents<K extends number>(filters: DittoFilter<K>[]): Promise<number>;
|
||||||
|
/** Delete events from filters. */
|
||||||
|
deleteEvents<K extends number>(filters: DittoFilter<K>[]): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts };
|
|
@ -1,5 +1,5 @@
|
||||||
import { AppContext } from '@/app.ts';
|
import { AppContext } from '@/app.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { type Filter } from '@/deps.ts';
|
import { type Filter } from '@/deps.ts';
|
||||||
import { getAuthor } from '@/queries.ts';
|
import { getAuthor } from '@/queries.ts';
|
||||||
import { renderAccount } from '@/views/mastodon/accounts.ts';
|
import { renderAccount } from '@/views/mastodon/accounts.ts';
|
||||||
|
@ -7,7 +7,7 @@ import { paginated } from '@/utils/web.ts';
|
||||||
|
|
||||||
/** Render account objects for the author of each event. */
|
/** Render account objects for the author of each event. */
|
||||||
async function renderEventAccounts(c: AppContext, filters: Filter[]) {
|
async function renderEventAccounts(c: AppContext, filters: Filter[]) {
|
||||||
const events = await eventsDB.getFilters(filters);
|
const events = await eventsDB.getEvents(filters);
|
||||||
const pubkeys = new Set(events.map(({ pubkey }) => pubkey));
|
const pubkeys = new Set(events.map(({ pubkey }) => pubkey));
|
||||||
|
|
||||||
if (!pubkeys.size) {
|
if (!pubkeys.size) {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { isCWTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts';
|
import { isCWTag } from 'https://gitlab.com/soapbox-pub/mostr/-/raw/c67064aee5ade5e01597c6d23e22e53c628ef0e2/src/nostr/tags.ts';
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import { eventsDB } from '@/db/events.ts';
|
||||||
import { findReplyTag, nip19 } from '@/deps.ts';
|
import { findReplyTag, nip19 } from '@/deps.ts';
|
||||||
import { getMediaLinks, parseNoteContent } from '@/note.ts';
|
import { getMediaLinks, parseNoteContent } from '@/note.ts';
|
||||||
import { getAuthor } from '@/queries.ts';
|
import { getAuthor } from '@/queries.ts';
|
||||||
|
@ -33,12 +33,8 @@ async function renderStatus(event: eventsDB.DittoEvent<1>, viewerPubkey?: string
|
||||||
.all([
|
.all([
|
||||||
Promise.all(mentionedPubkeys.map(toMention)),
|
Promise.all(mentionedPubkeys.map(toMention)),
|
||||||
firstUrl ? unfurlCardCached(firstUrl) : null,
|
firstUrl ? unfurlCardCached(firstUrl) : null,
|
||||||
viewerPubkey
|
viewerPubkey ? eventsDB.getEvents([{ kinds: [6], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) : [],
|
||||||
? eventsDB.getFilters([{ kinds: [6], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 })
|
viewerPubkey ? eventsDB.getEvents([{ kinds: [7], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 }) : [],
|
||||||
: [],
|
|
||||||
viewerPubkey
|
|
||||||
? eventsDB.getFilters([{ kinds: [7], '#e': [event.id], authors: [viewerPubkey] }], { limit: 1 })
|
|
||||||
: [],
|
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const content = buildInlineRecipients(mentions) + html;
|
const content = buildInlineRecipients(mentions) + html;
|
||||||
|
|
Loading…
Reference in New Issue