mirror of https://github.com/calzoneman/sync.git
Merge branch 'mp-backend' into 3.0
This commit is contained in:
commit
5de6be0850
|
@ -31,11 +31,14 @@
|
|||
"nodemailer": "^1.4.0",
|
||||
"oauth": "^0.9.12",
|
||||
"q": "^1.4.1",
|
||||
"redis": "^2.4.2",
|
||||
"sanitize-html": "git://github.com/calzoneman/sanitize-html",
|
||||
"serve-static": "^1.10.0",
|
||||
"socket.io": "^1.4.0",
|
||||
"socket.io-redis": "^1.0.0",
|
||||
"source-map-support": "^0.4.0",
|
||||
"status-message-polyfill": "calzoneman/status-message-polyfill",
|
||||
"uuid": "^2.0.1",
|
||||
"yamljs": "^0.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
class BackendConfiguration {
|
||||
constructor(config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
getRedisConfig() {
|
||||
return this.config.redis;
|
||||
}
|
||||
|
||||
getListenerConfig() {
|
||||
return this.config.proxy.listeners.map(listener => ({
|
||||
getHost() {
|
||||
return listener.host;
|
||||
},
|
||||
|
||||
getPort() {
|
||||
return listener.port;
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
export { BackendConfiguration };
|
|
@ -0,0 +1,74 @@
|
|||
import { RedisClusterClient } from '../io/cluster/redisclusterclient';
|
||||
import { FrontendPool } from 'cytube-common/lib/redis/frontendpool';
|
||||
import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider';
|
||||
import { loadFromToml } from 'cytube-common/lib/configuration/configloader';
|
||||
import path from 'path';
|
||||
import { BackendConfiguration } from './backendconfiguration';
|
||||
import logger from 'cytube-common/lib/logger';
|
||||
import redisAdapter from 'socket.io-redis';
|
||||
|
||||
const BACKEND_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'backend.toml');
|
||||
|
||||
class BackendModule {
|
||||
constructor() {
|
||||
this.initConfig();
|
||||
}
|
||||
|
||||
initConfig() {
|
||||
try {
|
||||
this.backendConfig = loadFromToml(BackendConfiguration, BACKEND_CONFIG_PATH);
|
||||
} catch (error) {
|
||||
if (typeof error.line !== 'undefined') {
|
||||
logger.error(`Error in configuration file: ${error} (line ${error.line})`);
|
||||
} else {
|
||||
logger.error(`Error loading configuration: ${error.stack}`);
|
||||
}
|
||||
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
onReady() {
|
||||
const redisClientProvider = this.getRedisClientProvider();
|
||||
this.redisAdapter = redisAdapter({
|
||||
pubClient: redisClientProvider.get(),
|
||||
subClient: redisClientProvider.get()
|
||||
});
|
||||
this.sioEmitter = require('socket.io').instance;
|
||||
this.sioEmitter.adapter(this.redisAdapter);
|
||||
const IOBackend = require('./iobackend');
|
||||
this.ioBackend = new IOBackend(
|
||||
this.backendConfig.getListenerConfig()[0],
|
||||
this.sioEmitter,
|
||||
redisClientProvider.get()
|
||||
)
|
||||
}
|
||||
|
||||
getFrontendPool() {
|
||||
if (!this.frontendPool) {
|
||||
this.frontendPool = new FrontendPool(this.getRedisClientProvider().get());
|
||||
}
|
||||
|
||||
return this.frontendPool;
|
||||
}
|
||||
|
||||
getRedisClientProvider() {
|
||||
if (!this.redisClientProvider) {
|
||||
this.redisClientProvider = new RedisClientProvider(
|
||||
this.backendConfig.getRedisConfig()
|
||||
);
|
||||
}
|
||||
|
||||
return this.redisClientProvider;
|
||||
}
|
||||
|
||||
getClusterClient() {
|
||||
if (!this.redisClusterClient) {
|
||||
this.redisClusterClient = new RedisClusterClient(this.getFrontendPool());
|
||||
}
|
||||
|
||||
return this.redisClusterClient;
|
||||
}
|
||||
}
|
||||
|
||||
export { BackendModule }
|
|
@ -0,0 +1,45 @@
|
|||
import Server from 'cytube-common/lib/proxy/server';
|
||||
import ProxyInterceptor from './proxyinterceptor';
|
||||
import uuid from 'uuid';
|
||||
import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater';
|
||||
import JSONProtocol from 'cytube-common/lib/proxy/protocol';
|
||||
import { formatProxyAddress } from 'cytube-common/lib/util/addressutil';
|
||||
|
||||
const BACKEND_POOL = 'backend-hosts';
|
||||
|
||||
export default class IOBackend {
|
||||
constructor(proxyListenerConfig, socketEmitter, poolRedisClient) {
|
||||
this.proxyListenerConfig = proxyListenerConfig;
|
||||
this.socketEmitter = socketEmitter;
|
||||
this.poolRedisClient = poolRedisClient;
|
||||
this.protocol = new JSONProtocol();
|
||||
this.initProxyInterceptor();
|
||||
this.initProxyListener();
|
||||
this.initBackendPoolUpdater();
|
||||
}
|
||||
|
||||
initProxyInterceptor() {
|
||||
this.proxyInterceptor = new ProxyInterceptor(this.socketEmitter);
|
||||
}
|
||||
|
||||
initProxyListener() {
|
||||
this.proxyListener = new Server(this.proxyListenerConfig, this.protocol);
|
||||
this.proxyListener.on('connection',
|
||||
this.proxyInterceptor.onConnection.bind(this.proxyInterceptor));
|
||||
}
|
||||
|
||||
initBackendPoolUpdater() {
|
||||
const hostname = this.proxyListenerConfig.getHost();
|
||||
const port = this.proxyListenerConfig.getPort();
|
||||
const entry = {
|
||||
address: formatProxyAddress(hostname, port)
|
||||
}
|
||||
this.poolEntryUpdater = new PoolEntryUpdater(
|
||||
this.poolRedisClient,
|
||||
BACKEND_POOL,
|
||||
uuid.v4(),
|
||||
entry
|
||||
);
|
||||
this.poolEntryUpdater.start();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
import logger from 'cytube-common/lib/logger';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
export default class ProxiedSocket extends EventEmitter {
|
||||
constructor(socketID, socketIP, socketUser, socketEmitter, frontendConnection) {
|
||||
super();
|
||||
this.id = socketID;
|
||||
this.ip = socketIP;
|
||||
this._realip = socketIP;
|
||||
if (socketUser) {
|
||||
this.user = {
|
||||
name: socketUser.name,
|
||||
global_rank: socketUser.globalRank
|
||||
};
|
||||
}
|
||||
this.socketEmitter = socketEmitter;
|
||||
this.frontendConnection = frontendConnection;
|
||||
}
|
||||
|
||||
emit() {
|
||||
const target = this.socketEmitter.to(this.id);
|
||||
target.emit.apply(target, arguments);
|
||||
}
|
||||
|
||||
onProxiedEventReceived() {
|
||||
try {
|
||||
EventEmitter.prototype.emit.apply(this, arguments);
|
||||
} catch (error) {
|
||||
logger.error(`Emit failed: ${error.stack}`);
|
||||
}
|
||||
}
|
||||
|
||||
join(channel) {
|
||||
this.frontendConnection.write(
|
||||
this.frontendConnection.protocol.newSocketJoinRoomsEvent(
|
||||
this.id, [channel]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.frontendConnection.write(
|
||||
this.frontendConnection.protocol.newSocketKickEvent(this.id)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
import logger from 'cytube-common/lib/logger';
|
||||
import ioServer from '../io/ioserver';
|
||||
import ProxiedSocket from './proxiedsocket';
|
||||
|
||||
export default class ProxyInterceptor {
|
||||
constructor(socketEmitter) {
|
||||
this.socketEmitter = socketEmitter;
|
||||
this.frontendConnections = {};
|
||||
this.frontendProxiedSockets = {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a new frontend proxy connection.
|
||||
*
|
||||
* @param {Connection} socket frontend proxy connection
|
||||
*/
|
||||
onConnection(socket) {
|
||||
if (this.frontendConnections.hasOwnProperty(socket.endpoint)) {
|
||||
logger.error(`Duplicate frontend connection: ${socket.endpoint}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.frontendConnections[socket.endpoint] = socket;
|
||||
socket.on('close', this.onFrontendDisconnect.bind(this, socket));
|
||||
socket.on('SocketConnectEvent', this.onSocketConnect.bind(this, socket));
|
||||
socket.on('SocketFrameEvent', this.onSocketFrame.bind(this, socket));
|
||||
}
|
||||
|
||||
onFrontendDisconnect(socket) {
|
||||
const endpoint = socket.endpoint;
|
||||
if (this.frontendConnections.hasOwnProperty(endpoint)) {
|
||||
if (this.frontendProxiedSockets.hasOwnProperty(endpoint)) {
|
||||
logger.warn(`Frontend ${endpoint} disconnected`);
|
||||
for (const key in this.frontendProxiedSockets[endpoint]) {
|
||||
const proxySocket = this.frontendProxiedSockets[endpoint][key];
|
||||
proxySocket.onProxiedEventReceived('disconnect');
|
||||
}
|
||||
delete this.frontendProxiedSockets[endpoint];
|
||||
}
|
||||
delete this.frontendConnections[endpoint];
|
||||
}
|
||||
}
|
||||
|
||||
onSocketConnect(frontendConnection, socketID, socketIP, socketUser) {
|
||||
const mapKey = frontendConnection.endpoint;
|
||||
const proxiedSocket = new ProxiedSocket(
|
||||
socketID,
|
||||
socketIP,
|
||||
socketUser,
|
||||
this.socketEmitter,
|
||||
frontendConnection);
|
||||
|
||||
if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) {
|
||||
this.frontendProxiedSockets[mapKey] = {};
|
||||
} else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) {
|
||||
logger.error(`Duplicate SocketConnectEvent for ${socketID}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket;
|
||||
ioServer.handleConnection(proxiedSocket);
|
||||
}
|
||||
|
||||
onSocketFrame(frontendConnection, socketID, event, args) {
|
||||
const mapKey = frontendConnection.endpoint;
|
||||
const socketMap = this.frontendProxiedSockets[mapKey];
|
||||
if (!socketMap || !socketMap.hasOwnProperty(socketID)) {
|
||||
logger.error(`Received SocketFrameEvent for nonexistent socket`,
|
||||
{ socketID, event });
|
||||
return;
|
||||
}
|
||||
|
||||
const socket = socketMap[socketID];
|
||||
socket.onProxiedEventReceived.apply(socket, [event].concat(args));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
class RedisClusterClient {
|
||||
constructor(frontendPool) {
|
||||
this.frontendPool = frontendPool;
|
||||
}
|
||||
|
||||
getSocketConfig(channel) {
|
||||
return this.frontendPool.getFrontends(channel).then(result => {
|
||||
if (!Array.isArray(result)) {
|
||||
result = [];
|
||||
}
|
||||
|
||||
return { servers: result };
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export { RedisClusterClient };
|
|
@ -276,7 +276,9 @@ module.exports = {
|
|||
|
||||
bound[id] = null;
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
handleConnection: handleConnection
|
||||
};
|
||||
|
||||
/* Clean out old rate limiters */
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
import NullClusterClient from './io/cluster/nullclusterclient';
|
||||
|
||||
class LegacyModule {
|
||||
getClusterClient() {
|
||||
return new NullClusterClient();
|
||||
}
|
||||
|
||||
onReady() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
export { LegacyModule };
|
|
@ -47,6 +47,8 @@ import IOConfiguration from './configuration/ioconfig';
|
|||
import WebConfiguration from './configuration/webconfig';
|
||||
import NullClusterClient from './io/cluster/nullclusterclient';
|
||||
import session from './session';
|
||||
import { BackendModule } from './backend/backendmodule';
|
||||
import { LegacyModule } from './legacymodule';
|
||||
|
||||
var Server = function () {
|
||||
var self = this;
|
||||
|
@ -58,6 +60,14 @@ var Server = function () {
|
|||
self.infogetter = null;
|
||||
self.servers = {};
|
||||
|
||||
// backend init
|
||||
var initModule;
|
||||
if (true) {
|
||||
initModule = new BackendModule();
|
||||
} else {
|
||||
initModule = new LegacyModule();
|
||||
}
|
||||
|
||||
// database init ------------------------------------------------------
|
||||
var Database = require("./database");
|
||||
self.db = Database;
|
||||
|
@ -67,7 +77,7 @@ var Server = function () {
|
|||
// webserver init -----------------------------------------------------
|
||||
const ioConfig = IOConfiguration.fromOldConfig(Config);
|
||||
const webConfig = WebConfiguration.fromOldConfig(Config);
|
||||
const clusterClient = new NullClusterClient(ioConfig);
|
||||
const clusterClient = initModule.getClusterClient();
|
||||
const channelIndex = new LocalChannelIndex();
|
||||
self.express = express();
|
||||
require("./web/webserver").init(self.express,
|
||||
|
@ -133,6 +143,8 @@ var Server = function () {
|
|||
|
||||
// setuid
|
||||
require("./setuid");
|
||||
|
||||
initModule.onReady();
|
||||
};
|
||||
|
||||
Server.prototype.getHTTPIP = function (req) {
|
||||
|
|
Loading…
Reference in New Issue