diff --git a/src/backend/backendconfiguration.js b/src/backend/backendconfiguration.js new file mode 100644 index 00000000..2883df42 --- /dev/null +++ b/src/backend/backendconfiguration.js @@ -0,0 +1,23 @@ +class BackendConfiguration { + constructor(config) { + this.config = config; + } + + getRedisConfig() { + return this.config.redis; + } + + getListenerConfig() { + return this.config.proxy.listeners.map(listener => ({ + getHost() { + return listener.host; + }, + + getPort() { + return listener.port; + } + })); + } +} + +export { BackendConfiguration }; diff --git a/src/backend/backendmodule.js b/src/backend/backendmodule.js new file mode 100644 index 00000000..dcca65ea --- /dev/null +++ b/src/backend/backendmodule.js @@ -0,0 +1,74 @@ +import { RedisClusterClient } from '../io/cluster/redisclusterclient'; +import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; +import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; +import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; +import path from 'path'; +import { BackendConfiguration } from './backendconfiguration'; +import logger from 'cytube-common/lib/logger'; +import redisAdapter from 'socket.io-redis'; + +const BACKEND_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'backend.toml'); + +class BackendModule { + constructor() { + this.initConfig(); + } + + initConfig() { + try { + this.backendConfig = loadFromToml(BackendConfiguration, BACKEND_CONFIG_PATH); + } catch (error) { + if (typeof error.line !== 'undefined') { + logger.error(`Error in configuration file: ${error} (line ${error.line})`); + } else { + logger.error(`Error loading configuration: ${error.stack}`); + } + + process.exit(1); + } + } + + onReady() { + const redisClientProvider = this.getRedisClientProvider(); + this.redisAdapter = redisAdapter({ + pubClient: redisClientProvider.get(), + subClient: redisClientProvider.get() + }); + this.sioEmitter = require('socket.io').instance; + this.sioEmitter.adapter(this.redisAdapter); + const IOBackend = require('./iobackend'); + this.ioBackend = new IOBackend( + this.backendConfig.getListenerConfig()[0], + this.sioEmitter, + redisClientProvider.get() + ) + } + + getFrontendPool() { + if (!this.frontendPool) { + this.frontendPool = new FrontendPool(this.getRedisClientProvider().get()); + } + + return this.frontendPool; + } + + getRedisClientProvider() { + if (!this.redisClientProvider) { + this.redisClientProvider = new RedisClientProvider( + this.backendConfig.getRedisConfig() + ); + } + + return this.redisClientProvider; + } + + getClusterClient() { + if (!this.redisClusterClient) { + this.redisClusterClient = new RedisClusterClient(this.getFrontendPool()); + } + + return this.redisClusterClient; + } +} + +export { BackendModule } diff --git a/src/io/backend/iobackend.js b/src/backend/iobackend.js similarity index 83% rename from src/io/backend/iobackend.js rename to src/backend/iobackend.js index a3371d02..2bb40fe6 100644 --- a/src/io/backend/iobackend.js +++ b/src/backend/iobackend.js @@ -1,5 +1,5 @@ import Server from 'cytube-common/lib/proxy/server'; -import FrontendManager from './frontendmanager'; +import ProxyInterceptor from './proxyinterceptor'; import uuid from 'uuid'; import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; import JSONProtocol from 'cytube-common/lib/proxy/protocol'; @@ -13,19 +13,19 @@ export default class IOBackend { this.socketEmitter = socketEmitter; this.poolRedisClient = poolRedisClient; this.protocol = new JSONProtocol(); - this.initFrontendManager(); + this.initProxyInterceptor(); this.initProxyListener(); this.initBackendPoolUpdater(); } - initFrontendManager() { - this.frontendManager = new FrontendManager(this.socketEmitter); + initProxyInterceptor() { + this.proxyInterceptor = new ProxyInterceptor(this.socketEmitter); } initProxyListener() { this.proxyListener = new Server(this.proxyListenerConfig, this.protocol); this.proxyListener.on('connection', - this.frontendManager.onConnection.bind(this.frontendManager)); + this.proxyInterceptor.onConnection.bind(this.proxyInterceptor)); } initBackendPoolUpdater() { diff --git a/src/io/backend/proxiedsocket.js b/src/backend/proxiedsocket.js similarity index 83% rename from src/io/backend/proxiedsocket.js rename to src/backend/proxiedsocket.js index 67202b3e..24805779 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/backend/proxiedsocket.js @@ -1,3 +1,4 @@ +import logger from 'cytube-common/lib/logger'; import { EventEmitter } from 'events'; export default class ProxiedSocket extends EventEmitter { @@ -22,7 +23,11 @@ export default class ProxiedSocket extends EventEmitter { } onProxiedEventReceived() { - EventEmitter.prototype.emit.apply(this, arguments); + try { + EventEmitter.prototype.emit.apply(this, arguments); + } catch (error) { + logger.error(`Emit failed: ${error.stack}`); + } } join(channel) { diff --git a/src/io/backend/frontendmanager.js b/src/backend/proxyinterceptor.js similarity index 81% rename from src/io/backend/frontendmanager.js rename to src/backend/proxyinterceptor.js index 92fdc341..2f67e0fe 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/backend/proxyinterceptor.js @@ -1,18 +1,23 @@ import logger from 'cytube-common/lib/logger'; -import ioServer from '../ioserver'; +import ioServer from '../io/ioserver'; import ProxiedSocket from './proxiedsocket'; -export default class FrontendManager { +export default class ProxyInterceptor { constructor(socketEmitter) { this.socketEmitter = socketEmitter; this.frontendConnections = {}; this.frontendProxiedSockets = {}; } + /** + * Handle a new frontend proxy connection. + * + * @param {Connection} socket frontend proxy connection + */ onConnection(socket) { if (this.frontendConnections.hasOwnProperty(socket.endpoint)) { - // TODO: do some validation, maybe check if the socket is still connected? - throw new Error(); + logger.error(`Duplicate frontend connection: ${socket.endpoint}`); + return; } this.frontendConnections[socket.endpoint] = socket; @@ -48,8 +53,8 @@ export default class FrontendManager { if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { this.frontendProxiedSockets[mapKey] = {}; } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) { - // TODO: Handle this gracefully - throw new Error(); + logger.error(`Duplicate SocketConnectEvent for ${socketID}`); + return; } this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket; @@ -60,8 +65,9 @@ export default class FrontendManager { const mapKey = frontendConnection.endpoint; const socketMap = this.frontendProxiedSockets[mapKey]; if (!socketMap || !socketMap.hasOwnProperty(socketID)) { - // TODO - throw new Error(); + logger.error(`Received SocketFrameEvent for nonexistent socket`, + { socketID, event }); + return; } const socket = socketMap[socketID]; diff --git a/src/legacymodule.js b/src/legacymodule.js new file mode 100644 index 00000000..e2d18f3f --- /dev/null +++ b/src/legacymodule.js @@ -0,0 +1,13 @@ +import NullClusterClient from './io/cluster/nullclusterclient'; + +class LegacyModule { + getClusterClient() { + return new NullClusterClient(); + } + + onReady() { + + } +} + +export { LegacyModule }; diff --git a/src/server.js b/src/server.js index 1381cdab..03886150 100644 --- a/src/server.js +++ b/src/server.js @@ -46,9 +46,9 @@ import LocalChannelIndex from './web/localchannelindex'; import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; -import { RedisClusterClient } from './io/cluster/redisclusterclient'; -import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; import session from './session'; +import { BackendModule } from './backend/backendmodule'; +import { LegacyModule } from './legacymodule'; var Server = function () { var self = this; @@ -60,22 +60,24 @@ var Server = function () { self.infogetter = null; self.servers = {}; + // backend init + var initModule; + if (true) { + initModule = new BackendModule(); + } else { + initModule = new LegacyModule(); + } + // database init ------------------------------------------------------ var Database = require("./database"); self.db = Database; self.db.init(); ChannelStore.init(); - // redis init - const redis = require('redis'); - Promise.promisifyAll(redis.RedisClient.prototype); - Promise.promisifyAll(redis.Multi.prototype); - // webserver init ----------------------------------------------------- const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); - const frontendPool = new FrontendPool(redis.createClient()); - const clusterClient = new RedisClusterClient(frontendPool); + const clusterClient = initModule.getClusterClient(); const channelIndex = new LocalChannelIndex(); self.express = express(); require("./web/webserver").init(self.express, @@ -135,26 +137,14 @@ var Server = function () { }); require("./io/ioserver").init(self, webConfig); - const redisAdapter = require('socket.io-redis'); - const IOBackend = require('./io/backend/iobackend'); - const sioEmitter = require("socket.io").instance; - sioEmitter.adapter(redisAdapter()); - const listenerConfig = { - getPort: function () { - return 3071; - }, - - getHost: function () { - return '127.0.0.1'; - } - }; - const backend = new IOBackend(listenerConfig, sioEmitter, redis.createClient()); // background tasks init ---------------------------------------------- require("./bgtask")(self); // setuid require("./setuid"); + + initModule.onReady(); }; Server.prototype.getHTTPIP = function (req) {