Use a Collection for outbox
This commit is contained in:
parent
ce7dd8e1cd
commit
468f1486c0
|
@ -1,4 +1,4 @@
|
|||
const { messageBodyToHtml, selfUrl, makeUserUrl } = require('./util');
|
||||
const { messageBodyToHtml, selfUrl } = require('./util');
|
||||
const { ActivityStreamsContext, WellKnownActivityTypes } = require('./const');
|
||||
const ActivityPubObject = require('./object');
|
||||
const User = require('../user');
|
||||
|
@ -7,7 +7,6 @@ const { Errors } = require('../enig_error');
|
|||
const { getISOTimestampString } = require('../database');
|
||||
const UserProps = require('../user_property');
|
||||
const { postJson } = require('../http_util');
|
||||
const { getOutboxEntries } = require('./db');
|
||||
const { WellKnownLocations } = require('../servers/content/web');
|
||||
|
||||
// deps
|
||||
|
@ -113,38 +112,6 @@ module.exports = class Activity extends ActivityPubObject {
|
|||
);
|
||||
}
|
||||
|
||||
// :TODO: move to Collection
|
||||
static fromOutboxEntries(owningUser, webServer, cb) {
|
||||
// :TODO: support paging
|
||||
const getOpts = {
|
||||
create: true, // items marked 'Create'
|
||||
};
|
||||
getOutboxEntries(owningUser, getOpts, (err, entries) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
const obj = {
|
||||
'@context': ActivityStreamsContext,
|
||||
// :TODO: makeOutboxUrl() and use elsewhere also
|
||||
id: makeUserUrl(webServer, owningUser, '/ap/users') + '/outbox',
|
||||
type: 'OrderedCollection',
|
||||
totalItems: entries.length,
|
||||
orderedItems: entries.map(e => {
|
||||
return {
|
||||
'@context': ActivityStreamsContext,
|
||||
id: e.activity.id,
|
||||
type: 'Create',
|
||||
actor: e.activity.actor,
|
||||
object: e.activity.object,
|
||||
};
|
||||
}),
|
||||
};
|
||||
|
||||
return cb(null, new Activity(obj));
|
||||
});
|
||||
}
|
||||
|
||||
sendTo(actorUrl, fromUser, webServer, cb) {
|
||||
const privateKey = fromUser.getProperty(UserProps.PrivateActivityPubSigningKey);
|
||||
if (_.isEmpty(privateKey)) {
|
||||
|
|
|
@ -33,16 +33,12 @@ module.exports = class Actor extends ActivityPubObject {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (
|
||||
!['Person', 'Group', 'Organization', 'Service', 'Application'].includes(
|
||||
this.type
|
||||
)
|
||||
) {
|
||||
if (!Actor.WellKnownActorTypes.includes(this.type)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const linksValid = ['inbox', 'outbox', 'following', 'followers'].every(l => {
|
||||
// must be valid if set
|
||||
const linksValid = Actor.WellKnownLinkTypes.every(l => {
|
||||
// must be valid if present & non-empty
|
||||
if (this[l] && !isValidLink(this[l])) {
|
||||
return false;
|
||||
}
|
||||
|
@ -56,7 +52,14 @@ module.exports = class Actor extends ActivityPubObject {
|
|||
return true;
|
||||
}
|
||||
|
||||
// :TODO: from a User object
|
||||
static get WellKnownActorTypes() {
|
||||
return ['Person', 'Group', 'Organization', 'Service', 'Application'];
|
||||
}
|
||||
|
||||
static get WellKnownLinkTypes() {
|
||||
return ['inbox', 'outbox', 'following', 'followers'];
|
||||
}
|
||||
|
||||
static fromLocalUser(user, webServer, cb) {
|
||||
const userSelfUrl = selfUrl(webServer, user);
|
||||
const userSettings = ActivityPubSettings.fromUser(user);
|
||||
|
|
|
@ -40,25 +40,56 @@ module.exports = class Collection extends ActivityPubObject {
|
|||
owningUser,
|
||||
followingActor.id,
|
||||
followingActor,
|
||||
false,
|
||||
cb
|
||||
);
|
||||
}
|
||||
|
||||
static outbox(owningUser, page, webServer, cb) {
|
||||
return Collection.getOrdered(
|
||||
'outbox',
|
||||
owningUser,
|
||||
false,
|
||||
page,
|
||||
null,
|
||||
webServer,
|
||||
cb
|
||||
);
|
||||
}
|
||||
|
||||
static addOutboxItem(owningUser, outboxItem, cb) {
|
||||
return Collection.addToCollection(
|
||||
'outbox',
|
||||
owningUser,
|
||||
outboxItem.id,
|
||||
outboxItem,
|
||||
false,
|
||||
cb
|
||||
);
|
||||
}
|
||||
|
||||
static getOrdered(name, owningUser, includePrivate, page, mapper, webServer, cb) {
|
||||
// :TODD: |includePrivate| handling
|
||||
const privateQuery = includePrivate ? '' : ' AND is_private = FALSE';
|
||||
const followersUrl =
|
||||
makeUserUrl(webServer, owningUser, '/ap/users/') + `/${name}`;
|
||||
|
||||
if (!page) {
|
||||
return apDb.get(
|
||||
`SELECT COUNT(id) AS count
|
||||
FROM collection
|
||||
WHERE name = ?;`,
|
||||
[name],
|
||||
WHERE user_id = ? AND name = ?${privateQuery};`,
|
||||
[owningUser.userId, name],
|
||||
(err, row) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
//
|
||||
// Mastodon for instance, will never follow up for the
|
||||
// actual data from some Collections such as 'followers';
|
||||
// Instead, they only use the |totalItems| to form an
|
||||
// approximate follower count.
|
||||
//
|
||||
let obj;
|
||||
if (row.count > 0) {
|
||||
obj = {
|
||||
|
@ -85,7 +116,7 @@ module.exports = class Collection extends ActivityPubObject {
|
|||
apDb.all(
|
||||
`SELECT obj_json
|
||||
FROM collection
|
||||
WHERE user_id = ? AND name = ?
|
||||
WHERE user_id = ? AND name = ?${privateQuery}
|
||||
ORDER BY timestamp;`,
|
||||
[owningUser.userId, name],
|
||||
(err, entries) => {
|
||||
|
@ -111,15 +142,16 @@ module.exports = class Collection extends ActivityPubObject {
|
|||
);
|
||||
}
|
||||
|
||||
static addToCollection(name, owningUser, objectId, obj, cb) {
|
||||
static addToCollection(name, owningUser, objectId, obj, isPrivate, cb) {
|
||||
if (!isString(obj)) {
|
||||
obj = JSON.stringify(obj);
|
||||
}
|
||||
|
||||
isPrivate = isPrivate ? 1 : 0;
|
||||
apDb.run(
|
||||
`INSERT OR IGNORE INTO collection (name, timestamp, user_id, obj_id, obj_json)
|
||||
VALUES (?, ?, ?, ?, ?);`,
|
||||
[name, getISOTimestampString(), owningUser.userId, objectId, obj],
|
||||
`INSERT OR IGNORE INTO collection (name, timestamp, user_id, obj_id, obj_json, is_private)
|
||||
VALUES (?, ?, ?, ?, ?, ?);`,
|
||||
[name, getISOTimestampString(), owningUser.userId, objectId, obj, isPrivate],
|
||||
function res(err) {
|
||||
// non-arrow for 'this' scope
|
||||
if (err) {
|
||||
|
@ -130,7 +162,7 @@ module.exports = class Collection extends ActivityPubObject {
|
|||
);
|
||||
}
|
||||
|
||||
static remoteFromCollectionById(name, owningUser, objectId, cb) {
|
||||
static removeFromCollectionById(name, owningUser, objectId, cb) {
|
||||
apDb.run(
|
||||
`DELETE FROM collection
|
||||
WHERE user_id = ? AND name = ? AND obj_id = ?;`,
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
const apDb = require('../database').dbs.activitypub;
|
||||
|
||||
exports.persistToOutbox = persistToOutbox;
|
||||
exports.getOutboxEntries = getOutboxEntries;
|
||||
|
||||
const FollowerEntryStatus = {
|
||||
Invalid: 0, // Invalid
|
||||
Requested: 1, // Entry is a *request* to local user
|
||||
Accepted: 2, // Accepted by local user
|
||||
Rejected: 3, // Rejected by local user
|
||||
};
|
||||
exports.FollowerEntryStatus = FollowerEntryStatus;
|
||||
|
||||
function persistToOutbox(activity, fromUser, message, cb) {
|
||||
const activityJson = JSON.stringify(activity);
|
||||
|
||||
apDb.run(
|
||||
`INSERT INTO outbox (activity_id, user_id, message_id, activity_json, published_timestamp)
|
||||
VALUES (?, ?, ?, ?, ?);`,
|
||||
[
|
||||
activity.id,
|
||||
fromUser.userId,
|
||||
message.messageId,
|
||||
activityJson,
|
||||
activity.object.published,
|
||||
],
|
||||
function res(err) {
|
||||
// non-arrow for 'this' scope
|
||||
return cb(err, this.lastID);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
function getOutboxEntries(owningUser, options, cb) {
|
||||
apDb.all(
|
||||
`SELECT id, activity_id, message_id, activity_json, published_timestamp
|
||||
FROM outbox
|
||||
WHERE user_id = ? AND json_extract(activity_json, '$.type') = "Create";`,
|
||||
[owningUser.userId],
|
||||
(err, rows) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
const entries = rows.map(r => {
|
||||
let parsed;
|
||||
try {
|
||||
parsed = JSON.parse(r.activity_json);
|
||||
} catch (e) {
|
||||
return cb(e);
|
||||
}
|
||||
|
||||
return {
|
||||
id: r.id,
|
||||
activityId: r.activity_id,
|
||||
messageId: r.message_id,
|
||||
activity: parsed,
|
||||
published: r.published_timestamp,
|
||||
};
|
||||
});
|
||||
|
||||
return cb(null, entries);
|
||||
}
|
||||
);
|
||||
}
|
|
@ -502,28 +502,21 @@ dbs.message.run(
|
|||
return cb(null);
|
||||
},
|
||||
activitypub: cb => {
|
||||
// private INTEGER NOT NULL, -- Is this Activity private?
|
||||
// Actors we know about and have cached
|
||||
dbs.activitypub.run(
|
||||
`CREATE TABLE IF NOT EXISTS outbox (
|
||||
id INTEGER PRIMARY KEY, -- Local ID
|
||||
activity_id VARCHAR NOT NULL, -- Fully qualified Activity ID/URL (activity.id)
|
||||
user_id INTEGER NOT NULL, -- Local user ID
|
||||
message_id INTEGER NOT NULL, -- Local message ID
|
||||
activity_json VARCHAR NOT NULL, -- Activity in JSON format
|
||||
published_timestamp DATETIME NOT NULL, -- (activity.object.published))
|
||||
`CREATE TABLE IF NOT EXISTS actor_cache (
|
||||
id INTEGER PRIMARY KEY, -- Local DB ID
|
||||
actor_id VARCHAR NOT NULL, -- Fully qualified Actor ID/URL
|
||||
actor_json VARCHAR NOT NULL, -- Actor document
|
||||
timestamp DATETIME NOT NULL, -- Timestamp in which this Actor was cached
|
||||
|
||||
UNIQUE(message_id, activity_id)
|
||||
UNIQUE(actor_id)
|
||||
);`
|
||||
);
|
||||
|
||||
dbs.activitypub.run(
|
||||
`CREATE INDEX IF NOT EXISTS outbox_user_id_index0
|
||||
ON outbox (user_id);`
|
||||
);
|
||||
|
||||
dbs.activitypub.run(
|
||||
`CREATE INDEX IF NOT EXISTS outbox_activity_id_index0
|
||||
ON outbox (activity_id);`
|
||||
`CREATE INDEX IF NOT EXISTS actor_cache_actor_id_index0
|
||||
ON actor_cache (actor_id);`
|
||||
);
|
||||
|
||||
dbs.activitypub.run(
|
||||
|
@ -539,6 +532,7 @@ dbs.message.run(
|
|||
user_id INTEGER NOT NULL, -- Local, owning user ID
|
||||
obj_id VARCHAR NOT NULL, -- Object ID from obj_json.id
|
||||
obj_json VARCHAR NOT NULL, -- Object varies by collection (obj_json.type)
|
||||
is_private INTEGER NOT NULL, -- Is this object private to |user_id|?
|
||||
|
||||
UNIQUE(name, user_id, obj_id)
|
||||
);`
|
||||
|
|
|
@ -11,7 +11,7 @@ exports.setExternalAddressedToInfo = setExternalAddressedToInfo;
|
|||
exports.copyExternalAddressedToInfo = copyExternalAddressedToInfo;
|
||||
|
||||
const EMAIL_REGEX =
|
||||
/^(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/;
|
||||
/^(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[?[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}]?)|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$/;
|
||||
|
||||
/*
|
||||
Input Output
|
||||
|
|
|
@ -3,11 +3,11 @@ const Message = require('../message');
|
|||
const { MessageScanTossModule } = require('../msg_scan_toss_module');
|
||||
const { getServer } = require('../listening_server');
|
||||
const Log = require('../logger').log;
|
||||
const { persistToOutbox } = require('../activitypub/db');
|
||||
|
||||
// deps
|
||||
const async = require('async');
|
||||
const _ = require('lodash');
|
||||
const Collection = require('../activitypub/collection');
|
||||
|
||||
exports.moduleInfo = {
|
||||
name: 'ActivityPub',
|
||||
|
@ -51,7 +51,7 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
|
|||
(noteInfo, callback) => {
|
||||
const { activity, fromUser, remoteActor } = noteInfo;
|
||||
|
||||
// :TODO: Implement retry logic (connection issues, retryable HTTP status)
|
||||
// :TODO: Implement retry logic (connection issues, retryable HTTP status) ??
|
||||
activity.sendTo(
|
||||
remoteActor.inbox,
|
||||
fromUser,
|
||||
|
@ -82,7 +82,7 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
|
|||
);
|
||||
},
|
||||
(activity, fromUser, callback) => {
|
||||
persistToOutbox(activity, fromUser, message, (err, localId) => {
|
||||
Collection.addOutboxItem(fromUser, activity, (err, localId) => {
|
||||
if (!err) {
|
||||
this.log.debug(
|
||||
{ localId, activityId: activity.id },
|
||||
|
|
|
@ -204,7 +204,9 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
|
|||
});
|
||||
}
|
||||
|
||||
_getCollectionHandler(name, req, resp) {
|
||||
_getCollectionHandler(name, req, resp, signature) {
|
||||
EnigAssert(signature, 'Missing signature!');
|
||||
|
||||
const url = new URL(req.url, `https://${req.headers.host}`);
|
||||
const accountName = this._accountNameFromUserPath(url, name);
|
||||
if (!accountName) {
|
||||
|
@ -244,52 +246,15 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
|
|||
});
|
||||
}
|
||||
|
||||
_followingGetHandler(req, resp) {
|
||||
_followingGetHandler(req, resp, signature) {
|
||||
this.log.debug({ url: req.url }, 'Request for "following"');
|
||||
return this._getCollectionHandler('following', req, resp);
|
||||
return this._getCollectionHandler('following', req, resp, signature);
|
||||
}
|
||||
|
||||
// https://docs.gotosocial.org/en/latest/federation/behaviors/outbox/
|
||||
_outboxGetHandler(req, resp) {
|
||||
_outboxGetHandler(req, resp, signature) {
|
||||
this.log.debug({ url: req.url }, 'Request for "outbox"');
|
||||
|
||||
// the request must be signed, and the signature must be valid
|
||||
const signature = this._parseAndValidateSignature(req);
|
||||
if (!signature) {
|
||||
return this.webServer.accessDenied(resp);
|
||||
}
|
||||
|
||||
// /_enig/ap/users/SomeName/outbox -> SomeName
|
||||
const url = new URL(req.url, `https://${req.headers.host}`);
|
||||
const accountName = this._accountNameFromUserPath(url, 'outbox');
|
||||
if (!accountName) {
|
||||
return this.webServer.resourceNotFound(resp);
|
||||
}
|
||||
|
||||
userFromAccount(accountName, (err, user) => {
|
||||
if (err) {
|
||||
this.log.info(
|
||||
{ reason: err.message, accountName: accountName },
|
||||
`No user "${accountName}" for "self"`
|
||||
);
|
||||
return this.webServer.resourceNotFound(resp);
|
||||
}
|
||||
|
||||
Activity.fromOutboxEntries(user, this.webServer, (err, activity) => {
|
||||
if (err) {
|
||||
return this.webServer.internalServerError(resp, err);
|
||||
}
|
||||
|
||||
const body = JSON.stringify(activity);
|
||||
const headers = {
|
||||
'Content-Type': ActivityJsonMime,
|
||||
'Content-Length': body.length,
|
||||
};
|
||||
|
||||
resp.writeHead(200, headers);
|
||||
return resp.end(body);
|
||||
});
|
||||
});
|
||||
return this._getCollectionHandler('outbox', req, resp, signature);
|
||||
}
|
||||
|
||||
_accountNameFromUserPath(url, suffix) {
|
||||
|
@ -301,9 +266,9 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
|
|||
return m[1];
|
||||
}
|
||||
|
||||
_followersGetHandler(req, resp) {
|
||||
_followersGetHandler(req, resp, signature) {
|
||||
this.log.debug({ url: req.url }, 'Request for "followers"');
|
||||
return this._getCollectionHandler('followers', req, resp);
|
||||
return this._getCollectionHandler('followers', req, resp, signature);
|
||||
}
|
||||
|
||||
_parseAndValidateSignature(req) {
|
||||
|
@ -376,7 +341,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
|
|||
return this.webServer.notImplemented(resp);
|
||||
}
|
||||
|
||||
Collection.remoteFromCollectionById(
|
||||
Collection.removeFromCollectionById(
|
||||
'followers',
|
||||
user,
|
||||
activity.actor,
|
||||
|
|
Loading…
Reference in New Issue