mirror of https://github.com/calzoneman/sync.git
Start refactoring channel storage
This commit is contained in:
parent
f95f3dc89b
commit
5ec9c2b029
|
@ -0,0 +1,11 @@
|
||||||
|
import { FileStore } from './filestore';
|
||||||
|
|
||||||
|
var CHANNEL_STORE = new FileStore();
|
||||||
|
|
||||||
|
export function load(channelName) {
|
||||||
|
return CHANNEL_STORE.load(channelName);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function save(channelName, data) {
|
||||||
|
return CHANNEL_STORE.save(channelName, data);
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
import * as Promise from 'bluebird';
|
||||||
|
import { stat } from 'fs';
|
||||||
|
import * as fs from 'graceful-fs';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
|
const readFileAsync = Promise.promisify(fs.readFile);
|
||||||
|
const writeFileAsync = Promise.promisify(fs.writeFile);
|
||||||
|
const statAsync = Promise.promisify(stat);
|
||||||
|
const SIZE_LIMIT = 1048576;
|
||||||
|
const CHANDUMP_DIR = path.resolve(__dirname, '..', '..', 'chandump');
|
||||||
|
|
||||||
|
export class FileStore {
|
||||||
|
filenameForChannel(channelName) {
|
||||||
|
return path.join(CHANDUMP_DIR, channelName);
|
||||||
|
}
|
||||||
|
|
||||||
|
load(channelName) {
|
||||||
|
const filename = this.filenameForChannel(channelName);
|
||||||
|
return statAsync(filename).then(stats => {
|
||||||
|
if (stats.size > SIZE_LIMIT) {
|
||||||
|
throw new Error('Channel state file is too large: ' + stats.size);
|
||||||
|
} else {
|
||||||
|
return readFileAsync(filename);
|
||||||
|
}
|
||||||
|
}).then(fileContents => {
|
||||||
|
try {
|
||||||
|
return JSON.parse(fileContents);
|
||||||
|
} catch (e) {
|
||||||
|
throw new Error('Channel state file is not valid JSON: ' + e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
save(channelName, data) {
|
||||||
|
const filename = this.filenameForChannel(channelName);
|
||||||
|
const fileContents = new Buffer(JSON.stringify(data), 'utf8');
|
||||||
|
if (fileContents.length > SIZE_LIMIT) {
|
||||||
|
let error = new Error('Channel state size is too large');
|
||||||
|
error.limit = SIZE_LIMIT;
|
||||||
|
error.size = fileContents.length;
|
||||||
|
return Promise.reject(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeFileAsync(filename, fileContents);
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,6 +8,8 @@ var fs = require("graceful-fs");
|
||||||
var path = require("path");
|
var path = require("path");
|
||||||
var sio = require("socket.io");
|
var sio = require("socket.io");
|
||||||
var db = require("../database");
|
var db = require("../database");
|
||||||
|
var ChannelStore = require("../channel-storage/channelstore");
|
||||||
|
var Promise = require("bluebird");
|
||||||
|
|
||||||
const SIZE_LIMIT = 1048576;
|
const SIZE_LIMIT = 1048576;
|
||||||
|
|
||||||
|
@ -150,17 +152,15 @@ Channel.prototype.getDiskSize = function (cb) {
|
||||||
};
|
};
|
||||||
|
|
||||||
Channel.prototype.loadState = function () {
|
Channel.prototype.loadState = function () {
|
||||||
var self = this;
|
|
||||||
var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName);
|
|
||||||
|
|
||||||
/* Don't load from disk if not registered */
|
/* Don't load from disk if not registered */
|
||||||
if (!self.is(Flags.C_REGISTERED)) {
|
if (!this.is(Flags.C_REGISTERED)) {
|
||||||
self.modules.permissions.loadUnregistered();
|
this.modules.permissions.loadUnregistered();
|
||||||
self.setFlag(Flags.C_READY);
|
this.setFlag(Flags.C_READY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var errorLoad = function (msg) {
|
const self = this;
|
||||||
|
function errorLoad(msg) {
|
||||||
if (self.modules.customization) {
|
if (self.modules.customization) {
|
||||||
self.modules.customization.load({
|
self.modules.customization.load({
|
||||||
motd: msg
|
motd: msg
|
||||||
|
@ -168,99 +168,73 @@ Channel.prototype.loadState = function () {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.setFlag(Flags.C_READY | Flags.C_ERROR);
|
self.setFlag(Flags.C_READY | Flags.C_ERROR);
|
||||||
};
|
}
|
||||||
|
|
||||||
fs.stat(file, function (err, stats) {
|
ChannelStore.load(this.uniqueName).then(data => {
|
||||||
if (!err) {
|
Object.keys(this.modules).forEach(m => {
|
||||||
var mb = stats.size / 1048576;
|
try {
|
||||||
mb = Math.floor(mb * 100) / 100;
|
this.modules[m].load(data);
|
||||||
if (mb > SIZE_LIMIT / 1048576) {
|
} catch (e) {
|
||||||
Logger.errlog.log("Large chandump detected: " + self.uniqueName +
|
Logger.errlog.log("Failed to load module " + m + " for channel " +
|
||||||
" (" + mb + " MiB)");
|
this.uniqueName);
|
||||||
var msg = "This channel's state size has exceeded the memory limit " +
|
}
|
||||||
|
});
|
||||||
|
this.setFlag(Flags.C_READY);
|
||||||
|
}).catch(err => {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
Object.keys(this.modules).forEach(m => {
|
||||||
|
this.modules[m].load({});
|
||||||
|
});
|
||||||
|
this.setFlag(Flags.C_READY);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let message;
|
||||||
|
if (/Channel state file is too large/.test(err.message)) {
|
||||||
|
message = "This channel's state size has exceeded the memory limit " +
|
||||||
"enforced by this server. Please contact an administrator " +
|
"enforced by this server. Please contact an administrator " +
|
||||||
"for assistance.";
|
"for assistance.";
|
||||||
errorLoad(msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continueLoad();
|
|
||||||
});
|
|
||||||
|
|
||||||
var continueLoad = function () {
|
|
||||||
fs.readFile(file, function (err, data) {
|
|
||||||
if (err) {
|
|
||||||
/* ENOENT means the file didn't exist. This is normal for new channels */
|
|
||||||
if (err.code === "ENOENT") {
|
|
||||||
self.setFlag(Flags.C_READY);
|
|
||||||
Object.keys(self.modules).forEach(function (m) {
|
|
||||||
self.modules[m].load({});
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
Logger.errlog.log("Failed to open channel dump " + self.uniqueName);
|
message = "An error occurred when loading this channel's data from " +
|
||||||
Logger.errlog.log(err);
|
"disk. Please contact an administrator for assistance. " +
|
||||||
errorLoad("Unknown error occurred when loading channel state. " +
|
`The error was: ${err}`;
|
||||||
"Contact an administrator for assistance.");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.logger.log("[init] Loading channel state from disk");
|
Logger.errlog.log(err.stack);
|
||||||
try {
|
errorLoad(message);
|
||||||
data = JSON.parse(data);
|
|
||||||
Object.keys(self.modules).forEach(function (m) {
|
|
||||||
self.modules[m].load(data);
|
|
||||||
});
|
});
|
||||||
self.setFlag(Flags.C_READY);
|
|
||||||
} catch (e) {
|
|
||||||
Logger.errlog.log("Channel dump for " + self.uniqueName + " is not " +
|
|
||||||
"valid");
|
|
||||||
Logger.errlog.log(e);
|
|
||||||
errorLoad("Unknown error occurred when loading channel state. Contact " +
|
|
||||||
"an administrator for assistance.");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Channel.prototype.saveState = function () {
|
Channel.prototype.saveState = function () {
|
||||||
var self = this;
|
if (!this.is(Flags.C_REGISTERED)) {
|
||||||
var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName);
|
return Promise.resolve();
|
||||||
|
|
||||||
/**
|
|
||||||
* Don't overwrite saved state data if the current state is dirty,
|
|
||||||
* or if this channel is unregistered
|
|
||||||
*/
|
|
||||||
if (self.is(Flags.C_ERROR) || !self.is(Flags.C_REGISTERED)) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.logger.log("[init] Saving channel state to disk");
|
if (this.is(Flags.C_ERROR)) {
|
||||||
var data = {};
|
return Promise.reject(new Error(`Channel is in error state`));
|
||||||
Object.keys(this.modules).forEach(function (m) {
|
}
|
||||||
self.modules[m].save(data);
|
|
||||||
|
this.logger.log("[init] Saving channel state to disk");
|
||||||
|
const data = {};
|
||||||
|
Object.keys(this.modules).forEach(m => {
|
||||||
|
this.modules[m].save(data);
|
||||||
});
|
});
|
||||||
|
|
||||||
var json = JSON.stringify(data);
|
return ChannelStore.save(this.uniqueName, data).catch(err => {
|
||||||
/**
|
if (/Channel state size is too large/.test(err.message)) {
|
||||||
* Synchronous on purpose.
|
this.users.forEach(u => {
|
||||||
* When the server is shutting down, saveState() is called on all channels and
|
|
||||||
* then the process terminates. Async writeFile causes a race condition that wipes
|
|
||||||
* channels.
|
|
||||||
*/
|
|
||||||
var err = fs.writeFileSync(file, json);
|
|
||||||
|
|
||||||
// Check for large chandump and warn moderators/admins
|
|
||||||
self.getDiskSize(function (err, size) {
|
|
||||||
if (!err && size > SIZE_LIMIT && self.users) {
|
|
||||||
self.users.forEach(function (u) {
|
|
||||||
if (u.account.effectiveRank >= 2) {
|
if (u.account.effectiveRank >= 2) {
|
||||||
u.socket.emit("warnLargeChandump", {
|
u.socket.emit("warnLargeChandump", {
|
||||||
limit: SIZE_LIMIT,
|
limit: err.limit,
|
||||||
actual: size
|
actual: err.size
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Logger.errlog.log(`Not saving ${this.uniqueName} because it exceeds ` +
|
||||||
|
"the size limit");
|
||||||
|
} else {
|
||||||
|
Logger.errlog.log(`Failed to save ${this.uniqueName}: ${err.stack}`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
const VERSION = require("../package.json").version;
|
const VERSION = require("../package.json").version;
|
||||||
var singleton = null;
|
var singleton = null;
|
||||||
var Config = require("./config");
|
var Config = require("./config");
|
||||||
|
var Promise = require("bluebird");
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
init: function () {
|
init: function () {
|
||||||
|
@ -226,13 +227,15 @@ Server.prototype.announce = function (data) {
|
||||||
|
|
||||||
Server.prototype.shutdown = function () {
|
Server.prototype.shutdown = function () {
|
||||||
Logger.syslog.log("Unloading channels");
|
Logger.syslog.log("Unloading channels");
|
||||||
for (var i = 0; i < this.channels.length; i++) {
|
Promise.map(this.channels, channel => {
|
||||||
if (this.channels[i].is(Flags.C_REGISTERED)) {
|
return channel.saveState().tap(() => {
|
||||||
Logger.syslog.log("Saving /r/" + this.channels[i].name);
|
Logger.syslog.log(`Saved /r/${channel.name}`);
|
||||||
this.channels[i].saveState();
|
}).catch(err => {
|
||||||
}
|
Logger.errlog.log(`Failed to save /r/${channel.name}: ${err.stack}`);
|
||||||
}
|
});
|
||||||
|
}).then(() => {
|
||||||
Logger.syslog.log("Goodbye");
|
Logger.syslog.log("Goodbye");
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1072,8 +1072,9 @@ Callbacks = {
|
||||||
errDialog("This channel currently exceeds the maximum size of " +
|
errDialog("This channel currently exceeds the maximum size of " +
|
||||||
toHumanReadable(data.limit) + " (channel size is " +
|
toHumanReadable(data.limit) + " (channel size is " +
|
||||||
toHumanReadable(data.actual) + "). Please reduce the size by removing " +
|
toHumanReadable(data.actual) + "). Please reduce the size by removing " +
|
||||||
"unneeded playlist items, filters, and/or emotes or else the channel will " +
|
"unneeded playlist items, filters, and/or emotes. Changes to the channel " +
|
||||||
"be unable to load the next time it is reloaded").attr("id", "chandumptoobig");
|
"will not be saved until the size is reduced to under the limit.")
|
||||||
|
.attr("id", "chandumptoobig");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue