Refactor backend initialization

This commit is contained in:
calzoneman 2016-02-04 21:43:20 -08:00
parent 86abebf9bf
commit 50124c8a45
7 changed files with 148 additions and 37 deletions

View File

@ -0,0 +1,23 @@
class BackendConfiguration {
constructor(config) {
this.config = config;
}
getRedisConfig() {
return this.config.redis;
}
getListenerConfig() {
return this.config.proxy.listeners.map(listener => ({
getHost() {
return listener.host;
},
getPort() {
return listener.port;
}
}));
}
}
export { BackendConfiguration };

View File

@ -0,0 +1,74 @@
import { RedisClusterClient } from '../io/cluster/redisclusterclient';
import { FrontendPool } from 'cytube-common/lib/redis/frontendpool';
import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider';
import { loadFromToml } from 'cytube-common/lib/configuration/configloader';
import path from 'path';
import { BackendConfiguration } from './backendconfiguration';
import logger from 'cytube-common/lib/logger';
import redisAdapter from 'socket.io-redis';
const BACKEND_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'backend.toml');
class BackendModule {
constructor() {
this.initConfig();
}
initConfig() {
try {
this.backendConfig = loadFromToml(BackendConfiguration, BACKEND_CONFIG_PATH);
} catch (error) {
if (typeof error.line !== 'undefined') {
logger.error(`Error in configuration file: ${error} (line ${error.line})`);
} else {
logger.error(`Error loading configuration: ${error.stack}`);
}
process.exit(1);
}
}
onReady() {
const redisClientProvider = this.getRedisClientProvider();
this.redisAdapter = redisAdapter({
pubClient: redisClientProvider.get(),
subClient: redisClientProvider.get()
});
this.sioEmitter = require('socket.io').instance;
this.sioEmitter.adapter(this.redisAdapter);
const IOBackend = require('./iobackend');
this.ioBackend = new IOBackend(
this.backendConfig.getListenerConfig()[0],
this.sioEmitter,
redisClientProvider.get()
)
}
getFrontendPool() {
if (!this.frontendPool) {
this.frontendPool = new FrontendPool(this.getRedisClientProvider().get());
}
return this.frontendPool;
}
getRedisClientProvider() {
if (!this.redisClientProvider) {
this.redisClientProvider = new RedisClientProvider(
this.backendConfig.getRedisConfig()
);
}
return this.redisClientProvider;
}
getClusterClient() {
if (!this.redisClusterClient) {
this.redisClusterClient = new RedisClusterClient(this.getFrontendPool());
}
return this.redisClusterClient;
}
}
export { BackendModule }

View File

