From c4cc22dd05deda38d5f98810b011b77297e73d20 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sun, 10 Dec 2017 10:36:28 -0800 Subject: [PATCH] Add experimental feature to reduce database writes for channel data --- src/channel-storage/dbstore.js | 35 ++++++++++--- src/channel/channel.js | 57 ++++++++++++++------ src/channel/chat.js | 5 ++ src/channel/customization.js | 6 +++ src/channel/emotes.js | 16 ++++++ src/channel/filters.js | 14 +++++ src/channel/module.js | 2 + src/channel/opts.js | 4 ++ src/channel/permissions.js | 5 ++ src/channel/poll.js | 7 +++ src/main.js | 4 +- src/server.js | 96 +++++++++++++++++++++------------- 12 files changed, 190 insertions(+), 61 deletions(-) diff --git a/src/channel-storage/dbstore.js b/src/channel-storage/dbstore.js index 69f53673..c6deba6d 100644 --- a/src/channel-storage/dbstore.js +++ b/src/channel-storage/dbstore.js @@ -1,10 +1,19 @@ import Promise from 'bluebird'; import { ChannelStateSizeError } from '../errors'; import db from '../database'; +import { Counter } from 'prom-client'; const LOGGER = require('@calzoneman/jsli')('dbstore'); const SIZE_LIMIT = 1048576; const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?'; +const loadRowcount = new Counter({ + name: 'cytube_channel_db_load_rows_total', + help: 'Total rows loaded from the channel_data table' +}); +const saveRowcount = new Counter({ + name: 'cytube_channel_db_save_rows_total', + help: 'Total rows saved in the channel_data table' +}); function queryAsync(query, substitutions) { return new Promise((resolve, reject) => { @@ -39,6 +48,8 @@ export class DatabaseStore { } return queryAsync(QUERY_CHANNEL_DATA, [id]).then(rows => { + loadRowcount.inc(rows.length); + const data = {}; rows.forEach(row => { try { @@ -53,35 +64,47 @@ export class DatabaseStore { }); } - save(id, channelName, data) { + async save(id, channelName, data) { if (!id || id === 0) { - return Promise.reject(new Error(`Cannot save state for [${channelName}]: ` + - `id was passed as [${id}]`)); + throw new Error( + `Cannot save state for [${channelName}]: ` + + `id was passed as [${id}]` + ); } let totalSize = 0; let rowCount = 0; const substitutions = []; + for (const key in data) { if (typeof data[key] === 'undefined') { continue; } + rowCount++; + const value = JSON.stringify(data[key]); totalSize += value.length; + substitutions.push(id); substitutions.push(key); substitutions.push(value); } + if (rowCount === 0) { + return; + } + + saveRowcount.inc(rowCount); + if (totalSize > SIZE_LIMIT) { - return Promise.reject(new ChannelStateSizeError( + throw new ChannelStateSizeError( 'Channel state size is too large', { limit: SIZE_LIMIT, actual: totalSize - })); + }); } - return queryAsync(buildUpdateQuery(rowCount), substitutions); + return await queryAsync(buildUpdateQuery(rowCount), substitutions); } } diff --git a/src/channel/channel.js b/src/channel/channel.js index 4bae406b..b727fac8 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -10,6 +10,7 @@ import Promise from 'bluebird'; import { EventEmitter } from 'events'; import { throttle } from '../util/throttle'; import Logger from '../logger'; +import * as Switches from '../switches'; const LOGGER = require('@calzoneman/jsli')('channel'); @@ -239,37 +240,59 @@ Channel.prototype.loadState = function () { }); }; -Channel.prototype.saveState = function () { +Channel.prototype.saveState = async function () { if (!this.is(Flags.C_REGISTERED)) { - return Promise.resolve(); + return; } else if (!this.is(Flags.C_READY)) { - return Promise.reject(new Error(`Attempted to save channel ${this.name} ` + - `but it wasn't finished loading yet!`)); + throw new Error( + `Attempted to save channel ${this.name} ` + + `but it wasn't finished loading yet!` + ); } if (this.is(Flags.C_ERROR)) { - return Promise.reject(new Error(`Channel is in error state`)); + throw new Error(`Channel is in error state`); } this.logger.log("[init] Saving channel state to disk"); + const data = {}; Object.keys(this.modules).forEach(m => { - this.modules[m].save(data); + if ( + this.modules[m].dirty || + !Switches.isActive('dirtyCheck') || + !this.modules[m].supportsDirtyCheck + ) { + this.modules[m].save(data); + } else { + LOGGER.debug( + "Skipping save for %s[%s]: not dirty", + this.uniqueName, + m + ); + } }); - return ChannelStore.save(this.id, this.uniqueName, data) - .catch(ChannelStateSizeError, err => { - this.users.forEach(u => { - if (u.account.effectiveRank >= 2) { - u.socket.emit("warnLargeChandump", { - limit: err.limit, - actual: err.actual - }); - } + try { + await ChannelStore.save(this.id, this.uniqueName, data); + + Object.keys(this.modules).forEach(m => { + this.modules[m].dirty = false; }); + } catch (error) { + if (error instanceof ChannelStateSizeError) { + this.users.forEach(u => { + if (u.account.effectiveRank >= 2) { + u.socket.emit("warnLargeChandump", { + limit: error.limit, + actual: error.actual + }); + } + }); + } - throw err; - }); + throw error; + } }; Channel.prototype.checkModules = function (fn, args, cb) { diff --git a/src/channel/chat.js b/src/channel/chat.js index 6ccd3c4e..c5a376d5 100644 --- a/src/channel/chat.js +++ b/src/channel/chat.js @@ -36,6 +36,7 @@ function ChatModule(channel) { this.buffer = []; this.muted = new util.Set(); this.commandHandlers = {}; + this.supportsDirtyCheck = true; /* Default commands */ this.registerCommand("/me", this.handleCmdMe.bind(this)); @@ -69,6 +70,8 @@ ChatModule.prototype.load = function (data) { this.muted.add(data.chatmuted[i]); } } + + this.dirty = false; }; ChatModule.prototype.save = function (data) { @@ -441,6 +444,7 @@ ChatModule.prototype.sendModMessage = function (msg, minrank) { ChatModule.prototype.sendMessage = function (msgobj) { this.channel.broadcastAll("chatMsg", msgobj); + this.dirty = true; this.buffer.push(msgobj); if (this.buffer.length > 15) { this.buffer.shift(); @@ -492,6 +496,7 @@ ChatModule.prototype.handleCmdClear = function (user, msg, meta) { return; } + this.dirty = true; this.buffer = []; this.channel.broadcastAll("clearchat", { clearedBy: user.getName() }); this.sendModMessage(user.getName() + " cleared chat.", -1); diff --git a/src/channel/customization.js b/src/channel/customization.js index 5bbfdd97..897aea58 100644 --- a/src/channel/customization.js +++ b/src/channel/customization.js @@ -18,6 +18,7 @@ function CustomizationModule(channel) { this.css = ""; this.js = ""; this.motd = ""; + this.supportsDirtyCheck = true; } CustomizationModule.prototype = Object.create(ChannelModule.prototype); @@ -42,6 +43,8 @@ CustomizationModule.prototype.load = function (data) { this.motd = XSS.sanitizeHTML(data.motd); } } + + this.dirty = false; }; CustomizationModule.prototype.save = function (data) { @@ -86,6 +89,7 @@ CustomizationModule.prototype.handleSetCSS = function (user, data) { return; } + this.dirty = true; this.css = data.css.substring(0, 20000); this.sendCSSJS(this.channel.users); @@ -98,6 +102,7 @@ CustomizationModule.prototype.handleSetJS = function (user, data) { return; } + this.dirty = true; this.js = data.js.substring(0, 20000); this.sendCSSJS(this.channel.users); @@ -112,6 +117,7 @@ CustomizationModule.prototype.handleSetMotd = function (user, data) { var motd = data.motd.substring(0, 20000); + this.dirty = true; this.setMotd(motd); this.channel.logger.log("[mod] " + user.getName() + " updated the MOTD"); }; diff --git a/src/channel/emotes.js b/src/channel/emotes.js index e748286d..5384c857 100644 --- a/src/channel/emotes.js +++ b/src/channel/emotes.js @@ -119,6 +119,7 @@ function validateEmote(f) { function EmoteModule(channel) { ChannelModule.apply(this, arguments); this.emotes = new EmoteList(); + this.supportsDirtyCheck = true; } EmoteModule.prototype = Object.create(ChannelModule.prototype); @@ -129,6 +130,8 @@ EmoteModule.prototype.load = function (data) { this.emotes.updateEmote(data.emotes[i]); } } + + this.dirty = false; }; EmoteModule.prototype.save = function (data) { @@ -198,6 +201,8 @@ EmoteModule.prototype.handleRenameEmote = function (user, data) { var success = this.emotes.renameEmote(Object.assign({}, f)); if(!success){ return; } + this.dirty = true; + var chan = this.channel; chan.broadcastAll("renameEmote", f); chan.logger.log(`[mod] ${user.getName()} renamed emote: ${f.old} -> ${f.name}`); @@ -228,6 +233,9 @@ EmoteModule.prototype.handleUpdateEmote = function (user, data) { } this.emotes.updateEmote(f); + + this.dirty = true; + var chan = this.channel; chan.broadcastAll("updateEmote", f); @@ -249,6 +257,9 @@ EmoteModule.prototype.handleImportEmotes = function (user, data) { this.emotes.importList(data.map(validateEmote).filter(function (f) { return f !== false; })); + + this.dirty = true; + this.sendEmotes(this.channel.users); }; @@ -266,6 +277,9 @@ EmoteModule.prototype.handleRemoveEmote = function (user, data) { } this.emotes.removeEmote(data); + + this.dirty = true; + this.channel.logger.log("[mod] " + user.getName() + " removed emote: " + data.name); this.channel.broadcastAll("removeEmote", data); }; @@ -284,6 +298,8 @@ EmoteModule.prototype.handleMoveEmote = function (user, data) { } this.emotes.moveEmote(data.from, data.to); + + this.dirty = true; }; module.exports = EmoteModule; diff --git a/src/channel/filters.js b/src/channel/filters.js index d11176b6..082cbbf7 100644 --- a/src/channel/filters.js +++ b/src/channel/filters.js @@ -65,6 +65,7 @@ const DEFAULT_FILTERS = [ function ChatFilterModule(channel) { ChannelModule.apply(this, arguments); this.filters = new FilterList(); + this.supportsDirtyCheck = true; } ChatFilterModule.prototype = Object.create(ChannelModule.prototype); @@ -84,6 +85,8 @@ ChatFilterModule.prototype.load = function (data) { } else { this.filters = new FilterList(DEFAULT_FILTERS); } + + this.dirty = false; }; ChatFilterModule.prototype.save = function (data) { @@ -149,6 +152,8 @@ ChatFilterModule.prototype.handleAddFilter = function (user, data) { return; } + this.dirty = true; + user.socket.emit("addFilterSuccess"); var chan = this.channel; @@ -197,6 +202,8 @@ ChatFilterModule.prototype.handleUpdateFilter = function (user, data) { return; } + this.dirty = true; + var chan = this.channel; chan.users.forEach(function (u) { if (chan.modules.permissions.canEditFilters(u)) { @@ -232,6 +239,8 @@ ChatFilterModule.prototype.handleImportFilters = function (user, data) { return; } + this.dirty = true; + this.channel.logger.log("[mod] " + user.getName() + " imported the filter list"); this.sendChatFilters(this.channel.users); }; @@ -258,6 +267,9 @@ ChatFilterModule.prototype.handleRemoveFilter = function (user, data) { }); return; } + + this.dirty = true; + var chan = this.channel; chan.users.forEach(function (u) { if (chan.modules.permissions.canEditFilters(u)) { @@ -290,6 +302,8 @@ ChatFilterModule.prototype.handleMoveFilter = function (user, data) { }); return; } + + this.dirty = true; }; ChatFilterModule.prototype.handleRequestChatFilters = function (user) { diff --git a/src/channel/module.js b/src/channel/module.js index c8c19ec0..34a0dd7a 100644 --- a/src/channel/module.js +++ b/src/channel/module.js @@ -1,5 +1,7 @@ function ChannelModule(channel) { this.channel = channel; + this.dirty = false; + this.supportsDirtyCheck = false; } ChannelModule.prototype = { diff --git a/src/channel/opts.js b/src/channel/opts.js index 182e64bb..2ca4e570 100644 --- a/src/channel/opts.js +++ b/src/channel/opts.js @@ -34,6 +34,8 @@ function OptionsModule(channel) { new_user_chat_link_delay: 0, // Minimum account/IP age to post links playlist_max_duration_per_user: 0 // Maximum total playlist time per user }; + + this.supportsDirtyCheck = true; } OptionsModule.prototype = Object.create(ChannelModule.prototype); @@ -51,6 +53,7 @@ OptionsModule.prototype.load = function (data) { this.opts.chat_antiflood_params.burst); this.opts.chat_antiflood_params.sustained = Math.min(10, this.opts.chat_antiflood_params.sustained); + this.dirty = false; }; OptionsModule.prototype.save = function (data) { @@ -356,6 +359,7 @@ OptionsModule.prototype.handleSetOptions = function (user, data) { this.channel.logger.log("[mod] " + user.getName() + " updated channel options"); if (sendUpdate) { + this.dirty = true; this.sendOpts(this.channel.users); } }; diff --git a/src/channel/permissions.js b/src/channel/permissions.js index 9762c652..3254f5c8 100644 --- a/src/channel/permissions.js +++ b/src/channel/permissions.js @@ -50,6 +50,7 @@ function PermissionsModule(channel) { ChannelModule.apply(this, arguments); this.permissions = {}; this.openPlaylist = false; + this.supportsDirtyCheck = true; } PermissionsModule.prototype = Object.create(ChannelModule.prototype); @@ -70,6 +71,8 @@ PermissionsModule.prototype.load = function (data) { } else if ("playlistLock" in data) { this.openPlaylist = !data.playlistLock; } + + this.dirty = false; }; PermissionsModule.prototype.save = function (data) { @@ -124,6 +127,7 @@ PermissionsModule.prototype.handleTogglePlaylistLock = function (user) { return; } + this.dirty = true; this.openPlaylist = !this.openPlaylist; if (this.openPlaylist) { this.channel.logger.log("[playlist] " + user.getName() + " unlocked the playlist"); @@ -165,6 +169,7 @@ PermissionsModule.prototype.handleSetPermissions = function (user, perms) { } } + this.dirty = true; this.channel.logger.log("[mod] " + user.getName() + " updated permissions"); this.sendPermissions(this.channel.users); }; diff --git a/src/channel/poll.js b/src/channel/poll.js index 239f1a72..f7bc70ce 100644 --- a/src/channel/poll.js +++ b/src/channel/poll.js @@ -28,6 +28,7 @@ function PollModule(channel) { this.channel.modules.chat.registerCommand("poll", this.handlePollCmd.bind(this, false)); this.channel.modules.chat.registerCommand("hpoll", this.handlePollCmd.bind(this, true)); } + this.supportsDirtyCheck = true; } PollModule.prototype = Object.create(ChannelModule.prototype); @@ -49,6 +50,8 @@ PollModule.prototype.load = function (data) { this.poll.timestamp = data.poll.timestamp; } } + + this.dirty = false; }; PollModule.prototype.save = function (data) { @@ -197,6 +200,7 @@ PollModule.prototype.handleNewPoll = function (user, data, ack) { } this.poll = poll; + this.dirty = true; this.broadcastPoll(true); this.channel.logger.log("[poll] " + user.getName() + " opened poll: '" + poll.title + "'"); ack({}); @@ -209,6 +213,7 @@ PollModule.prototype.handleVote = function (user, data) { if (this.poll) { this.poll.vote(user.realip, data.option); + this.dirty = true; this.broadcastPoll(false); } }; @@ -231,6 +236,7 @@ PollModule.prototype.handleClosePoll = function (user) { this.channel.broadcastAll("closePoll"); this.channel.logger.log("[poll] " + user.getName() + " closed the active poll"); this.poll = null; + this.dirty = true; } }; @@ -255,6 +261,7 @@ PollModule.prototype.handlePollCmd = function (obscured, user, msg, meta) { var poll = new Poll(user.getName(), title, args, obscured); this.poll = poll; + this.dirty = true; this.broadcastPoll(true); this.channel.logger.log("[poll] " + user.getName() + " opened poll: '" + poll.title + "'"); }; diff --git a/src/main.js b/src/main.js index 8b73e2e5..6ecb60a0 100644 --- a/src/main.js +++ b/src/main.js @@ -1,5 +1,5 @@ import Config from './config'; -import Switches from './switches'; +import * as Switches from './switches'; import { isIP as validIP } from 'net'; import { eventlog } from './logger'; require('source-map-support').install(); @@ -66,6 +66,8 @@ function handleLine(line) { } }) } + } else if (line.indexOf('/save') === 0) { + sv.forceSave(); } else if (line.indexOf('/unloadchan') === 0) { const args = line.split(/\s+/); args.shift(); if (args.length) { diff --git a/src/server.js b/src/server.js index bb17648a..b885ce63 100644 --- a/src/server.js +++ b/src/server.js @@ -295,53 +295,59 @@ Server.prototype.unloadChannel = function (chan, options) { if (!options.skipSave) { chan.saveState().catch(error => { LOGGER.error(`Failed to save /${this.chanPath}/${chan.name} for unload: ${error.stack}`); - }); + }).then(finishUnloading); + } else { + finishUnloading(); } - chan.logger.log("[init] Channel shutting down"); - chan.logger.close(); + var self = this; - chan.notifyModules("unload", []); - Object.keys(chan.modules).forEach(function (k) { - chan.modules[k].dead = true; - /* - * Automatically clean up any timeouts/intervals assigned - * to properties of channel modules. Prevents a memory leak - * in case of forgetting to clear the timer on the "unload" - * module event. - */ - Object.keys(chan.modules[k]).forEach(function (prop) { - if (chan.modules[k][prop] && chan.modules[k][prop]._onTimeout) { - LOGGER.warn("Detected non-null timer when unloading " + - "module " + k + ": " + prop); - try { - clearTimeout(chan.modules[k][prop]); - clearInterval(chan.modules[k][prop]); - } catch (error) { - LOGGER.error(error.stack); + function finishUnloading() { + chan.logger.log("[init] Channel shutting down"); + chan.logger.close(); + + chan.notifyModules("unload", []); + Object.keys(chan.modules).forEach(function (k) { + chan.modules[k].dead = true; + /* + * Automatically clean up any timeouts/intervals assigned + * to properties of channel modules. Prevents a memory leak + * in case of forgetting to clear the timer on the "unload" + * module event. + */ + Object.keys(chan.modules[k]).forEach(function (prop) { + if (chan.modules[k][prop] && chan.modules[k][prop]._onTimeout) { + LOGGER.warn("Detected non-null timer when unloading " + + "module " + k + ": " + prop); + try { + clearTimeout(chan.modules[k][prop]); + clearInterval(chan.modules[k][prop]); + } catch (error) { + LOGGER.error(error.stack); + } } - } + }); }); - }); - for (var i = 0; i < this.channels.length; i++) { - if (this.channels[i].uniqueName === chan.uniqueName) { - this.channels.splice(i, 1); - i--; + for (var i = 0; i < self.channels.length; i++) { + if (self.channels[i].uniqueName === chan.uniqueName) { + self.channels.splice(i, 1); + i--; + } } - } - LOGGER.info("Unloaded channel " + chan.name); - chan.broadcastUsercount.cancel(); - // Empty all outward references from the channel - var keys = Object.keys(chan); - for (var i in keys) { - if (keys[i] !== "refCounter") { - delete chan[keys[i]]; + LOGGER.info("Unloaded channel " + chan.name); + chan.broadcastUsercount.cancel(); + // Empty all outward references from the channel + var keys = Object.keys(chan); + for (var i in keys) { + if (keys[i] !== "refCounter") { + delete chan[keys[i]]; + } } + chan.dead = true; + promActiveChannels.dec(); } - chan.dead = true; - promActiveChannels.dec(); }; Server.prototype.packChannelList = function (publicOnly, isAdmin) { @@ -380,6 +386,22 @@ Server.prototype.setAnnouncement = function (data) { } }; +Server.prototype.forceSave = function () { + Promise.map(this.channels, channel => { + try { + return channel.saveState().tap(() => { + LOGGER.info(`Saved /${this.chanPath}/${channel.name}`); + }).catch(err => { + LOGGER.error(`Failed to save /${this.chanPath}/${channel.name}: ${err.stack}`); + }); + } catch (error) { + LOGGER.error(`Failed to save channel: ${error.stack}`); + } + }, { concurrency: 5 }).then(() => { + LOGGER.info('Finished save'); + }); +}; + Server.prototype.shutdown = function () { LOGGER.info("Unloading channels"); Promise.map(this.channels, channel => {