From 10d4ec8e603b7c500f870055d58cf2c176a65c7d Mon Sep 17 00:00:00 2001 From: calzoneman Date: Thu, 24 Dec 2015 16:24:07 -0800 Subject: [PATCH 01/12] Initial work for proxy connections --- src/io/backend/frontendmanager.js | 20 ++++++++++++++++++ src/io/backend/iobackend.js | 20 ++++++++++++++++++ src/io/backend/proxiedsocket.js | 35 +++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) create mode 100644 src/io/backend/frontendmanager.js create mode 100644 src/io/backend/iobackend.js create mode 100644 src/io/backend/proxiedsocket.js diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js new file mode 100644 index 00000000..5bd465b3 --- /dev/null +++ b/src/io/backend/frontendmanager.js @@ -0,0 +1,20 @@ +export default class FrontendManager { + constructor() { + this.frontendConnections = {}; + } + + onConnection(socket) { + if (this.frontendConnections.hasOwnProperty(socket.remoteAddress)) { + // TODO: do some validation, maybe check if the socket is still connected? + throw new Error(); + } + + this.frontendConnections[socket.remoteAddressAndPort] = socket; + console.log(socket.remoteAddressAndPort); + socket.on('data', this.onData.bind(this, socket)); + } + + onData(socket, data) { + console.log(data); + } +} diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js new file mode 100644 index 00000000..e0c8d78d --- /dev/null +++ b/src/io/backend/iobackend.js @@ -0,0 +1,20 @@ +import Server from 'cytube-common/lib/tcpjson/server'; +import FrontendManager from './frontendmanager'; + +export default class IOBackend { + constructor(proxyListenerConfig) { + this.proxyListenerConfig = proxyListenerConfig; + this.initFrontendManager(); + this.initProxyListener(); + } + + initFrontendManager() { + this.frontendManager = new FrontendManager(); + } + + initProxyListener() { + this.proxyListener = new Server(this.proxyListenerConfig); + this.proxyListener.on('connection', + this.frontendManager.onConnection.bind(this.frontendManager)); + } +} diff --git a/src/io/backend/proxiedsocket.js b/src/io/backend/proxiedsocket.js new file mode 100644 index 00000000..38e004b6 --- /dev/null +++ b/src/io/backend/proxiedsocket.js @@ -0,0 +1,35 @@ +import { EventEmitter } from 'events'; + +export default class ProxiedSocket extends EventEmitter { + constructor(socketID, socketData, socketEmitter, frontendConnection) { + super(); + this.id = socketID; + this.ip = socketData.ip; + this._realip = socketData.ip; + this.socketEmitter = socketEmitter; + this.frontendConnection = frontendConnection; + } + + emit() { + const target = socketEmitter.to(this.id); + target.emit.apply(target, arguments); + } + + onProxiedEventReceived() { + EventEmitter.prototype.emit.apply(this, arguments); + } + + join(channel) { + this.frontendConnection.write( + this.frontendConnection.protocol.socketJoinSocketChannels( + this.id, [channel] + ) + ); + } + + disconnect() { + this.frontendConnection.write( + this.frontendConnection.protocol.socketKick(this.id) + ); + } +} From b536c157582088a5b4bfac7ee8f8d5686fd41925 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Sat, 26 Dec 2015 15:07:03 -0800 Subject: [PATCH 02/12] Initial hacks to get the split to work --- package.json | 1 + src/io/backend/frontendmanager.js | 50 ++++++++++++++++++++++++++++--- src/io/backend/iobackend.js | 5 ++-- src/io/backend/proxiedsocket.js | 2 +- src/io/ioserver.js | 4 ++- src/server.js | 14 +++++++++ 6 files changed, 68 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index 27969620..be9bf2ef 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "sanitize-html": "git://github.com/calzoneman/sanitize-html", "serve-static": "^1.10.0", "socket.io": "^1.3.7", + "socket.io-redis": "^1.0.0", "source-map-support": "^0.3.2", "status-message-polyfill": "calzoneman/status-message-polyfill", "yamljs": "^0.1.6" diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 5bd465b3..1dcd83c0 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -1,20 +1,62 @@ +import ioServer from '../ioserver'; +import ProxiedSocket from './proxiedsocket'; + export default class FrontendManager { - constructor() { + constructor(socketEmitter) { + this.socketEmitter = socketEmitter; this.frontendConnections = {}; + this.frontendProxiedSockets = {}; } onConnection(socket) { - if (this.frontendConnections.hasOwnProperty(socket.remoteAddress)) { + if (this.frontendConnections.hasOwnProperty(socket.remoteAddressAndPort)) { // TODO: do some validation, maybe check if the socket is still connected? throw new Error(); } this.frontendConnections[socket.remoteAddressAndPort] = socket; - console.log(socket.remoteAddressAndPort); socket.on('data', this.onData.bind(this, socket)); } onData(socket, data) { - console.log(data); + switch (data.$type) { + case 'socketConnect': + this.onSocketConnect(socket, data); + break; + case 'socketFrame': + this.onSocketFrame(socket, data); + break; + } + } + + onSocketConnect(frontendConnection, data) { + const mapKey = frontendConnection.remoteAddressAndPort; + const proxiedSocket = new ProxiedSocket( + data.socketID, + data.socketData, + this.socketEmitter, + frontendConnection); + + if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { + this.frontendProxiedSockets[mapKey] = {}; + } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(data.socketID)) { + // TODO: Handle this gracefully + throw new Error(); + } + + this.frontendProxiedSockets[mapKey][data.socketID] = proxiedSocket; + ioServer.handleConnection(proxiedSocket); + } + + onSocketFrame(frontendConnection, data) { + const mapKey = frontendConnection.remoteAddressAndPort; + const socketMap = this.frontendProxiedSockets[mapKey]; + if (!socketMap || !socketMap.hasOwnProperty(data.socketID)) { + // TODO + throw new Error(); + } + + const socket = socketMap[data.socketID]; + socket.onProxiedEventReceived.apply(socket, [data.event].concat(data.args)); } } diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js index e0c8d78d..9d0de5c2 100644 --- a/src/io/backend/iobackend.js +++ b/src/io/backend/iobackend.js @@ -2,14 +2,15 @@ import Server from 'cytube-common/lib/tcpjson/server'; import FrontendManager from './frontendmanager'; export default class IOBackend { - constructor(proxyListenerConfig) { + constructor(proxyListenerConfig, socketEmitter) { this.proxyListenerConfig = proxyListenerConfig; + this.socketEmitter = socketEmitter; this.initFrontendManager(); this.initProxyListener(); } initFrontendManager() { - this.frontendManager = new FrontendManager(); + this.frontendManager = new FrontendManager(this.socketEmitter); } initProxyListener() { diff --git a/src/io/backend/proxiedsocket.js b/src/io/backend/proxiedsocket.js index 38e004b6..64db50a2 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/io/backend/proxiedsocket.js @@ -11,7 +11,7 @@ export default class ProxiedSocket extends EventEmitter { } emit() { - const target = socketEmitter.to(this.id); + const target = this.socketEmitter.to(this.id); target.emit.apply(target, arguments); } diff --git a/src/io/ioserver.js b/src/io/ioserver.js index 0f625783..6def95f8 100644 --- a/src/io/ioserver.js +++ b/src/io/ioserver.js @@ -273,7 +273,9 @@ module.exports = { bound[id] = null; }); - } + }, + + handleConnection: handleConnection }; /* Clean out old rate limiters */ diff --git a/src/server.js b/src/server.js index fd79c9f1..20c8c7e9 100644 --- a/src/server.js +++ b/src/server.js @@ -127,6 +127,20 @@ var Server = function () { }); require("./io/ioserver").init(self, webConfig); + const redisAdapter = require('socket.io-redis'); + const IOBackend = require('./io/backend/iobackend'); + const sioEmitter = require("socket.io").instance; + sioEmitter.adapter(redisAdapter()); + const listenerConfig = { + getPort: function () { + return 3071; + }, + + getHost: function () { + return '127.0.0.1'; + } + }; + const backend = new IOBackend(listenerConfig, sioEmitter); // background tasks init ---------------------------------------------- require("./bgtask")(self); From 9dd617d9fc4816c9f71b4c861318642c9fc7d6cf Mon Sep 17 00:00:00 2001 From: calzoneman Date: Sun, 27 Dec 2015 15:10:43 -0800 Subject: [PATCH 03/12] Update to reflect change in endpoint key --- src/io/backend/frontendmanager.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 1dcd83c0..7cd465ee 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -9,12 +9,12 @@ export default class FrontendManager { } onConnection(socket) { - if (this.frontendConnections.hasOwnProperty(socket.remoteAddressAndPort)) { + if (this.frontendConnections.hasOwnProperty(socket.endpoint)) { // TODO: do some validation, maybe check if the socket is still connected? throw new Error(); } - this.frontendConnections[socket.remoteAddressAndPort] = socket; + this.frontendConnections[socket.endpoint] = socket; socket.on('data', this.onData.bind(this, socket)); } @@ -30,7 +30,7 @@ export default class FrontendManager { } onSocketConnect(frontendConnection, data) { - const mapKey = frontendConnection.remoteAddressAndPort; + const mapKey = frontendConnection.endpoint; const proxiedSocket = new ProxiedSocket( data.socketID, data.socketData, @@ -49,7 +49,7 @@ export default class FrontendManager { } onSocketFrame(frontendConnection, data) { - const mapKey = frontendConnection.remoteAddressAndPort; + const mapKey = frontendConnection.endpoint; const socketMap = this.frontendProxiedSockets[mapKey]; if (!socketMap || !socketMap.hasOwnProperty(data.socketID)) { // TODO From 5b44117681b2f5d49c800b6de38fb0ab697eec21 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Mon, 28 Dec 2015 23:52:39 -0800 Subject: [PATCH 04/12] Use new protocol --- src/io/backend/frontendmanager.js | 32 +++++++++++-------------------- src/io/backend/iobackend.js | 2 +- src/io/backend/proxiedsocket.js | 10 +++++----- 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 7cd465ee..730fb321 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -15,48 +15,38 @@ export default class FrontendManager { } this.frontendConnections[socket.endpoint] = socket; - socket.on('data', this.onData.bind(this, socket)); + socket.on('SocketConnectEvent', this.onSocketConnect.bind(this, socket)); + socket.on('SocketFrameEvent', this.onSocketFrame.bind(this, socket)); } - onData(socket, data) { - switch (data.$type) { - case 'socketConnect': - this.onSocketConnect(socket, data); - break; - case 'socketFrame': - this.onSocketFrame(socket, data); - break; - } - } - - onSocketConnect(frontendConnection, data) { + onSocketConnect(frontendConnection, socketID, socketIP) { const mapKey = frontendConnection.endpoint; const proxiedSocket = new ProxiedSocket( - data.socketID, - data.socketData, + socketID, + socketIP, this.socketEmitter, frontendConnection); if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { this.frontendProxiedSockets[mapKey] = {}; - } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(data.socketID)) { + } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) { // TODO: Handle this gracefully throw new Error(); } - this.frontendProxiedSockets[mapKey][data.socketID] = proxiedSocket; + this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket; ioServer.handleConnection(proxiedSocket); } - onSocketFrame(frontendConnection, data) { + onSocketFrame(frontendConnection, socketID, event, args) { const mapKey = frontendConnection.endpoint; const socketMap = this.frontendProxiedSockets[mapKey]; - if (!socketMap || !socketMap.hasOwnProperty(data.socketID)) { + if (!socketMap || !socketMap.hasOwnProperty(socketID)) { // TODO throw new Error(); } - const socket = socketMap[data.socketID]; - socket.onProxiedEventReceived.apply(socket, [data.event].concat(data.args)); + const socket = socketMap[socketID]; + socket.onProxiedEventReceived.apply(socket, [event].concat(args)); } } diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js index 9d0de5c2..1f0a875e 100644 --- a/src/io/backend/iobackend.js +++ b/src/io/backend/iobackend.js @@ -1,4 +1,4 @@ -import Server from 'cytube-common/lib/tcpjson/server'; +import Server from 'cytube-common/lib/proxy/server'; import FrontendManager from './frontendmanager'; export default class IOBackend { diff --git a/src/io/backend/proxiedsocket.js b/src/io/backend/proxiedsocket.js index 64db50a2..49e034c2 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/io/backend/proxiedsocket.js @@ -1,11 +1,11 @@ import { EventEmitter } from 'events'; export default class ProxiedSocket extends EventEmitter { - constructor(socketID, socketData, socketEmitter, frontendConnection) { + constructor(socketID, socketIP, socketEmitter, frontendConnection) { super(); this.id = socketID; - this.ip = socketData.ip; - this._realip = socketData.ip; + this.ip = socketIP; + this._realip = socketIP; this.socketEmitter = socketEmitter; this.frontendConnection = frontendConnection; } @@ -21,7 +21,7 @@ export default class ProxiedSocket extends EventEmitter { join(channel) { this.frontendConnection.write( - this.frontendConnection.protocol.socketJoinSocketChannels( + this.frontendConnection.protocol.newSocketJoinRoomsEvent( this.id, [channel] ) ); @@ -29,7 +29,7 @@ export default class ProxiedSocket extends EventEmitter { disconnect() { this.frontendConnection.write( - this.frontendConnection.protocol.socketKick(this.id) + this.frontendConnection.protocol.newSocketKickEvent(this.id) ); } } From 9a262da13d5a782615c73fac8747c486e219a86a Mon Sep 17 00:00:00 2001 From: calzoneman Date: Wed, 30 Dec 2015 21:57:46 -0800 Subject: [PATCH 05/12] Set socketUser data from frontend --- src/io/backend/frontendmanager.js | 3 ++- src/io/backend/proxiedsocket.js | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 730fb321..b04f7d34 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -19,11 +19,12 @@ export default class FrontendManager { socket.on('SocketFrameEvent', this.onSocketFrame.bind(this, socket)); } - onSocketConnect(frontendConnection, socketID, socketIP) { + onSocketConnect(frontendConnection, socketID, socketIP, socketUser) { const mapKey = frontendConnection.endpoint; const proxiedSocket = new ProxiedSocket( socketID, socketIP, + socketUser, this.socketEmitter, frontendConnection); diff --git a/src/io/backend/proxiedsocket.js b/src/io/backend/proxiedsocket.js index 49e034c2..67202b3e 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/io/backend/proxiedsocket.js @@ -1,11 +1,17 @@ import { EventEmitter } from 'events'; export default class ProxiedSocket extends EventEmitter { - constructor(socketID, socketIP, socketEmitter, frontendConnection) { + constructor(socketID, socketIP, socketUser, socketEmitter, frontendConnection) { super(); this.id = socketID; this.ip = socketIP; this._realip = socketIP; + if (socketUser) { + this.user = { + name: socketUser.name, + global_rank: socketUser.globalRank + }; + } this.socketEmitter = socketEmitter; this.frontendConnection = frontendConnection; } From cdb20e8d40c4282aeec2fcfa9a2ba714f5f4a85d Mon Sep 17 00:00:00 2001 From: calzoneman Date: Fri, 1 Jan 2016 18:25:12 -0800 Subject: [PATCH 06/12] Handle when a frontend disconnects --- src/io/backend/frontendmanager.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index b04f7d34..601c3a1e 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -15,10 +15,25 @@ export default class FrontendManager { } this.frontendConnections[socket.endpoint] = socket; + socket.on('close', this.onFrontendDisconnect.bind(this, socket)); socket.on('SocketConnectEvent', this.onSocketConnect.bind(this, socket)); socket.on('SocketFrameEvent', this.onSocketFrame.bind(this, socket)); } + onFrontendDisconnect(socket) { + const endpoint = socket.endpoint; + if (this.frontendConnections.hasOwnProperty(endpoint)) { + if (this.frontendProxiedSockets.hasOwnProperty(endpoint)) { + logger.warn(`Frontend ${endpoint} disconnected`); + this.frontendProxiedSockets[endpoint].forEach(proxySocket => { + proxySocket.onProxiedEventReceived('disconnect'); + }); + delete this.frontendProxiedSockets[endpoint]; + } + delete this.frontendConnections[endpoint]; + } + } + onSocketConnect(frontendConnection, socketID, socketIP, socketUser) { const mapKey = frontendConnection.endpoint; const proxiedSocket = new ProxiedSocket( From 28807344bcec78ed68ac0deea97b995710e4f837 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Fri, 1 Jan 2016 18:26:43 -0800 Subject: [PATCH 07/12] Import logger --- src/io/backend/frontendmanager.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index 601c3a1e..b047ac94 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -1,3 +1,4 @@ +import logger from 'cytube-common/lib/logger'; import ioServer from '../ioserver'; import ProxiedSocket from './proxiedsocket'; From 8bef7924b2fdab898d02c9e4fa68866a8be382fb Mon Sep 17 00:00:00 2001 From: calzoneman Date: Fri, 1 Jan 2016 18:28:53 -0800 Subject: [PATCH 08/12] Minor fix --- src/io/backend/frontendmanager.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/io/backend/frontendmanager.js b/src/io/backend/frontendmanager.js index b047ac94..92fdc341 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/io/backend/frontendmanager.js @@ -26,9 +26,10 @@ export default class FrontendManager { if (this.frontendConnections.hasOwnProperty(endpoint)) { if (this.frontendProxiedSockets.hasOwnProperty(endpoint)) { logger.warn(`Frontend ${endpoint} disconnected`); - this.frontendProxiedSockets[endpoint].forEach(proxySocket => { + for (const key in this.frontendProxiedSockets[endpoint]) { + const proxySocket = this.frontendProxiedSockets[endpoint][key]; proxySocket.onProxiedEventReceived('disconnect'); - }); + } delete this.frontendProxiedSockets[endpoint]; } delete this.frontendConnections[endpoint]; From dd73a8ee9a5a01bf20cd7e90a2a015b0b9652b1d Mon Sep 17 00:00:00 2001 From: calzoneman Date: Wed, 20 Jan 2016 23:11:55 -0800 Subject: [PATCH 09/12] Automatically publish backend address to the pool --- package.json | 2 ++ src/io/backend/iobackend.js | 25 +++++++++++++++++++++++-- src/server.js | 5 ++++- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index be9bf2ef..9d5ea444 100644 --- a/package.json +++ b/package.json @@ -31,12 +31,14 @@ "nodemailer": "^1.4.0", "oauth": "^0.9.12", "q": "^1.4.1", + "redis": "^2.4.2", "sanitize-html": "git://github.com/calzoneman/sanitize-html", "serve-static": "^1.10.0", "socket.io": "^1.3.7", "socket.io-redis": "^1.0.0", "source-map-support": "^0.3.2", "status-message-polyfill": "calzoneman/status-message-polyfill", + "uuid": "^2.0.1", "yamljs": "^0.1.6" }, "scripts": { diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js index 1f0a875e..6b1e0da5 100644 --- a/src/io/backend/iobackend.js +++ b/src/io/backend/iobackend.js @@ -1,12 +1,20 @@ import Server from 'cytube-common/lib/proxy/server'; import FrontendManager from './frontendmanager'; +import uuid from 'uuid'; +import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; +import JSONProtocol from 'cytube-common/lib/proxy/protocol'; + +const BACKEND_POOL = 'backend-hosts'; export default class IOBackend { - constructor(proxyListenerConfig, socketEmitter) { + constructor(proxyListenerConfig, socketEmitter, poolRedisClient) { this.proxyListenerConfig = proxyListenerConfig; this.socketEmitter = socketEmitter; + this.poolRedisClient = poolRedisClient; + this.protocol = new JSONProtocol(); this.initFrontendManager(); this.initProxyListener(); + this.initBackendPoolUpdater(); } initFrontendManager() { @@ -14,8 +22,21 @@ export default class IOBackend { } initProxyListener() { - this.proxyListener = new Server(this.proxyListenerConfig); + this.proxyListener = new Server(this.proxyListenerConfig, this.protocol); this.proxyListener.on('connection', this.frontendManager.onConnection.bind(this.frontendManager)); } + + initBackendPoolUpdater() { + const entry = { + address: this.proxyListenerConfig.getHost() + '/' + this.proxyListenerConfig.getPort() + } + this.poolEntryUpdater = new PoolEntryUpdater( + this.poolRedisClient, + BACKEND_POOL, + uuid.v4(), + entry + ); + this.poolEntryUpdater.start(); + } } diff --git a/src/server.js b/src/server.js index 20c8c7e9..6dfca796 100644 --- a/src/server.js +++ b/src/server.js @@ -140,7 +140,10 @@ var Server = function () { return '127.0.0.1'; } }; - const backend = new IOBackend(listenerConfig, sioEmitter); + const redis = require('redis'); + Promise.promisifyAll(redis.RedisClient.prototype); + Promise.promisifyAll(redis.Multi.prototype); + const backend = new IOBackend(listenerConfig, sioEmitter, redis.createClient()); // background tasks init ---------------------------------------------- require("./bgtask")(self); From f8470fc8f6472b03c6ac66a21f04a8a47599da3a Mon Sep 17 00:00:00 2001 From: calzoneman Date: Sat, 23 Jan 2016 12:46:04 -0800 Subject: [PATCH 10/12] Use new proxy address formatter --- src/io/backend/iobackend.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/io/backend/iobackend.js b/src/io/backend/iobackend.js index 6b1e0da5..a3371d02 100644 --- a/src/io/backend/iobackend.js +++ b/src/io/backend/iobackend.js @@ -3,6 +3,7 @@ import FrontendManager from './frontendmanager'; import uuid from 'uuid'; import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; import JSONProtocol from 'cytube-common/lib/proxy/protocol'; +import { formatProxyAddress } from 'cytube-common/lib/util/addressutil'; const BACKEND_POOL = 'backend-hosts'; @@ -28,8 +29,10 @@ export default class IOBackend { } initBackendPoolUpdater() { + const hostname = this.proxyListenerConfig.getHost(); + const port = this.proxyListenerConfig.getPort(); const entry = { - address: this.proxyListenerConfig.getHost() + '/' + this.proxyListenerConfig.getPort() + address: formatProxyAddress(hostname, port) } this.poolEntryUpdater = new PoolEntryUpdater( this.poolRedisClient, From 86abebf9bfc5b63a58564ba94b2c62ec21308db9 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Thu, 28 Jan 2016 19:51:59 -0800 Subject: [PATCH 11/12] Add RedisClusterClient --- src/io/cluster/redisclusterclient.js | 17 +++++++++++++++++ src/server.js | 13 +++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 src/io/cluster/redisclusterclient.js diff --git a/src/io/cluster/redisclusterclient.js b/src/io/cluster/redisclusterclient.js new file mode 100644 index 00000000..b4c7eb83 --- /dev/null +++ b/src/io/cluster/redisclusterclient.js @@ -0,0 +1,17 @@ +class RedisClusterClient { + constructor(frontendPool) { + this.frontendPool = frontendPool; + } + + getSocketConfig(channel) { + return this.frontendPool.getFrontends(channel).then(result => { + if (!Array.isArray(result)) { + result = []; + } + + return { servers: result }; + }); + } +} + +export { RedisClusterClient }; diff --git a/src/server.js b/src/server.js index 6dfca796..1381cdab 100644 --- a/src/server.js +++ b/src/server.js @@ -46,6 +46,8 @@ import LocalChannelIndex from './web/localchannelindex'; import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; +import { RedisClusterClient } from './io/cluster/redisclusterclient'; +import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; import session from './session'; var Server = function () { @@ -64,10 +66,16 @@ var Server = function () { self.db.init(); ChannelStore.init(); + // redis init + const redis = require('redis'); + Promise.promisifyAll(redis.RedisClient.prototype); + Promise.promisifyAll(redis.Multi.prototype); + // webserver init ----------------------------------------------------- const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); - const clusterClient = new NullClusterClient(ioConfig); + const frontendPool = new FrontendPool(redis.createClient()); + const clusterClient = new RedisClusterClient(frontendPool); const channelIndex = new LocalChannelIndex(); self.express = express(); require("./web/webserver").init(self.express, @@ -140,9 +148,6 @@ var Server = function () { return '127.0.0.1'; } }; - const redis = require('redis'); - Promise.promisifyAll(redis.RedisClient.prototype); - Promise.promisifyAll(redis.Multi.prototype); const backend = new IOBackend(listenerConfig, sioEmitter, redis.createClient()); // background tasks init ---------------------------------------------- From 50124c8a4553948f14bfd8916800af71844d675f Mon Sep 17 00:00:00 2001 From: calzoneman Date: Thu, 4 Feb 2016 21:43:20 -0800 Subject: [PATCH 12/12] Refactor backend initialization --- src/backend/backendconfiguration.js | 23 ++++++ src/backend/backendmodule.js | 74 +++++++++++++++++++ src/{io => }/backend/iobackend.js | 10 +-- src/{io => }/backend/proxiedsocket.js | 7 +- .../proxyinterceptor.js} | 22 ++++-- src/legacymodule.js | 13 ++++ src/server.js | 36 ++++----- 7 files changed, 148 insertions(+), 37 deletions(-) create mode 100644 src/backend/backendconfiguration.js create mode 100644 src/backend/backendmodule.js rename src/{io => }/backend/iobackend.js (83%) rename src/{io => }/backend/proxiedsocket.js (83%) rename src/{io/backend/frontendmanager.js => backend/proxyinterceptor.js} (81%) create mode 100644 src/legacymodule.js diff --git a/src/backend/backendconfiguration.js b/src/backend/backendconfiguration.js new file mode 100644 index 00000000..2883df42 --- /dev/null +++ b/src/backend/backendconfiguration.js @@ -0,0 +1,23 @@ +class BackendConfiguration { + constructor(config) { + this.config = config; + } + + getRedisConfig() { + return this.config.redis; + } + + getListenerConfig() { + return this.config.proxy.listeners.map(listener => ({ + getHost() { + return listener.host; + }, + + getPort() { + return listener.port; + } + })); + } +} + +export { BackendConfiguration }; diff --git a/src/backend/backendmodule.js b/src/backend/backendmodule.js new file mode 100644 index 00000000..dcca65ea --- /dev/null +++ b/src/backend/backendmodule.js @@ -0,0 +1,74 @@ +import { RedisClusterClient } from '../io/cluster/redisclusterclient'; +import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; +import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; +import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; +import path from 'path'; +import { BackendConfiguration } from './backendconfiguration'; +import logger from 'cytube-common/lib/logger'; +import redisAdapter from 'socket.io-redis'; + +const BACKEND_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'backend.toml'); + +class BackendModule { + constructor() { + this.initConfig(); + } + + initConfig() { + try { + this.backendConfig = loadFromToml(BackendConfiguration, BACKEND_CONFIG_PATH); + } catch (error) { + if (typeof error.line !== 'undefined') { + logger.error(`Error in configuration file: ${error} (line ${error.line})`); + } else { + logger.error(`Error loading configuration: ${error.stack}`); + } + + process.exit(1); + } + } + + onReady() { + const redisClientProvider = this.getRedisClientProvider(); + this.redisAdapter = redisAdapter({ + pubClient: redisClientProvider.get(), + subClient: redisClientProvider.get() + }); + this.sioEmitter = require('socket.io').instance; + this.sioEmitter.adapter(this.redisAdapter); + const IOBackend = require('./iobackend'); + this.ioBackend = new IOBackend( + this.backendConfig.getListenerConfig()[0], + this.sioEmitter, + redisClientProvider.get() + ) + } + + getFrontendPool() { + if (!this.frontendPool) { + this.frontendPool = new FrontendPool(this.getRedisClientProvider().get()); + } + + return this.frontendPool; + } + + getRedisClientProvider() { + if (!this.redisClientProvider) { + this.redisClientProvider = new RedisClientProvider( + this.backendConfig.getRedisConfig() + ); + } + + return this.redisClientProvider; + } + + getClusterClient() { + if (!this.redisClusterClient) { + this.redisClusterClient = new RedisClusterClient(this.getFrontendPool()); + } + + return this.redisClusterClient; + } +} + +export { BackendModule } diff --git a/src/io/backend/iobackend.js b/src/backend/iobackend.js similarity index 83% rename from src/io/backend/iobackend.js rename to src/backend/iobackend.js index a3371d02..2bb40fe6 100644 --- a/src/io/backend/iobackend.js +++ b/src/backend/iobackend.js @@ -1,5 +1,5 @@ import Server from 'cytube-common/lib/proxy/server'; -import FrontendManager from './frontendmanager'; +import ProxyInterceptor from './proxyinterceptor'; import uuid from 'uuid'; import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; import JSONProtocol from 'cytube-common/lib/proxy/protocol'; @@ -13,19 +13,19 @@ export default class IOBackend { this.socketEmitter = socketEmitter; this.poolRedisClient = poolRedisClient; this.protocol = new JSONProtocol(); - this.initFrontendManager(); + this.initProxyInterceptor(); this.initProxyListener(); this.initBackendPoolUpdater(); } - initFrontendManager() { - this.frontendManager = new FrontendManager(this.socketEmitter); + initProxyInterceptor() { + this.proxyInterceptor = new ProxyInterceptor(this.socketEmitter); } initProxyListener() { this.proxyListener = new Server(this.proxyListenerConfig, this.protocol); this.proxyListener.on('connection', - this.frontendManager.onConnection.bind(this.frontendManager)); + this.proxyInterceptor.onConnection.bind(this.proxyInterceptor)); } initBackendPoolUpdater() { diff --git a/src/io/backend/proxiedsocket.js b/src/backend/proxiedsocket.js similarity index 83% rename from src/io/backend/proxiedsocket.js rename to src/backend/proxiedsocket.js index 67202b3e..24805779 100644 --- a/src/io/backend/proxiedsocket.js +++ b/src/backend/proxiedsocket.js @@ -1,3 +1,4 @@ +import logger from 'cytube-common/lib/logger'; import { EventEmitter } from 'events'; export default class ProxiedSocket extends EventEmitter { @@ -22,7 +23,11 @@ export default class ProxiedSocket extends EventEmitter { } onProxiedEventReceived() { - EventEmitter.prototype.emit.apply(this, arguments); + try { + EventEmitter.prototype.emit.apply(this, arguments); + } catch (error) { + logger.error(`Emit failed: ${error.stack}`); + } } join(channel) { diff --git a/src/io/backend/frontendmanager.js b/src/backend/proxyinterceptor.js similarity index 81% rename from src/io/backend/frontendmanager.js rename to src/backend/proxyinterceptor.js index 92fdc341..2f67e0fe 100644 --- a/src/io/backend/frontendmanager.js +++ b/src/backend/proxyinterceptor.js @@ -1,18 +1,23 @@ import logger from 'cytube-common/lib/logger'; -import ioServer from '../ioserver'; +import ioServer from '../io/ioserver'; import ProxiedSocket from './proxiedsocket'; -export default class FrontendManager { +export default class ProxyInterceptor { constructor(socketEmitter) { this.socketEmitter = socketEmitter; this.frontendConnections = {}; this.frontendProxiedSockets = {}; } + /** + * Handle a new frontend proxy connection. + * + * @param {Connection} socket frontend proxy connection + */ onConnection(socket) { if (this.frontendConnections.hasOwnProperty(socket.endpoint)) { - // TODO: do some validation, maybe check if the socket is still connected? - throw new Error(); + logger.error(`Duplicate frontend connection: ${socket.endpoint}`); + return; } this.frontendConnections[socket.endpoint] = socket; @@ -48,8 +53,8 @@ export default class FrontendManager { if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { this.frontendProxiedSockets[mapKey] = {}; } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) { - // TODO: Handle this gracefully - throw new Error(); + logger.error(`Duplicate SocketConnectEvent for ${socketID}`); + return; } this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket; @@ -60,8 +65,9 @@ export default class FrontendManager { const mapKey = frontendConnection.endpoint; const socketMap = this.frontendProxiedSockets[mapKey]; if (!socketMap || !socketMap.hasOwnProperty(socketID)) { - // TODO - throw new Error(); + logger.error(`Received SocketFrameEvent for nonexistent socket`, + { socketID, event }); + return; } const socket = socketMap[socketID]; diff --git a/src/legacymodule.js b/src/legacymodule.js new file mode 100644 index 00000000..e2d18f3f --- /dev/null +++ b/src/legacymodule.js @@ -0,0 +1,13 @@ +import NullClusterClient from './io/cluster/nullclusterclient'; + +class LegacyModule { + getClusterClient() { + return new NullClusterClient(); + } + + onReady() { + + } +} + +export { LegacyModule }; diff --git a/src/server.js b/src/server.js index 1381cdab..03886150 100644 --- a/src/server.js +++ b/src/server.js @@ -46,9 +46,9 @@ import LocalChannelIndex from './web/localchannelindex'; import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; -import { RedisClusterClient } from './io/cluster/redisclusterclient'; -import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; import session from './session'; +import { BackendModule } from './backend/backendmodule'; +import { LegacyModule } from './legacymodule'; var Server = function () { var self = this; @@ -60,22 +60,24 @@ var Server = function () { self.infogetter = null; self.servers = {}; + // backend init + var initModule; + if (true) { + initModule = new BackendModule(); + } else { + initModule = new LegacyModule(); + } + // database init ------------------------------------------------------ var Database = require("./database"); self.db = Database; self.db.init(); ChannelStore.init(); - // redis init - const redis = require('redis'); - Promise.promisifyAll(redis.RedisClient.prototype); - Promise.promisifyAll(redis.Multi.prototype); - // webserver init ----------------------------------------------------- const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); - const frontendPool = new FrontendPool(redis.createClient()); - const clusterClient = new RedisClusterClient(frontendPool); + const clusterClient = initModule.getClusterClient(); const channelIndex = new LocalChannelIndex(); self.express = express(); require("./web/webserver").init(self.express, @@ -135,26 +137,14 @@ var Server = function () { }); require("./io/ioserver").init(self, webConfig); - const redisAdapter = require('socket.io-redis'); - const IOBackend = require('./io/backend/iobackend'); - const sioEmitter = require("socket.io").instance; - sioEmitter.adapter(redisAdapter()); - const listenerConfig = { - getPort: function () { - return 3071; - }, - - getHost: function () { - return '127.0.0.1'; - } - }; - const backend = new IOBackend(listenerConfig, sioEmitter, redis.createClient()); // background tasks init ---------------------------------------------- require("./bgtask")(self); // setuid require("./setuid"); + + initModule.onReady(); }; Server.prototype.getHTTPIP = function (req) {