* Fix major issue with SQLite transactions + aync code causing collisions

This commit is contained in:
Bryan Ashby 2017-09-16 17:13:11 -06:00
parent 68247d87e8
commit 1e250f06d9
8 changed files with 116 additions and 122 deletions

View File

@ -6,6 +6,7 @@ const conf = require('./config.js');
// deps
const sqlite3 = require('sqlite3');
const sqlite3Trans = require('sqlite3-transactions');
const paths = require('path');
const async = require('async');
const _ = require('lodash');
@ -13,14 +14,19 @@ const assert = require('assert');
const moment = require('moment');
// database handles
let dbs = {};
const dbs = {};
exports.getTransactionDatabase = getTransactionDatabase;
exports.getModDatabasePath = getModDatabasePath;
exports.getISOTimestampString = getISOTimestampString;
exports.initializeDatabases = initializeDatabases;
exports.dbs = dbs;
function getTransactionDatabase(db) {
return new sqlite3Trans.TransactionDatabase(db);
}
function getDatabasePath(name) {
return paths.join(conf.config.paths.db, `${name}.sqlite3`);
}
@ -55,7 +61,7 @@ function getISOTimestampString(ts) {
function initializeDatabases(cb) {
async.eachSeries( [ 'system', 'user', 'message', 'file' ], (dbName, next) => {
dbs[dbName] = new sqlite3.Database(getDatabasePath(dbName), err => {
dbs[dbName] = new sqlite3Trans.TransactionDatabase(new sqlite3.Database(getDatabasePath(dbName), err => {
if(err) {
return cb(err);
}
@ -65,7 +71,7 @@ function initializeDatabases(cb) {
return next(null);
});
});
});
}));
}, err => {
return cb(err);
});

View File

