Replace old ActiveLock system with a slightly better one

CyTube has been crashing recently due to things attempting to release
the reference after the channel was already closed (apparently the
uncaughtException handler isn't called for this?).  This newer
implementation keeps track of what is ref'ing and unref'ing it, so it
can log an error if it detects a discrepancy.

Also changed the server to not delete the refCounter field from the
channel when it's unloaded, so that should reduce the number of errors
stemming from it being null/undefined.
This commit is contained in:
calzoneman 2015-12-25 17:07:25 -08:00
parent e88971a011
commit be4011cda1
6 changed files with 138 additions and 163 deletions

View File

@ -12,39 +12,65 @@ import * as ChannelStore from '../channel-storage/channelstore';
import { ChannelStateSizeError } from '../errors';
import Promise from 'bluebird';
/**
* Previously, async channel functions were riddled with race conditions due to
* an event causing the channel to be unloaded while a pending callback still
* needed to reference it.
*
* This solution should be better than constantly checking whether the channel
* has been unloaded in nested callbacks. The channel won't be unloaded until
* nothing needs it anymore. Conceptually similar to a reference count.
*/
function ActiveLock(channel) {
class ReferenceCounter {
constructor(channel) {
this.channel = channel;
this.count = 0;
this.channelName = channel.name;
this.refCount = 0;
this.references = {};
}
ActiveLock.prototype = {
lock: function () {
this.count++;
},
ref(caller) {
if (caller) {
if (this.references.hasOwnProperty(caller)) {
this.references[caller]++;
} else {
this.references[caller] = 1;
}
}
release: function () {
this.count--;
if (this.count === 0) {
/* sanity check */
if (this.channel.users.length > 0) {
Logger.errlog.log("Warning: ActiveLock count=0 but users.length > 0 (" +
"channel: " + this.channel.name + ")");
this.count = this.channel.users.length;
this.refCount++;
}
unref(caller) {
if (caller) {
if (this.references.hasOwnProperty(caller)) {
this.references[caller]--;
if (this.references[caller] === 0) {
delete this.references[caller];
}
} else {
Logger.errlog.log("ReferenceCounter::unref() called by caller [" +
caller + "] but this caller had no active references! " +
`(channel: ${this.channelName})`);
}
}
this.refCount--;
this.checkRefCount();
}
checkRefCount() {
if (this.refCount === 0) {
if (Object.keys(this.references).length > 0) {
Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " +
"active references: " +
JSON.stringify(Object.keys(this.references)) +
` (channel: ${this.channelName})`);
for (var caller in this.references) {
this.refCount += this.references[caller];
}
} else if (this.channel.users.length > 0) {
Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " +
this.channel.users.length + " active users" +
` (channel: ${this.channelName})`);
this.refCount = this.channel.users.length;
} else {
this.channel.emit("empty");
}
}
}
};
}
function Channel(name) {
MakeEmitter(this);
@ -54,7 +80,7 @@ function Channel(name) {
this.logger = new Logger.Logger(path.join(__dirname, "..", "..", "chanlogs",
this.uniqueName + ".log"));
this.users = [];
this.activeLock = new ActiveLock(this);
this.refCounter = new ReferenceCounter(this);
this.flags = 0;
var self = this;
db.channels.load(this, function (err) {
@ -238,15 +264,16 @@ Channel.prototype.saveState = function () {
};
Channel.prototype.checkModules = function (fn, args, cb) {
var self = this;
const self = this;
const refCaller = `Channel::checkModules/${fn}`;
this.waitFlag(Flags.C_READY, function () {
self.activeLock.lock();
self.refCounter.ref(refCaller);
var keys = Object.keys(self.modules);
var next = function (err, result) {
if (result !== ChannelModule.PASSTHROUGH) {
/* Either an error occured, or the module denied the user access */
cb(err, result);
self.activeLock.release();
self.refCounter.unref(refCaller);
return;
}
@ -254,7 +281,7 @@ Channel.prototype.checkModules = function (fn, args, cb) {
if (m === undefined) {
/* No more modules to check */
cb(null, ChannelModule.PASSTHROUGH);
self.activeLock.release();
self.refCounter.unref(refCaller);
return;
}
@ -278,13 +305,13 @@ Channel.prototype.notifyModules = function (fn, args) {
};
Channel.prototype.joinUser = function (user, data) {
var self = this;
const self = this;
self.activeLock.lock();
self.refCounter.ref("Channel::user");
self.waitFlag(Flags.C_READY, function () {
/* User closed the connection before the channel finished loading */
if (user.socket.disconnected) {
self.activeLock.release();
self.refCounter.unref("Channel::user");
return;
}
@ -293,7 +320,7 @@ Channel.prototype.joinUser = function (user, data) {
if (err) {
Logger.errlog.log("user.refreshAccount failed at Channel.joinUser");
Logger.errlog.log(err.stack);
self.activeLock.release();
self.refCounter.unref("Channel::user");
return;
}
@ -304,8 +331,10 @@ Channel.prototype.joinUser = function (user, data) {
}
function afterAccount() {
if (self.dead || user.socket.disconnected) {
if (self.activeLock) self.activeLock.release();
if (user.socket.disconnected) {
self.refCounter.unref("Channel::user");
return;
} else if (self.dead) {
return;
}
@ -318,9 +347,7 @@ Channel.prototype.joinUser = function (user, data) {
} else {
user.account.channelRank = 0;
user.account.effectiveRank = user.account.globalRank;
if (self.activeLock) {
self.activeLock.release();
}
self.refCounter.unref("Channel::user");
}
});
}
@ -408,7 +435,7 @@ Channel.prototype.partUser = function (user) {
});
this.sendUsercount(this.users);
this.activeLock.release();
this.refCounter.unref("Channel::user");
user.die();
};
@ -555,20 +582,20 @@ Channel.prototype.sendUserJoin = function (users, user) {
};
Channel.prototype.readLog = function (cb) {
var maxLen = 102400;
var file = this.logger.filename;
this.activeLock.lock();
var self = this;
const maxLen = 102400;
const file = this.logger.filename;
this.refCounter.ref("Channel::readLog");
const self = this;
fs.stat(file, function (err, data) {
if (err) {
self.activeLock.release();
self.refCounter.unref("readLog");
return cb(err, null);
}
var start = Math.max(data.size - maxLen, 0);
var end = data.size - 1;
const start = Math.max(data.size - maxLen, 0);
const end = data.size - 1;
var read = fs.createReadStream(file, {
const read = fs.createReadStream(file, {
start: start,
end: end
});
@ -579,7 +606,7 @@ Channel.prototype.readLog = function (cb) {
});
read.on("end", function () {
cb(null, buffer);
self.activeLock.release();
self.refCounter.unref("Channel::readLog");
});
});
};
@ -648,7 +675,7 @@ Channel.prototype.packInfo = function (isAdmin) {
}
if (isAdmin) {
data.activeLockCount = this.activeLock.count;
data.activeLockCount = this.refCounter.refCount;
}
var self = this;

View File

@ -73,9 +73,10 @@ KickBanModule.prototype.onUserPostJoin = function (user) {
return;
}
var chan = this.channel;
const chan = this.channel;
const refCaller = "KickBanModule::onUserPostJoin";
user.waitFlag(Flags.U_LOGGED_IN, function () {
chan.activeLock.lock();
chan.refCounter.ref(refCaller);
db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) {
if (!err && banned) {
user.kick("You are banned from this channel.");
@ -84,7 +85,7 @@ KickBanModule.prototype.onUserPostJoin = function (user) {
"name is banned)");
}
}
chan.activeLock.release();
chan.refCounter.unref(refCaller);
});
});
@ -222,10 +223,10 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, meta) {
var name = args.shift().toLowerCase();
var reason = args.join(" ");
var chan = this.channel;
chan.activeLock.lock();
const chan = this.channel;
chan.refCounter.ref("KickBanModule::handleCmdBan");
this.banName(user, name, reason, function (err) {
chan.activeLock.release();
chan.refCounter.unref("KickBanModule::handleCmdBan");
});
};
@ -249,10 +250,10 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, meta) {
}
var reason = args.join(" ");
var chan = this.channel;
chan.activeLock.lock();
const chan = this.channel;
chan.refCounter.ref("KickBanModule::handleCmdIPBan");
this.banAll(user, name, range, reason, function (err) {
chan.activeLock.release();
chan.refCounter.unref("KickBanModule::handleCmdIPBan");
});
};
@ -416,9 +417,10 @@ KickBanModule.prototype.handleUnban = function (user, data) {
}
var self = this;
this.channel.activeLock.lock();
this.channel.refCounter.ref("KickBanModule::handleUnban");
db.channels.unbanId(this.channel.name, data.id, function (err) {
if (err) {
self.channel.refCounter.unref("KickBanModule::handleUnban");
return user.socket.emit("errorMsg", {
msg: err
});
@ -431,7 +433,7 @@ KickBanModule.prototype.handleUnban = function (user, data) {
self.channel.modules.chat.sendModMessage(user.getName() + " unbanned " +
data.name, banperm);
}
self.channel.activeLock.release();
self.channel.refCounter.unref("KickBanModule::handleUnban");
});
};

View File

@ -51,16 +51,19 @@ LibraryModule.prototype.handleUncache = function (user, data) {
return;
}
var chan = this.channel;
chan.activeLock.lock();
const chan = this.channel;
chan.refCounter.ref("LibraryModule::handleUncache");
db.channels.deleteFromLibrary(chan.name, data.id, function (err, res) {
if (chan.dead || err) {
if (chan.dead) {
return;
} else if (err) {
chan.refCounter.unref("LibraryModule::handleUncache");
return;
}
chan.logger.log("[library] " + user.getName() + " deleted " + data.id +
"from the library");
chan.activeLock.release();
chan.refCounter.unref("LibraryModule::handleUncache");
});
};

View File

@ -63,8 +63,8 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) {
return;
}
var self = this;
self.channel.activeLock.lock();
const self = this;
self.channel.refCounter.ref("MediaRefresherModule::initVimeo");
Vimeo.extract(data.id).then(function (direct) {
if (self.dead || self.channel.dead)
return;
@ -74,12 +74,13 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) {
self.channel.logger.log("[mediarefresher] Refreshed vimeo video with ID " +
data.id);
}
self.channel.activeLock.release();
if (cb) cb();
}).catch(function (err) {
Logger.errlog.log("Unexpected vimeo::extract() fail: " + err.stack);
if (cb) cb();
}).finally(() => {
self.channel.refCounter.unref("MediaRefresherModule::initVimeo");
});
};
@ -90,7 +91,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) {
return;
}
self.channel.activeLock.lock();
self.channel.refCounter.ref("MediaRefresherModule::refreshGoogleDocs");
InfoGetter.getMedia(media.id, "gd", function (err, data) {
if (self.dead || self.channel.dead) {
return;
@ -108,7 +109,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) {
self.channel.logger.log("[mediarefresher] Google Docs refresh failed " +
"(likely redirect to login page-- make sure it is shared " +
"correctly)");
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs");
if (cb) cb();
return;
case "Access Denied":
@ -119,7 +120,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) {
case "Google Drive videos must be shared publicly":
self.channel.logger.log("[mediarefresher] Google Docs refresh failed: " +
err);
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs");
if (cb) cb();
return;
default:
@ -128,14 +129,14 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) {
err);
Logger.errlog.log("Google Docs refresh failed for ID " + media.id +
": " + err);
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs");
if (cb) cb();
return;
}
}
if (media !== self._media) {
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs");
if (cb) cb();
return;
}
@ -143,7 +144,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) {
self.channel.logger.log("[mediarefresher] Refreshed Google Docs video with ID " +
media.id);
media.meta = data.meta;
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs");
if (cb) cb();
});
};
@ -155,7 +156,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) {
return;
}
self.channel.activeLock.lock();
self.channel.refCounter.ref("MediaRefresherModule::initGooglePlus");
InfoGetter.getMedia(media.id, "gp", function (err, data) {
if (self.dead || self.channel.dead) {
return;
@ -177,7 +178,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) {
"and is shared publicly"):
self.channel.logger.log("[mediarefresher] Google+ refresh failed: " +
err);
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus");
if (cb) cb();
return;
default:
@ -186,14 +187,14 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) {
err);
Logger.errlog.log("Google+ refresh failed for ID " + media.id +
": " + err);
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus");
if (cb) cb();
return;
}
}
if (media !== self._media) {
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus");
if (cb) cb();
return;
}
@ -201,7 +202,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) {
self.channel.logger.log("[mediarefresher] Refreshed Google+ video with ID " +
media.id);
media.meta = data.meta;
self.channel.activeLock.release();
self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus");
if (cb) cb();
});
};

