From eaadd0a8307cef33ea43adc3162219233701c68d Mon Sep 17 00:00:00 2001 From: Bryan Ashby Date: Thu, 12 Jan 2023 18:26:44 -0700 Subject: [PATCH] Persist exported/published messages to ActivityPub --- core/activitypub_activity.js | 37 ++---- core/activitypub_db.js | 17 +++ core/database.js | 77 ++++++++---- core/message.js | 22 ++++ core/scanner_tossers/activitypub.js | 115 +++++++++++++++--- .../content/web_handlers/activitypub.js | 2 +- 6 files changed, 195 insertions(+), 75 deletions(-) create mode 100644 core/activitypub_db.js diff --git a/core/activitypub_activity.js b/core/activitypub_activity.js index 3eca0c58..7f6dc8c2 100644 --- a/core/activitypub_activity.js +++ b/core/activitypub_activity.js @@ -54,14 +54,14 @@ module.exports = class Activity { return false; } - // :TODO: we could validate the particular types + // :TODO: Additional validation return true; } // https://www.w3.org/TR/activitypub/#accept-activity-inbox static makeAccept(webServer, localActor, followRequest, id = null) { - id = id || Activity._makeId(webServer, '/accept'); + id = id || Activity._makeFullId(webServer, 'accept'); return new Activity({ type: 'Accept', @@ -107,27 +107,7 @@ module.exports = class Activity { }, (localUser, localActor, remoteActor, callback) => { // we'll need the entire |activityId| as a linked reference later - const activityId = Activity._makeId(webServer, '/create'); - - // |remoteActor| is non-null if we fetchd it - //const to = message.isPrivate() ? remoteActor ? remoteActor.id : `${ActivityStreamsContext}#Public`; - - // const obj = { - // '@context': ActivityStreamsContext, - // id: activityId, - // type: 'Create', - // to: [remoteActor.id], - // audience: ['as:Public'], - // actor: localActor.id, - // object: { - // id: Activity._makeId(webServer, '/note'), - // type: 'Note', - // attributedTo: localActor.id, - // to: [remoteActor.id], - // audience: ['as:Public'], - // content: messageBodyToHtml(message.message.trim()), - // }, - // }; + const activityId = Activity._makeFullId(webServer, 'create'); const obj = { '@context': ActivityStreamsContext, @@ -135,13 +115,12 @@ module.exports = class Activity { type: 'Create', actor: localActor.id, object: { - id: Activity._makeId(webServer, '/note'), + id: Activity._makeFullId(webServer, 'note'), type: 'Note', published: getISOTimestampString(message.modTimestamp), attributedTo: localActor.id, // :TODO: inReplyto if this is a reply; we need this store in message meta. - // :TODO: we may want to turn this to a HTML fragment? content: messageBodyToHtml(message.message.trim()), }, }; @@ -149,11 +128,11 @@ module.exports = class Activity { // :TODO: this probably needs to change quite a bit based on "groups" // :TODO: verify we need both 'to' fields: https://socialhub.activitypub.rocks/t/problems-posting-to-mastodon-inbox/801/4 if (message.isPrivate()) { - obj.to = remoteActor.id; + //obj.to = remoteActor.id; obj.object.to = remoteActor.id; } else { const publicInbox = `${ActivityStreamsContext}#Public`; - obj.to = publicInbox; + //obj.to = publicInbox; obj.object.to = publicInbox; } @@ -194,7 +173,7 @@ module.exports = class Activity { return postJson(actorUrl, activityJson, reqOpts, cb); } - static _makeId(webServer, prefix = '') { - return webServer.buildUrl(`${prefix}/${UUIDv4()}`); + static _makeFullId(webServer, prefix, uuid = '') { + return webServer.buildUrl(`/${prefix}/${uuid || UUIDv4()}`); } }; diff --git a/core/activitypub_db.js b/core/activitypub_db.js new file mode 100644 index 00000000..2ba5ef36 --- /dev/null +++ b/core/activitypub_db.js @@ -0,0 +1,17 @@ +const apDb = require('./database').dbs.activitypub; + +exports.persistToOutbox = persistToOutbox; + +function persistToOutbox(activity, userId, messageId, cb) { + const activityJson = JSON.stringify(activity); + + apDb.run( + `INSERT INTO activitypub_outbox (activity_id, user_id, message_id, activity_json) + VALUES (?, ?, ?, ?);`, + [activity.id, userId, messageId, activityJson], + function res(err) { + // non-arrow for 'this' scope + return cb(err, this.lastID); + } + ); +} diff --git a/core/database.js b/core/database.js index 4e880935..4c24a480 100644 --- a/core/database.js +++ b/core/database.js @@ -109,7 +109,7 @@ function sanitizeString(s) { function initializeDatabases(cb) { async.eachSeries( - ['system', 'user', 'actor', 'message', 'file'], + ['system', 'user', 'message', 'file', 'activitypub'], (dbName, next) => { dbs[dbName] = sqlite3Trans.wrap( new sqlite3.Database(getDatabasePath(dbName), err => { @@ -242,36 +242,36 @@ const DB_INIT_TABLE = { return cb(null); }, - actor: cb => { - enableForeignKeys(dbs.actor); + // actor: cb => { + // enableForeignKeys(dbs.actor); - dbs.actor.run( - `CREATE TABLE IF NOT EXISTS activitypub_actor ( - id INTEGER PRIMARY KEY, - actor_url VARCHAR NOT NULL, - UNIQUE(actor_url) - );` - ); + // dbs.actor.run( + // `CREATE TABLE IF NOT EXISTS activitypub_actor ( + // id INTEGER PRIMARY KEY, + // actor_url VARCHAR NOT NULL, + // UNIQUE(actor_url) + // );` + // ); - // :TODO: create FK on delete/etc. + // // :TODO: create FK on delete/etc. - dbs.actor.run( - `CREATE TABLE IF NOT EXISTS activitypub_actor_property ( - actor_id INTEGER NOT NULL, - prop_name VARCHAR NOT NULL, - prop_value VARCHAR, - UNIQUE(actor_id, prop_name), - FOREIGN KEY(actor_id) REFERENCES actor(id) ON DELETE CASCADE - );` - ); + // dbs.actor.run( + // `CREATE TABLE IF NOT EXISTS activitypub_actor_property ( + // actor_id INTEGER NOT NULL, + // prop_name VARCHAR NOT NULL, + // prop_value VARCHAR, + // UNIQUE(actor_id, prop_name), + // FOREIGN KEY(actor_id) REFERENCES actor(id) ON DELETE CASCADE + // );` + // ); - dbs.actor.run( - `CREATE INDEX IF NOT EXISTS activitypub_actor_property_id_and_name_index0 - ON activitypub_actor_property (actor_id, prop_name);` - ); + // dbs.actor.run( + // `CREATE INDEX IF NOT EXISTS activitypub_actor_property_id_and_name_index0 + // ON activitypub_actor_property (actor_id, prop_name);` + // ); - return cb(null); - }, + // return cb(null); + // }, message: cb => { enableForeignKeys(dbs.message); @@ -499,6 +499,31 @@ dbs.message.run( );` ); + return cb(null); + }, + activitypub: cb => { + dbs.activitypub.run( + `CREATE TABLE IF NOT EXISTS activitypub_outbox ( + id INTEGER PRIMARY KEY, -- Local ID + activity_id VARCHAR NOT NULL, -- Fully qualified Activity ID/URL + user_id INTEGER NOT NULL, -- Local user ID + message_id INTEGER NOT NULL, -- Local message ID + activity_json VARCHAR NOT NULL, -- Activity in JSON format + + UNIQUE(message_id, activity_id) + );` + ); + + dbs.activitypub.run( + `CREATE INDEX IF NOT EXISTS activitypub_outbox_user_id_index0 + ON activitypub_outbox (user_id);` + ); + + dbs.activitypub.run( + `CREATE INDEX IF NOT EXISTS activitypub_outbox_activity_id_index0 + ON activitypub_outbox (activity_id);` + ); + return cb(null); }, }; diff --git a/core/message.js b/core/message.js index 9b300ffe..2e11e0f0 100644 --- a/core/message.js +++ b/core/message.js @@ -39,6 +39,16 @@ const WELL_KNOWN_AREA_TAGS = { Bulletin: 'local_bulletin', }; +const WellKnownMetaCategories = { + System: 'System', + FtnProperty: 'FtnProperty', + FtnKludge: 'FtnKludge', + QwkProperty: 'QwkProperty', + QwkKludge: 'QwkKludge', + ActivityPub: 'ActivityPub', +}; + +// Category: WellKnownMetaCategories.System ("System") const SYSTEM_META_NAMES = { LocalToUserID: 'local_to_user_id', LocalFromUserID: 'local_from_user_id', @@ -66,6 +76,7 @@ const STATE_FLAGS0 = { }; // :TODO: these should really live elsewhere... +// Category: WellKnownMetaCategories.FtnProperty ("FtnProperty") const FTN_PROPERTY_NAMES = { // packet header oriented FtnOrigNode: 'ftn_orig_node', @@ -94,6 +105,7 @@ const FTN_PROPERTY_NAMES = { FtnSeenBy: 'ftn_seen_by', // http://ftsc.org/docs/fts-0004.001 }; +// Category: WellKnownMetaCategories.QwkProperty const QWKPropertyNames = { MessageNumber: 'qwk_msg_num', MessageStatus: 'qwk_msg_status', // See http://wiki.synchro.net/ref:qwk for a decent list @@ -101,8 +113,10 @@ const QWKPropertyNames = { InReplyToNum: 'qwk_in_reply_to_num', // note that we prefer the 'InReplyToMsgId' kludge if available }; +// Category: WellKnownMetaCategories.ActivityPub const ActivityPubPropertyNames = { ActivityId: 'activitypub_activity_id', // Activity ID; FK to AP table entries + InReplyTo: 'activitypub_in_reply_to', // Activity ID from 'inReplyTo' field }; // :TODO: this is a ugly hack due to bad variable names - clean it up & just _.camelCase(k)! @@ -213,6 +227,10 @@ module.exports = class Message { return (this.isPrivate() && user.userId === messageLocalUserId) || user.isSysOp(); } + static get WellKnownMetaCategories() { + return WellKnownMetaCategories; + } + static get WellKnownAreaTags() { return WELL_KNOWN_AREA_TAGS; } @@ -237,6 +255,10 @@ module.exports = class Message { return QWKPropertyNames; } + static get ActivityPubPropertyNames() { + return ActivityPubPropertyNames; + } + setLocalToUserId(userId) { this.meta.System = this.meta.System || {}; this.meta.System[Message.SystemMetaNames.LocalToUserID] = userId; diff --git a/core/scanner_tossers/activitypub.js b/core/scanner_tossers/activitypub.js index 1ebaa452..3b20f173 100644 --- a/core/scanner_tossers/activitypub.js +++ b/core/scanner_tossers/activitypub.js @@ -2,6 +2,12 @@ const Activity = require('../activitypub_activity'); const Message = require('../message'); const { MessageScanTossModule } = require('../msg_scan_toss_module'); const { getServer } = require('../listening_server'); +const Log = require('../logger').log; +const { persistToOutbox } = require('../activitypub_db'); + +// deps +const async = require('async'); +const _ = require('lodash'); exports.moduleInfo = { name: 'ActivityPub', @@ -12,6 +18,8 @@ exports.moduleInfo = { exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule { constructor() { super(); + + this.log = Log.child({ module: 'ActivityPubScannerTosser' }); } startup(cb) { @@ -27,28 +35,97 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule return; } - Activity.noteFromLocalMessage(this._webServer(), message, (err, noteData) => { - if (err) { - // :TODO: Log me - } + async.waterfall( + [ + callback => { + return Activity.noteFromLocalMessage( + this._webServer(), + message, + callback + ); + }, + (noteInfo, callback) => { + const { activity, fromUser, remoteActor } = noteInfo; - const { activity, fromUser, remoteActor } = noteData; + persistToOutbox( + activity, + fromUser.userId, + message.messageId, + (err, localId) => { + if (!err) { + this.log.debug( + { localId, activityId: activity.id }, + 'Note Activity persisted to database' + ); + } + return callback(err, activity, fromUser, remoteActor); + } + ); + }, + (activity, fromUser, remoteActor, callback) => { + activity.sendTo( + remoteActor.inbox, + fromUser, + this._webServer(), + (err, respBody, res) => { + if (err) { + return callback(err); + } - // - persist Activity - // - sendTo - // - update message properties: - // * exported - // * ActivityPub ID -> activity table - activity.sendTo( - remoteActor.inbox, - fromUser, - this._webServer(), - (err, respBody, res) => { - if (err) { - } + if (res.statusCode !== 202 && res.statusCode !== 200) { + this.log.warn( + { + inbox: remoteActor.inbox, + statusCode: res.statusCode, + body: _.truncate(respBody, 128), + }, + 'Unexpected status code' + ); + } + + // + // We sent successfully; update some properties + // in the original message to indicate export + // and updated mapping of message -> Activity record + // + return callback(null, activity); + } + ); + }, + (activity, callback) => { + // mark exported + return message.persistMetaValue( + Message.WellKnownMetaCategories.System, + Message.SystemMetaNames.StateFlags0, + Message.StateFlags0.Exported.toString(), + err => { + return callback(err, activity); + } + ); + }, + (activity, callback) => { + // message -> Activity ID relation + return message.persistMetaValue( + Message.WellKnownMetaCategories.ActivityPub, + Message.ActivityPubPropertyNames.ActivityId, + activity.id, + err => { + return callback(err, activity); + } + ); + }, + ], + (err, activity) => { + if (err) { + this.log.error( + { error: err.message, messageId: message.messageId }, + 'Failed to export message to ActivityPub' + ); + } else { + this.log.info({id: activity.id}, 'Note Activity exported (published) successfully'); } - ); - }); + } + ); } _isEnabled() { diff --git a/core/servers/content/web_handlers/activitypub.js b/core/servers/content/web_handlers/activitypub.js index 6d4d60f1..ef36a692 100644 --- a/core/servers/content/web_handlers/activitypub.js +++ b/core/servers/content/web_handlers/activitypub.js @@ -245,7 +245,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule { ); } - if (res.statusCode != 202) { + if (res.statusCode !== 202 && res.statusCode !== 200) { return this.log.warn( { inbox: actor.inbox,