From 76bbc43600f1d73837c1ece274d75beed70f4c7f Mon Sep 17 00:00:00 2001 From: Bryan Ashby Date: Sun, 28 Feb 2016 22:04:03 -0700 Subject: [PATCH] * Start work on FTN/BSO schedule via later.js * Utilize last scan message ID to scan areas * Lots of changes to FTN packet creation * Create packets with target max size * Create ArcMail bundles when configured to do so --- core/archive_util.js | 13 +- core/config.js | 12 +- core/database.js | 10 + core/ftn_mail_packet.js | 155 ++++++++++++ core/ftn_util.js | 6 +- core/scanner_tossers/ftn_bso.js | 423 ++++++++++++++++++++++++++++++-- package.json | 3 +- 7 files changed, 595 insertions(+), 27 deletions(-) diff --git a/core/archive_util.js b/core/archive_util.js index ba7d3c4e..cbae6d13 100644 --- a/core/archive_util.js +++ b/core/archive_util.js @@ -50,6 +50,15 @@ module.exports = class ArchiveUtil { }); } } + + haveArchiver(archType) { + if(!archType) { + return false; + } + + archType = archType.toLowerCase(); + return archType in this.archivers; + } detectType(path, cb) { fs.open(path, 'r', (err, fd) => { @@ -83,7 +92,9 @@ module.exports = class ArchiveUtil { } compressTo(archType, archivePath, files, cb) { + archType = archType.toLowerCase(); const archiver = this.archivers[archType]; + if(!archiver) { cb(new Error('Unknown archive type: ' + archType)); return; @@ -104,7 +115,7 @@ module.exports = class ArchiveUtil { }); comp.on('exit', exitCode => { - cb(0 === exitCode ? null : new Error('Compression failed with exit code: ' + exitCode)); + cb(exitCode ? new Error('Compression failed with exit code: ' + exitCode) : null); }); } diff --git a/core/config.js b/core/config.js index 76521f97..47b0003d 100644 --- a/core/config.js +++ b/core/config.js @@ -211,7 +211,7 @@ function getDefaultConfig() { archivers : { zip : { - name : "PKZip", + name : "PKZip", // :TODO: Use key for this sig : "504b0304", offset : 0, compressCmd : "7z", @@ -246,10 +246,16 @@ function getDefaultConfig() { outbound : paths.join(__dirname, './../mail/ftn_out/'), inbound : paths.join(__dirname, './../mail/ftn_in/'), secInbound : paths.join(__dirname, './../mail/ftn_secin/'), + temp : paths.join(__dirname, './../mail/ftn_temp'), }, - maxPacketByteSize : 512000, // 512k, before placing messages in a new pkt - maxBundleByteSize : 2048000, // 2M, before creating another archive + // + // Packet and (ArcMail) bundle target sizes are just that: targets. + // Actual sizes may be slightly larger when we must place a full + // PKT contents *somewhere* + // + packetTargetByteSize : 512000, // 512k, before placing messages in a new pkt + bundleTargetByteSize : 2048000, // 2M, before creating another archive } }, diff --git a/core/database.js b/core/database.js index cb1a97f4..3e3b6faf 100644 --- a/core/database.js +++ b/core/database.js @@ -204,6 +204,7 @@ function createMessageBaseTables() { ');' ); + // :TODO: Not currently used dbs.message.run( 'CREATE TABLE IF NOT EXISTS user_message_status (' + ' user_id INTEGER NOT NULL,' + @@ -213,6 +214,15 @@ function createMessageBaseTables() { ' FOREIGN KEY(user_id) REFERENCES user(id)' + ');' ); + + dbs.message.run( + `CREATE TABLE IF NOT EXISTS message_area_last_scan ( + scan_toss VARCHAR NOT NULL, + area_tag VARCHAR NOT NULL, + message_id INTEGER NOT NULL, + UNIQUE(scan_toss, area_tag) + );` + ); } function createInitialMessageValues() { diff --git a/core/ftn_mail_packet.js b/core/ftn_mail_packet.js index 891ceaca..ecc0f1c8 100644 --- a/core/ftn_mail_packet.js +++ b/core/ftn_mail_packet.js @@ -280,6 +280,44 @@ function Packet() { cb(null, ph); }); }; + + this.getPacketHeaderBuffer = function(packetHeader) { + let buffer = new Buffer(FTN_PACKET_HEADER_SIZE); + + buffer.writeUInt16LE(packetHeader.origNode, 0); + buffer.writeUInt16LE(packetHeader.destNode, 2); + buffer.writeUInt16LE(packetHeader.year, 4); + buffer.writeUInt16LE(packetHeader.month, 6); + buffer.writeUInt16LE(packetHeader.day, 8); + buffer.writeUInt16LE(packetHeader.hour, 10); + buffer.writeUInt16LE(packetHeader.minute, 12); + buffer.writeUInt16LE(packetHeader.second, 14); + + buffer.writeUInt16LE(packetHeader.baud, 16); + buffer.writeUInt16LE(FTN_PACKET_HEADER_TYPE, 18); + buffer.writeUInt16LE(packetHeader.origNet, 20); + buffer.writeUInt16LE(packetHeader.destNet, 22); + buffer.writeUInt8(packetHeader.prodCodeLo, 24); + buffer.writeUInt8(packetHeader.prodRevHi, 25); + + const pass = ftn.stringToNullPaddedBuffer(packetHeader.password, 8); + pass.copy(buffer, 26); + + buffer.writeUInt16LE(packetHeader.origZone, 34); + buffer.writeUInt16LE(packetHeader.destZone, 36); + buffer.writeUInt16LE(packetHeader.auxNet, 38); + buffer.writeUInt16LE(packetHeader.capWordValidate, 40); + buffer.writeUInt8(packetHeader.prodCodeHi, 42); + buffer.writeUInt8(packetHeader.prodRevLo, 43); + buffer.writeUInt16LE(packetHeader.capWord, 44); + buffer.writeUInt16LE(packetHeader.origZone2, 46); + buffer.writeUInt16LE(packetHeader.destZone2, 48); + buffer.writeUInt16LE(packetHeader.origPoint, 50); + buffer.writeUInt16LE(packetHeader.destPoint, 52); + buffer.writeUInt32LE(packetHeader.prodData, 54); + + return buffer; + } this.writePacketHeader = function(packetHeader, ws) { let buffer = new Buffer(FTN_PACKET_HEADER_SIZE); @@ -317,6 +355,8 @@ function Packet() { buffer.writeUInt32LE(packetHeader.prodData, 54); ws.write(buffer); + + return buffer.length; }; this.processMessageBody = function(messageBodyBuffer, cb) { @@ -577,6 +617,103 @@ function Packet() { }); }); }; + + this.getMessageEntryBuffer = function(message, options) { + let basicHeader = new Buffer(34); + + basicHeader.writeUInt16LE(FTN_PACKET_MESSAGE_TYPE, 0); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_orig_node, 2); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_dest_node, 4); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_orig_network, 6); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_dest_network, 8); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_attr_flags, 10); + basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_cost, 12); + + const dateTimeBuffer = new Buffer(ftn.getDateTimeString(message.modTimestamp) + '\0'); + dateTimeBuffer.copy(basicHeader, 14); + + // toUserName & fromUserName: up to 36 bytes in length, NULL term'd + // :TODO: DRY... + let toUserNameBuf = iconv.encode(message.toUserName + '\0', 'CP437').slice(0, 36); + toUserNameBuf[toUserNameBuf.length - 1] = '\0'; // ensure it's null term'd + + let fromUserNameBuf = iconv.encode(message.fromUserName + '\0', 'CP437').slice(0, 36); + fromUserNameBuf[fromUserNameBuf.length - 1] = '\0'; // ensure it's null term'd + + // subject: up to 72 bytes in length, NULL term'd + let subjectBuf = iconv.encode(message.subject + '\0', 'CP437').slice(0, 72); + subjectBuf[subjectBuf.length - 1] = '\0'; // ensure it's null term'd + + // + // message: unbound length, NULL term'd + // + // We need to build in various special lines - kludges, area, + // seen-by, etc. + // + // :TODO: Put this in it's own method + let msgBody = ''; + + function appendMeta(k, m) { + if(m) { + let a = m; + if(!_.isArray(a)) { + a = [ a ]; + } + a.forEach(v => { + msgBody += `${k}: ${v}\r`; + }); + } + } + + // + // FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001 + // AREA:CONFERENCE + // Should be first line in a message + // + if(message.meta.FtnProperty.ftn_area) { + msgBody += `AREA:${message.meta.FtnProperty.ftn_area}\r`; // note: no ^A (0x01) + } + + Object.keys(message.meta.FtnKludge).forEach(k => { + // we want PATH to be last + if('PATH' !== k) { + appendMeta(`\x01${k}`, message.meta.FtnKludge[k]); + } + }); + + msgBody += message.message + '\r'; + + // + // FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001 + // Tear line should be near the bottom of a message + // + if(message.meta.FtnProperty.ftn_tear_line) { + msgBody += `${message.meta.FtnProperty.ftn_tear_line}\r`; + } + + // + // Origin line should be near the bottom of a message + // + if(message.meta.FtnProperty.ftn_origin) { + msgBody += `${message.meta.FtnProperty.ftn_origin}\r`; + } + + // + // FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001 + // SEEN-BY and PATH should be the last lines of a message + // + appendMeta('SEEN-BY', message.meta.FtnProperty.ftn_seen_by); // note: no ^A (0x01) + + appendMeta('\x01PATH', message.meta.FtnKludge['PATH']); + + return Buffer.concat( [ + basicHeader, + toUserNameBuf, + fromUserNameBuf, + subjectBuf, + iconv.encode(msgBody + '\0', options.encoding) + ]); + }; this.writeMessage = function(message, ws, options) { let basicHeader = new Buffer(34); @@ -750,6 +887,24 @@ Packet.prototype.read = function(pathOrBuffer, iterator, cb) { ); }; +Packet.prototype.writeHeader = function(ws, packetHeader) { + return this.writePacketHeader(packetHeader, ws); +}; + +Packet.prototype.writeMessage = function(ws, message, options) { + +} + +Packet.prototype.writeMessageEntry = function(ws, msgEntry) { + ws.write(msgEntry); + return msgEntry.length; +}; + +Packet.prototype.writeTerminator = function(ws) { + ws.write(new Buffer( [ 0 ] )); // final extra null term + return 1; +}; + Packet.prototype.writeStream = function(ws, messages, options) { if(!_.isBoolean(options.terminatePacket)) { options.terminatePacket = true; diff --git a/core/ftn_util.js b/core/ftn_util.js index 5172b218..3a347b2b 100644 --- a/core/ftn_util.js +++ b/core/ftn_util.js @@ -138,9 +138,9 @@ function createMessageUuid(ftnMsgId, ftnArea) { return uuid.unparse(u); // to string } -function getMessageSerialNumber(message) { +function getMessageSerialNumber(messageId) { const msSinceEnigmaEpoc = (Date.now() - Date.UTC(2016, 1, 1)); - const hash = Math.abs(new FNV1a(msSinceEnigmaEpoc + message.messageId).value).toString(16); + const hash = Math.abs(new FNV1a(msSinceEnigmaEpoc + messageId).value).toString(16); return `00000000${hash}`.substr(-8); // return ('00000000' + ((Math.floor((Date.now() - Date.UTC(2016, 1, 1)) / 1000) + // message.messageId)).toString(16)).substr(-8); @@ -183,7 +183,7 @@ function getMessageSerialNumber(message) { // function getMessageIdentifier(message, address) { const addrStr = new Address(address).toString('5D'); - return `${message.messageId}.${message.areaTag.toLowerCase()}@${addrStr} ${getMessageSerialNumber(message)}`; + return `${message.messageId}.${message.areaTag.toLowerCase()}@${addrStr} ${getMessageSerialNumber(message.messageId)}`; } // diff --git a/core/scanner_tossers/ftn_bso.js b/core/scanner_tossers/ftn_bso.js index cd71358d..6f45fdb8 100644 --- a/core/scanner_tossers/ftn_bso.js +++ b/core/scanner_tossers/ftn_bso.js @@ -9,6 +9,8 @@ let ftnUtil = require('../ftn_util.js'); let Address = require('../ftn_address.js'); let Log = require('../logger.js').log; let ArchiveUtil = require('../archive_util.js'); +let msgDb = require('../database.js').dbs.message; +let Message = require('../message.js'); let moment = require('moment'); let _ = require('lodash'); @@ -16,10 +18,11 @@ let paths = require('path'); let mkdirp = require('mkdirp'); let async = require('async'); let fs = require('fs'); +let later = require('later'); exports.moduleInfo = { - name : 'FTN', - desc : 'FidoNet Style Message Scanner/Tosser', + name : 'FTN BSO', + desc : 'BSO style message scanner/tosser for FTN networks', author : 'NuSkooler', }; @@ -35,14 +38,21 @@ exports.moduleInfo = { exports.getModule = FTNMessageScanTossModule; +const SCHEDULE_REGEXP = /(?:^|or )?(@watch\:|@immediate)([^\0]+)?$/; + function FTNMessageScanTossModule() { MessageScanTossModule.call(this); + + let self = this; this.archUtil = new ArchiveUtil(); this.archUtil.init(); + if(_.has(Config, 'scannerTossers.ftn_bso')) { this.moduleConfig = Config.scannerTossers.ftn_bso; + + } this.isDefaultDomainZone = function(networkName, address) { @@ -58,7 +68,7 @@ function FTNMessageScanTossModule() { return dir; }; - this.getOutgoingPacketFileName = function(basePath, message, isTemp) { + this.getOutgoingPacketFileName = function(basePath, messageId, isTemp) { // // Generating an outgoing packet file name comes with a few issues: // * We must use DOS 8.3 filenames due to legacy systems that receive @@ -76,7 +86,7 @@ function FTNMessageScanTossModule() { // * We already have a system for 8-character serial number gernation that is // used for e.g. in FTS-0009.001 MSGIDs... let's use that! // - const name = ftnUtil.getMessageSerialNumber(message); + const name = ftnUtil.getMessageSerialNumber(messageId); const ext = (true === isTemp) ? 'pk_' : 'pkt'; return paths.join(basePath, `${name}.${ext}`); }; @@ -133,8 +143,8 @@ function FTNMessageScanTossModule() { } }); }; - - this.createMessagePacket = function(message, options) { + + this.exportMessage = function(message, options, cb) { this.prepareMessage(message, options); let packet = new ftnMailPacket.Packet(); @@ -153,18 +163,17 @@ function FTNMessageScanTossModule() { mkdirp(outgoingDir, err => { if(err) { - // :TODO: Handle me!! - } else { - this.getOutgoingBundleFileName(outgoingDir, options.network.localAddress, options.destAddress, (err, path) => { + return cb(err); + } + this.getOutgoingBundleFileName(outgoingDir, options.network.localAddress, options.destAddress, (err, path) => { console.log(path); - }); - packet.write( - this.getOutgoingPacketFileName(outgoingDir, message), - packetHeader, - [ message ], - { encoding : options.encoding } - ); - } + }); + packet.write( + this.getOutgoingPacketFileName(outgoingDir, message), + packetHeader, + [ message ], + { encoding : options.encoding } + ); }); } @@ -261,7 +270,7 @@ function FTNMessageScanTossModule() { // :TODO: change to something like isAreaConfigValid // check paths, Addresses, etc. - this.isAreaConfigComplete = function(areaConfig) { + this.isAreaConfigValid = function(areaConfig) { if(!_.isString(areaConfig.tag) || !_.isString(areaConfig.network)) { return false; } @@ -272,7 +281,291 @@ function FTNMessageScanTossModule() { return (_.isArray(areaConfig.uplinks)); }; + + + this.hasValidConfiguration = function() { + if(!_.has(this, 'moduleConfig.nodes') || !_.has(Config, 'messageNetworks.ftn.areas')) { + return false; + } + + return true; + }; + + this.parseScheduleString = function(schedStr) { + let schedule = {}; + + const m = SCHEDULE_REGEXP.exec(schedStr); + if(m) { + schedStr = schedStr.substr(0, m.index).trim(); + + if('@watch:' === m[1]) { + schedule.watchFile = m[2]; + } else if('@immediate' === m[1]) { + schedule.immediate = true; + } + } + if(schedStr.length > 0) { + const sched = later.parse.text(schedStr); + if(-1 === sched.error) { + schedule.sched = sched; + } + } + + // return undefined if we couldn't parse out anything useful + if(!_.isEmpty(schedule)) { + return schedule; + } + }; + + this.performImport = function() { + }; + + this.getAreaLastScanId = function(areaTag, cb) { + const sql = + `SELECT area_tag, message_id + FROM message_area_last_scan + WHERE scan_toss = "ftn_bso" AND area_tag = ? + LIMIT 1;`; + + msgDb.get(sql, [ areaTag ], (err, row) => { + cb(err, row ? row.message_id : 0); + }); + }; + + this.getNodeConfigKeyForUplink = function(uplink) { + // :TODO: sort by least # of '*' & take top? + const nodeKey = _.filter(Object.keys(this.moduleConfig.nodes), addr => { + return Address.fromString(addr).isMatch(uplink); + })[0]; + + return nodeKey; + }; + + this.exportMessagesByUuid = function(messageUuids, exportOpts, cb) { + // + // This method has a lot of madness going on: + // - Try to stuff messages into packets until we've hit the target size + // - We need to wait for write streams to finish before proceeding in many cases + // or data will be cut off when closing and creating a new stream + // + let exportedFiles = []; + let currPacketSize = self.moduleConfig.packetTargetByteSize; + let packet; + let ws; + let remainMessageBuf; + let remainMessageId; + + async.each(messageUuids, (msgUuid, nextUuid) => { + let message = new Message(); + + async.series( + [ + function finalizePrevious(callback) { + if(packet && currPacketSize >= self.moduleConfig.packetTargetByteSize) { + packet.writeTerminator(ws); + ws.end(); + ws.once('finish', () => { + callback(null); + }); + } else { + callback(null); + } + }, + function loadMessage(callback) { + message.load( { uuid : msgUuid }, err => { + if(!err) { + self.prepareMessage(message, exportOpts); + } + callback(err); + }); + }, + function createNewPacket(callback) { + if(currPacketSize >= self.moduleConfig.packetTargetByteSize) { + packet = new ftnMailPacket.Packet(); + + const packetHeader = new ftnMailPacket.PacketHeader( + exportOpts.network.localAddress, + exportOpts.destAddress, + exportOpts.nodeConfig.packetType); + + packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; + + // use current message ID for filename seed + const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, message.messageId); + exportedFiles.push(pktFileName); + + ws = fs.createWriteStream(pktFileName); + + currPacketSize = packet.writeHeader(ws, packetHeader); + + if(remainMessageBuf) { + currPacketSize += packet.writeMessageEntry(ws, remainMessageBuf); + remainMessageBuf = null; + } + } + + callback(null); + }, + function appendMessage(callback) { + const msgBuf = packet.getMessageEntryBuffer(message, exportOpts); + currPacketSize += msgBuf.length; + + if(currPacketSize >= self.moduleConfig.packetTargetByteSize) { + remainMessageBuf = msgBuf; // save for next packet + remainMessageId = message.messageId; + } else { + ws.write(msgBuf); + } + callback(null); + } + ], + err => { + nextUuid(err); + } + ); + }, err => { + if(err) { + cb(err); + } else { + async.series( + [ + function terminateLast(callback) { + if(packet) { + packet.writeTerminator(ws); + ws.end(); + ws.once('finish', () => { + callback(null); + }); + } else { + callback(null); + } + }, + function writeRemainPacket(callback) { + if(remainMessageBuf) { + // :TODO: DRY this with the code above -- they are basically identical + packet = new ftnMailPacket.Packet(); + + const packetHeader = new ftnMailPacket.PacketHeader( + exportOpts.network.localAddress, + exportOpts.destAddress, + exportOpts.nodeConfig.packetType); + + packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; + + // use current message ID for filename seed + const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, remainMessageId); + exportedFiles.push(pktFileName); + + ws = fs.createWriteStream(pktFileName); + + packet.writeHeader(ws, packetHeader); + ws.write(remainMessageBuf); + packet.writeTerminator(ws); + ws.end(); + ws.once('finish', () => { + callback(null); + }); + } else { + callback(null); + } + } + ], + err => { + cb(err, exportedFiles); + } + ); + } + }); + }; + + this.exportMessagesToUplinks = function(messageUuids, areaConfig, cb) { + async.each(areaConfig.uplinks, (uplink, nextUplink) => { + const nodeConfigKey = self.getNodeConfigKeyForUplink(uplink); + if(!nodeConfigKey) { + return nextUplink(); + } + + const exportOpts = { + nodeConfig : self.moduleConfig.nodes[nodeConfigKey], + network : Config.messageNetworks.ftn.networks[areaConfig.network], + destAddress : Address.fromString(uplink), + networkName : areaConfig.network, + exportDir : self.moduleConfig.paths.temp, + }; + + if(_.isString(exportOpts.network.localAddress)) { + exportOpts.network.localAddress = Address.fromString(exportOpts.network.localAddress); + } + + const outgoingDir = self.getOutgoingPacketDir(exportOpts.networkName, exportOpts.destAddress); + + async.waterfall( + [ + function createTempDir(callback) { + mkdirp(exportOpts.exportDir, err => { + callback(err); + }); + }, + function createOutgoingDir(callback) { + mkdirp(outgoingDir, err => { + callback(err); + }); + }, + function exportToTempArea(callback) { + self.exportMessagesByUuid(messageUuids, exportOpts, callback); + }, + function createArcMailBundle(exportedFileNames, callback) { + if(self.archUtil.haveArchiver(exportOpts.nodeConfig.archiveType)) { + // :TODO: support bundleTargetByteSize: + // + // Compress to a temp location then we'll move it in the next step + // + // Note that we must use the *final* output dir for getOutgoingBundleFileName() + // as it checks for collisions in bundle names! + // + self.getOutgoingBundleFileName(outgoingDir, exportOpts.network.localAddress, exportOpts.destAddress, (err, bundlePath) => { + if(err) { + return callback(err); + } + + // adjust back to temp path + const tempBundlePath = paths.join(exportOpts.exportDir, paths.basename(bundlePath)); + + self.archUtil.compressTo( + exportOpts.nodeConfig.archiveType, + tempBundlePath, + exportedFileNames, err => { + // :TODO: we need to delete the original input file(s) + fs.rename(tempBundlePath, bundlePath, err => { + callback(err, [ bundlePath ] ); + }); + } + ); + }); + } else { + callback(null, exportedFileNames); + } + }, + function moveFilesToOutgoing(exportedFileNames, callback) { + async.each(exportedFileNames, (oldPath, nextFile) => { + const ext = paths.extname(oldPath); + if('.pk_' === ext) { + const newPath = paths.join(outgoingDir, paths.basename(oldPath, ext) + '.pkt'); + fs.rename(oldPath, newPath, nextFile); + } else { + const newPath = paths.join(outgoingDir, paths.basename(oldPath)); + fs.rename(oldPath, newPath, nextFile); + } + }, callback); + } + ], + err => { + nextUplink(); + } + ); + }, cb); // complete + }; } require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule); @@ -280,16 +573,107 @@ require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule); FTNMessageScanTossModule.prototype.startup = function(cb) { Log.info('FidoNet Scanner/Tosser starting up'); + if(_.isObject(this.moduleConfig.schedule)) { + const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export); + if(exportSchedule) { + if(exportSchedule.sched) { + let exporting = false; + this.exportTimer = later.setInterval( () => { + if(!exporting) { + exporting = true; + + Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message export...'); + + this.performExport(err => { + exporting = false; + }); + } + }, exportSchedule.sched); + } + + if(exportSchedule.watchFile) { + // :TODO: monitor file for changes/existance with gaze + } + } + } + FTNMessageScanTossModule.super_.prototype.startup.call(this, cb); }; FTNMessageScanTossModule.prototype.shutdown = function(cb) { Log.info('FidoNet Scanner/Tosser shutting down'); + + if(this.exportTimer) { + this.exportTimer.clear(); + } FTNMessageScanTossModule.super_.prototype.shutdown.call(this, cb); }; +FTNMessageScanTossModule.prototype.performExport = function(cb) { + // + // We're only concerned with areas related to FTN. For each area, loop though + // and let's find out what messages need exported. + // + if(!this.hasValidConfiguration()) { + return cb(new Error('No valid configurations for export')); + } + + // :TODO: Block exporting (e.g. ignore timer) until export is finished + + const getNewUuidsSql = + `SELECT message_uuid + FROM message + WHERE area_tag = ? AND message_id > ? + ORDER BY message_id;`; + + var self = this; + + async.each(Object.keys(Config.messageNetworks.ftn.areas), (areaTag, nextArea) => { + const areaConfig = Config.messageNetworks.ftn.areas[areaTag]; + if(!this.isAreaConfigValid(areaConfig)) { + return nextArea(); + } + + // + // For each message that is newer than that of the last scan + // we need to export to each configured associated uplink(s) + // + async.waterfall( + [ + function getLastScanId(callback) { + self.getAreaLastScanId(areaTag, callback); + }, + function getNewUuids(lastScanId, callback) { + msgDb.all(getNewUuidsSql, [ areaTag, lastScanId ], (err, rows) => { + if(err) { + callback(err); + } else { + callback(null, rows.map(r => r.message_uuid)); // convert to simple array of UUIDs + } + }); + }, + function exportToConfiguredUplinks(msgUuids, callback) { + self.exportMessagesToUplinks(msgUuids, areaConfig, err => { + // :TODO: Log/handle err + callback(null, msgUuids[msgUuids.length - 1]); + }); + }, + function updateLastScanId(newLastScanId, callback) { + callback(null); + } + ], + function complete(err) { + nextArea(); + } + ); + }, err => { + cb(err); + }); +}; + FTNMessageScanTossModule.prototype.record = function(message) { + /* if(!_.has(this, 'moduleConfig.nodes') || !_.has(Config, [ 'messageNetworks', 'ftn', 'areas', message.areaTag ])) { @@ -297,7 +681,7 @@ FTNMessageScanTossModule.prototype.record = function(message) { } const areaConfig = Config.messageNetworks.ftn.areas[message.areaTag]; - if(!this.isAreaConfigComplete(areaConfig)) { + if(!this.isAreaConfigValid(areaConfig)) { // :TODO: should probably log a warning here return; } @@ -334,4 +718,5 @@ FTNMessageScanTossModule.prototype.record = function(message) { // :TODO: should perhaps record in batches - e.g. start an event, record // to temp location until time is hit or N achieved such that if multiple // messages are being created a .FTN file is not made for each one + */ }; diff --git a/package.json b/package.json index aed5721b..794100a7 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,8 @@ "ptyw.js": "^0.3.7", "sqlite3": "^3.1.1", "ssh2": "^0.4.13", - "string-format": "davidchambers/string-format#mini-language" + "string-format": "davidchambers/string-format#mini-language", + "later" : "1.2.0" }, "engines": { "node": ">=0.12.2"