From 593bf67b45a7bf45bd2db19b850d3e69a79a17ef Mon Sep 17 00:00:00 2001 From: David Stephens Date: Fri, 31 May 2019 01:16:32 +0100 Subject: [PATCH] Stop passing sockets all over the place. General tidy up and refactor. Add reconnection to MRC logic. --- core/servers/chat/mrc_multiplexer.js | 170 ++++++++++++++++++--------- 1 file changed, 113 insertions(+), 57 deletions(-) diff --git a/core/servers/chat/mrc_multiplexer.js b/core/servers/chat/mrc_multiplexer.js index 7eaac22f..e0ef4a54 100644 --- a/core/servers/chat/mrc_multiplexer.js +++ b/core/servers/chat/mrc_multiplexer.js @@ -16,7 +16,8 @@ const os = require('os'); // MRC -const PROTOCOL_VERSION = '1.2.9'; +const protocolVersion = '1.2.9'; +const lineDelimiter = new RegExp('\r\n|\r|\n'); const ModuleInfo = exports.moduleInfo = { name : 'MRC', @@ -27,62 +28,130 @@ const ModuleInfo = exports.moduleInfo = { }; const connectedSockets = new Set(); -let mrcCentralConnection = ''; exports.getModule = class MrcModule extends ServerModule { constructor() { super(); - this.log = Log.child( { server : 'MRC' } ); + + this.log = Log.child( { server : 'MRC' } ); + this.config = Config(); + this.mrcConnectOpts = { + host : this.config.chatServers.mrc.serverHostname || 'mrc.bottomlessabyss.net', + port : this.config.chatServers.mrc.serverPort || 5000, + retryDelay : this.config.chatServers.mrc.retryDelay || 10000 + }; } - createServer(cb) { - if (!this.enabled) { - return cb(null); - } - - const self = this; - + _connectionHandler() { const config = Config(); const boardName = config.general.prettyBoardName || config.general.boardName; const enigmaVersion = 'ENiGMA-BBS_' + require('../../../package.json').version; - const mrcConnectOpts = { - host : config.chatServers.mrc.serverHostname || 'mrc.bottomlessabyss.net', - port : config.chatServers.mrc.serverPort || 5000 - }; - - const handshake = `${boardName}~${enigmaVersion}/${os.platform()}-${os.arch()}/${PROTOCOL_VERSION}`; + const handshake = `${boardName}~${enigmaVersion}/${os.platform()}-${os.arch()}/${protocolVersion}`; this.log.debug({ handshake : handshake }, 'Handshaking with MRC server'); + this.mrcClient.write(handshake); + this.log.info(this.mrcConnectOpts, 'Connected to MRC server'); + } + + createServer(cb) { + + if (!this.enabled) { + return cb(null); + } + + this.connectToMrc(); + this.createLocalListener(); + + return cb(null); + } + + listen(cb) { + if (!this.enabled) { + return cb(null); + } + + const config = Config(); + + const port = parseInt(config.chatServers.mrc.multiplexerPort); + if(isNaN(port)) { + this.log.warn( { port : config.chatServers.mrc.multiplexerPort, server : ModuleInfo.name }, 'Invalid port' ); + return cb(Errors.Invalid(`Invalid port: ${config.chatServers.mrc.multiplexerPort}`)); + } + Log.info( { server : ModuleInfo.name, port : config.chatServers.mrc.multiplexerPort }, 'MRC multiplexer local listener starting up'); + return this.server.listen(port, cb); + } + + /** + * Handles connecting to to the MRC server + */ + connectToMrc() { + const self = this; + // create connection to MRC server - this.mrcClient = net.createConnection(mrcConnectOpts, () => { - this.mrcClient.write(handshake); - this.log.info(mrcConnectOpts, 'Connected to MRC server'); - mrcCentralConnection = this.mrcClient; - }); + this.mrcClient = net.createConnection(this.mrcConnectOpts, self._connectionHandler.bind(self)); + + this.mrcClient.requestedDisconnect = false; // do things when we get data from MRC central - this.mrcClient.on('data', (data) => { - // split on \n to deal with getting messages in batches - data.toString().split('\n').forEach( item => { - if (item == '') return; + var buffer = new Buffer.from(''); - this.log.debug( { data : item } , 'Received data'); - let message = this.parseMessage(item); - this.log.debug(message, 'Parsed data'); - if (message) { - this.receiveFromMRC(this.mrcClient, message); + function handleData(chunk) { + if (typeof (chunk) === 'string') { + buffer += chunk; + } else { + buffer = Buffer.concat([buffer, chunk]); + } + + var lines = buffer.toString().split(lineDelimiter); + + if (lines.pop()) { + // if buffer is not ended with \r\n, there's more chunks. + return; + } else { + // else, initialize the buffer. + buffer = new Buffer.from(''); + } + + lines.forEach(function iterator(line) { + if (line.length) { + let message = self.parseMessage(line); + if (message) { + self.receiveFromMRC(message); + } } }); + } + + this.mrcClient.on('data', (data) => { + handleData(data); }); this.mrcClient.on('end', () => { - this.log.info(mrcConnectOpts, 'Disconnected from MRC server'); + this.log.info(this.mrcConnectOpts, 'Disconnected from MRC server'); + }); + + this.mrcClient.on('close', () => { + + if (this.mrcClient && this.mrcClient.requestedDisconnect) + return; + this.log.info(this.mrcConnectOpts, 'Disconnected from MRC server, reconnecting'); + + this.log.debug('Waiting ' + this.mrcConnectOpts.retryDelay + 'ms before retrying'); + + + setTimeout(function() { + self.connectToMrc(); + }, this.mrcConnectOpts.retryDelay); }); this.mrcClient.on('error', err => { this.log.info( { error : err.message }, 'MRC server error'); }); + } + + createLocalListener() { + const self = this; // start a local server for clients to connect to this.server = net.createServer( function(socket) { @@ -90,7 +159,7 @@ exports.getModule = class MrcModule extends ServerModule { socket.on('data', data => { // split on \n to deal with getting messages in batches - data.toString().split('\n').forEach( item => { + data.toString().split(lineDelimiter).forEach( item => { if (item == '') return; // save username with socket @@ -114,24 +183,6 @@ exports.getModule = class MrcModule extends ServerModule { } }); }); - - return cb(null); - } - - listen(cb) { - if (!this.enabled) { - return cb(null); - } - - const config = Config(); - - const port = parseInt(config.chatServers.mrc.multiplexerPort); - if(isNaN(port)) { - this.log.warn( { port : config.chatServers.mrc.multiplexerPort, server : ModuleInfo.name }, 'Invalid port' ); - return cb(Errors.Invalid(`Invalid port: ${config.chatServers.mrc.multiplexerPort}`)); - } - Log.info( { server : ModuleInfo.name, port : config.chatServers.mrc.multiplexerPort }, 'MRC multiplexer local listener starting up'); - return this.server.listen(port, cb); } get enabled() { @@ -158,21 +209,21 @@ exports.getModule = class MrcModule extends ServerModule { } /** - * Processes messages received // split raw data received into an object we can work withfrom the central MRC server + * Processes messages received from the central MRC server */ - receiveFromMRC(socket, message) { + receiveFromMRC(message) { const config = Config(); const siteName = slugify(config.general.boardName); if (message.from_user == 'SERVER' && message.body == 'HELLO') { // reply with extra bbs info - this.sendToMrcServer(socket, 'CLIENT', '', 'SERVER', 'ALL', '', `INFOSYS:${StatLog.getSystemStat(SysProps.SysOpUsername)}`); + this.sendToMrcServer('CLIENT', '', 'SERVER', 'ALL', '', `INFOSYS:${StatLog.getSystemStat(SysProps.SysOpUsername)}`); } else if (message.from_user == 'SERVER' && message.body.toUpperCase() == 'PING') { // reply to heartbeat // this.log.debug('Respond to heartbeat'); - this.sendToMrcServer(socket, 'CLIENT', '', 'SERVER', 'ALL', '', `IMALIVE:${siteName}`); + this.sendToMrcServer('CLIENT', '', 'SERVER', 'ALL', '', `IMALIVE:${siteName}`); } else { // if not a heartbeat, and we have clients then we need to send something to them @@ -210,13 +261,13 @@ exports.getModule = class MrcModule extends ServerModule { Log.debug({ server : 'MRC', user : username, message : message }, 'Dodgy message received from client'); } - this.sendToMrcServer(mrcCentralConnection, message.from_user, message.from_room, message.to_user, message.to_site, message.to_room, message.body); + this.sendToMrcServer(message.from_user, message.from_room, message.to_user, message.to_site, message.to_room, message.body); } /** * Converts a message back into the MRC format and sends it to the central MRC server */ - sendToMrcServer(socket, fromUser, fromRoom, toUser, toSite, toRoom, messageBody) { + sendToMrcServer(fromUser, fromRoom, toUser, toSite, toRoom, messageBody) { const config = Config(); const siteName = slugify(config.general.boardName); @@ -231,7 +282,12 @@ exports.getModule = class MrcModule extends ServerModule { ].join('~') + '~'; Log.debug({ server : 'MRC', data : line }, 'Sending data'); - return socket.write(line + '\n'); + this.sendRaw(line); + } + + sendRaw(message) { + // optionally log messages here + this.mrcClient.write(message + '\r\n'); } };