diff --git a/core/database.js b/core/database.js index 3baf682a..5029d7e1 100644 --- a/core/database.js +++ b/core/database.js @@ -169,17 +169,19 @@ function createMessageBaseTables() { 'END;' ); + // :TODO: need SQL to ensure cleaned up if delete from message? dbs.message.run( 'CREATE TABLE IF NOT EXISTS message_meta (' + ' message_id INTEGER NOT NULL,' + ' meta_category INTEGER NOT NULL,' + ' meta_name VARCHAR NOT NULL,' + ' meta_value VARCHAR NOT NULL,' + - ' UNIQUE(message_id, meta_category, meta_name, meta_value),' + // why unique here? + ' UNIQUE(message_id, meta_category, meta_name, meta_value),' + // :TODO:why unique here? ' FOREIGN KEY(message_id) REFERENCES message(message_id)' + ');' ); + // :TODO: need SQL to ensure cleaned up if delete from message? dbs.message.run( 'CREATE TABLE IF NOT EXISTS hash_tag (' + ' hash_tag_id INTEGER PRIMARY KEY,' + @@ -188,6 +190,7 @@ function createMessageBaseTables() { ');' ); + // :TODO: need SQL to ensure cleaned up if delete from message? dbs.message.run( 'CREATE TABLE IF NOT EXISTS message_hash_tag (' + ' hash_tag_id INTEGER NOT NULL,' + diff --git a/core/door.js b/core/door.js index 2cca1151..2862be4b 100644 --- a/core/door.js +++ b/core/door.js @@ -1,24 +1,24 @@ /* jslint node: true */ 'use strict'; -var spawn = require('child_process').spawn; -var events = require('events'); +const events = require('events'); -var _ = require('lodash'); -var pty = require('ptyw.js'); -var decode = require('iconv-lite').decode; -var net = require('net'); -var async = require('async'); +const _ = require('lodash'); +const pty = require('ptyw.js'); +const decode = require('iconv-lite').decode; +const createServer = require('net').createServer; exports.Door = Door; function Door(client, exeInfo) { events.EventEmitter.call(this); - this.client = client; - this.exeInfo = exeInfo; - + const self = this; + this.client = client; + this.exeInfo = exeInfo; this.exeInfo.encoding = this.exeInfo.encoding || 'cp437'; + this.exeInfo.encoding = this.exeInfo.encoding.toLowerCase(); + let restored = false; // // Members of exeInfo: @@ -31,25 +31,17 @@ function Door(client, exeInfo) { // dropFile // node // inhSocket - // -} + // -require('util').inherits(Door, events.EventEmitter); - - - -Door.prototype.run = function() { - - var self = this; - - var doorData = function(data) { - // :TODO: skip decoding if we have a match, e.g. cp437 === cp437 - self.client.term.write(decode(data, self.exeInfo.encoding)); + this.doorDataHandler = function(data) { + if(self.client.term.outputEncoding === self.exeInfo.encoding) { + self.client.term.rawWrite(data); + } else { + self.client.term.write(decode(data, self.exeInfo.encoding)); + } }; - var restored = false; - - var restore = function(piped) { + this.restoreIo = function(piped) { if(!restored && self.client.term.output) { self.client.term.output.unpipe(piped); self.client.term.output.resume(); @@ -57,100 +49,98 @@ Door.prototype.run = function() { } }; - var sockServer; + this.prepareSocketIoServer = function(cb) { + if('socket' === self.exeInfo.io) { + const sockServer = createServer(conn => { - async.series( - [ - function prepareServer(callback) { - if('socket' === self.exeInfo.io) { - sockServer = net.createServer(function connected(conn) { + sockServer.getConnections( (err, count) => { - sockServer.getConnections(function connCount(err, count) { + // We expect only one connection from our DOOR/emulator/etc. + if(!err && count <= 1) { + self.client.term.output.pipe(conn); + + conn.on('data', self.doorDataHandler); - // We expect only one connection from our DOOR/emulator/etc. - if(!err && count <= 1) { - self.client.term.output.pipe(conn); - - conn.on('data', doorData); - - conn.once('end', function ended() { - restore(conn); - }); - - conn.once('error', function error(err) { - self.client.log.info('Door socket server connection error: ' + err.message); - restore(conn); - }); - } + conn.once('end', () => { + return self.restoreIo(conn); }); - }); - sockServer.listen(0, function listening() { - callback(null); - }); - } else { - callback(null); - } - }, - function launch(callback) { - // Expand arg strings, e.g. {dropFile} -> DOOR32.SYS - var args = _.clone(self.exeInfo.args); // we need a copy so the original is not modified - - for(var i = 0; i < args.length; ++i) { - args[i] = self.exeInfo.args[i].format({ - dropFile : self.exeInfo.dropFile, - node : self.exeInfo.node.toString(), - //inhSocket : self.exeInfo.inhSocket.toString(), - srvPort : sockServer ? sockServer.address().port.toString() : '-1', - userId : self.client.user.userId.toString(), - }); - } - - var door = pty.spawn(self.exeInfo.cmd, args, { - cols : self.client.term.termWidth, - rows : self.client.term.termHeight, - // :TODO: cwd - env : self.exeInfo.env, - }); - - if('stdio' === self.exeInfo.io) { - self.client.log.debug('Using stdio for door I/O'); - - self.client.term.output.pipe(door); - - door.on('data', doorData); - - door.once('close', function closed() { - restore(door); - }); - } else if('socket' === self.exeInfo.io) { - self.client.log.debug( - { port : sockServer.address().port }, - 'Using temporary socket server for door I/O'); - } - - door.once('exit', function exited(code) { - self.client.log.info( { code : code }, 'Door exited'); - - if(sockServer) { - sockServer.close(); + conn.once('error', err => { + self.client.log.info( { error : err.toString() }, 'Door socket server connection'); + return self.restoreIo(conn); + }); } - - // we may not get a close - if('stdio' === self.exeInfo.io) { - restore(door); - } - - door.removeAllListeners(); - - self.emit('finished'); }); - } - ], - function complete(err) { - if(err) { - self.client.log.warn( { error : err.toString() }, 'Failed executing door'); - } + }); + + sockServer.listen(0, () => { + return cb(null, sockServer); + }); + } else { + return cb(null); } - ); -}; \ No newline at end of file + }; +} + +require('util').inherits(Door, events.EventEmitter); + +Door.prototype.run = function() { + const self = this; + + this.prepareSocketIoServer( (err, sockServer) => { + if(err) { + this.client.log.warn( { error : err.toString() }, 'Failed executing door'); + return self.emit('finished'); + } + + // Expand arg strings, e.g. {dropFile} -> DOOR32.SYS + let args = _.clone(self.exeInfo.args); // we need a copy so the original is not modified + + for(let i = 0; i < args.length; ++i) { + args[i] = self.exeInfo.args[i].format({ + dropFile : self.exeInfo.dropFile, + node : self.exeInfo.node.toString(), + srvPort : sockServer ? sockServer.address().port.toString() : '-1', + userId : self.client.user.userId.toString(), + }); + } + + const door = pty.spawn(self.exeInfo.cmd, args, { + cols : self.client.term.termWidth, + rows : self.client.term.termHeight, + // :TODO: cwd + env : self.exeInfo.env, + }); + + if('stdio' === self.exeInfo.io) { + self.client.log.debug('Using stdio for door I/O'); + + self.client.term.output.pipe(door); + + door.on('data', self.doorDataHandler); + + door.once('close', () => { + return self.restoreIo(door); + }); + } else if('socket' === self.exeInfo.io) { + self.client.log.debug( { port : sockServer.address().port }, 'Using temporary socket server for door I/O'); + } + + door.once('exit', exitCode => { + self.client.log.info( { exitCode : exitCode }, 'Door exited'); + + if(sockServer) { + sockServer.close(); + } + + // we may not get a close + if('stdio' === self.exeInfo.io) { + return self.restoreIo(door); + } + + door.removeAllListeners(); + + self.emit('finished'); + }); + }); +}; diff --git a/core/message_area.js b/core/message_area.js index 67bd2821..323061dc 100644 --- a/core/message_area.js +++ b/core/message_area.js @@ -1,16 +1,17 @@ /* jslint node: true */ 'use strict'; -let msgDb = require('./database.js').dbs.message; -let Config = require('./config.js').config; -let Message = require('./message.js'); -let Log = require('./logger.js').log; -let checkAcs = require('./acs_util.js').checkAcs; -let msgNetRecord = require('./msg_network.js').recordMessage; +const msgDb = require('./database.js').dbs.message; +const Config = require('./config.js').config; +const Message = require('./message.js'); +const Log = require('./logger.js').log; +const checkAcs = require('./acs_util.js').checkAcs; +const msgNetRecord = require('./msg_network.js').recordMessage; -let async = require('async'); -let _ = require('lodash'); -let assert = require('assert'); +const async = require('async'); +const _ = require('lodash'); +const assert = require('assert'); +const moment = require('moment'); exports.getAvailableMessageConferences = getAvailableMessageConferences; exports.getSortedAvailMessageConferences = getSortedAvailMessageConferences; @@ -427,8 +428,8 @@ function updateMessageAreaLastReadId(userId, areaTag, messageId, cb) { 'VALUES (?, ?, ?);', [ userId, areaTag, messageId ], function written(err) { - callback(err, true); // true=didUpdate - } + callback(err, true); // true=didUpdate + } ); } else { callback(null); @@ -441,11 +442,11 @@ function updateMessageAreaLastReadId(userId, areaTag, messageId, cb) { { error : err.toString(), userId : userId, areaTag : areaTag, messageId : messageId }, 'Failed updating area last read ID'); } else { - if(true === didUpdate) { - Log.trace( - { userId : userId, areaTag : areaTag, messageId : messageId }, - 'Area last read ID updated'); - } + if(true === didUpdate) { + Log.trace( + { userId : userId, areaTag : areaTag, messageId : messageId }, + 'Area last read ID updated'); + } } cb(err); } @@ -466,67 +467,60 @@ function persistMessage(message, cb) { ); } -function trimMessagesToMax(areaTag, maxMessages, archivePath, cb) { - async.waterfall( - [ - function getRemoteCount(callback) { - let removeCount = 0; - msgDb.get( - `SELECT COUNT(area_tag) AS msgCount - FROM message - WHERE area_tag = ?`, - [ areaTag ], - (err, row) => { - if(!err) { - if(row.msgCount >= maxMessages) { - removeCount = row.msgCount - maxMessages; - } - } - return callback(err, removeCount); - } - ); - }, - function trimMessages(removeCount, callback) { - if(0 === removeCount) { - return callback(null); - } - - if(archivePath) { - - } else { - // just delete 'em - } - } - ], - err => { - return cb(err); - } - ); -} - // method exposed for event scheduler function trimMessageAreasScheduledEvent(args, cb) { - // - // Available args: - // - archive:/path/to/archive/dir/ - // - let archivePath; - if(args) { - args.forEach(a => { - if(a.startsWith('archive:')) { - archivePath = a.split(':')[1]; + + function trimMessageAreaByMaxMessages(areaInfo, cb) { + if(0 === areaInfo.maxMessages) { + return cb(null); + } + + msgDb.run( + `DELETE FROM message + WHERE message_id IN + (SELECT message_id + FROM message + WHERE area_tag = ? + ORDER BY message_id + LIMIT (MAX(0, (SELECT COUNT() + FROM message + WHERE area_tag = ?) - ${areaInfo.maxMessages} + )) + );`, + [ areaInfo.areaTag, areaInfo.areaTag], + err => { + if(err) { + Log.warn( { areaInfo : areaInfo, error : err.toString(), type : 'maxMessages' }, 'Error trimming message area'); + } else { + Log.debug( { areaInfo : areaInfo, type : 'maxMessages' }, 'Area trimmed successfully'); + } + return cb(err); + } + ); + } + + function trimMessageAreaByMaxAgeDays(areaInfo, cb) { + if(0 === areaInfo.maxAgeDays) { + return cb(null); + } + + msgDb.run( + `DELETE FROM message + WHERE area_tag = ? AND modified_timestamp < date('now', '-${areaInfo.maxAgeDays} days');`, + [ areaInfo.areaTag ], + err => { + if(err) { + Log.warn( { areaInfo : areaInfo, error : err.toString(), type : 'maxAgeDays' }, 'Error trimming message area'); + } else { + Log.debug( { areaInfo : areaInfo, type : 'maxAgeDays' }, 'Area trimmed successfully'); + } + return cb(err); } - }); + ); } - // - // Find all area_tag's in message. We don't rely on user configurations - // in case one is no longer available. From there we can trim messages - // that meet the criteria (too old, too many, ...) and optionally archive - // them via moving them to a new DB with the same layout - // async.waterfall( - [ + [ function getAreaTags(callback) { let areaTags = []; msgDb.each( @@ -543,13 +537,16 @@ function trimMessageAreasScheduledEvent(args, cb) { } ); }, - function trimAreas(areaTags, callback) { + function prepareAreaInfo(areaTags, callback) { + let areaInfos = []; + + // determine maxMessages & maxAgeDays per area areaTags.forEach(areaTag => { let maxMessages = Config.messageAreaDefaults.maxMessages; let maxAgeDays = Config.messageAreaDefaults.maxAgeDays; - const area = getMessageAreaByTag(areaTag); // note: we don't know the conf + const area = getMessageAreaByTag(areaTag); // note: we don't know the conf here if(area) { if(area.maxMessages) { maxMessages = area.maxMessages; @@ -558,18 +555,37 @@ function trimMessageAreasScheduledEvent(args, cb) { maxAgeDays = area.maxAgeDays; } } - - if(maxMessages) { - trimMessagesToMax(areaTag, maxMessages, archivePath, err => { - }); - } - + areaInfos.push( { + areaTag : areaTag, + maxMessages : maxMessages, + maxAgeDays : maxAgeDays, + } ); }); - } - ] + + return callback(null, areaInfos); + }, + function trimAreas(areaInfos, callback) { + async.each( + areaInfos, + (areaInfo, next) => { + trimMessageAreaByMaxMessages(areaInfo, err => { + if(err) { + return next(err); + } + + trimMessageAreaByMaxAgeDays(areaInfo, err => { + return next(err); + }); + }); + }, + callback + ); + } + ], + err => { + return cb(err); + } ); - console.log('trimming messages from scheduled event') // :TODO: remove me!!! - } \ No newline at end of file