From 00e9acbe4df552b3c56a066a203209e716158c3e Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sat, 9 Jan 2021 13:03:38 -0800 Subject: [PATCH] Revert "Remove channel reference counter" This reverts commit d678fa56d127b93cfa10ccc53af81a6f3c9fa830. The reference counter, flawed as it is, was masking far more issues than I realized. It would require a more significant rearchitecture of the code to remove it. Probably better to keep it and try to improve it for now. --- integration_test/channel/kickban.js | 39 +++++----- package.json | 2 +- src/channel/channel.js | 106 +++++++++++++++++++++++----- src/channel/kickban.js | 23 ++++-- src/channel/library.js | 3 + src/channel/mediarefresher.js | 9 ++- src/channel/playlist.js | 30 +++++--- src/server.js | 6 +- www/js/acp.js | 4 ++ 9 files changed, 158 insertions(+), 64 deletions(-) diff --git a/integration_test/channel/kickban.js b/integration_test/channel/kickban.js index c633adc6..f153bb93 100644 --- a/integration_test/channel/kickban.js +++ b/integration_test/channel/kickban.js @@ -16,6 +16,10 @@ describe('KickbanModule', () => { beforeEach(() => { mockChannel = { name: channelName, + refCounter: { + ref() { }, + unref() { } + }, logger: { log() { } }, @@ -65,22 +69,11 @@ describe('KickbanModule', () => { }); }); - function patch(fn, after) { - let existing = kickban[fn]; - kickban[fn] = async function () { - try { - await existing.apply(this, arguments) - } finally { - after(); - } - }; - } - describe('#handleCmdBan', () => { it('inserts a valid ban', done => { let kicked = false; - patch('banName', () => { + mockChannel.refCounter.unref = () => { assert(kicked, 'Expected user to be kicked'); database.getDB().runTransaction(async tx => { @@ -97,7 +90,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; mockChannel.users = [{ getLowerName() { @@ -254,7 +247,7 @@ describe('KickbanModule', () => { let firstUserKicked = false; let secondUserKicked = false; - patch('banAll', () => { + mockChannel.refCounter.unref = () => { assert(firstUserKicked, 'Expected banned user to be kicked'); assert( secondUserKicked, @@ -286,7 +279,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; mockChannel.users = [{ getLowerName() { @@ -320,7 +313,7 @@ describe('KickbanModule', () => { }); it('inserts a valid range ban', done => { - patch('banIP', () => { + mockChannel.refCounter.unref = () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -335,7 +328,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; kickban.handleCmdIPBan( mockUser, @@ -345,7 +338,7 @@ describe('KickbanModule', () => { }); it('inserts a valid wide-range ban', done => { - patch('banIP', () => { + mockChannel.refCounter.unref = () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -360,7 +353,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; kickban.handleCmdIPBan( mockUser, @@ -372,7 +365,7 @@ describe('KickbanModule', () => { it('inserts a valid IPv6 ban', done => { const longIP = require('../../lib/utilities').expandIPv6('::abcd'); - patch('banAll', () => { + mockChannel.refCounter.unref = () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -387,7 +380,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; database.getDB().runTransaction(async tx => { await tx.table('aliases') @@ -553,7 +546,7 @@ describe('KickbanModule', () => { }); it('still adds the IP ban even if the name is already banned', done => { - patch('banIP', () => { + mockChannel.refCounter.unref = () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -568,7 +561,7 @@ describe('KickbanModule', () => { done(); }); - }); + }; database.getDB().runTransaction(tx => { return tx.table('channel_bans') diff --git a/package.json b/package.json index d71d34e5..da7c4ff1 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.74.1", + "version": "3.74.2", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/channel/channel.js b/src/channel/channel.js index eb3fc8eb..f60b1d5a 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -14,6 +14,67 @@ const LOGGER = require('@calzoneman/jsli')('channel'); const USERCOUNT_THROTTLE = 10000; +class ReferenceCounter { + constructor(channel) { + this.channel = channel; + this.channelName = channel.name; + this.refCount = 0; + this.references = {}; + } + + ref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]++; + } else { + this.references[caller] = 1; + } + } + + this.refCount++; + } + + unref(caller) { + if (caller) { + if (this.references.hasOwnProperty(caller)) { + this.references[caller]--; + if (this.references[caller] === 0) { + delete this.references[caller]; + } + } else { + LOGGER.error("ReferenceCounter::unref() called by caller [" + + caller + "] but this caller had no active references! " + + `(channel: ${this.channelName})`); + return; + } + } + + this.refCount--; + this.checkRefCount(); + } + + checkRefCount() { + if (this.refCount === 0) { + if (Object.keys(this.references).length > 0) { + LOGGER.error("ReferenceCounter::refCount reached 0 but still had " + + "active references: " + + JSON.stringify(Object.keys(this.references)) + + ` (channel: ${this.channelName})`); + for (var caller in this.references) { + this.refCount += this.references[caller]; + } + } else if (this.channel.users && this.channel.users.length > 0) { + LOGGER.error("ReferenceCounter::refCount reached 0 but still had " + + this.channel.users.length + " active users" + + ` (channel: ${this.channelName})`); + this.refCount = this.channel.users.length; + } else { + this.channel.emit("empty"); + } + } + } +} + function Channel(name) { this.name = name; this.uniqueName = name.toLowerCase(); @@ -24,6 +85,7 @@ function Channel(name) { ) ); this.users = []; + this.refCounter = new ReferenceCounter(this); this.flags = 0; this.id = 0; this.ownerName = null; @@ -220,16 +282,17 @@ Channel.prototype.saveState = async function () { Channel.prototype.checkModules = function (fn, args, cb) { const self = this; + const refCaller = `Channel::checkModules/${fn}`; this.waitFlag(Flags.C_READY, function () { if (self.dead) return; + self.refCounter.ref(refCaller); var keys = Object.keys(self.modules); var next = function (err, result) { - if (self.dead) return; - if (result !== ChannelModule.PASSTHROUGH) { /* Either an error occured, or the module denied the user access */ cb(err, result); + self.refCounter.unref(refCaller); return; } @@ -237,6 +300,7 @@ Channel.prototype.checkModules = function (fn, args, cb) { if (m === undefined) { /* No more modules to check */ cb(null, ChannelModule.PASSTHROUGH); + self.refCounter.unref(refCaller); return; } @@ -275,32 +339,28 @@ Channel.prototype.notifyModules = function (fn, args) { Channel.prototype.joinUser = function (user, data) { const self = this; + self.refCounter.ref("Channel::user"); self.waitFlag(Flags.C_READY, function () { + /* User closed the connection before the channel finished loading */ if (user.socket.disconnected) { - return; - } - - if (self.dead) { - user.kick('Channel is not loaded'); + self.refCounter.unref("Channel::user"); return; } user.channel = self; user.waitFlag(Flags.U_LOGGED_IN, () => { if (self.dead) { - user.kick('Channel is not loaded'); + LOGGER.warn( + 'Got U_LOGGED_IN for %s after channel already unloaded', + user.getName() + ); return; } if (user.is(Flags.U_REGISTERED)) { db.channels.getRank(self.name, user.getName(), (error, rank) => { if (!error) { - if (self.dead) { - user.kick('Channel is not loaded'); - return; - } - user.setChannelRank(rank); user.setFlag(Flags.U_HAS_CHANNEL_RANK); if (user.inChannel()) { @@ -314,6 +374,13 @@ Channel.prototype.joinUser = function (user, data) { } }); + if (user.socket.disconnected) { + self.refCounter.unref("Channel::user"); + return; + } else if (self.dead) { + return; + } + self.checkModules("onUserPreJoin", [user, data], function (err, result) { if (result === ChannelModule.PASSTHROUGH) { user.channel = self; @@ -322,6 +389,7 @@ Channel.prototype.joinUser = function (user, data) { user.channel = null; user.account.channelRank = 0; user.account.effectiveRank = user.account.globalRank; + self.refCounter.unref("Channel::user"); } }); }); @@ -425,8 +493,8 @@ Channel.prototype.partUser = function (user) { }); this.broadcastUsercount(); + this.refCounter.unref("Channel::user"); user.die(); - if (this.users.length === 0) this.emit('empty'); }; Channel.prototype.maybeResendUserlist = function maybeResendUserlist(user, newRank, oldRank) { @@ -587,14 +655,13 @@ Channel.prototype.sendUserJoin = function (users, user) { Channel.prototype.readLog = function (cb) { const maxLen = 102400; const file = this.logger.filename; + this.refCounter.ref("Channel::readLog"); const self = this; fs.stat(file, function (err, data) { if (err) { + self.refCounter.unref("Channel::readLog"); return cb(err, null); } - if (self.dead) { - return cb(new Error('Channel unloaded'), null); - } const start = Math.max(data.size - maxLen, 0); const end = data.size - 1; @@ -610,6 +677,7 @@ Channel.prototype.readLog = function (cb) { }); read.on("end", function () { cb(null, buffer); + self.refCounter.unref("Channel::readLog"); }); }); }; @@ -676,6 +744,10 @@ Channel.prototype.packInfo = function (isAdmin) { } } + if (isAdmin) { + data.activeLockCount = this.refCounter.refCount; + } + var self = this; var keys = Object.keys(this.modules); keys.forEach(function (k) { diff --git a/src/channel/kickban.js b/src/channel/kickban.js index a325d20e..9c342bf3 100644 --- a/src/channel/kickban.js +++ b/src/channel/kickban.js @@ -77,11 +77,10 @@ KickBanModule.prototype.onUserPostJoin = function (user) { } const chan = this.channel; + const refCaller = "KickBanModule::onUserPostJoin"; user.waitFlag(Flags.U_LOGGED_IN, function () { + chan.refCounter.ref(refCaller); db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) { - if (chan.dead) { - return; - } if (!err && banned) { user.kick("You are banned from this channel."); if (chan.modules.chat) { @@ -89,6 +88,7 @@ KickBanModule.prototype.onUserPostJoin = function (user) { "name is banned)"); } } + chan.refCounter.unref(refCaller); }); }); @@ -226,9 +226,14 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, _meta) { var name = args.shift().toLowerCase(); var reason = args.join(" "); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdBan"); + this.banName(user, name, reason).catch(error => { const message = error.message || error; user.socket.emit("errorMsg", { msg: message }); + }).then(() => { + chan.refCounter.unref("KickBanModule::handleCmdBan"); }); }; @@ -252,9 +257,15 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, _meta) { } var reason = args.join(" "); + const chan = this.channel; + chan.refCounter.ref("KickBanModule::handleCmdIPBan"); + this.banAll(user, name, range, reason).catch(error => { + //console.log('!!!', error.stack); const message = error.message || error; user.socket.emit("errorMsg", { msg: message }); + }).then(() => { + chan.refCounter.unref("KickBanModule::handleCmdIPBan"); }); }; @@ -416,15 +427,14 @@ KickBanModule.prototype.handleUnban = function (user, data) { } var self = this; + this.channel.refCounter.ref("KickBanModule::handleUnban"); db.channels.unbanId(this.channel.name, data.id, function (err) { if (err) { + self.channel.refCounter.unref("KickBanModule::handleUnban"); return user.socket.emit("errorMsg", { msg: err }); } - if (self.channel.dead) { - return; - } self.sendUnban(self.channel.users, data); self.channel.logger.log("[mod] " + user.getName() + " unbanned " + data.name); @@ -435,6 +445,7 @@ KickBanModule.prototype.handleUnban = function (user, data) { banperm ); } + self.channel.refCounter.unref("KickBanModule::handleUnban"); }); }; diff --git a/src/channel/library.js b/src/channel/library.js index c0df5b45..31b34316 100644 --- a/src/channel/library.js +++ b/src/channel/library.js @@ -55,15 +55,18 @@ LibraryModule.prototype.handleUncache = function (user, data) { } const chan = this.channel; + chan.refCounter.ref("LibraryModule::handleUncache"); db.channels.deleteFromLibrary(chan.name, data.id, function (err, _res) { if (chan.dead) { return; } else if (err) { + chan.refCounter.unref("LibraryModule::handleUncache"); return; } chan.logger.log("[library] " + user.getName() + " deleted " + data.id + "from the library"); + chan.refCounter.unref("LibraryModule::handleUncache"); }); }; diff --git a/src/channel/mediarefresher.js b/src/channel/mediarefresher.js index 4f9c4dbe..f8c53f57 100644 --- a/src/channel/mediarefresher.js +++ b/src/channel/mediarefresher.js @@ -47,8 +47,9 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { } const self = this; + self.channel.refCounter.ref("MediaRefresherModule::initVimeo"); Vimeo.extract(data.id).then(function (direct) { - if (self.channel.dead) { + if (self.dead || self.channel.dead) { self.unload(); return; } @@ -62,11 +63,9 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { if (cb) cb(); }).catch(function (err) { LOGGER.error("Unexpected vimeo::extract() fail: " + err.stack); - if (self.channel.dead) { - self.unload(); - return; - } if (cb) cb(); + }).finally(() => { + self.channel.refCounter.unref("MediaRefresherModule::initVimeo"); }); }; diff --git a/src/channel/playlist.js b/src/channel/playlist.js index 1df38219..786c439f 100644 --- a/src/channel/playlist.js +++ b/src/channel/playlist.js @@ -511,19 +511,19 @@ PlaylistModule.prototype.queueStandard = function (user, data) { }; const self = this; + this.channel.refCounter.ref("PlaylistModule::queueStandard"); counters.add("playlist:queue:count", 1); this.semaphore.queue(function (lock) { InfoGetter.getMedia(data.id, data.type, function (err, media) { if (err) { error(XSS.sanitizeText(String(err))); - return lock.release(); - } - if (self.channel.dead) { + self.channel.refCounter.unref("PlaylistModule::queueStandard"); return lock.release(); } self._addItem(media, data, user, function () { lock.release(); + self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); }); }); @@ -546,7 +546,7 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { return lock.release(); } - if (self.channel.dead) { + if (self.dead) { return lock.release(); } @@ -562,6 +562,8 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { } } + self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist"); + if (self.channel.modules.library && data.shouldAddToLibrary) { self.channel.modules.library.cacheMediaList(vids); data.shouldAddToLibrary = false; @@ -572,6 +574,8 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { self._addItem(media, data, user); }); + self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist"); + lock.release(); }); }); @@ -589,6 +593,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } var plitem = this.items.find(data); + self.channel.refCounter.ref("PlaylistModule::handleDelete"); this.semaphore.queue(function (lock) { if (self._delete(data)) { self.channel.logger.log("[playlist] " + user.getName() + " deleted " + @@ -596,6 +601,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } lock.release(); + self.channel.refCounter.unref("PlaylistModule::handleDelete"); }); }; @@ -631,24 +637,26 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { } const self = this; + self.channel.refCounter.ref("PlaylistModule::handleMoveMedia"); self.semaphore.queue(function (lock) { - if (self.channel.dead) { - return lock.release(); - } if (!self.items.remove(data.from)) { + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } if (data.after === "prepend") { if (!self.items.prepend(from)) { + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else if (data.after === "append") { if (!self.items.append(from)) { + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else { if (!self.items.insertAfter(from, data.after)) { + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } @@ -660,6 +668,7 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { (after ? " after " + after.media.title : "")); self._listDirty = true; lock.release(); + self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); }); }; @@ -1351,15 +1360,14 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { }; const self = this; + self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist"); db.getUserPlaylist(user.getName(), data.name, function (err, pl) { if (err) { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); return user.socket.emit("errorMsg", { msg: "Playlist load failed: " + err }); } - if (self.channel.dead) { - return; - } try { if (data.pos === "next") { @@ -1394,6 +1402,8 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { msg: "Internal error occurred when loading playlist.", link: null }); + } finally { + self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); } }); }; diff --git a/src/server.js b/src/server.js index ec487854..b6be7c70 100644 --- a/src/server.js +++ b/src/server.js @@ -357,9 +357,11 @@ Server.prototype.unloadChannel = function (chan, options) { LOGGER.info("Unloaded channel " + chan.name); chan.broadcastUsercount.cancel(); - // Empty all outward references from the channel | TODO does this actually help? + // Empty all outward references from the channel Object.keys(chan).forEach(key => { - delete chan[key]; + if (key !== "refCounter") { + delete chan[key]; + } }); chan.dead = true; promActiveChannels.dec(); diff --git a/www/js/acp.js b/www/js/acp.js index 1613ab21..55de7ae6 100644 --- a/www/js/acp.js +++ b/www/js/acp.js @@ -421,6 +421,10 @@ function showChannelDetailModal(c) { $("").text("Public").appendTo(tr); $("").text(c.public).appendTo(tr); + tr = $("").appendTo(table); + $("").text("ActiveLock Count").appendTo(tr); + $("").text(c.activeLockCount).appendTo(tr); + tr = $("").appendTo(table); $("").text("Chat Filter Count").appendTo(tr); $("").text(c.chatFilterCount).appendTo(tr);