Ability to send/recv public messages in the AP shared inbox areaTag

* Optional subjects
* Resolving followers
* Various cleanup and tidy
This commit is contained in:
Bryan Ashby 2023-02-05 21:10:51 -07:00
parent 36ebda5269
commit 0402de7444
No known key found for this signature in database
GPG Key ID: C2C1B501E4EFD994
9 changed files with 340 additions and 119 deletions

View File

@ -49,12 +49,25 @@ module.exports = class Activity extends ActivityPubObject {
}
static makeCreate(webServer, actor, obj) {
return new Activity({
const activity = new Activity({
id: Activity.activityObjectId(webServer),
to: obj.to,
type: WellKnownActivity.Create,
actor,
object: obj,
});
const copy = n => {
if (obj[n]) {
activity[n] = obj[n];
}
};
copy('to');
copy('cc');
// :TODO: Others?
return activity;
}
static makeTombstone(obj) {
@ -68,7 +81,7 @@ module.exports = class Activity extends ActivityPubObject {
});
}
sendTo(actorUrl, fromUser, webServer, cb) {
sendTo(inboxEndpoint, fromUser, webServer, cb) {
const privateKey = fromUser.getProperty(UserProps.PrivateActivityPubSigningKey);
if (_.isEmpty(privateKey)) {
return cb(
@ -91,7 +104,7 @@ module.exports = class Activity extends ActivityPubObject {
};
const activityJson = JSON.stringify(this);
return postJson(actorUrl, activityJson, reqOpts, cb);
return postJson(inboxEndpoint, activityJson, reqOpts, cb);
}
// :TODO: we need dp/support a bit more here...

View File

@ -12,6 +12,7 @@ const { getJson } = require('../http_util');
// deps
const { isString } = require('lodash');
const Log = require('../logger');
module.exports = class Collection extends ActivityPubObject {
constructor(obj) {
@ -260,18 +261,33 @@ module.exports = class Collection extends ActivityPubObject {
return cb(err);
}
entries = entries || [];
try {
entries = (entries || []).map(e => JSON.parse(e.object_json));
} catch (e) {
Log.error(`Collection "${collectionId}" error: ${e.message}`);
}
if (mapper && entries.length > 0) {
entries = entries.map(mapper);
}
const obj = {
id: `${collectionId}/page=${page}`,
type: 'OrderedCollectionPage',
totalItems: entries.length,
orderedItems: entries,
partOf: collectionId,
};
let obj;
if ('all' === page) {
obj = {
id: collectionId,
type: 'OrderedCollection',
totalItems: entries.length,
orderedItems: entries,
};
} else {
obj = {
id: `${collectionId}/page=${page}`,
type: 'OrderedCollectionPage',
totalItems: entries.length,
orderedItems: entries,
partOf: collectionId,
};
}
return cb(null, new Collection(obj));
}

View File

@ -50,7 +50,7 @@ module.exports = class Note extends ActivityPubObject {
}
// A local Message bound for ActivityPub
static fromLocalOutgoingMessage(message, webServer, cb) {
static fromLocalMessage(message, webServer, cb) {
const localUserId = message.getLocalFromUserId();
if (!localUserId) {
return cb(Errors.UnexpectedState('Invalid user ID for local user!'));
@ -63,7 +63,7 @@ module.exports = class Note extends ActivityPubObject {
}
const remoteActorAccount = message.getRemoteToUser();
if (!remoteActorAccount) {
if (!remoteActorAccount && message.isPrivate()) {
return cb(
Errors.UnexpectedState('Message does not contain a remote address')
);
@ -80,9 +80,13 @@ module.exports = class Note extends ActivityPubObject {
});
},
(fromUser, fromActor, callback) => {
Actor.fromId(remoteActorAccount, (err, remoteActor) => {
return callback(err, fromUser, fromActor, remoteActor);
});
if (message.isPrivate()) {
Actor.fromId(remoteActorAccount, (err, remoteActor) => {
return callback(err, fromUser, fromActor, remoteActor);
});
} else {
return callback(null, fromUser, fromActor, null);
}
},
(fromUser, fromActor, remoteActor, callback) => {
if (!message.replyToMsgId) {

View File

@ -904,10 +904,20 @@ module.exports = () => {
name: 'System Bulletins',
desc: 'Bulletin messages for all users',
},
},
},
activitypub_shared_inbox: {
activity_pub: {
name: 'ActivityPub Shared Inbox',
desc: 'Public and shared ActivityPub messages',
areas: {
activitypub_shared: {
name: 'ActivityPub sharedInbox',
desc: 'Public shared inbox for ActivityPub',
alwaysExportExternal: true,
subjectOptional: true,
addressFlavor: 'activitypub',
},
},
},

View File

@ -174,6 +174,12 @@ exports.FullScreenEditorModule =
) {
// Ignore validation errors if this is the subject field
// and it's optional
const areaInfo = getMessageAreaByTag(this.messageAreaTag);
if (true === areaInfo.subjectOptional) {
return cb(null, null);
}
// private messages are a little different...
const toView = this.getView('header', MciViewIds.header.to);
const msgInfo = messageInfoFromAddressedToInfo(
getAddressedToInfo(toView.getData())
@ -594,7 +600,11 @@ exports.FullScreenEditorModule =
function populateLocalUserInfo(callback) {
self.message.setLocalFromUserId(self.client.user.userId);
if (!self.isPrivateMail()) {
const areaInfo = getMessageAreaByTag(self.messageAreaTag);
if (
!self.isPrivateMail() &&
true !== areaInfo.alwaysExportExternal
) {
return callback(null);
}
@ -636,7 +646,17 @@ exports.FullScreenEditorModule =
self.message.toUserName,
(err, toUserId) => {
if (err) {
return callback(err);
if (self.message.isPrivate()) {
return callback(err);
}
if (areaInfo.addressFlavor) {
self.message.setExternalFlavor(
areaInfo.addressFlavor
);
}
return callback(null);
}
self.message.setLocalToUserId(toUserId);

View File

@ -2,10 +2,13 @@ const WellKnownAreaTags = {
Invalid: '',
Private: 'private_mail',
Bulletin: 'local_bulletin',
ActivityPubSharedInbox: 'activitypub_shared_inbox',
ActivityPubShared: 'activitypub_shared', // sharedInbox -> HERE -> exported as replies (direct) and outbox items (new posts)
};
exports.WellKnownAreaTags = WellKnownAreaTags;
const WellKnownExternalAreaTags = [WellKnownAreaTags.ActivityPubShared];
exports.WellKnownExternalAreaTags = WellKnownExternalAreaTags;
const WellKnownMetaCategories = {
System: 'System',
FtnProperty: 'FtnProperty',

View File

@ -3,12 +3,18 @@ const Message = require('../message');
const { MessageScanTossModule } = require('../msg_scan_toss_module');
const { getServer } = require('../listening_server');
const Log = require('../logger').log;
const { WellKnownAreaTags, AddressFlavor } = require('../message_const');
const { Errors } = require('../enig_error');
const Collection = require('../activitypub/collection');
const Note = require('../activitypub/note');
const { makeUserUrl } = require('../activitypub/util');
const { getAddressedToInfo } = require('../mail_util');
const { PublicCollectionId } = require('../activitypub/const');
const Actor = require('../activitypub/actor');
// deps
const async = require('async');
const _ = require('lodash');
const Collection = require('../activitypub/collection');
const Note = require('../activitypub/note');
exports.moduleInfo = {
name: 'ActivityPub',
@ -40,19 +46,83 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
return;
}
//
// Private:
// Send Note directly to another remote Actor's inbox
//
// Public:
// - The original message may be addressed to a non-ActivityPub address
// or something like "All" or "Public"; In this case, ignore that entry
// - Additionally, we need to send to the local Actor's followers via their sharedInbox
//
// To achieve the above for Public, we'll collect the followers from the local
// user, query their unique shared inboxes's, update the Note's addressing,
// then deliver and store.
//
async.waterfall(
[
callback => {
Note.fromLocalOutgoingMessage(
// Private or addressed to a single AP Actor:
Note.fromLocalMessage(message, this._webServer(), (err, noteInfo) => {
return callback(err, noteInfo);
});
},
(noteInfo, callback) => {
if (message.isPrivate()) {
if (!noteInfo.remoteActor) {
return callback(
Errors.UnexpectedState(
'Private messages should contain a remote Actor!'
)
);
}
return callback(null, noteInfo, [noteInfo.remoteActor.id]);
}
// public: we need to build a list of sharedInbox's
this._collectDeliveryEndpoints(
message,
this._webServer(),
(err, noteInfo) => {
return callback(err, noteInfo);
noteInfo.fromUser,
(err, deliveryEndpoints) => {
return callback(err, noteInfo, deliveryEndpoints);
}
);
},
(noteInfo, callback) => {
const { note, fromUser, remoteActor } = noteInfo;
(noteInfo, deliveryEndpoints, callback) => {
const { note, fromUser } = noteInfo;
//
// Update the Note's addressing:
// - Private:
// to: sharedInboxEndpoints[0]
// - Public:
// to: https://www.w3.org/ns/activitystreams#Public
// ... and the message.getRemoteToUser() value *if*
// the flavor is deemed ActivityPub
// cc: [sharedInboxEndpoints]
//
if (message.isPrivate()) {
note.to = deliveryEndpoints.sharedInboxes[0];
} else {
if (deliveryEndpoints.additionalTo) {
note.to = [
PublicCollectionId,
deliveryEndpoints.additionalTo,
];
} else {
note.to = PublicCollectionId;
}
note.cc = [
deliveryEndpoints.followers,
...deliveryEndpoints.sharedInboxes,
];
if (note.to.length < 2 && note.cc.length < 2) {
// If we only have a generic 'followers' endpoint, there is no where to send to
return callback(null, activity, fromUser);
}
}
const activity = Activity.makeCreate(
this._webServer(),
@ -60,41 +130,59 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
note
);
// :TODO: Implement retry logic (connection issues, retryable HTTP status) ??
const inbox = remoteActor.inbox;
let allEndpoints = deliveryEndpoints.sharedInboxes;
if (deliveryEndpoints.additionalTo) {
allEndpoints.push(deliveryEndpoints.additionalTo);
}
allEndpoints = Array.from(new Set(allEndpoints)); // unique again
// const inbox = remoteActor.endpoints.sharedInbox;
// activity.object.to = 'https://www.w3.org/ns/activitystreams#Public';
async.eachLimit(
allEndpoints,
4,
(inbox, nextInbox) => {
activity.sendTo(
inbox,
fromUser,
this._webServer(),
(err, respBody, res) => {
if (err) {
this.log.warn(
{
inbox,
error: err.message,
},
'Failed to send "Note" Activity to Inbox'
);
} else if (
res.statusCode === 200 ||
res.statusCode === 202
) {
this.log.debug(
{ inbox, uuid: message.uuid },
'Message delivered to Inbox'
);
} else {
this.log.warn(
{
inbox,
statusCode: res.statusCode,
body: _.truncate(respBody, 128),
},
'Unexpected status code'
);
}
activity.sendTo(
inbox,
fromUser,
this._webServer(),
(err, respBody, res) => {
if (err) {
this.log.warn(
{ error: err.message, inbox: remoteActor.inbox },
'Failed to send "Note" Activity to Inbox'
);
} else if (res.statusCode !== 202 && res.statusCode !== 200) {
this.log.warn(
{
inbox: remoteActor.inbox,
statusCode: res.statusCode,
body: _.truncate(respBody, 128),
},
'Unexpected status code'
);
}
// carry on regardless if we sent and record
// the item in the user's Outbox collection
// If we can't send now, no harm, we'll record to the outbox
return nextInbox(null);
}
);
},
() => {
return callback(null, activity, fromUser);
}
);
},
(activity, fromUser, callback) => {
// If we failed to send above,
Collection.addOutboxItem(
fromUser,
activity,
@ -166,6 +254,90 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
);
}
_collectDeliveryEndpoints(message, localUser, cb) {
this._collectFollowersSharedInboxEndpoints(
localUser,
(err, endpoints, followersEndpoint) => {
if (err) {
return cb(err);
}
//
// Don't inspect the remote address/remote to
// Here; We already know this in a public
// area. Instead, see if the user typed in
// a reasonable AP address here. If so, we'll
// try to send directly to them as well.
//
const addrInfo = getAddressedToInfo(message.toUserName);
if (
!message.isPrivate() &&
AddressFlavor.ActivityPub === addrInfo.flavor
) {
Actor.fromId(addrInfo.remote, (err, actor) => {
if (err) {
return cb(err);
}
return cb(null, {
additionalTo: actor.id,
sharedInboxes: endpoints,
followers: followersEndpoint,
});
});
} else {
return cb(null, {
sharedInboxes: endpoints,
followers: followersEndpoint,
});
}
}
);
}
_collectFollowersSharedInboxEndpoints(localUser, cb) {
const localFollowersEndpoint =
makeUserUrl(this._webServer(), localUser, '/ap/users/') + '/followers';
Collection.followers(localFollowersEndpoint, 'all', (err, collection) => {
if (err) {
return cb(err);
}
if (!collection.orderedItems || collection.orderedItems.length < 1) {
// no followers :(
return cb(null, []);
}
async.mapLimit(
collection.orderedItems,
4,
(actorId, nextActorId) => {
Actor.fromId(actorId, (err, actor) => {
return nextActorId(err, actor);
});
},
(err, followerActors) => {
if (err) {
return cb(err);
}
const sharedInboxEndpoints = Array.from(
new Set(
followerActors
.map(actor => {
return _.get(actor, 'endpoints.sharedInbox');
})
.filter(inbox => inbox) // drop nulls
)
);
return cb(null, sharedInboxEndpoints, localFollowersEndpoint);
}
);
});
}
_isEnabled() {
// :TODO: check config to see if AP integration is enabled/etc.
return this._webServer();
@ -183,7 +355,13 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
return true;
}
// :TODO: Implement the area mapping check for public
// Public items do not need a specific 'to'; we'll record to the
// local Actor's outbox and send to any followers we know about
if (message.areaTag === WellKnownAreaTags.ActivityPubShared) {
return true;
}
// :TODO: Implement the area mapping check for public 'groups'
return false;
}

View File

@ -326,6 +326,14 @@ exports.getModule = class WebServerModule extends ServerModule {
});
}
ok(resp, body = '', headers = { 'Content-Type:': 'text/html' }) {
if (body && !headers['Content-Length']) {
headers['Content-Length'] = Buffer(body).length;
}
resp.writeHead(200, 'OK', body ? headers : null);
return resp.end(body);
}
created(resp, body = '', headers = { 'Content-Type:': 'text/html' }) {
resp.writeHead(201, 'Created', body ? headers : null);
return resp.end(body);

View File

@ -189,20 +189,14 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
req.on('end', () => {
const activity = Activity.fromJsonString(Buffer.concat(body).toString());
if (!activity) {
if (!activity || !activity.isValid()) {
this.log.error(
{ url: req.url, method: req.method, endpoint: 'inbox' },
'Failed to parse Activity'
);
return this.webServer.resourceNotFound(resp);
}
if (!activity.isValid()) {
this.log.warn(
{ activity, endpoint: 'inbox' },
'Invalid or unsupported Activity'
);
return this.webServer.badRequest(resp);
return activity
? this.webServer.badRequest(resp)
: this.webServer.notImplemented(resp);
}
switch (activity.type) {
@ -247,7 +241,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
break;
}
return this.webServer.resourceNotFound(resp);
return this.webServer.notImplemented(resp);
});
}
@ -259,20 +253,14 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
req.on('end', () => {
const activity = Activity.fromJsonString(Buffer.concat(body).toString());
if (!activity) {
if (!activity || !activity.isValid()) {
this.log.error(
{ url: req.url, method: req.method, endpoint: 'sharedInbox' },
'Failed to parse Activity'
);
return this.webServer.resourceNotFound(resp);
}
if (!activity.isValid()) {
this.log.warn(
{ activity, endpoint: 'sharedInbox' },
'Invalid or unsupported Activity'
);
return this.webServer.badRequest(resp);
return activity
? this.webServer.badRequest(resp)
: this.webServer.notImplemented(resp);
}
switch (activity.type) {
@ -287,8 +275,11 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
{ type: activity.type },
'Invalid or unknown Activity type'
);
return this.webServer.resourceNotFound(resp);
break;
}
// don't understand the 'type'
return this.webServer.notImplemented(resp);
});
}
@ -309,28 +300,28 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
}
inboxUpdateObject(inboxType, req, resp, activity) {
const objectIdToUpdate = _.get(activity, 'object.id');
const updateObjectId = _.get(activity, 'object.id');
const objectType = _.get(activity, 'object.type');
this.log.info(
{ inboxType, objectId: objectIdToUpdate, type: objectType },
{ inboxType, objectId: updateObjectId, type: objectType },
'Inbox Object "Update" request'
);
// :TODO: other types...
if (!objectIdToUpdate || !['Note'].includes(objectType)) {
return this.webServer.resourceNotFound(resp);
if (!updateObjectId || !['Note'].includes(objectType)) {
return this.webServer.notImplemented(resp);
}
// Note's are wrapped in Create Activities
Collection.objectByEmbeddedId(objectIdToUpdate, (err, obj) => {
Collection.objectByEmbeddedId(updateObjectId, (err, obj) => {
if (err) {
return this.webServer.internalServerError(resp, err);
}
if (!obj) {
// no match
return this.webServer.resourceNotFound(resp);
// no match, but respond as accepted and hopefully they don't ask again
return this.webServer.accepted(resp);
}
// OK, the object exists; Does the caller have permission
@ -346,7 +337,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
if (updateTargetUrl.host !== updaterUrl.host) {
this.log.warn(
{
objectId: objectIdToUpdate,
objectId: updateObjectId,
type: objectType,
updateTargetHost: updateTargetUrl.host,
requestorHost: updaterUrl.host,
@ -358,7 +349,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
Collection.updateCollectionEntry(
'inbox',
objectIdToUpdate,
updateObjectId,
activity,
err => {
if (err) {
@ -367,7 +358,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
this.log.info(
{
objectId: objectIdToUpdate,
objectId: updateObjectId,
type: objectType,
collection: 'inbox',
},
@ -444,7 +435,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
return this._storeNoteAsMessage(
activity.id,
'All',
Message.WellKnownAreaTags.ActivityPubSharedInbox,
Message.WellKnownAreaTags.ActivityPubShared,
note,
cb
);
@ -527,13 +518,9 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
}
const body = JSON.stringify(collection);
const headers = {
return this.webServer.ok(resp, body, {
'Content-Type': ActivityStreamMediaType,
'Content-Length': Buffer(body).length,
};
resp.writeHead(200, headers);
return resp.end(body);
});
});
}
@ -568,8 +555,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
// :TODO: support a template here
resp.writeHead(200, { 'Content-Type': 'text/html' });
return resp.end(note.content);
return this.webServer.ok(resp, note.content);
});
}
@ -619,11 +605,6 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
'Follow request'
);
const ok = () => {
resp.writeHead(200, { 'Content-Type': 'text/html' });
return resp.end('');
};
//
// If the user blindly accepts Followers, we can persist
// and send an 'Accept' now. Otherwise, we need to queue this
@ -633,7 +614,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
const activityPubSettings = ActivityPubSettings.fromUser(localUser);
if (!activityPubSettings.manuallyApproveFollowers) {
this._recordAcceptedFollowRequest(localUser, remoteActor, activity);
return ok();
return this.webServer.ok(resp);
} else {
Collection.addFollowRequest(
localUser,
@ -645,7 +626,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
return this.internalServerError(resp, err);
}
return ok();
return this.webServer.ok(resp);
}
);
}
@ -836,13 +817,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
const body = JSON.stringify(localActor);
const headers = {
'Content-Type': ActivityStreamMediaType,
'Content-Length': Buffer(body).length,
};
resp.writeHead(200, headers);
return resp.end(body);
return this.webServer.ok(resp, body, { 'Content-Type': ActivityStreamMediaType });
}
_standardSelfHandler(localUser, localActor, req, resp) {
@ -872,13 +847,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
`Serving ActivityPub Profile for "${localUser.username}"`
);
const headers = {
'Content-Type': contentType,
'Content-Length': Buffer(body).length,
};
resp.writeHead(200, headers);
return resp.end(body);
return this.webServer.ok(resp, body, { 'Content-Type': contentType });
}
);
}