@ -110,9 +110,8 @@ module.exports = class FileEntry {
}
const self = this;
let inTransaction = false;
async.series(
async.waterfall(
[
function check(callback) {
if(isUpdate && !self.fileId) {
@ -121,22 +120,20 @@ module.exports = class FileEntry {
return callback(null);
},
function startTrans(callback) {
return fileDb.run('BEGIN;', callback);
return fileDb.beginTransaction(callback);
},
function storeEntry(callback) {
inTransaction = true;
function storeEntry(trans, callback) {
if(isUpdate) {
fileDb.run(
trans.run(
`REPLACE INTO file (file_id, area_tag, file_sha256, file_name, storage_tag, desc, desc_long, upload_timestamp)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);`,
[ self.fileId, self.areaTag, self.fileSha256, self.fileName, self.storageTag, self.desc, self.descLong, getISOTimestampString() ],
err => {
return callback(err);
return callback(err, trans);
}
);
} else {
fileDb.run(
trans.run(
`REPLACE INTO file (area_tag, file_sha256, file_name, storage_tag, desc, desc_long, upload_timestamp)
VALUES(?, ?, ?, ?, ?, ?, ?);`,
[ self.areaTag, self.fileSha256, self.fileName, self.storageTag, self.desc, self.descLong, getISOTimestampString() ],
@ -144,34 +141,34 @@ module.exports = class FileEntry {
if(!err) {
self.fileId = this.lastID;
}
return callback(err);
return callback(err, trans);
}
);
}
},
function storeMeta(callback) {
function storeMeta(trans, callback) {
async.each(Object.keys(self.meta), (n, next) => {
const v = self.meta[n];
return FileEntry.persistMetaValue(self.fileId, n, v, next);
return FileEntry.persistMetaValue(self.fileId, n, v, trans, next);
},
err => {
return callback(err);
return callback(err, trans);
});
},
function storeHashTags(callback) {
function storeHashTags(trans, callback) {
const hashTagsArray = Array.from(self.hashTags);
async.each(hashTagsArray, (hashTag, next) => {
return FileEntry.persistHashTag(self.fileId, hashTag, next);
return FileEntry.persistHashTag(self.fileId, hashTag, trans, next);
},
err => {
return callback(err);
return callback(err, trans);
});
}
],
err => {
(err, trans) => {
// :TODO: Log orig err
if(inTransaction) {
fileDb.run(err ? 'ROLLBACK;' : 'COMMIT;', err => {
if(trans) {
trans[err ? 'rollback' : 'commit'](err => {
return cb(err);
});
} else {
@ -207,8 +204,13 @@ module.exports = class FileEntry {
);
}
static persistMetaValue(fileId, name, value, cb) {
return fileDb.run(
static persistMetaValue(fileId, name, value, transOrDb, cb) {
if(!_.isFunction(cb) && _.isFunction(transOrDb)) {
cb = transOrDb;
transOrDb = fileDb;
}
return transOrDb.run(
`REPLACE INTO file_meta (file_id, meta_name, meta_value)
VALUES (?, ?, ?);`,
[ fileId, name, value ],
@ -249,15 +251,20 @@ module.exports = class FileEntry {
);
}
static persistHashTag(fileId, hashTag, cb) {
fileDb.serialize( () => {
static persistHashTag(fileId, hashTag, transOrDb, cb) {
if(!_.isFunction(cb) && _.isFunction(transOrDb)) {
cb = transOrDb;
transOrDb = fileDb;
}
transOrDb.serialize( () => {
fileDb.run(
`INSERT OR IGNORE INTO hash_tag (hash_tag)
VALUES (?);`,
[ hashTag ]
);
fileDb.run(
transOrDb.run(
`REPLACE INTO file_hash_tag (hash_tag_id, file_id)
VALUES (
(SELECT hash_tag_id

View File

@ -321,8 +321,13 @@ Message.prototype.load = function(options, cb) {
);
};
Message.prototype.persistMetaValue = function(category, name, value, cb) {
const metaStmt = msgDb.prepare(
Message.prototype.persistMetaValue = function(category, name, value, transOrDb, cb) {
if(!_.isFunction(cb) && _.isFunction(transOrDb)) {
cb = transOrDb;
transOrDb = msgDb;
}
const metaStmt = transOrDb.prepare(
`INSERT INTO message_meta (message_id, meta_category, meta_name, meta_value)
VALUES (?, ?, ?, ?);`);
@ -341,18 +346,6 @@ Message.prototype.persistMetaValue = function(category, name, value, cb) {
});
};
Message.startTransaction = function(cb) {
msgDb.run('BEGIN;', err => {
cb(err);
});
};
Message.endTransaction = function(hadError, cb) {
msgDb.run(hadError ? 'ROLLBACK;' : 'COMMIT;', err => {
cb(err);
});
};
Message.prototype.persist = function(cb) {
if(!this.isValid()) {
@ -361,14 +354,12 @@ Message.prototype.persist = function(cb) {
const self = this;
async.series(
async.waterfall(
[
function beginTransaction(callback) {
Message.startTransaction(err => {
return callback(err);
});
return msgDb.beginTransaction(callback);
},
function storeMessage(callback) {
function storeMessage(trans, callback) {
// generate a UUID for this message if required (general case)
const msgTimestamp = moment();
if(!self.uuid) {
@ -379,7 +370,7 @@ Message.prototype.persist = function(cb) {
self.message);
}
msgDb.run(
trans.run(
`INSERT INTO message (area_tag, message_uuid, reply_to_message_id, to_user_name, from_user_name, subject, message, modified_timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?);`,
[ self.areaTag, self.uuid, self.replyToMsgId, self.toUserName, self.fromUserName, self.subject, self.message, getISOTimestampString(msgTimestamp) ],
@ -388,13 +379,13 @@ Message.prototype.persist = function(cb) {
self.messageId = this.lastID;
}
return callback(err);
return callback(err, trans);
}
);
},
function storeMeta(callback) {
function storeMeta(trans, callback) {
if(!self.meta) {
return callback(null);
return callback(null, trans);
}
/*
Example of self.meta:
@ -410,7 +401,7 @@ Message.prototype.persist = function(cb) {
*/
async.each(Object.keys(self.meta), (category, nextCat) => {
async.each(Object.keys(self.meta[category]), (name, nextName) => {
self.persistMetaValue(category, name, self.meta[category][name], err => {
self.persistMetaValue(category, name, self.meta[category][name], trans, err => {
nextName(err);
});
}, err => {
@ -418,18 +409,22 @@ Message.prototype.persist = function(cb) {
});
}, err => {
callback(err);
callback(err, trans);
});
},
function storeHashTags(callback) {
function storeHashTags(trans, callback) {
// :TODO: hash tag support
return callback(null);
return callback(null, trans);
}
],
err => {
Message.endTransaction(err, transErr => {
return cb(err ? err : transErr, self.messageId);
});
(err, trans) => {
if(trans) {
trans[err ? 'rollback' : 'commit'](transErr => {
return cb(err ? err : transErr, self.messageId);
});
} else {
return cb(err);
}
}
);
};

View File

@ -189,15 +189,13 @@ module.exports = class User {
// :TODO: set various defaults, e.g. default activation status, etc.
self.properties.account_status = Config.users.requireActivation ? User.AccountStatus.inactive : User.AccountStatus.active;
async.series(
async.waterfall(
[
function beginTransaction(callback) {
userDb.run('BEGIN;', err => {
return callback(err);
});
return userDb.beginTransaction(callback);
},
function createUserRec(callback) {
userDb.run(
function createUserRec(trans, callback) {
trans.run(
`INSERT INTO user (user_name)
VALUES (?);`,
[ self.username ],
@ -213,11 +211,11 @@ module.exports = class User {
self.properties.account_status = User.AccountStatus.active;
}
return callback(null);
return callback(null, trans);
}
);
},
function genAuthCredentials(callback) {
function genAuthCredentials(trans, callback) {
User.generatePasswordDerivedKeyAndSalt(password, (err, info) => {
if(err) {
return callback(err);
@ -225,85 +223,56 @@ module.exports = class User {
self.properties.pw_pbkdf2_salt = info.salt;
self.properties.pw_pbkdf2_dk = info.dk;
return callback(null);
return callback(null, trans);
});
},
function setInitialGroupMembership(callback) {
function setInitialGroupMembership(trans, callback) {
self.groups = Config.users.defaultGroups;
if(User.RootUserID === self.userId) { // root/SysOp?
self.groups.push('sysops');
}
return callback(null);
return callback(null, trans);
},
function saveAll(callback) {
self.persist(false, err => {
return callback(err);
function saveAll(trans, callback) {
self.persistWithTransaction(trans, err => {
return callback(err, trans);
});
}
],
err => {
if(err) {
const originalError = err;
userDb.run('ROLLBACK;', err => {
assert(!err);
return cb(originalError);
(err, trans) => {
if(trans) {
trans[err ? 'rollback' : 'commit'](transErr => {
return cb(err ? err : transErr);
});
} else {
userDb.run('COMMIT;', err => {
return cb(err);
});
return cb(err);
}
}
);
}
persist(useTransaction, cb) {
persistWithTransaction(trans, cb) {
assert(this.userId > 0);
const self = this;
async.series(
[
function beginTransaction(callback) {
if(useTransaction) {
userDb.run('BEGIN;', err => {
return callback(err);
});
} else {
return callback(null);
}
},
function saveProps(callback) {
self.persistProperties(self.properties, err => {
self.persistProperties(self.properties, trans, err => {
return callback(err);
});
},
function saveGroups(callback) {
userGroup.addUserToGroups(self.userId, self.groups, err => {
userGroup.addUserToGroups(self.userId, self.groups, trans, err => {
return callback(err);
});
}
],
err => {
if(err) {
if(useTransaction) {
userDb.run('ROLLBACK;', err => {
return cb(err);
});
} else {
return cb(err);
}
} else {
if(useTransaction) {
userDb.run('COMMIT;', err => {
return cb(err);
});
} else {
return cb(null);
}
}
return cb(err);
}
);
}
@ -340,13 +309,18 @@ module.exports = class User {
);
}
persistProperties(properties, cb) {
persistProperties(properties, transOrDb, cb) {
if(!_.isFunction(cb) && _.isFunction(transOrDb)) {
cb = transOrDb;
transOrDb = userDb;
}
const self = this;
// update live props
_.merge(this.properties, properties);
const stmt = userDb.prepare(
const stmt = transOrDb.prepare(
`REPLACE INTO user_property (user_id, prop_name, prop_value)
VALUES (?, ?, ?);`
);

View File

@ -33,8 +33,13 @@ function getGroupsForUser(userId, cb) {
});
}
function addUserToGroup(userId, groupName, cb) {
userDb.run(
function addUserToGroup(userId, groupName, transOrDb, cb) {
if(!_.isFunction(cb) && _.isFunction(transOrDb)) {
cb = transOrDb;
transOrDb = userDb;
}
transOrDb.run(
'REPLACE INTO user_group_member (group_name, user_id) ' +
'VALUES(?, ?);',
[ groupName, userId ],
@ -44,10 +49,10 @@ function addUserToGroup(userId, groupName, cb) {
);
}
function addUserToGroups(userId, groups, cb) {
function addUserToGroups(userId, groups, transOrDb, cb) {
async.each(groups, function item(groupName, next) {
addUserToGroup(userId, groupName, next);
addUserToGroup(userId, groupName, transOrDb, next);
}, function complete(err) {
cb(err);
});

View File

@ -3,7 +3,10 @@
// ENiGMA½
const MenuModule = require('../core/menu_module.js').MenuModule;
const getModDatabasePath = require('../core/database.js').getModDatabasePath;
const {
getModDatabasePath,
getTransactionDatabase
} = require('../core/database.js').getModDatabasePath;
const ViewController = require('../core/view_controller.js').ViewController;
const ansi = require('../core/ansi_term.js');
const theme = require('../core/theme.js');
@ -392,10 +395,10 @@ exports.getModule = class BBSListModule extends MenuModule {
async.series(
[
function openDatabase(callback) {
self.database = new sqlite3.Database(
self.database = getTransactionDatabase(new sqlite3.Database(
getModDatabasePath(moduleInfo),
callback
);
));
},
function createTables(callback) {
self.database.serialize( () => {

View File

@ -3,7 +3,10 @@
// ENiGMA½
const MenuModule = require('../core/menu_module.js').MenuModule;
const getModDatabasePath = require('../core/database.js').getModDatabasePath;
const {
getModDatabasePath,
getTransactionDatabase
} = require('../core/database.js').getModDatabasePath;
const ViewController = require('../core/view_controller.js').ViewController;
const theme = require('../core/theme.js');
const ansi = require('../core/ansi_term.js');
@ -263,12 +266,12 @@ exports.getModule = class OnelinerzModule extends MenuModule {
async.series(
[
function openDatabase(callback) {
self.db = new sqlite3.Database(
self.db = getTransactionDatabase(new sqlite3.Database(
getModDatabasePath(exports.moduleInfo),
err => {
return callback(err);
}
);
));
},
function createTables(callback) {
self.db.run(

View File

@ -42,6 +42,7 @@
"ptyw.js": "NuSkooler/ptyw.js",
"sanitize-filename": "^1.6.1",
"sqlite3": "^3.1.9",
"sqlite3-transactions": "^0.0.5",
"ssh2": "^0.5.5",
"temptmp": "^1.0.0",
"uuid": "^3.1.0",