Persist exported/published messages to ActivityPub

This commit is contained in:
Bryan Ashby 2023-01-12 18:26:44 -07:00
parent 64848b4675
commit eaadd0a830
No known key found for this signature in database
GPG Key ID: C2C1B501E4EFD994
6 changed files with 195 additions and 75 deletions

View File

@ -54,14 +54,14 @@ module.exports = class Activity {
return false; return false;
} }
// :TODO: we could validate the particular types // :TODO: Additional validation
return true; return true;
} }
// https://www.w3.org/TR/activitypub/#accept-activity-inbox // https://www.w3.org/TR/activitypub/#accept-activity-inbox
static makeAccept(webServer, localActor, followRequest, id = null) { static makeAccept(webServer, localActor, followRequest, id = null) {
id = id || Activity._makeId(webServer, '/accept'); id = id || Activity._makeFullId(webServer, 'accept');
return new Activity({ return new Activity({
type: 'Accept', type: 'Accept',
@ -107,27 +107,7 @@ module.exports = class Activity {
}, },
(localUser, localActor, remoteActor, callback) => { (localUser, localActor, remoteActor, callback) => {
// we'll need the entire |activityId| as a linked reference later // we'll need the entire |activityId| as a linked reference later
const activityId = Activity._makeId(webServer, '/create'); const activityId = Activity._makeFullId(webServer, 'create');
// |remoteActor| is non-null if we fetchd it
//const to = message.isPrivate() ? remoteActor ? remoteActor.id : `${ActivityStreamsContext}#Public`;
// const obj = {
// '@context': ActivityStreamsContext,
// id: activityId,
// type: 'Create',
// to: [remoteActor.id],
// audience: ['as:Public'],
// actor: localActor.id,
// object: {
// id: Activity._makeId(webServer, '/note'),
// type: 'Note',
// attributedTo: localActor.id,
// to: [remoteActor.id],
// audience: ['as:Public'],
// content: messageBodyToHtml(message.message.trim()),
// },
// };
const obj = { const obj = {
'@context': ActivityStreamsContext, '@context': ActivityStreamsContext,
@ -135,13 +115,12 @@ module.exports = class Activity {
type: 'Create', type: 'Create',
actor: localActor.id, actor: localActor.id,
object: { object: {
id: Activity._makeId(webServer, '/note'), id: Activity._makeFullId(webServer, 'note'),
type: 'Note', type: 'Note',
published: getISOTimestampString(message.modTimestamp), published: getISOTimestampString(message.modTimestamp),
attributedTo: localActor.id, attributedTo: localActor.id,
// :TODO: inReplyto if this is a reply; we need this store in message meta. // :TODO: inReplyto if this is a reply; we need this store in message meta.
// :TODO: we may want to turn this to a HTML fragment?
content: messageBodyToHtml(message.message.trim()), content: messageBodyToHtml(message.message.trim()),
}, },
}; };
@ -149,11 +128,11 @@ module.exports = class Activity {
// :TODO: this probably needs to change quite a bit based on "groups" // :TODO: this probably needs to change quite a bit based on "groups"
// :TODO: verify we need both 'to' fields: https://socialhub.activitypub.rocks/t/problems-posting-to-mastodon-inbox/801/4 // :TODO: verify we need both 'to' fields: https://socialhub.activitypub.rocks/t/problems-posting-to-mastodon-inbox/801/4
if (message.isPrivate()) { if (message.isPrivate()) {
obj.to = remoteActor.id; //obj.to = remoteActor.id;
obj.object.to = remoteActor.id; obj.object.to = remoteActor.id;
} else { } else {
const publicInbox = `${ActivityStreamsContext}#Public`; const publicInbox = `${ActivityStreamsContext}#Public`;
obj.to = publicInbox; //obj.to = publicInbox;
obj.object.to = publicInbox; obj.object.to = publicInbox;
} }
@ -194,7 +173,7 @@ module.exports = class Activity {
return postJson(actorUrl, activityJson, reqOpts, cb); return postJson(actorUrl, activityJson, reqOpts, cb);
} }
static _makeId(webServer, prefix = '') { static _makeFullId(webServer, prefix, uuid = '') {
return webServer.buildUrl(`${prefix}/${UUIDv4()}`); return webServer.buildUrl(`/${prefix}/${uuid || UUIDv4()}`);
} }
}; };

17
core/activitypub_db.js Normal file
View File

@ -0,0 +1,17 @@
const apDb = require('./database').dbs.activitypub;
exports.persistToOutbox = persistToOutbox;
function persistToOutbox(activity, userId, messageId, cb) {
const activityJson = JSON.stringify(activity);
apDb.run(
`INSERT INTO activitypub_outbox (activity_id, user_id, message_id, activity_json)
VALUES (?, ?, ?, ?);`,
[activity.id, userId, messageId, activityJson],
function res(err) {
// non-arrow for 'this' scope
return cb(err, this.lastID);
}
);
}

View File

@ -109,7 +109,7 @@ function sanitizeString(s) {
function initializeDatabases(cb) { function initializeDatabases(cb) {
async.eachSeries( async.eachSeries(
['system', 'user', 'actor', 'message', 'file'], ['system', 'user', 'message', 'file', 'activitypub'],
(dbName, next) => { (dbName, next) => {
dbs[dbName] = sqlite3Trans.wrap( dbs[dbName] = sqlite3Trans.wrap(
new sqlite3.Database(getDatabasePath(dbName), err => { new sqlite3.Database(getDatabasePath(dbName), err => {
@ -242,36 +242,36 @@ const DB_INIT_TABLE = {
return cb(null); return cb(null);
}, },
actor: cb => { // actor: cb => {
enableForeignKeys(dbs.actor); // enableForeignKeys(dbs.actor);
dbs.actor.run( // dbs.actor.run(
`CREATE TABLE IF NOT EXISTS activitypub_actor ( // `CREATE TABLE IF NOT EXISTS activitypub_actor (
id INTEGER PRIMARY KEY, // id INTEGER PRIMARY KEY,
actor_url VARCHAR NOT NULL, // actor_url VARCHAR NOT NULL,
UNIQUE(actor_url) // UNIQUE(actor_url)
);` // );`
); // );
// :TODO: create FK on delete/etc. // // :TODO: create FK on delete/etc.
dbs.actor.run( // dbs.actor.run(
`CREATE TABLE IF NOT EXISTS activitypub_actor_property ( // `CREATE TABLE IF NOT EXISTS activitypub_actor_property (
actor_id INTEGER NOT NULL, // actor_id INTEGER NOT NULL,
prop_name VARCHAR NOT NULL, // prop_name VARCHAR NOT NULL,
prop_value VARCHAR, // prop_value VARCHAR,
UNIQUE(actor_id, prop_name), // UNIQUE(actor_id, prop_name),
FOREIGN KEY(actor_id) REFERENCES actor(id) ON DELETE CASCADE // FOREIGN KEY(actor_id) REFERENCES actor(id) ON DELETE CASCADE
);` // );`
); // );
dbs.actor.run( // dbs.actor.run(
`CREATE INDEX IF NOT EXISTS activitypub_actor_property_id_and_name_index0 // `CREATE INDEX IF NOT EXISTS activitypub_actor_property_id_and_name_index0
ON activitypub_actor_property (actor_id, prop_name);` // ON activitypub_actor_property (actor_id, prop_name);`
); // );
return cb(null); // return cb(null);
}, // },
message: cb => { message: cb => {
enableForeignKeys(dbs.message); enableForeignKeys(dbs.message);
@ -499,6 +499,31 @@ dbs.message.run(
);` );`
); );
return cb(null);
},
activitypub: cb => {
dbs.activitypub.run(
`CREATE TABLE IF NOT EXISTS activitypub_outbox (
id INTEGER PRIMARY KEY, -- Local ID
activity_id VARCHAR NOT NULL, -- Fully qualified Activity ID/URL
user_id INTEGER NOT NULL, -- Local user ID
message_id INTEGER NOT NULL, -- Local message ID
activity_json VARCHAR NOT NULL, -- Activity in JSON format
UNIQUE(message_id, activity_id)
);`
);
dbs.activitypub.run(
`CREATE INDEX IF NOT EXISTS activitypub_outbox_user_id_index0
ON activitypub_outbox (user_id);`
);
dbs.activitypub.run(
`CREATE INDEX IF NOT EXISTS activitypub_outbox_activity_id_index0
ON activitypub_outbox (activity_id);`
);
return cb(null); return cb(null);
}, },
}; };

View File

@ -39,6 +39,16 @@ const WELL_KNOWN_AREA_TAGS = {
Bulletin: 'local_bulletin', Bulletin: 'local_bulletin',
}; };
const WellKnownMetaCategories = {
System: 'System',
FtnProperty: 'FtnProperty',
FtnKludge: 'FtnKludge',
QwkProperty: 'QwkProperty',
QwkKludge: 'QwkKludge',
ActivityPub: 'ActivityPub',
};
// Category: WellKnownMetaCategories.System ("System")
const SYSTEM_META_NAMES = { const SYSTEM_META_NAMES = {
LocalToUserID: 'local_to_user_id', LocalToUserID: 'local_to_user_id',
LocalFromUserID: 'local_from_user_id', LocalFromUserID: 'local_from_user_id',
@ -66,6 +76,7 @@ const STATE_FLAGS0 = {
}; };
// :TODO: these should really live elsewhere... // :TODO: these should really live elsewhere...
// Category: WellKnownMetaCategories.FtnProperty ("FtnProperty")
const FTN_PROPERTY_NAMES = { const FTN_PROPERTY_NAMES = {
// packet header oriented // packet header oriented
FtnOrigNode: 'ftn_orig_node', FtnOrigNode: 'ftn_orig_node',
@ -94,6 +105,7 @@ const FTN_PROPERTY_NAMES = {
FtnSeenBy: 'ftn_seen_by', // http://ftsc.org/docs/fts-0004.001 FtnSeenBy: 'ftn_seen_by', // http://ftsc.org/docs/fts-0004.001
}; };
// Category: WellKnownMetaCategories.QwkProperty
const QWKPropertyNames = { const QWKPropertyNames = {
MessageNumber: 'qwk_msg_num', MessageNumber: 'qwk_msg_num',
MessageStatus: 'qwk_msg_status', // See http://wiki.synchro.net/ref:qwk for a decent list MessageStatus: 'qwk_msg_status', // See http://wiki.synchro.net/ref:qwk for a decent list
@ -101,8 +113,10 @@ const QWKPropertyNames = {
InReplyToNum: 'qwk_in_reply_to_num', // note that we prefer the 'InReplyToMsgId' kludge if available InReplyToNum: 'qwk_in_reply_to_num', // note that we prefer the 'InReplyToMsgId' kludge if available
}; };
// Category: WellKnownMetaCategories.ActivityPub
const ActivityPubPropertyNames = { const ActivityPubPropertyNames = {
ActivityId: 'activitypub_activity_id', // Activity ID; FK to AP table entries ActivityId: 'activitypub_activity_id', // Activity ID; FK to AP table entries
InReplyTo: 'activitypub_in_reply_to', // Activity ID from 'inReplyTo' field
}; };
// :TODO: this is a ugly hack due to bad variable names - clean it up & just _.camelCase(k)! // :TODO: this is a ugly hack due to bad variable names - clean it up & just _.camelCase(k)!
@ -213,6 +227,10 @@ module.exports = class Message {
return (this.isPrivate() && user.userId === messageLocalUserId) || user.isSysOp(); return (this.isPrivate() && user.userId === messageLocalUserId) || user.isSysOp();
} }
static get WellKnownMetaCategories() {
return WellKnownMetaCategories;
}
static get WellKnownAreaTags() { static get WellKnownAreaTags() {
return WELL_KNOWN_AREA_TAGS; return WELL_KNOWN_AREA_TAGS;
} }
@ -237,6 +255,10 @@ module.exports = class Message {
return QWKPropertyNames; return QWKPropertyNames;
} }
static get ActivityPubPropertyNames() {
return ActivityPubPropertyNames;
}
setLocalToUserId(userId) { setLocalToUserId(userId) {
this.meta.System = this.meta.System || {}; this.meta.System = this.meta.System || {};
this.meta.System[Message.SystemMetaNames.LocalToUserID] = userId; this.meta.System[Message.SystemMetaNames.LocalToUserID] = userId;

View File

@ -2,6 +2,12 @@ const Activity = require('../activitypub_activity');
const Message = require('../message'); const Message = require('../message');
const { MessageScanTossModule } = require('../msg_scan_toss_module'); const { MessageScanTossModule } = require('../msg_scan_toss_module');
const { getServer } = require('../listening_server'); const { getServer } = require('../listening_server');
const Log = require('../logger').log;
const { persistToOutbox } = require('../activitypub_db');
// deps
const async = require('async');
const _ = require('lodash');
exports.moduleInfo = { exports.moduleInfo = {
name: 'ActivityPub', name: 'ActivityPub',
@ -12,6 +18,8 @@ exports.moduleInfo = {
exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule { exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule {
constructor() { constructor() {
super(); super();
this.log = Log.child({ module: 'ActivityPubScannerTosser' });
} }
startup(cb) { startup(cb) {
@ -27,28 +35,97 @@ exports.getModule = class ActivityPubScannerTosser extends MessageScanTossModule
return; return;
} }
Activity.noteFromLocalMessage(this._webServer(), message, (err, noteData) => { async.waterfall(
if (err) { [
// :TODO: Log me callback => {
} return Activity.noteFromLocalMessage(
this._webServer(),
message,
callback
);
},
(noteInfo, callback) => {
const { activity, fromUser, remoteActor } = noteInfo;
const { activity, fromUser, remoteActor } = noteData; persistToOutbox(
activity,
fromUser.userId,
message.messageId,
(err, localId) => {
if (!err) {
this.log.debug(
{ localId, activityId: activity.id },
'Note Activity persisted to database'
);
}
return callback(err, activity, fromUser, remoteActor);
}
);
},
(activity, fromUser, remoteActor, callback) => {
activity.sendTo(
remoteActor.inbox,
fromUser,
this._webServer(),
(err, respBody, res) => {
if (err) {
return callback(err);
}
// - persist Activity if (res.statusCode !== 202 && res.statusCode !== 200) {
// - sendTo this.log.warn(
// - update message properties: {
// * exported inbox: remoteActor.inbox,
// * ActivityPub ID -> activity table statusCode: res.statusCode,
activity.sendTo( body: _.truncate(respBody, 128),
remoteActor.inbox, },
fromUser, 'Unexpected status code'
this._webServer(), );
(err, respBody, res) => { }
if (err) {
} //
// We sent successfully; update some properties
// in the original message to indicate export
// and updated mapping of message -> Activity record
//
return callback(null, activity);
}
);
},
(activity, callback) => {
// mark exported
return message.persistMetaValue(
Message.WellKnownMetaCategories.System,
Message.SystemMetaNames.StateFlags0,
Message.StateFlags0.Exported.toString(),
err => {
return callback(err, activity);
}
);
},
(activity, callback) => {
// message -> Activity ID relation
return message.persistMetaValue(
Message.WellKnownMetaCategories.ActivityPub,
Message.ActivityPubPropertyNames.ActivityId,
activity.id,
err => {
return callback(err, activity);
}
);
},
],
(err, activity) => {
if (err) {
this.log.error(
{ error: err.message, messageId: message.messageId },
'Failed to export message to ActivityPub'
);
} else {
this.log.info({id: activity.id}, 'Note Activity exported (published) successfully');
} }
); }
}); );
} }
_isEnabled() { _isEnabled() {

View File

@ -245,7 +245,7 @@ exports.getModule = class ActivityPubWebHandler extends WebHandlerModule {
); );
} }
if (res.statusCode != 202) { if (res.statusCode !== 202 && res.statusCode !== 200) {
return this.log.warn( return this.log.warn(
{ {
inbox: actor.inbox, inbox: actor.inbox,