View File

@ -447,15 +447,15 @@ PlaylistModule.prototype.queueStandard = function (user, data) {
});
};
var self = this;
this.channel.activeLock.lock();
const self = this;
this.channel.refCounter.ref("PlaylistModule::queueStandard");
this.semaphore.queue(function (lock) {
var lib = self.channel.modules.library;
if (lib && self.channel.is(Flags.C_REGISTERED) && !util.isLive(data.type)) {
lib.getItem(data.id, function (err, item) {
if (err && err !== "Item not in library") {
error(err+"");
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::queueStandard");
return lock.release();
}
@ -464,7 +464,7 @@ PlaylistModule.prototype.queueStandard = function (user, data) {
data.shouldAddToLibrary = false;
self._addItem(item, data, user, function () {
lock.release();
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::queueStandard");
});
} else {
handleLookup();
@ -479,25 +479,13 @@ PlaylistModule.prototype.queueStandard = function (user, data) {
InfoGetter.getMedia(data.id, data.type, function (err, media) {
if (err) {
error(XSS.sanitizeText(String(err)));
if (self.channel && self.channel.activeLock) {
self.channel.activeLock.release();
} else {
Logger.errlog.log("Attempted release of channel lock after " +
"channel was already unloaded in queueStandard: " +
channelName + " " + data.type + ":" + data.id);
}
self.channel.refCounter.unref("PlaylistModule::queueStandard");
return lock.release();
}
self._addItem(media, data, user, function () {
lock.release();
if (self.channel && self.channel.activeLock) {
self.channel.activeLock.release();
} else {
Logger.errlog.log("Attempted release of channel lock after " +
"channel was already unloaded in queueStandard: " +
channelName + " " + data.type + ":" + data.id);
}
self.channel.refCounter.unref("PlaylistModule::queueStandard");
});
});
}
@ -536,12 +524,12 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) {
}
}
self.channel.activeLock.lock();
self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist");
vids.forEach(function (media) {
data.link = util.formatLink(media.id, media.type);
self._addItem(media, data, user);
});
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist");
lock.release();
});
@ -560,7 +548,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) {
}
var plitem = this.items.find(data);
self.channel.activeLock.lock();
self.channel.refCounter.ref("PlaylistModule::handleDelete");
this.semaphore.queue(function (lock) {
if (self._delete(data)) {
self.channel.logger.log("[playlist] " + user.getName() + " deleted " +
@ -568,7 +556,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) {
}
lock.release();
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleDelete");
});
};
@ -602,27 +590,27 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) {
return;
}
var self = this;
self.channel.activeLock.lock();
const self = this;
self.channel.refCounter.ref("PlaylistModule::handleMoveMedia");
self.semaphore.queue(function (lock) {
if (!self.items.remove(data.from)) {
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
if (data.after === "prepend") {
if (!self.items.prepend(from)) {
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
} else if (data.after === "append") {
if (!self.items.append(from)) {
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
} else {
if (!self.items.insertAfter(from, data.after)) {
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
}
@ -633,7 +621,7 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) {
from.media.title +
(after ? " after " + after.media.title : ""));
lock.release();
self.channel.activeLock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
});
};
@ -1128,55 +1116,6 @@ PlaylistModule.prototype._leadLoop = function() {
}
};
PlaylistModule.prototype.refreshGoogleDocs = function (cb) {
var self = this;
if (self.dead || !self.channel || self.channel.dead) {
return;
}
var abort = function () {
if (self.current) {
self.current.media.meta.object = self.current.media.meta.object || null;
self.current.media.meta.failed = true;
}
if (cb) {
cb();
}
};
if (!this.current || this.current.media.type !== "gd") {
return abort();
}
self.channel.activeLock.lock();
InfoGetter.getMedia(this.current.media.id, "gd", function (err, media) {
if (err) {
Logger.errlog.log("Google Docs autorefresh failed: " + err);
Logger.errlog.log("ID was: " + self.current.media.id);
if (self.current) {
self.current.media.meta.object = self.current.media.meta.object || null;
self.current.media.meta.failed = true;
}
if (cb) {
cb();
}
self.channel.activeLock.release();
} else {
if (!self.current || self.current.media.type !== "gd") {
self.channel.activeLock.release();
return abort();
}
self.current.media.meta = media.meta;
self.current.media.meta.expiration = Date.now() + 3600000;
self.channel.logger.log("[playlist] Auto-refreshed Google Doc video");
cb && cb();
self.channel.activeLock.release();
}
});
};
PlaylistModule.prototype._playNext = function () {
if (!this.current) {
return;
@ -1335,10 +1274,11 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) {
pos: data.pos
};
var self = this;
self.channel.activeLock.lock();
const self = this;
self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist");
db.getUserPlaylist(user.getName(), data.name, function (err, pl) {
if (err) {
self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist");
return user.socket.emit("errorMsg", {
msg: "Playlist load failed: " + err
});
@ -1369,7 +1309,6 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) {
var m = new Media(item.id, item.title, item.seconds, item.type, item.meta);
self._addItem(m, qdata, user);
});
self.channel.activeLock.release();
} catch (e) {
Logger.errlog.log("Loading user playlist failed!");
Logger.errlog.log("PL: " + user.getName() + "-" + data.name);
@ -1378,7 +1317,8 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) {
msg: "Internal error occurred when loading playlist.",
link: null
});
self.channel.activeLock.release();
} finally {
self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist");
}
});
};

View File

@ -208,8 +208,10 @@ Server.prototype.unloadChannel = function (chan) {
// Empty all outward references from the channel
var keys = Object.keys(chan);
for (var i in keys) {
if (keys[i] !== "refCounter") {
delete chan[keys[i]];
}
}
chan.dead = true;
};