diff --git a/src/channel/channel.js b/src/channel/channel.js index 6c90c77a..b348ceab 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -12,39 +12,65 @@ import * as ChannelStore from '../channel-storage/channelstore'; import { ChannelStateSizeError } from '../errors'; import Promise from 'bluebird'; -/** - * Previously, async channel functions were riddled with race conditions due to - * an event causing the channel to be unloaded while a pending callback still - * needed to reference it. - * - * This solution should be better than constantly checking whether the channel - * has been unloaded in nested callbacks. The channel won't be unloaded until - * nothing needs it anymore. Conceptually similar to a reference count. - */ -function ActiveLock(channel) { - this.channel = channel; - this.count = 0; -} +class ReferenceCounter { + constructor(channel) { + this.channel = channel; + this.channelName = channel.name; + this.refCount = 0; + this.references = {}; + } -ActiveLock.prototype = { - lock: function () { - this.count++; - }, + ref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]++; + } else { + this.references[caller] = 1; + } + } - release: function () { - this.count--; - if (this.count === 0) { - /* sanity check */ - if (this.channel.users.length > 0) { - Logger.errlog.log("Warning: ActiveLock count=0 but users.length > 0 (" + - "channel: " + this.channel.name + ")"); - this.count = this.channel.users.length; + this.refCount++; + } + + unref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]--; + if (this.references[caller] === 0) { + delete this.references[caller]; + } + } else { + Logger.errlog.log("ReferenceCounter::unref() called by caller [" + + caller + "] but this caller had no active references! " + + `(channel: ${this.channelName})`); + } + } + + this.refCount--; + this.checkRefCount(); + } + + checkRefCount() { + if (this.refCount === 0) { + if (Object.keys(this.references).length > 0) { + Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " + + "active references: " + + JSON.stringify(Object.keys(this.references)) + + ` (channel: ${this.channelName})`); + for (var caller in this.references) { + this.refCount += this.references[caller]; + } + } else if (this.channel.users.length > 0) { + Logger.errlog.log("ReferenceCounter::refCount reached 0 but still had " + + this.channel.users.length + " active users" + + ` (channel: ${this.channelName})`); + this.refCount = this.channel.users.length; } else { this.channel.emit("empty"); } } } -}; +} function Channel(name) { MakeEmitter(this); @@ -54,7 +80,7 @@ function Channel(name) { this.logger = new Logger.Logger(path.join(__dirname, "..", "..", "chanlogs", this.uniqueName + ".log")); this.users = []; - this.activeLock = new ActiveLock(this); + this.refCounter = new ReferenceCounter(this); this.flags = 0; var self = this; db.channels.load(this, function (err) { @@ -238,15 +264,16 @@ Channel.prototype.saveState = function () { }; Channel.prototype.checkModules = function (fn, args, cb) { - var self = this; + const self = this; + const refCaller = `Channel::checkModules/${fn}`; this.waitFlag(Flags.C_READY, function () { - self.activeLock.lock(); + self.refCounter.ref(refCaller); var keys = Object.keys(self.modules); var next = function (err, result) { if (result !== ChannelModule.PASSTHROUGH) { /* Either an error occured, or the module denied the user access */ cb(err, result); - self.activeLock.release(); + self.refCounter.unref(refCaller); return; } @@ -254,7 +281,7 @@ Channel.prototype.checkModules = function (fn, args, cb) { if (m === undefined) { /* No more modules to check */ cb(null, ChannelModule.PASSTHROUGH); - self.activeLock.release(); + self.refCounter.unref(refCaller); return; } @@ -278,13 +305,13 @@ Channel.prototype.notifyModules = function (fn, args) { }; Channel.prototype.joinUser = function (user, data) { - var self = this; + const self = this; - self.activeLock.lock(); + self.refCounter.ref("Channel::user"); self.waitFlag(Flags.C_READY, function () { /* User closed the connection before the channel finished loading */ if (user.socket.disconnected) { - self.activeLock.release(); + self.refCounter.unref("Channel::user"); return; } @@ -293,7 +320,7 @@ Channel.prototype.joinUser = function (user, data) { if (err) { Logger.errlog.log("user.refreshAccount failed at Channel.joinUser"); Logger.errlog.log(err.stack); - self.activeLock.release(); + self.refCounter.unref("Channel::user"); return; } @@ -304,8 +331,10 @@ Channel.prototype.joinUser = function (user, data) { } function afterAccount() { - if (self.dead || user.socket.disconnected) { - if (self.activeLock) self.activeLock.release(); + if (user.socket.disconnected) { + self.refCounter.unref("Channel::user"); + return; + } else if (self.dead) { return; } @@ -318,9 +347,7 @@ Channel.prototype.joinUser = function (user, data) { } else { user.account.channelRank = 0; user.account.effectiveRank = user.account.globalRank; - if (self.activeLock) { - self.activeLock.release(); - } + self.refCounter.unref("Channel::user"); } }); } @@ -408,7 +435,7 @@ Channel.prototype.partUser = function (user) { }); this.sendUsercount(this.users); - this.activeLock.release(); + this.refCounter.unref("Channel::user"); user.die(); }; @@ -555,20 +582,20 @@ Channel.prototype.sendUserJoin = function (users, user) { }; Channel.prototype.readLog = function (cb) { - var maxLen = 102400; - var file = this.logger.filename; - this.activeLock.lock(); - var self = this; + const maxLen = 102400; + const file = this.logger.filename; + this.refCounter.ref("Channel::readLog"); + const self = this; fs.stat(file, function (err, data) { if (err) { - self.activeLock.release(); + self.refCounter.unref("readLog"); return cb(err, null); } - var start = Math.max(data.size - maxLen, 0); - var end = data.size - 1; + const start = Math.max(data.size - maxLen, 0); + const end = data.size - 1; - var read = fs.createReadStream(file, { + const read = fs.createReadStream(file, { start: start, end: end }); @@ -579,7 +606,7 @@ Channel.prototype.readLog = function (cb) { }); read.on("end", function () { cb(null, buffer); - self.activeLock.release(); + self.refCounter.unref("Channel::readLog"); }); }); }; @@ -648,7 +675,7 @@ Channel.prototype.packInfo = function (isAdmin) { } if (isAdmin) { - data.activeLockCount = this.activeLock.count; + data.activeLockCount = this.refCounter.refCount; } var self = this; diff --git a/src/channel/kickban.js b/src/channel/kickban.js index c9f306aa..0d11e0c9 100644 --- a/src/channel/kickban.js +++ b/src/channel/kickban.js @@ -73,9 +73,10 @@ KickBanModule.prototype.onUserPostJoin = function (user) { return; } - var chan = this.channel; + const chan = this.channel; + const refCaller = "KickBanModule::onUserPostJoin"; user.waitFlag(Flags.U_LOGGED_IN, function () { - chan.activeLock.lock(); + chan.refCounter.ref(refCaller); db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) { if (!err && banned) { user.kick("You are banned from this channel."); @@ -84,7 +85,7 @@ KickBanModule.prototype.onUserPostJoin = function (user) { "name is banned)"); } } - chan.activeLock.release(); + chan.refCounter.unref(refCaller); }); }); @@ -222,10 +223,10 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, meta) { var name = args.shift().toLowerCase(); var reason = args.join(" "); - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdBan"); this.banName(user, name, reason, function (err) { - chan.activeLock.release(); + chan.refCounter.unref("KickBanModule::handleCmdBan"); }); }; @@ -249,10 +250,10 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, meta) { } var reason = args.join(" "); - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdIPBan"); this.banAll(user, name, range, reason, function (err) { - chan.activeLock.release(); + chan.refCounter.unref("KickBanModule::handleCmdIPBan"); }); }; @@ -416,9 +417,10 @@ KickBanModule.prototype.handleUnban = function (user, data) { } var self = this; - this.channel.activeLock.lock(); + this.channel.refCounter.ref("KickBanModule::handleUnban"); db.channels.unbanId(this.channel.name, data.id, function (err) { if (err) { + self.channel.refCounter.unref("KickBanModule::handleUnban"); return user.socket.emit("errorMsg", { msg: err }); @@ -431,7 +433,7 @@ KickBanModule.prototype.handleUnban = function (user, data) { self.channel.modules.chat.sendModMessage(user.getName() + " unbanned " + data.name, banperm); } - self.channel.activeLock.release(); + self.channel.refCounter.unref("KickBanModule::handleUnban"); }); }; diff --git a/src/channel/library.js b/src/channel/library.js index 2d8eeb78..afd6c0ca 100644 --- a/src/channel/library.js +++ b/src/channel/library.js @@ -51,16 +51,19 @@ LibraryModule.prototype.handleUncache = function (user, data) { return; } - var chan = this.channel; - chan.activeLock.lock(); + const chan = this.channel; + chan.refCounter.ref("LibraryModule::handleUncache"); db.channels.deleteFromLibrary(chan.name, data.id, function (err, res) { - if (chan.dead || err) { + if (chan.dead) { + return; + } else if (err) { + chan.refCounter.unref("LibraryModule::handleUncache"); return; } chan.logger.log("[library] " + user.getName() + " deleted " + data.id + "from the library"); - chan.activeLock.release(); + chan.refCounter.unref("LibraryModule::handleUncache"); }); }; diff --git a/src/channel/mediarefresher.js b/src/channel/mediarefresher.js index 228a0b7e..a08b39b1 100644 --- a/src/channel/mediarefresher.js +++ b/src/channel/mediarefresher.js @@ -63,8 +63,8 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { return; } - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("MediaRefresherModule::initVimeo"); Vimeo.extract(data.id).then(function (direct) { if (self.dead || self.channel.dead) return; @@ -74,12 +74,13 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { self.channel.logger.log("[mediarefresher] Refreshed vimeo video with ID " + data.id); } - self.channel.activeLock.release(); if (cb) cb(); }).catch(function (err) { Logger.errlog.log("Unexpected vimeo::extract() fail: " + err.stack); if (cb) cb(); + }).finally(() => { + self.channel.refCounter.unref("MediaRefresherModule::initVimeo"); }); }; @@ -90,7 +91,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { return; } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("MediaRefresherModule::refreshGoogleDocs"); InfoGetter.getMedia(media.id, "gd", function (err, data) { if (self.dead || self.channel.dead) { return; @@ -108,7 +109,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { self.channel.logger.log("[mediarefresher] Google Docs refresh failed " + "(likely redirect to login page-- make sure it is shared " + "correctly)"); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; case "Access Denied": @@ -119,7 +120,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { case "Google Drive videos must be shared publicly": self.channel.logger.log("[mediarefresher] Google Docs refresh failed: " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; default: @@ -128,14 +129,14 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { err); Logger.errlog.log("Google Docs refresh failed for ID " + media.id + ": " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; } } if (media !== self._media) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); return; } @@ -143,7 +144,7 @@ MediaRefresherModule.prototype.refreshGoogleDocs = function (media, cb) { self.channel.logger.log("[mediarefresher] Refreshed Google Docs video with ID " + media.id); media.meta = data.meta; - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::refreshGoogleDocs"); if (cb) cb(); }); }; @@ -155,7 +156,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { return; } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("MediaRefresherModule::initGooglePlus"); InfoGetter.getMedia(media.id, "gp", function (err, data) { if (self.dead || self.channel.dead) { return; @@ -177,7 +178,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { "and is shared publicly"): self.channel.logger.log("[mediarefresher] Google+ refresh failed: " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; default: @@ -186,14 +187,14 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { err); Logger.errlog.log("Google+ refresh failed for ID " + media.id + ": " + err); - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; } } if (media !== self._media) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); return; } @@ -201,7 +202,7 @@ MediaRefresherModule.prototype.initGooglePlus = function (media, cb) { self.channel.logger.log("[mediarefresher] Refreshed Google+ video with ID " + media.id); media.meta = data.meta; - self.channel.activeLock.release(); + self.channel.refCounter.unref("MediaRefresherModule::initGooglePlus"); if (cb) cb(); }); }; diff --git a/src/channel/playlist.js b/src/channel/playlist.js index 295c6a56..0beb9777 100644 --- a/src/channel/playlist.js +++ b/src/channel/playlist.js @@ -447,15 +447,15 @@ PlaylistModule.prototype.queueStandard = function (user, data) { }); }; - var self = this; - this.channel.activeLock.lock(); + const self = this; + this.channel.refCounter.ref("PlaylistModule::queueStandard"); this.semaphore.queue(function (lock) { var lib = self.channel.modules.library; if (lib && self.channel.is(Flags.C_REGISTERED) && !util.isLive(data.type)) { lib.getItem(data.id, function (err, item) { if (err && err !== "Item not in library") { error(err+""); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueStandard"); return lock.release(); } @@ -464,7 +464,7 @@ PlaylistModule.prototype.queueStandard = function (user, data) { data.shouldAddToLibrary = false; self._addItem(item, data, user, function () { lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); } else { handleLookup(); @@ -479,25 +479,13 @@ PlaylistModule.prototype.queueStandard = function (user, data) { InfoGetter.getMedia(data.id, data.type, function (err, media) { if (err) { error(XSS.sanitizeText(String(err))); - if (self.channel && self.channel.activeLock) { - self.channel.activeLock.release(); - } else { - Logger.errlog.log("Attempted release of channel lock after " + - "channel was already unloaded in queueStandard: " + - channelName + " " + data.type + ":" + data.id); - } + self.channel.refCounter.unref("PlaylistModule::queueStandard"); return lock.release(); } self._addItem(media, data, user, function () { lock.release(); - if (self.channel && self.channel.activeLock) { - self.channel.activeLock.release(); - } else { - Logger.errlog.log("Attempted release of channel lock after " + - "channel was already unloaded in queueStandard: " + - channelName + " " + data.type + ":" + data.id); - } + self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); }); } @@ -536,12 +524,12 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { } } - self.channel.activeLock.lock(); + self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist"); vids.forEach(function (media) { data.link = util.formatLink(media.id, media.type); self._addItem(media, data, user); }); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist"); lock.release(); }); @@ -560,7 +548,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } var plitem = this.items.find(data); - self.channel.activeLock.lock(); + self.channel.refCounter.ref("PlaylistModule::handleDelete"); this.semaphore.queue(function (lock) { if (self._delete(data)) { self.channel.logger.log("[playlist] " + user.getName() + " deleted " + @@ -568,7 +556,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleDelete"); }); }; @@ -602,27 +590,27 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { return; } - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("PlaylistModule::handleMoveMedia"); self.semaphore.queue(function (lock) { if (!self.items.remove(data.from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } if (data.after === "prepend") { if (!self.items.prepend(from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else if (data.after === "append") { if (!self.items.append(from)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else { if (!self.items.insertAfter(from, data.after)) { - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } @@ -633,7 +621,7 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { from.media.title + (after ? " after " + after.media.title : "")); lock.release(); - self.channel.activeLock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); }); }; @@ -1128,55 +1116,6 @@ PlaylistModule.prototype._leadLoop = function() { } }; -PlaylistModule.prototype.refreshGoogleDocs = function (cb) { - var self = this; - - if (self.dead || !self.channel || self.channel.dead) { - return; - } - - var abort = function () { - if (self.current) { - self.current.media.meta.object = self.current.media.meta.object || null; - self.current.media.meta.failed = true; - } - if (cb) { - cb(); - } - }; - - if (!this.current || this.current.media.type !== "gd") { - return abort(); - } - - self.channel.activeLock.lock(); - InfoGetter.getMedia(this.current.media.id, "gd", function (err, media) { - if (err) { - Logger.errlog.log("Google Docs autorefresh failed: " + err); - Logger.errlog.log("ID was: " + self.current.media.id); - if (self.current) { - self.current.media.meta.object = self.current.media.meta.object || null; - self.current.media.meta.failed = true; - } - if (cb) { - cb(); - } - self.channel.activeLock.release(); - } else { - if (!self.current || self.current.media.type !== "gd") { - self.channel.activeLock.release(); - return abort(); - } - - self.current.media.meta = media.meta; - self.current.media.meta.expiration = Date.now() + 3600000; - self.channel.logger.log("[playlist] Auto-refreshed Google Doc video"); - cb && cb(); - self.channel.activeLock.release(); - } - }); -}; - PlaylistModule.prototype._playNext = function () { if (!this.current) { return; @@ -1335,10 +1274,11 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { pos: data.pos }; - var self = this; - self.channel.activeLock.lock(); + const self = this; + self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist"); db.getUserPlaylist(user.getName(), data.name, function (err, pl) { if (err) { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); return user.socket.emit("errorMsg", { msg: "Playlist load failed: " + err }); @@ -1369,7 +1309,6 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { var m = new Media(item.id, item.title, item.seconds, item.type, item.meta); self._addItem(m, qdata, user); }); - self.channel.activeLock.release(); } catch (e) { Logger.errlog.log("Loading user playlist failed!"); Logger.errlog.log("PL: " + user.getName() + "-" + data.name); @@ -1378,7 +1317,8 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { msg: "Internal error occurred when loading playlist.", link: null }); - self.channel.activeLock.release(); + } finally { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); } }); }; diff --git a/src/server.js b/src/server.js index fd79c9f1..290e9b29 100644 --- a/src/server.js +++ b/src/server.js @@ -208,7 +208,9 @@ Server.prototype.unloadChannel = function (chan) { // Empty all outward references from the channel var keys = Object.keys(chan); for (var i in keys) { - delete chan[keys[i]]; + if (keys[i] !== "refCounter") { + delete chan[keys[i]]; + } } chan.dead = true; };