@ -1,5 +1,5 @@
import Server from 'cytube-common/lib/proxy/server'; import Server from 'cytube-common/lib/proxy/server';
import FrontendManager from './frontendmanager'; import ProxyInterceptor from './proxyinterceptor';
import uuid from 'uuid'; import uuid from 'uuid';
import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater';
import JSONProtocol from 'cytube-common/lib/proxy/protocol'; import JSONProtocol from 'cytube-common/lib/proxy/protocol';
@ -13,19 +13,19 @@ export default class IOBackend {
this.socketEmitter = socketEmitter; this.socketEmitter = socketEmitter;
this.poolRedisClient = poolRedisClient; this.poolRedisClient = poolRedisClient;
this.protocol = new JSONProtocol(); this.protocol = new JSONProtocol();
this.initFrontendManager(); this.initProxyInterceptor();
this.initProxyListener(); this.initProxyListener();
this.initBackendPoolUpdater(); this.initBackendPoolUpdater();
} }
initFrontendManager() { initProxyInterceptor() {
this.frontendManager = new FrontendManager(this.socketEmitter); this.proxyInterceptor = new ProxyInterceptor(this.socketEmitter);
} }
initProxyListener() { initProxyListener() {
this.proxyListener = new Server(this.proxyListenerConfig, this.protocol); this.proxyListener = new Server(this.proxyListenerConfig, this.protocol);
this.proxyListener.on('connection', this.proxyListener.on('connection',
this.frontendManager.onConnection.bind(this.frontendManager)); this.proxyInterceptor.onConnection.bind(this.proxyInterceptor));
} }
initBackendPoolUpdater() { initBackendPoolUpdater() {

View File

@ -1,3 +1,4 @@
import logger from 'cytube-common/lib/logger';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
export default class ProxiedSocket extends EventEmitter { export default class ProxiedSocket extends EventEmitter {
@ -22,7 +23,11 @@ export default class ProxiedSocket extends EventEmitter {
} }
onProxiedEventReceived() { onProxiedEventReceived() {
try {
EventEmitter.prototype.emit.apply(this, arguments); EventEmitter.prototype.emit.apply(this, arguments);
} catch (error) {
logger.error(`Emit failed: ${error.stack}`);
}
} }
join(channel) { join(channel) {

View File

@ -1,18 +1,23 @@
import logger from 'cytube-common/lib/logger'; import logger from 'cytube-common/lib/logger';
import ioServer from '../ioserver'; import ioServer from '../io/ioserver';
import ProxiedSocket from './proxiedsocket'; import ProxiedSocket from './proxiedsocket';
export default class FrontendManager { export default class ProxyInterceptor {
constructor(socketEmitter) { constructor(socketEmitter) {
this.socketEmitter = socketEmitter; this.socketEmitter = socketEmitter;
this.frontendConnections = {}; this.frontendConnections = {};
this.frontendProxiedSockets = {}; this.frontendProxiedSockets = {};
} }
/**
* Handle a new frontend proxy connection.
*
* @param {Connection} socket frontend proxy connection
*/
onConnection(socket) { onConnection(socket) {
if (this.frontendConnections.hasOwnProperty(socket.endpoint)) { if (this.frontendConnections.hasOwnProperty(socket.endpoint)) {
// TODO: do some validation, maybe check if the socket is still connected? logger.error(`Duplicate frontend connection: ${socket.endpoint}`);
throw new Error(); return;
} }
this.frontendConnections[socket.endpoint] = socket; this.frontendConnections[socket.endpoint] = socket;
@ -48,8 +53,8 @@ export default class FrontendManager {
if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) {
this.frontendProxiedSockets[mapKey] = {}; this.frontendProxiedSockets[mapKey] = {};
} else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) { } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) {
// TODO: Handle this gracefully logger.error(`Duplicate SocketConnectEvent for ${socketID}`);
throw new Error(); return;
} }
this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket; this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket;
@ -60,8 +65,9 @@ export default class FrontendManager {
const mapKey = frontendConnection.endpoint; const mapKey = frontendConnection.endpoint;
const socketMap = this.frontendProxiedSockets[mapKey]; const socketMap = this.frontendProxiedSockets[mapKey];
if (!socketMap || !socketMap.hasOwnProperty(socketID)) { if (!socketMap || !socketMap.hasOwnProperty(socketID)) {
// TODO logger.error(`Received SocketFrameEvent for nonexistent socket`,
throw new Error(); { socketID, event });
return;
} }
const socket = socketMap[socketID]; const socket = socketMap[socketID];

13
src/legacymodule.js Normal file
View File

@ -0,0 +1,13 @@
import NullClusterClient from './io/cluster/nullclusterclient';
class LegacyModule {
getClusterClient() {
return new NullClusterClient();
}
onReady() {
}
}
export { LegacyModule };

View File

@ -46,9 +46,9 @@ import LocalChannelIndex from './web/localchannelindex';
import IOConfiguration from './configuration/ioconfig'; import IOConfiguration from './configuration/ioconfig';
import WebConfiguration from './configuration/webconfig'; import WebConfiguration from './configuration/webconfig';
import NullClusterClient from './io/cluster/nullclusterclient'; import NullClusterClient from './io/cluster/nullclusterclient';
import { RedisClusterClient } from './io/cluster/redisclusterclient';
import { FrontendPool } from 'cytube-common/lib/redis/frontendpool';
import session from './session'; import session from './session';
import { BackendModule } from './backend/backendmodule';
import { LegacyModule } from './legacymodule';
var Server = function () { var Server = function () {
var self = this; var self = this;
@ -60,22 +60,24 @@ var Server = function () {
self.infogetter = null; self.infogetter = null;
self.servers = {}; self.servers = {};
// backend init
var initModule;
if (true) {
initModule = new BackendModule();
} else {
initModule = new LegacyModule();
}
// database init ------------------------------------------------------ // database init ------------------------------------------------------
var Database = require("./database"); var Database = require("./database");
self.db = Database; self.db = Database;
self.db.init(); self.db.init();
ChannelStore.init(); ChannelStore.init();
// redis init
const redis = require('redis');
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);
// webserver init ----------------------------------------------------- // webserver init -----------------------------------------------------
const ioConfig = IOConfiguration.fromOldConfig(Config); const ioConfig = IOConfiguration.fromOldConfig(Config);
const webConfig = WebConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config);
const frontendPool = new FrontendPool(redis.createClient()); const clusterClient = initModule.getClusterClient();
const clusterClient = new RedisClusterClient(frontendPool);
const channelIndex = new LocalChannelIndex(); const channelIndex = new LocalChannelIndex();
self.express = express(); self.express = express();
require("./web/webserver").init(self.express, require("./web/webserver").init(self.express,
@ -135,26 +137,14 @@ var Server = function () {
}); });
require("./io/ioserver").init(self, webConfig); require("./io/ioserver").init(self, webConfig);
const redisAdapter = require('socket.io-redis');
const IOBackend = require('./io/backend/iobackend');
const sioEmitter = require("socket.io").instance;
sioEmitter.adapter(redisAdapter());
const listenerConfig = {
getPort: function () {
return 3071;
},
getHost: function () {
return '127.0.0.1';
}
};
const backend = new IOBackend(listenerConfig, sioEmitter, redis.createClient());
// background tasks init ---------------------------------------------- // background tasks init ----------------------------------------------
require("./bgtask")(self); require("./bgtask")(self);
// setuid // setuid
require("./setuid"); require("./setuid");
initModule.onReady();
}; };
Server.prototype.getHTTPIP = function (req) { Server.prototype.getHTTPIP = function (req) {