From 0402de7444e628921065545256e69fbcdf772216 Mon Sep 17 00:00:00 2001 From: Bryan Ashby Date: Sun, 5 Feb 2023 21:10:51 -0700 Subject: [PATCH] Ability to send/recv public messages in the AP shared inbox areaTag * Optional subjects * Resolving followers * Various cleanup and tidy --- core/activitypub/activity.js | 19 +- core/activitypub/collection.js | 32 ++- core/activitypub/note.js | 14 +- core/config_default.js | 12 +- core/fse.js | 24 +- core/message_const.js | 5 +- core/scanner_tossers/activitypub.js | 252 +++++++++++++++--- core/servers/content/web.js | 8 + .../content/web_handlers/activitypub.js | 93 +++---- 9 files changed, 340 insertions(+), 119 deletions(-) diff --git a/core/activitypub/activity.js b/core/activitypub/activity.js index d5db4112..700838d2 100644 --- a/core/activitypub/activity.js +++ b/core/activitypub/activity.js @@ -49,12 +49,25 @@ module.exports = class Activity extends ActivityPubObject { } static makeCreate(webServer, actor, obj) { - return new Activity({ + const activity = new Activity({ id: Activity.activityObjectId(webServer), + to: obj.to, type: WellKnownActivity.Create, actor, object: obj, }); + + const copy = n => { + if (obj[n]) { + activity[n] = obj[n]; + } + }; + + copy('to'); + copy('cc'); + // :TODO: Others? + + return activity; } static makeTombstone(obj) { @@ -68,7 +81,7 @@ module.exports = class Activity extends ActivityPubObject { }); } - sendTo(actorUrl, fromUser, webServer, cb) { + sendTo(inboxEndpoint, fromUser, webServer, cb) { const privateKey = fromUser.getProperty(UserProps.PrivateActivityPubSigningKey); if (_.isEmpty(privateKey)) { return cb( @@ -91,7 +104,7 @@ module.exports = class Activity extends ActivityPubObject { }; const activityJson = JSON.stringify(this); - return postJson(actorUrl, activityJson, reqOpts, cb); + return postJson(inboxEndpoint, activityJson, reqOpts, cb); } // :TODO: we need dp/support a bit more here... diff --git a/core/activitypub/collection.js b/core/activitypub/collection.js index a95753d9..bd09a16a 100644 --- a/core/activitypub/collection.js +++ b/core/activitypub/collection.js @@ -12,6 +12,7 @@ const { getJson } = require('../http_util'); // deps const { isString } = require('lodash'); +const Log = require('../logger'); module.exports = class Collection extends ActivityPubObject { constructor(obj) { @@ -260,18 +261,33 @@ module.exports = class Collection extends ActivityPubObject { return cb(err); } - entries = entries || []; + try { + entries = (entries || []).map(e => JSON.parse(e.object_json)); + } catch (e) { + Log.error(`Collection "${collectionId}" error: ${e.message}`); + } + if (mapper && entries.length > 0) { entries = entries.map(mapper); } - const obj = { - id: `${collectionId}/page=${page}`, - type: 'OrderedCollectionPage', - totalItems: entries.length, - orderedItems: entries, - partOf: collectionId, - }; + let obj; + if ('all' === page) { + obj = { + id: collectionId, + type: 'OrderedCollection', + totalItems: entries.length, + orderedItems: entries, + }; + } else { + obj = { + id: `${collectionId}/page=${page}`, + type: 'OrderedCollectionPage', + totalItems: entries.length, + orderedItems: entries, + partOf: collectionId, + }; + } return cb(null, new Collection(obj)); } diff --git a/core/activitypub/note.js b/core/activitypub/note.js index 8904e339..43af40ec 100644 --- a/core/activitypub/note.js +++ b/core/activitypub/note.js @@ -50,7 +50,7 @@ module.exports = class Note extends ActivityPubObject { } // A local Message bound for ActivityPub - static fromLocalOutgoingMessage(message, webServer, cb) { + static fromLocalMessage(message, webServer, cb) { const localUserId = message.getLocalFromUserId(); if (!localUserId) { return cb(Errors.UnexpectedState('Invalid user ID for local user!')); @@ -63,7 +63,7 @@ module.exports = class Note extends ActivityPubObject { } const remoteActorAccount = message.getRemoteToUser(); - if (!remoteActorAccount) { + if (!remoteActorAccount && message.isPrivate()) { return cb( Errors.UnexpectedState('Message does not contain a remote address') ); @@ -80,9 +80,13 @@ module.exports = class Note extends ActivityPubObject { }); }, (fromUser, fromActor, callback) => { - Actor.fromId(remoteActorAccount, (err, remoteActor) => { - return callback(err, fromUser, fromActor, remoteActor); - }); + if (message.isPrivate()) { + Actor.fromId(remoteActorAccount, (err, remoteActor) => { + return callback(err, fromUser, fromActor, remoteActor); + }); + } else { + return callback(null, fromUser, fromActor, null); + } }, (fromUser, fromActor, remoteActor, callback) => { if (!message.replyToMsgId) { diff --git a/core/config_default.js b/core/config_default.js index 60885f90..1dc75678 100644 --- a/core/config_default.js +++ b/core/config_default.js @@ -904,10 +904,20 @@ module.exports = () => { name: 'System Bulletins', desc: 'Bulletin messages for all users', }, + }, + }, - activitypub_shared_inbox: { + activity_pub: { + name: 'ActivityPub Shared Inbox', + desc: 'Public and shared ActivityPub messages', + + areas: { + activitypub_shared: { name: 'ActivityPub sharedInbox', desc: 'Public shared inbox for ActivityPub', + alwaysExportExternal: true, + subjectOptional: true, + addressFlavor: 'activitypub', }, }, }, diff --git a/core/fse.js b/core/fse.js index 9a9c1f18..fceb7fac 100644 --- a/core/fse.js +++ b/core/fse.js @@ -174,6 +174,12 @@ exports.FullScreenEditorModule = ) { // Ignore validation errors if this is the subject field // and it's optional + const areaInfo = getMessageAreaByTag(this.messageAreaTag); + if (true === areaInfo.subjectOptional) { + return cb(null, null); + } + + // private messages are a little different... const toView = this.getView('header', MciViewIds.header.to); const msgInfo = messageInfoFromAddressedToInfo( getAddressedToInfo(toView.getData()) @@ -594,7 +600,11 @@ exports.FullScreenEditorModule = function populateLocalUserInfo(callback) { self.message.setLocalFromUserId(self.client.user.userId); - if (!self.isPrivateMail()) { + const areaInfo = getMessageAreaByTag(self.messageAreaTag); + if ( + !self.isPrivateMail() && + true !== areaInfo.alwaysExportExternal + ) { return callback(null); } @@ -636,7 +646,17 @@ exports.FullScreenEditorModule = self.message.toUserName, (err, toUserId) => { if (err) { - return callback(err); + if (self.message.isPrivate()) { + return callback(err); + } + + if (areaInfo.addressFlavor) { + self.message.setExternalFlavor( + areaInfo.addressFlavor + ); + } + + return callback(null); } self.message.setLocalToUserId(toUserId); diff --git a/core/message_const.js b/core/message_const.js index b67992b8..7769663e 100644 --- a/core/message_const.js +++ b/core/message_const.js @@ -2,10 +2,13 @@ const WellKnownAreaTags = { Invalid: '', Private: 'private_mail', Bulletin: 'local_bulletin', - ActivityPubSharedInbox: 'activitypub_shared_inbox', + ActivityPubShared: 'activitypub_shared', // sharedInbox -> HERE -> exported as replies (direct) and outbox items (new posts) }; exports.WellKnownAreaTags = WellKnownAreaTags; +const WellKnownExternalAreaTags = [WellKnownAreaTags.ActivityPubShared]; +exports.WellKnownExternalAreaTags = WellKnownExternalAreaTags; + const WellKnownMetaCategories = { System: 'System', FtnProperty: 'FtnProperty', diff --git a/core/scanner_tossers/activitypub.js b/core/scanner_tossers/activitypub.js index 014222f4..9fd896fb 100644 --- a/core/scanner_tossers/activitypub.js +++ b/core/scanner_tossers/activitypub.js @@ -3,12 +3,18 @@ const Message = require('../message'); const { MessageScanTossModule } = require('../msg_scan_toss_module'); const { getServer } = require('../listening_server'); const Log = require('../logger').log; +const { WellKnownAreaTags, AddressFlavor } = require('../message_const'); +const { Errors } = require('../enig_error'); +const Collection = require('../activitypub/collection'); +const Note = require('../activitypub/note'); +const { makeUserUrl } = require('../activitypub/util'); +const { getAddressedToInfo } = require('../mail_util'); +const { PublicCollectionId } = require('../activitypub/const'); +const Actor = require('../activitypub/actor'); // deps const async = require('async'); const _ = require('lodash'); -const Collection = require('../activitypub/collection'); -const Note = require('../activitypub/note'); exports.moduleInfo = { name: 'ActivityPub', @@ -40,19 +46,83 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule return; } + // + // Private: + // Send Note directly to another remote Actor's inbox + // + // Public: + // - The original message may be addressed to a non-ActivityPub address + // or something like "All" or "Public"; In this case, ignore that entry + // - Additionally, we need to send to the local Actor's followers via their sharedInbox + // + // To achieve the above for Public, we'll collect the followers from the local + // user, query their unique shared inboxes's, update the Note's addressing, + // then deliver and store. + // + async.waterfall( [ callback => { - Note.fromLocalOutgoingMessage( + // Private or addressed to a single AP Actor: + Note.fromLocalMessage(message, this._webServer(), (err, noteInfo) => { + return callback(err, noteInfo); + }); + }, + (noteInfo, callback) => { + if (message.isPrivate()) { + if (!noteInfo.remoteActor) { + return callback( + Errors.UnexpectedState( + 'Private messages should contain a remote Actor!' + ) + ); + } + return callback(null, noteInfo, [noteInfo.remoteActor.id]); + } + + // public: we need to build a list of sharedInbox's + this._collectDeliveryEndpoints( message, - this._webServer(), - (err, noteInfo) => { - return callback(err, noteInfo); + noteInfo.fromUser, + (err, deliveryEndpoints) => { + return callback(err, noteInfo, deliveryEndpoints); } ); }, - (noteInfo, callback) => { - const { note, fromUser, remoteActor } = noteInfo; + (noteInfo, deliveryEndpoints, callback) => { + const { note, fromUser } = noteInfo; + + // + // Update the Note's addressing: + // - Private: + // to: sharedInboxEndpoints[0] + // - Public: + // to: https://www.w3.org/ns/activitystreams#Public + // ... and the message.getRemoteToUser() value *if* + // the flavor is deemed ActivityPub + // cc: [sharedInboxEndpoints] + // + if (message.isPrivate()) { + note.to = deliveryEndpoints.sharedInboxes[0]; + } else { + if (deliveryEndpoints.additionalTo) { + note.to = [ + PublicCollectionId, + deliveryEndpoints.additionalTo, + ]; + } else { + note.to = PublicCollectionId; + } + note.cc = [ + deliveryEndpoints.followers, + ...deliveryEndpoints.sharedInboxes, + ]; + + if (note.to.length < 2 && note.cc.length < 2) { + // If we only have a generic 'followers' endpoint, there is no where to send to + return callback(null, activity, fromUser); + } + } const activity = Activity.makeCreate( this._webServer(), @@ -60,41 +130,59 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule note ); - // :TODO: Implement retry logic (connection issues, retryable HTTP status) ?? - const inbox = remoteActor.inbox; + let allEndpoints = deliveryEndpoints.sharedInboxes; + if (deliveryEndpoints.additionalTo) { + allEndpoints.push(deliveryEndpoints.additionalTo); + } + allEndpoints = Array.from(new Set(allEndpoints)); // unique again - // const inbox = remoteActor.endpoints.sharedInbox; - // activity.object.to = 'https://www.w3.org/ns/activitystreams#Public'; + async.eachLimit( + allEndpoints, + 4, + (inbox, nextInbox) => { + activity.sendTo( + inbox, + fromUser, + this._webServer(), + (err, respBody, res) => { + if (err) { + this.log.warn( + { + inbox, + error: err.message, + }, + 'Failed to send "Note" Activity to Inbox' + ); + } else if ( + res.statusCode === 200 || + res.statusCode === 202 + ) { + this.log.debug( + { inbox, uuid: message.uuid }, + 'Message delivered to Inbox' + ); + } else { + this.log.warn( + { + inbox, + statusCode: res.statusCode, + body: _.truncate(respBody, 128), + }, + 'Unexpected status code' + ); + } - activity.sendTo( - inbox, - fromUser, - this._webServer(), - (err, respBody, res) => { - if (err) { - this.log.warn( - { error: err.message, inbox: remoteActor.inbox }, - 'Failed to send "Note" Activity to Inbox' - ); - } else if (res.statusCode !== 202 && res.statusCode !== 200) { - this.log.warn( - { - inbox: remoteActor.inbox, - statusCode: res.statusCode, - body: _.truncate(respBody, 128), - }, - 'Unexpected status code' - ); - } - - // carry on regardless if we sent and record - // the item in the user's Outbox collection + // If we can't send now, no harm, we'll record to the outbox + return nextInbox(null); + } + ); + }, + () => { return callback(null, activity, fromUser); } ); }, (activity, fromUser, callback) => { - // If we failed to send above, Collection.addOutboxItem( fromUser, activity, @@ -166,6 +254,90 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule ); } + _collectDeliveryEndpoints(message, localUser, cb) { + this._collectFollowersSharedInboxEndpoints( + localUser, + (err, endpoints, followersEndpoint) => { + if (err) { + return cb(err); + } + + // + // Don't inspect the remote address/remote to + // Here; We already know this in a public + // area. Instead, see if the user typed in + // a reasonable AP address here. If so, we'll + // try to send directly to them as well. + // + const addrInfo = getAddressedToInfo(message.toUserName); + if ( + !message.isPrivate() && + AddressFlavor.ActivityPub === addrInfo.flavor + ) { + Actor.fromId(addrInfo.remote, (err, actor) => { + if (err) { + return cb(err); + } + + return cb(null, { + additionalTo: actor.id, + sharedInboxes: endpoints, + followers: followersEndpoint, + }); + }); + } else { + return cb(null, { + sharedInboxes: endpoints, + followers: followersEndpoint, + }); + } + } + ); + } + + _collectFollowersSharedInboxEndpoints(localUser, cb) { + const localFollowersEndpoint = + makeUserUrl(this._webServer(), localUser, '/ap/users/') + '/followers'; + + Collection.followers(localFollowersEndpoint, 'all', (err, collection) => { + if (err) { + return cb(err); + } + + if (!collection.orderedItems || collection.orderedItems.length < 1) { + // no followers :( + return cb(null, []); + } + + async.mapLimit( + collection.orderedItems, + 4, + (actorId, nextActorId) => { + Actor.fromId(actorId, (err, actor) => { + return nextActorId(err, actor); + }); + }, + (err, followerActors) => { + if (err) { + return cb(err); + } + + const sharedInboxEndpoints = Array.from( + new Set( + followerActors + .map(actor => { + return _.get(actor, 'endpoints.sharedInbox'); + }) + .filter(inbox => inbox) // drop nulls + ) + ); + + return cb(null, sharedInboxEndpoints, localFollowersEndpoint); + } + ); + }); + } + _isEnabled() { // :TODO: check config to see if AP integration is enabled/etc. return this._webServer(); @@ -183,7 +355,13 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule return true; } - // :TODO: Implement the area mapping check for public + // Public items do not need a specific 'to'; we'll record to the + // local Actor's outbox and send to any followers we know about + if (message.areaTag === WellKnownAreaTags.ActivityPubShared) { + return true; + } + + // :TODO: Implement the area mapping check for public 'groups' return false; } diff --git a/core/servers/content/web.js b/core/servers/content/web.js index 8042ec47..4c3a551c 100644 --- a/core/servers/content/web.js +++ b/core/servers/content/web.js @@ -326,6 +326,14 @@ exports.getModule = class WebServerModule extends ServerModule { }); } + ok(resp, body = '', headers = { 'Content-Type:': 'text/html' }) { + if (body && !headers['Content-Length']) { + headers['Content-Length'] = Buffer(body).length; + } + resp.writeHead(200, 'OK', body ? headers : null); + return resp.end(body); + } + created(resp, body = '', headers = { 'Content-Type:': 'text/html' }) { resp.writeHead(201, 'Created', body ? headers : null); return resp.end(body); diff --git a/core/servers/content/web_handlers/activitypub.js b/core/servers/content/web_handlers/activitypub.js index 3527233d..b01fbc8d 100644 --- a/core/servers/content/web_handlers/activitypub.js +++ b/core/servers/content/web_handlers/activitypub.js @@ -189,20 +189,14 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { req.on('end', () => { const activity = Activity.fromJsonString(Buffer.concat(body).toString()); - if (!activity) { + if (!activity || !activity.isValid()) { this.log.error( { url: req.url, method: req.method, endpoint: 'inbox' }, - 'Failed to parse Activity' - ); - return this.webServer.resourceNotFound(resp); - } - - if (!activity.isValid()) { - this.log.warn( - { activity, endpoint: 'inbox' }, 'Invalid or unsupported Activity' ); - return this.webServer.badRequest(resp); + return activity + ? this.webServer.badRequest(resp) + : this.webServer.notImplemented(resp); } switch (activity.type) { @@ -247,7 +241,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { break; } - return this.webServer.resourceNotFound(resp); + return this.webServer.notImplemented(resp); }); } @@ -259,20 +253,14 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { req.on('end', () => { const activity = Activity.fromJsonString(Buffer.concat(body).toString()); - if (!activity) { + if (!activity || !activity.isValid()) { this.log.error( { url: req.url, method: req.method, endpoint: 'sharedInbox' }, - 'Failed to parse Activity' - ); - return this.webServer.resourceNotFound(resp); - } - - if (!activity.isValid()) { - this.log.warn( - { activity, endpoint: 'sharedInbox' }, 'Invalid or unsupported Activity' ); - return this.webServer.badRequest(resp); + return activity + ? this.webServer.badRequest(resp) + : this.webServer.notImplemented(resp); } switch (activity.type) { @@ -287,8 +275,11 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { { type: activity.type }, 'Invalid or unknown Activity type' ); - return this.webServer.resourceNotFound(resp); + break; } + + // don't understand the 'type' + return this.webServer.notImplemented(resp); }); } @@ -309,28 +300,28 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { } inboxUpdateObject(inboxType, req, resp, activity) { - const objectIdToUpdate = _.get(activity, 'object.id'); + const updateObjectId = _.get(activity, 'object.id'); const objectType = _.get(activity, 'object.type'); this.log.info( - { inboxType, objectId: objectIdToUpdate, type: objectType }, + { inboxType, objectId: updateObjectId, type: objectType }, 'Inbox Object "Update" request' ); // :TODO: other types... - if (!objectIdToUpdate || !['Note'].includes(objectType)) { - return this.webServer.resourceNotFound(resp); + if (!updateObjectId || !['Note'].includes(objectType)) { + return this.webServer.notImplemented(resp); } // Note's are wrapped in Create Activities - Collection.objectByEmbeddedId(objectIdToUpdate, (err, obj) => { + Collection.objectByEmbeddedId(updateObjectId, (err, obj) => { if (err) { return this.webServer.internalServerError(resp, err); } if (!obj) { - // no match - return this.webServer.resourceNotFound(resp); + // no match, but respond as accepted and hopefully they don't ask again + return this.webServer.accepted(resp); } // OK, the object exists; Does the caller have permission @@ -346,7 +337,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { if (updateTargetUrl.host !== updaterUrl.host) { this.log.warn( { - objectId: objectIdToUpdate, + objectId: updateObjectId, type: objectType, updateTargetHost: updateTargetUrl.host, requestorHost: updaterUrl.host, @@ -358,7 +349,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { Collection.updateCollectionEntry( 'inbox', - objectIdToUpdate, + updateObjectId, activity, err => { if (err) { @@ -367,7 +358,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { this.log.info( { - objectId: objectIdToUpdate, + objectId: updateObjectId, type: objectType, collection: 'inbox', }, @@ -444,7 +435,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { return this._storeNoteAsMessage( activity.id, 'All', - Message.WellKnownAreaTags.ActivityPubSharedInbox, + Message.WellKnownAreaTags.ActivityPubShared, note, cb ); @@ -527,13 +518,9 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { } const body = JSON.stringify(collection); - const headers = { + return this.webServer.ok(resp, body, { 'Content-Type': ActivityStreamMediaType, - 'Content-Length': Buffer(body).length, - }; - - resp.writeHead(200, headers); - return resp.end(body); + }); }); } @@ -568,8 +555,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { // :TODO: support a template here - resp.writeHead(200, { 'Content-Type': 'text/html' }); - return resp.end(note.content); + return this.webServer.ok(resp, note.content); }); } @@ -619,11 +605,6 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { 'Follow request' ); - const ok = () => { - resp.writeHead(200, { 'Content-Type': 'text/html' }); - return resp.end(''); - }; - // // If the user blindly accepts Followers, we can persist // and send an 'Accept' now. Otherwise, we need to queue this @@ -633,7 +614,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { const activityPubSettings = ActivityPubSettings.fromUser(localUser); if (!activityPubSettings.manuallyApproveFollowers) { this._recordAcceptedFollowRequest(localUser, remoteActor, activity); - return ok(); + return this.webServer.ok(resp); } else { Collection.addFollowRequest( localUser, @@ -645,7 +626,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { return this.internalServerError(resp, err); } - return ok(); + return this.webServer.ok(resp); } ); } @@ -836,13 +817,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { const body = JSON.stringify(localActor); - const headers = { - 'Content-Type': ActivityStreamMediaType, - 'Content-Length': Buffer(body).length, - }; - - resp.writeHead(200, headers); - return resp.end(body); + return this.webServer.ok(resp, body, { 'Content-Type': ActivityStreamMediaType }); } _standardSelfHandler(localUser, localActor, req, resp) { @@ -872,13 +847,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { `Serving ActivityPub Profile for "${localUser.username}"` ); - const headers = { - 'Content-Type': contentType, - 'Content-Length': Buffer(body).length, - }; - - resp.writeHead(200, headers); - return resp.end(body); + return this.webServer.ok(resp, body, { 'Content-Type': contentType }); } ); }