From be4011cda1b7fe2c7bbb738964d5b1334b15a2f3 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Fri, 25 Dec 2015 17:07:25 -0800 Subject: [PATCH] Replace old ActiveLock system with a slightly better one CyTube has been crashing recently due to things attempting to release the reference after the channel was already closed (apparently the uncaughtException handler isn't called for this?). This newer implementation keeps track of what is ref'ing and unref'ing it, so it can log an error if it detects a discrepancy. Also changed the server to not delete the refCounter field from the channel when it's unloaded, so that should reduce the number of errors stemming from it being null/undefined. --- src/channel/channel.js | 129 ++++++++++++++++++++-------------- src/channel/kickban.js | 24 ++++--- src/channel/library.js | 11 +-- src/channel/mediarefresher.js | 29 ++++---- src/channel/playlist.js | 104 ++++++--------------------- src/server.js | 4 +- 6 files changed, 138 insertions(+), 163 deletions(-) diff --git a/src/channel/channel.js b/src/channel/channel.js index 6c90c77a..b348ceab 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -12,39 +12,65 @@ import * as ChannelStore from '../channel-storage/channelstore'; import { ChannelStateSizeError } from '../errors'; import Promise from 'bluebird'; -/** - * Previously, async channel functions were riddled with race conditions due to - * an event causing the channel to be unloaded while a pending callback still - * needed to reference it. - * - * This solution should be better than constantly checking whether the channel - * has been unloaded in nested callbacks. The channel won't be unloaded until - * nothing needs it anymore. Conceptually similar to a reference count. - */ -function ActiveLock(channel) { - this.channel = channel; - this.count = 0; -} +class ReferenceCounter { + constructor(channel) { + this.channel = channel; + this.channelName = channel.name; + this.refCount = 0; + this.references = {}; + } -ActiveLock.prototype = { - lock: function () { - this.count++; - }, + ref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]++; + } else { + this.references[caller] = 1; + } + } - release: function () { - this.count--; - if (this.count === 0) { - /* sanity check */ - if (this.channel.users.length > 0) { - Logger.errlog.log("Warning: ActiveLock count=0 but users.length > 0 (" + - "channel: " + this.channel.name + ")"); - this.count = this.channel.users.length; + this.refCount++; + } + + unref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]--; + if (this.references[caller] === 0) { + delete this.references[caller]; + } + } else { + Logger.errlog.log("ReferenceCounter::unref() called by caller [" + + caller + "] but this caller had no active references! " + + `(channel: ${this.channelName})`); + } + } + + this.refCount--; + this.checkRefCount(); + } + + checkRefCount() { + if (this.refCount === 0) { + if (Object.keys(this.references).length > 0) { + Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " + + "active references: " + + JSON.stringify(Object.keys(this.references)) + + ` (channel: ${this.channelName})`); + for (var caller in this.references) { + this.refCount += this.references[caller]; + } + } else if (this.channel.users.length > 0) { + Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " + + this.channel.users.length + " active users" + + ` (channel: ${this.channelName})`); + this.refCount = this.channel.users.length; } else { this.channel.emit("empty"); } } } -}; +} function Channel(name) { MakeEmitter(this); @@ -54,7 +80,7 @@ function Channel(name) { this.logger = new Logger.Logger(path.join(__dirname, "..", "..", "chanlogs", this.uniqueName + ".log")); this.users = []; - this.activeLock = new ActiveLock(this); + this.refCounter = new ReferenceCounter(this); this.flags = 0; var self = this; db.channels.load(this, function (err) { @@ -238,15 +264,16 @@ Channel.prototype.saveState = function () { }; Channel.prototype.checkModules = function (fn, args, cb) { - var self = this; + const self = this; + const refCaller = `Channel::checkModules/${fn}`; this.waitFlag(Flags.C_READY, function () { - self.activeLock.lock(); + self.refCounter.ref(refCaller); var keys = Object.keys(self.modules); var next = function (err, result) { if (result !== ChannelModule.PASSTHROUGH) { /* Either an error occured, or the module denied the user access */ cb(err, result); - self.activeLock.release(); + self.refCounter.unref(refCaller); return; } @@ -254,7 +281,7 @@ Channel.prototype.checkModules = function (fn, args, cb) { if (m === undefined) { /* No more modules to check */ cb(null, ChannelModule.PASSTHROUGH); - self.activeLock.release(); + self.refCounter.unref(refCaller); return; } @@ -278,13 +305,13 @@ Channel.prototype.notifyModules = function (fn, args) { }; Channel.prototype.joinUser = function (user, data) { - var self = this; + const self = this; - self.activeLock.lock(); + self.refCounter.ref("Channel::user"); self.waitFlag(Flags.C_READY, function () { /* User closed the connection before the channel finished loading */ if (user.socket.disconnected) { - self.activeLock.release(); + self.refCounter.unref("Channel::user"); return; } @@ -293,7 +320,7 @@ Channel.prototype.joinUser = function (user, data) { if (err) { Logger.errlog.log("user.refreshAccount failed at Channel.joinUser"); Logger.errlog.log(err.stack); - self.activeLock.release(); + self.refCounter.unref("Channel::user"); return; } @@ -304,8 +331,10 @@ Channel.prototype.joinUser = function (user, data) { } function afterAccount() { - if (self.dead || user.socket.disconnected) { - if (self.activeLock) self.activeLock.release(); + if (user.socket.disconnected) { + self.refCounter.unref("Channel::user"); + return; + } else if (self.dead) { return; } @@ -318,9 +347,7 @@ Channel.prototype.joinUser = function (user, data) { } else { user.account.channelRank = 0; user.account.effectiveRank = user.account.globalRank; - if (self.activeLock) { - self.activeLock.release(); - } + self.refCounter.unref("Channel::user"); } }); } @@ -408,7 +435,7 @@ Channel.prototype.partUser = function (user) { }); this.sendUsercount(this.users); - this.activeLock.release(); + this.refCounter.unref("Channel::user"); user.die(); }; @@ -555,20 +582,20 @@ Channel.prototype.sendUserJoin = function (users, user) { }; Channel.prototype.readLog = function (cb) { - var maxLen = 102400; - var file = this.logger.filename; - this.activeLock.lock(); - var self = this; + const maxLen = 102400; + const file = this.logger.filename; + this.refCounter.ref("Channel::readLog"); + const self = this; fs.stat(file, function (err, data) { if (err) { - self.activeLock.release(); + self.refCounter.unref("readLog"); return cb(err, null); } - var start = Math.max(data.size - maxLen, 0); - var end = data.size - 1; + const start = Math.max(data.size - maxLen, 0); + const end = data.size - 1; - var read = fs.createReadStream(file, { + const read = fs.createReadStream(file, { start: start, end: end }); @@ -579,7 +606,7 @@ Channel.prototype.readLog = function (cb) { }); read.on("end", function () { cb(null, buffer); - self.activeLock.release(); + self.refCounter.unref("Channel::readLog"); }); }); }; @@ -648,7 +675,7 @@ Channel.prototype.packInfo = function (isAdmin) { } if (isAdmin) { - data.activeLockCount = this.activeLock.count; + data.activeLockCount = this.refCounter.refCount; } var self = this; diff --git a/src/channel/kickban.js b/src/channel/kickban.js index c9f306aa..0d11e0c9 100644 --- a/src/channel/kickban.js +++ b/src/channel/kickban.js @@ -73,9 +73,10 @@ KickBanModule.prototype.onUserPostJoin = function (user) { return; } - var chan = this.channel; + const chan = this.channel; + const refCaller = "KickBanModule::onUserPostJoin"; user.waitFlag(Flags.U_LOGGED_IN, function () { - chan.activeLock.lock(); + chan.refCounter.ref(refCaller); db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) { if (!err && banned) { user.kick("You are banned from this channel."); @@ -84,7 +85,7 @@ KickBanModule.prototype.onUserPostJoin = function (user) { "name is banned)"); } } - chan.activeLock.release(); + chan.refCounter.unref(refCaller); }); }); @@ -222,10 +223,10 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, meta) { var name = args.shift().toLowerCase(); var reason = args.join(" "); - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdBan"); this.banName(user, name, reason, function (err) { - chan.activeLock.release(); + chan.refCounter.unref("KickBanModule::handleCmdBan"); }); }; @@ -249,10 +250,10 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, meta) { } var reason = args.join(" "); - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdIPBan"); this.banAll(user, name, range, reason, function (err) { - chan.activeLock.release(); + chan.refCounter.unref("KickBanModule::handleCmdIPBan"); }); }; @@ -416,9 +417,10 @@ KickBanModule.prototype.handleUnban = function (user, data) { } var self = this; - this.channel.activeLock.lock(); + this.channel.refCounter.ref("KickBanModule::handleUnban"); db.channels.unbanId(this.channel.name, data.id, function (err) { if (err) { + self.channel.refCounter.unref("KickBanModule::handleUnban"); return user.socket.emit("errorMsg", { msg: err }); @@ -431,7 +433,7 @@ KickBanModule.prototype.handleUnban = function (user, data) { self.channel.modules.chat.sendModMessage(user.getName() + " unbanned " + data.name, banperm); } - self.channel.activeLock.release(); + self.channel.refCounter.unref("KickBanModule::handleUnban"); }); }; diff --git a/src/channel/library.js b/src/channel/library.js index 2d8eeb78..afd6c0ca 100644 --- a/src/channel/library.js +++ b/src/channel/library.js @@ -51,16 +51,19 @@ LibraryModule.prototype.handleUncache = function (user, data) { return; } - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("LibraryModule::handleUncache"); db.channels.deleteFromLibrary(chan.name, data.id, function (err, res) { - if (chan.dead || err) { + if (chan.dead) { + return; + } else if (err) { + chan.refCounter.unref("LibraryModule::handleUncache"); return; } chan.logger.log("[library] " + user.getName() + " deleted " + data.id + "from the library"); - chan.activeLock.release(); + chan.refCounter.unref("LibraryModule::handleUncache"); }); }; diff --git a/src/channel/mediarefresher.js b/src/channel/mediarefresher.js index 228a0b7e..a08b39b1 100644 --- a/src/channel/mediarefresher.js +++ b/src/channel/mediarefresher.js @@ -63,8 +63,8 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { return; } - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("MediaRefresherModule::initVimeo"); Vimeo.extract(data.id).then(function (direct) { if (self.dead || self.channel.dead) return; @@ -74,12 +74,13 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { self.channel.logger.log("[mediarefresher] Refreshed vimeo video with ID " + data.id); } - self.channel.activeLock.release(); if (cb) cb(); }).catch(function (err) { Logger.errlog.log("Unexpected vimeo::extract() fail: " + err.stack); if (cb) cb(); + }).finally(() => { + self.channel.refCounter.unref("MediaRefresherModule::initVimeo"); }); }; @@ -90,7 +91,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { return; } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("MediaRefresherModule::refreshGoogleDocs"); InfoGetter.getMedia(media.id, "gd", function (err, data) { if (self.dead || self.channel.dead) { return; @@ -108,7 +109,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { self.channel.logger.log("[mediarefresher] Google Docs refresh failed " + "(likely redirect to login page-- make sure it is shared " + "correctly)"); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; case "Access Denied": @@ -119,7 +120,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { case "Google Drive videos must be shared publicly": self.channel.logger.log("[mediarefresher] Google Docs refresh failed: " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; default: @@ -128,14 +129,14 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { err); Logger.errlog.log("Google Docs refresh failed for ID " + media.id + ": " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; } } if (media !== self._media) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; } @@ -143,7 +144,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { self.channel.logger.log("[mediarefresher] Refreshed Google Docs video with ID " + media.id); media.meta = data.meta; - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); }); }; @@ -155,7 +156,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { return; } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("MediaRefresherModule::initGooglePlus"); InfoGetter.getMedia(media.id, "gp", function (err, data) { if (self.dead || self.channel.dead) { return; @@ -177,7 +178,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { "and is shared publicly"): self.channel.logger.log("[mediarefresher] Google+ refresh failed: " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; default: @@ -186,14 +187,14 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { err); Logger.errlog.log("Google+ refresh failed for ID " + media.id + ": " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; } } if (media !== self._media) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; } @@ -201,7 +202,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { self.channel.logger.log("[mediarefresher] Refreshed Google+ video with ID " + media.id); media.meta = data.meta; - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); }); }; diff --git a/src/channel/playlist.js b/src/channel/playlist.js index 295c6a56..0beb9777 100644 --- a/src/channel/playlist.js +++ b/src/channel/playlist.js @@ -447,15 +447,15 @@ PlaylistModule.prototype.queueStandard = function (user, data) { }); }; - var self = this; - this.channel.activeLock.lock(); + const self = this; + this.channel.refCounter.ref("PlaylistModule::queueStandard"); this.semaphore.queue(function (lock) { var lib = self.channel.modules.library; if (lib && self.channel.is(Flags.C_REGISTERED) && !util.isLive(data.type)) { lib.getItem(data.id, function (err, item) { if (err && err !== "Item not in library") { error(err+""); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueStandard"); return lock.release(); } @@ -464,7 +464,7 @@ PlaylistModule.prototype.queueStandard = function (user, data) { data.shouldAddToLibrary = false; self._addItem(item, data, user, function () { lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); } else { handleLookup(); @@ -479,25 +479,13 @@ PlaylistModule.prototype.queueStandard = function (user, data) { InfoGetter.getMedia(data.id, data.type, function (err, media) { if (err) { error(XSS.sanitizeText(String(err))); - if (self.channel && self.channel.activeLock) { - self.channel.activeLock.release(); - } else { - Logger.errlog.log("Attempted release of channel lock after " + - "channel was already unloaded in queueStandard: " + - channelName + " " + data.type + ":" + data.id); - } + self.channel.refCounter.unref("PlaylistModule::queueStandard"); return lock.release(); } self._addItem(media, data, user, function () { lock.release(); - if (self.channel && self.channel.activeLock) { - self.channel.activeLock.release(); - } else { - Logger.errlog.log("Attempted release of channel lock after " + - "channel was already unloaded in queueStandard: " + - channelName + " " + data.type + ":" + data.id); - } + self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); }); } @@ -536,12 +524,12 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { } } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist"); vids.forEach(function (media) { data.link = util.formatLink(media.id, media.type); self._addItem(media, data, user); }); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist"); lock.release(); }); @@ -560,7 +548,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } var plitem = this.items.find(data); - self.channel.activeLock.lock(); + self.channel.refCounter.ref("PlaylistModule::handleDelete"); this.semaphore.queue(function (lock) { if (self._delete(data)) { self.channel.logger.log("[playlist] " + user.getName() + " deleted " + @@ -568,7 +556,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleDelete"); }); }; @@ -602,27 +590,27 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { return; } - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("PlaylistModule::handleMoveMedia"); self.semaphore.queue(function (lock) { if (!self.items.remove(data.from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } if (data.after === "prepend") { if (!self.items.prepend(from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else if (data.after === "append") { if (!self.items.append(from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else { if (!self.items.insertAfter(from, data.after)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } @@ -633,7 +621,7 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { from.media.title + (after ? " after " + after.media.title : "")); lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); }); }; @@ -1128,55 +1116,6 @@ PlaylistModule.prototype._leadLoop = function() { } }; -PlaylistModule.prototype.refreshGoogleDocs = function (cb) { - var self = this; - - if (self.dead || !self.channel || self.channel.dead) { - return; - } - - var abort = function () { - if (self.current) { - self.current.media.meta.object = self.current.media.meta.object || null; - self.current.media.meta.failed = true; - } - if (cb) { - cb(); - } - }; - - if (!this.current || this.current.media.type !== "gd") { - return abort(); - } - - self.channel.activeLock.lock(); - InfoGetter.getMedia(this.current.media.id, "gd", function (err, media) { - if (err) { - Logger.errlog.log("Google Docs autorefresh failed: " + err); - Logger.errlog.log("ID was: " + self.current.media.id); - if (self.current) { - self.current.media.meta.object = self.current.media.meta.object || null; - self.current.media.meta.failed = true; - } - if (cb) { - cb(); - } - self.channel.activeLock.release(); - } else { - if (!self.current || self.current.media.type !== "gd") { - self.channel.activeLock.release(); - return abort(); - } - - self.current.media.meta = media.meta; - self.current.media.meta.expiration = Date.now() + 3600000; - self.channel.logger.log("[playlist] Auto-refreshed Google Doc video"); - cb && cb(); - self.channel.activeLock.release(); - } - }); -}; - PlaylistModule.prototype._playNext = function () { if (!this.current) { return; @@ -1335,10 +1274,11 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { pos: data.pos }; - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist"); db.getUserPlaylist(user.getName(), data.name, function (err, pl) { if (err) { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); return user.socket.emit("errorMsg", { msg: "Playlist load failed: " + err }); @@ -1369,7 +1309,6 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { var m = new Media(item.id, item.title, item.seconds, item.type, item.meta); self._addItem(m, qdata, user); }); - self.channel.activeLock.release(); } catch (e) { Logger.errlog.log("Loading user playlist failed!"); Logger.errlog.log("PL: " + user.getName() + "-" + data.name); @@ -1378,7 +1317,8 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { msg: "Internal error occurred when loading playlist.", link: null }); - self.channel.activeLock.release(); + } finally { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); } }); }; diff --git a/src/server.js b/src/server.js index fd79c9f1..290e9b29 100644 --- a/src/server.js +++ b/src/server.js @@ -208,7 +208,9 @@ Server.prototype.unloadChannel = function (chan) { // Empty all outward references from the channel var keys = Object.keys(chan); for (var i in keys) { - delete chan[keys[i]]; + if (keys[i] !== "refCounter") { + delete chan[keys[i]]; + } } chan.dead = true; };