sync/src/server.js

552 lines
17 KiB
JavaScript
Raw Normal View History

2015-07-06 18:16:02 +00:00
const VERSION = require("../package.json").version;
2013-10-11 21:31:40 +00:00
var singleton = null;
2014-01-22 23:11:26 +00:00
var Config = require("./config");
2015-09-21 06:17:06 +00:00
var Promise = require("bluebird");
2015-10-02 05:02:59 +00:00
import * as ChannelStore from './channel-storage/channelstore';
2016-07-04 04:28:43 +00:00
import { EventEmitter } from 'events';
2017-04-05 06:02:31 +00:00
2017-07-09 03:11:54 +00:00
const LOGGER = require('@calzoneman/jsli')('server');
2013-10-11 21:31:40 +00:00
module.exports = {
2014-01-22 23:11:26 +00:00
init: function () {
2017-04-05 06:02:31 +00:00
LOGGER.info("Starting CyTube v%s", VERSION);
2013-10-11 21:31:40 +00:00
var chanlogpath = path.join(__dirname, "../chanlogs");
fs.exists(chanlogpath, function (exists) {
2016-11-18 07:01:20 +00:00
exists || fs.mkdirSync(chanlogpath);
2013-10-11 21:31:40 +00:00
});
var chandumppath = path.join(__dirname, "../chandump");
fs.exists(chandumppath, function (exists) {
2016-11-18 07:01:20 +00:00
exists || fs.mkdirSync(chandumppath);
2013-10-11 21:31:40 +00:00
});
var gdvttpath = path.join(__dirname, "../google-drive-subtitles");
fs.exists(gdvttpath, function (exists) {
2016-11-18 07:01:20 +00:00
exists || fs.mkdirSync(gdvttpath);
});
2014-01-22 23:11:26 +00:00
singleton = new Server();
2013-10-11 21:31:40 +00:00
return singleton;
},
getServer: function () {
return singleton;
}
};
2013-07-19 20:27:21 +00:00
var path = require("path");
2013-07-28 23:47:55 +00:00
var fs = require("fs");
2013-09-09 22:16:41 +00:00
var https = require("https");
2013-04-17 18:42:29 +00:00
var express = require("express");
2014-05-21 02:30:14 +00:00
var Channel = require("./channel/channel");
2014-02-01 18:41:06 +00:00
var db = require("./database");
2014-05-21 02:30:14 +00:00
var Flags = require("./flags");
var sio = require("socket.io");
2015-10-27 05:56:53 +00:00
import LocalChannelIndex from './web/localchannelindex';
2016-06-10 06:42:30 +00:00
import { PartitionChannelIndex } from './partition/partitionchannelindex';
2015-10-27 05:56:53 +00:00
import IOConfiguration from './configuration/ioconfig';
import WebConfiguration from './configuration/webconfig';
2015-11-02 01:42:20 +00:00
import session from './session';
2016-02-05 05:43:20 +00:00
import { LegacyModule } from './legacymodule';
2016-06-07 04:54:49 +00:00
import { PartitionModule } from './partition/partitionmodule';
import { Gauge } from 'prom-client';
import { AccountDB } from './db/account';
import { ChannelDB } from './db/channel';
2017-09-06 05:47:29 +00:00
import { AccountController } from './controller/account';
import { EmailController } from './controller/email';
2014-01-22 23:11:26 +00:00
var Server = function () {
2013-10-09 23:10:26 +00:00
var self = this;
self.channels = [],
self.express = null;
self.db = null;
self.api = null;
self.announcement = null;
self.infogetter = null;
self.servers = {};
self.chanPath = Config.get('channel-path');
2013-10-09 23:10:26 +00:00
2016-02-05 05:43:20 +00:00
var initModule;
if (Config.get('enable-partition')) {
2016-06-09 05:54:16 +00:00
initModule = this.initModule = new PartitionModule();
self.partitionDecider = initModule.getPartitionDecider();
2016-02-05 05:43:20 +00:00
} else {
2016-06-09 05:54:16 +00:00
initModule = this.initModule = new LegacyModule();
2016-02-05 05:43:20 +00:00
}
2017-08-16 01:23:03 +00:00
const globalMessageBus = this.initModule.getGlobalMessageBus();
globalMessageBus.on('UserProfileChanged', this.handleUserProfileChange.bind(this));
globalMessageBus.on('ChannelDeleted', this.handleChannelDelete.bind(this));
globalMessageBus.on('ChannelRegistered', this.handleChannelRegister.bind(this));
2017-08-16 01:23:03 +00:00
2013-10-09 23:10:26 +00:00
// database init ------------------------------------------------------
var Database = require("./database");
2013-12-12 22:28:30 +00:00
self.db = Database;
2014-01-22 23:11:26 +00:00
self.db.init();
2015-10-02 05:02:59 +00:00
ChannelStore.init();
2013-10-11 23:15:44 +00:00
const accountDB = new AccountDB(db.getDB());
const channelDB = new ChannelDB(db.getDB());
// controllers
2017-09-06 05:47:29 +00:00
const accountController = new AccountController(accountDB, globalMessageBus);
let emailTransport;
if (Config.getEmailConfig().getPasswordReset().isEnabled()) {
const smtpConfig = Config.getEmailConfig().getSmtp();
emailTransport = require("nodemailer").createTransport({
host: smtpConfig.getHost(),
port: smtpConfig.getPort(),
secure: smtpConfig.isSecure(),
auth: {
user: smtpConfig.getUser(),
pass: smtpConfig.getPassword()
}
});
} else {
emailTransport = {
sendMail() {
2018-04-07 22:30:30 +00:00
throw new Error('Email is not enabled on this server');
}
};
}
const emailController = new EmailController(
emailTransport,
Config.getEmailConfig()
);
2013-10-11 23:15:44 +00:00
// webserver init -----------------------------------------------------
2015-10-27 05:56:53 +00:00
const ioConfig = IOConfiguration.fromOldConfig(Config);
const webConfig = WebConfiguration.fromOldConfig(Config);
2016-02-05 05:43:20 +00:00
const clusterClient = initModule.getClusterClient();
2016-06-10 06:42:30 +00:00
var channelIndex;
if (Config.get("enable-partition")) {
channelIndex = new PartitionChannelIndex(
2018-08-18 19:27:24 +00:00
initModule.getRedisClientProvider().get(),
initModule.getRedisClientProvider().get(),
initModule.partitionConfig.getChannelIndexChannel()
2016-06-10 06:42:30 +00:00
);
} else {
channelIndex = new LocalChannelIndex();
}
2013-10-09 23:10:26 +00:00
self.express = express();
require("./web/webserver").init(
self.express,
webConfig,
2015-10-27 05:56:53 +00:00
ioConfig,
clusterClient,
2015-11-02 01:42:20 +00:00
channelIndex,
2017-08-16 01:23:03 +00:00
session,
globalMessageBus,
2017-09-06 05:47:29 +00:00
accountController,
channelDB,
Config.getEmailConfig(),
emailController
);
2013-10-09 23:10:26 +00:00
// http/https/sio server init -----------------------------------------
var key = "", cert = "", ca = undefined;
if (Config.get("https.enabled")) {
const certData = self.loadCertificateData();
key = certData.key;
cert = certData.cert;
ca = certData.ca;
2013-10-09 23:10:26 +00:00
}
var opts = {
key: key,
cert: cert,
passphrase: Config.get("https.passphrase"),
ca: ca,
2015-03-06 22:29:21 +00:00
ciphers: Config.get("https.ciphers"),
2015-03-06 21:59:34 +00:00
honorCipherOrder: true
};
Config.get("listen").forEach(function (bind) {
var id = bind.ip + ":" + bind.port;
if (id in self.servers) {
2017-04-05 06:02:31 +00:00
LOGGER.warn("Ignoring duplicate listen address %s", id);
return;
}
if (bind.https && Config.get("https.enabled")) {
self.servers[id] = https.createServer(opts, self.express)
.listen(bind.port, bind.ip);
2015-01-06 15:54:14 +00:00
self.servers[id].on("clientError", function (err, socket) {
try {
socket.destroy();
} catch (e) {
2018-04-07 22:30:30 +00:00
// Ignore
2015-01-06 15:54:14 +00:00
}
});
} else if (bind.http) {
self.servers[id] = self.express.listen(bind.port, bind.ip);
2015-01-06 15:54:14 +00:00
self.servers[id].on("clientError", function (err, socket) {
try {
socket.destroy();
} catch (e) {
2018-04-07 22:30:30 +00:00
// Ignore
2015-01-06 15:54:14 +00:00
}
});
}
});
2014-01-23 03:12:43 +00:00
require("./io/ioserver").init(self, webConfig);
2013-10-09 23:10:26 +00:00
// background tasks init ----------------------------------------------
require("./bgtask")(self);
// prometheus server
const prometheusConfig = Config.getPrometheusConfig();
if (prometheusConfig.isEnabled()) {
require("./prometheus-server").init(prometheusConfig);
}
// setuid
require("./setuid");
2016-02-05 05:43:20 +00:00
initModule.onReady();
2013-10-09 23:10:26 +00:00
};
2016-07-04 04:28:43 +00:00
Server.prototype = Object.create(EventEmitter.prototype);
Server.prototype.loadCertificateData = function loadCertificateData() {
const data = {
key: fs.readFileSync(path.resolve(__dirname, "..",
Config.get("https.keyfile"))),
cert: fs.readFileSync(path.resolve(__dirname, "..",
Config.get("https.certfile")))
};
if (Config.get("https.cafile")) {
data.ca = fs.readFileSync(path.resolve(__dirname, "..",
Config.get("https.cafile")));
}
return data;
};
Server.prototype.reloadCertificateData = function reloadCertificateData() {
const certData = this.loadCertificateData();
Object.keys(this.servers).forEach(key => {
const server = this.servers[key];
// TODO: Replace with actual node API
// once https://github.com/nodejs/node/issues/4464 is implemented.
if (server._sharedCreds) {
try {
server._sharedCreds.context.setCert(certData.cert);
server._sharedCreds.context.setKey(certData.key, Config.get("https.passphrase"));
LOGGER.info('Reloaded certificate data for %s', key);
} catch (error) {
LOGGER.error('Failed to reload certificate data for %s: %s', key, error.stack);
}
}
});
};
2013-10-09 23:10:26 +00:00
Server.prototype.isChannelLoaded = function (name) {
name = name.toLowerCase();
for (var i = 0; i < this.channels.length; i++) {
if (this.channels[i].uniqueName == name)
2013-10-09 23:10:26 +00:00
return true;
}
return false;
};
2013-09-09 22:16:41 +00:00
const promActiveChannels = new Gauge({
name: 'cytube_channels_num_active',
help: 'Number of channels currently active'
});
2013-10-09 23:10:26 +00:00
Server.prototype.getChannel = function (name) {
var cname = name.toLowerCase();
if (this.partitionDecider &&
!this.partitionDecider.isChannelOnThisPartition(cname)) {
const error = new Error(`Channel '${cname}' is mapped to a different partition`);
error.code = 'EWRONGPART';
throw error;
}
var self = this;
2014-01-23 21:53:53 +00:00
for (var i = 0; i < self.channels.length; i++) {
if (self.channels[i].uniqueName === cname)
return self.channels[i];
2013-10-09 23:10:26 +00:00
}
2013-09-09 22:16:41 +00:00
2013-10-11 21:31:40 +00:00
var c = new Channel(name);
promActiveChannels.inc();
2014-01-23 21:53:53 +00:00
c.on("empty", function () {
self.unloadChannel(c);
});
2017-03-02 04:46:01 +00:00
c.waitFlag(Flags.C_ERROR, () => {
self.unloadChannel(c, { skipSave: true });
});
2014-01-23 21:53:53 +00:00
self.channels.push(c);
2013-10-09 23:10:26 +00:00
return c;
};
2013-09-09 22:16:41 +00:00
Server.prototype.unloadChannel = function (chan, options) {
var self = this;
2017-12-16 03:10:32 +00:00
if (chan.dead || chan.dying) {
return;
}
2017-12-16 03:10:32 +00:00
chan.dying = true;
if (!options) {
options = {};
}
if (!options.skipSave) {
chan.saveState().catch(error => {
LOGGER.error(`Failed to save /${this.chanPath}/${chan.name} for unload: ${error.stack}`);
}).then(finishUnloading);
} else {
finishUnloading();
}
2013-09-09 22:16:41 +00:00
function finishUnloading() {
chan.logger.log("[init] Channel shutting down");
chan.logger.close();
chan.notifyModules("unload", []);
Object.keys(chan.modules).forEach(function (k) {
chan.modules[k].dead = true;
/*
* Automatically clean up any timeouts/intervals assigned
* to properties of channel modules. Prevents a memory leak
* in case of forgetting to clear the timer on the "unload"
* module event.
*/
Object.keys(chan.modules[k]).forEach(function (prop) {
if (chan.modules[k][prop] && chan.modules[k][prop]._onTimeout) {
LOGGER.warn("Detected non-null timer when unloading " +
"module " + k + ": " + prop);
try {
clearTimeout(chan.modules[k][prop]);
clearInterval(chan.modules[k][prop]);
} catch (error) {
LOGGER.error(error.stack);
}
}
});
});
2014-05-21 02:30:14 +00:00
for (var i = 0; i < self.channels.length; i++) {
if (self.channels[i].uniqueName === chan.uniqueName) {
self.channels.splice(i, 1);
i--;
}
2013-10-09 23:10:26 +00:00
}
LOGGER.info("Unloaded channel " + chan.name);
chan.broadcastUsercount.cancel();
// Empty all outward references from the channel
2018-04-07 22:30:30 +00:00
Object.keys(chan).forEach(key => {
if (key !== "refCounter") {
delete chan[key];
}
2018-04-07 22:30:30 +00:00
});
chan.dead = true;
promActiveChannels.dec();
2013-10-09 23:10:26 +00:00
}
};
2014-05-24 06:09:36 +00:00
Server.prototype.packChannelList = function (publicOnly, isAdmin) {
2014-01-20 23:52:36 +00:00
var channels = this.channels.filter(function (c) {
if (!publicOnly) {
return true;
}
2014-05-21 02:30:14 +00:00
return c.modules.options && c.modules.options.get("show_public");
2014-01-20 23:52:36 +00:00
});
2014-05-24 05:40:35 +00:00
return channels.map(function (c) {
2014-05-24 06:09:36 +00:00
return c.packInfo(isAdmin);
2014-05-24 05:40:35 +00:00
});
2014-01-20 23:52:36 +00:00
};
2014-02-01 18:41:06 +00:00
Server.prototype.announce = function (data) {
2016-07-04 04:28:43 +00:00
this.setAnnouncement(data);
2014-02-01 18:41:06 +00:00
if (data == null) {
db.clearAnnouncement();
} else {
db.setAnnouncement(data);
2016-07-04 04:28:43 +00:00
}
this.emit("announcement", data);
};
Server.prototype.setAnnouncement = function (data) {
if (data == null) {
this.announcement = null;
} else {
this.announcement = data;
sio.instance.emit("announcement", data);
2014-02-01 18:41:06 +00:00
}
2013-10-09 23:10:26 +00:00
};
Server.prototype.forceSave = function () {
Promise.map(this.channels, channel => {
try {
return channel.saveState().tap(() => {
LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
}).catch(err => {
LOGGER.error(`Failed to save /${this.chanPath}/${channel.name}: ${err.stack}`);
});
} catch (error) {
LOGGER.error(`Failed to save channel: ${error.stack}`);
}
}, { concurrency: 5 }).then(() => {
LOGGER.info('Finished save');
});
};
2013-10-09 23:10:26 +00:00
Server.prototype.shutdown = function () {
2017-04-05 06:02:31 +00:00
LOGGER.info("Unloading channels");
Promise.map(this.channels, channel => {
try {
return channel.saveState().tap(() => {
LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
}).catch(err => {
LOGGER.error(`Failed to save /${this.chanPath}/${channel.name}: ${err.stack}`);
});
} catch (error) {
2017-04-05 06:02:31 +00:00
LOGGER.error(`Failed to save channel: ${error.stack}`);
}
}, { concurrency: 5 }).then(() => {
2017-04-05 06:02:31 +00:00
LOGGER.info("Goodbye");
2015-09-21 06:17:06 +00:00
process.exit(0);
}).catch(err => {
2017-04-05 06:02:31 +00:00
LOGGER.error(`Caught error while saving channels: ${err.stack}`);
process.exit(1);
2015-09-21 06:17:06 +00:00
});
2013-10-09 23:10:26 +00:00
};
2016-06-09 05:54:16 +00:00
Server.prototype.handlePartitionMapChange = function () {
2016-06-09 05:54:16 +00:00
const channels = Array.prototype.slice.call(this.channels);
Promise.map(channels, channel => {
2016-06-09 05:54:16 +00:00
if (channel.dead) {
return;
}
if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
2017-04-05 06:02:31 +00:00
LOGGER.info("Partition changed for " + channel.uniqueName);
2016-06-09 05:54:16 +00:00
return channel.saveState().then(() => {
channel.broadcastAll("partitionChange",
this.partitionDecider.getPartitionForChannel(channel.uniqueName));
const users = Array.prototype.slice.call(channel.users);
users.forEach(u => {
try {
u.socket.disconnect();
} catch (error) {
2018-04-07 22:30:30 +00:00
// Ignore
2016-06-09 05:54:16 +00:00
}
});
this.unloadChannel(channel, { skipSave: true });
}).catch(error => {
LOGGER.error(`Failed to unload /${this.chanPath}/${channel.name} for ` +
`partition map flip: ${error.stack}`);
2016-06-09 05:54:16 +00:00
});
}
}, { concurrency: 5 }).then(() => {
2017-04-05 06:02:31 +00:00
LOGGER.info("Partition reload complete");
2016-06-09 05:54:16 +00:00
});
};
Server.prototype.reloadPartitionMap = function () {
2016-10-15 23:09:27 +00:00
if (!Config.get("enable-partition")) {
return;
}
this.initModule.getPartitionMapReloader().reload();
};
2017-08-16 01:23:03 +00:00
Server.prototype.handleUserProfileChange = function (event) {
try {
const lname = event.user.toLowerCase();
// Probably not the most efficient thing in the world, but w/e
// profile changes are not high volume
this.channels.forEach(channel => {
if (channel.dead) return;
channel.users.forEach(user => {
if (user.getLowerName() === lname && user.account.user) {
user.account.user.profile = {
image: event.profile.image,
text: event.profile.text
};
user.account.update();
channel.sendUserProfile(channel.users, user);
LOGGER.info(
'Updated profile for user %s in channel %s',
lname,
channel.name
);
}
});
});
} catch (error) {
LOGGER.error('handleUserProfileChange failed: %s', error);
}
};
Server.prototype.handleChannelDelete = function (event) {
try {
const lname = event.channel.toLowerCase();
this.channels.forEach(channel => {
if (channel.dead) return;
if (channel.uniqueName === lname) {
channel.clearFlag(Flags.C_REGISTERED);
const users = Array.prototype.slice.call(channel.users);
users.forEach(u => {
u.kick('Channel deleted');
});
2017-12-16 03:10:32 +00:00
if (!channel.dead && !channel.dying) {
channel.emit('empty');
}
LOGGER.info('Processed deleted channel %s', lname);
}
});
} catch (error) {
LOGGER.error('handleChannelDelete failed: %s', error);
}
};
Server.prototype.handleChannelRegister = function (event) {
try {
const lname = event.channel.toLowerCase();
this.channels.forEach(channel => {
if (channel.dead) return;
if (channel.uniqueName === lname) {
channel.clearFlag(Flags.C_REGISTERED);
const users = Array.prototype.slice.call(channel.users);
users.forEach(u => {
u.kick('Channel reloading');
});
2017-12-16 03:10:32 +00:00
if (!channel.dead && !channel.dying) {
channel.emit('empty');
}
LOGGER.info('Processed registered channel %s', lname);
}
});
} catch (error) {
LOGGER.error('handleChannelRegister failed: %s', error);
}
};