* Start work on FTN/BSO schedule via later.js
* Utilize last scan message ID to scan areas * Lots of changes to FTN packet creation * Create packets with target max size * Create ArcMail bundles when configured to do so
This commit is contained in:
parent
ae20dc1f7c
commit
76bbc43600
|
@ -50,6 +50,15 @@ module.exports = class ArchiveUtil {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
haveArchiver(archType) {
|
||||
if(!archType) {
|
||||
return false;
|
||||
}
|
||||
|
||||
archType = archType.toLowerCase();
|
||||
return archType in this.archivers;
|
||||
}
|
||||
|
||||
detectType(path, cb) {
|
||||
fs.open(path, 'r', (err, fd) => {
|
||||
|
@ -83,7 +92,9 @@ module.exports = class ArchiveUtil {
|
|||
}
|
||||
|
||||
compressTo(archType, archivePath, files, cb) {
|
||||
archType = archType.toLowerCase();
|
||||
const archiver = this.archivers[archType];
|
||||
|
||||
if(!archiver) {
|
||||
cb(new Error('Unknown archive type: ' + archType));
|
||||
return;
|
||||
|
@ -104,7 +115,7 @@ module.exports = class ArchiveUtil {
|
|||
});
|
||||
|
||||
comp.on('exit', exitCode => {
|
||||
cb(0 === exitCode ? null : new Error('Compression failed with exit code: ' + exitCode));
|
||||
cb(exitCode ? new Error('Compression failed with exit code: ' + exitCode) : null);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ function getDefaultConfig() {
|
|||
|
||||
archivers : {
|
||||
zip : {
|
||||
name : "PKZip",
|
||||
name : "PKZip", // :TODO: Use key for this
|
||||
sig : "504b0304",
|
||||
offset : 0,
|
||||
compressCmd : "7z",
|
||||
|
@ -246,10 +246,16 @@ function getDefaultConfig() {
|
|||
outbound : paths.join(__dirname, './../mail/ftn_out/'),
|
||||
inbound : paths.join(__dirname, './../mail/ftn_in/'),
|
||||
secInbound : paths.join(__dirname, './../mail/ftn_secin/'),
|
||||
temp : paths.join(__dirname, './../mail/ftn_temp'),
|
||||
},
|
||||
|
||||
maxPacketByteSize : 512000, // 512k, before placing messages in a new pkt
|
||||
maxBundleByteSize : 2048000, // 2M, before creating another archive
|
||||
//
|
||||
// Packet and (ArcMail) bundle target sizes are just that: targets.
|
||||
// Actual sizes may be slightly larger when we must place a full
|
||||
// PKT contents *somewhere*
|
||||
//
|
||||
packetTargetByteSize : 512000, // 512k, before placing messages in a new pkt
|
||||
bundleTargetByteSize : 2048000, // 2M, before creating another archive
|
||||
}
|
||||
},
|
||||
|
||||
|
|
|
@ -204,6 +204,7 @@ function createMessageBaseTables() {
|
|||
');'
|
||||
);
|
||||
|
||||
// :TODO: Not currently used
|
||||
dbs.message.run(
|
||||
'CREATE TABLE IF NOT EXISTS user_message_status (' +
|
||||
' user_id INTEGER NOT NULL,' +
|
||||
|
@ -213,6 +214,15 @@ function createMessageBaseTables() {
|
|||
' FOREIGN KEY(user_id) REFERENCES user(id)' +
|
||||
');'
|
||||
);
|
||||
|
||||
dbs.message.run(
|
||||
`CREATE TABLE IF NOT EXISTS message_area_last_scan (
|
||||
scan_toss VARCHAR NOT NULL,
|
||||
area_tag VARCHAR NOT NULL,
|
||||
message_id INTEGER NOT NULL,
|
||||
UNIQUE(scan_toss, area_tag)
|
||||
);`
|
||||
);
|
||||
}
|
||||
|
||||
function createInitialMessageValues() {
|
||||
|
|
|
@ -280,6 +280,44 @@ function Packet() {
|
|||
cb(null, ph);
|
||||
});
|
||||
};
|
||||
|
||||
this.getPacketHeaderBuffer = function(packetHeader) {
|
||||
let buffer = new Buffer(FTN_PACKET_HEADER_SIZE);
|
||||
|
||||
buffer.writeUInt16LE(packetHeader.origNode, 0);
|
||||
buffer.writeUInt16LE(packetHeader.destNode, 2);
|
||||
buffer.writeUInt16LE(packetHeader.year, 4);
|
||||
buffer.writeUInt16LE(packetHeader.month, 6);
|
||||
buffer.writeUInt16LE(packetHeader.day, 8);
|
||||
buffer.writeUInt16LE(packetHeader.hour, 10);
|
||||
buffer.writeUInt16LE(packetHeader.minute, 12);
|
||||
buffer.writeUInt16LE(packetHeader.second, 14);
|
||||
|
||||
buffer.writeUInt16LE(packetHeader.baud, 16);
|
||||
buffer.writeUInt16LE(FTN_PACKET_HEADER_TYPE, 18);
|
||||
buffer.writeUInt16LE(packetHeader.origNet, 20);
|
||||
buffer.writeUInt16LE(packetHeader.destNet, 22);
|
||||
buffer.writeUInt8(packetHeader.prodCodeLo, 24);
|
||||
buffer.writeUInt8(packetHeader.prodRevHi, 25);
|
||||
|
||||
const pass = ftn.stringToNullPaddedBuffer(packetHeader.password, 8);
|
||||
pass.copy(buffer, 26);
|
||||
|
||||
buffer.writeUInt16LE(packetHeader.origZone, 34);
|
||||
buffer.writeUInt16LE(packetHeader.destZone, 36);
|
||||
buffer.writeUInt16LE(packetHeader.auxNet, 38);
|
||||
buffer.writeUInt16LE(packetHeader.capWordValidate, 40);
|
||||
buffer.writeUInt8(packetHeader.prodCodeHi, 42);
|
||||
buffer.writeUInt8(packetHeader.prodRevLo, 43);
|
||||
buffer.writeUInt16LE(packetHeader.capWord, 44);
|
||||
buffer.writeUInt16LE(packetHeader.origZone2, 46);
|
||||
buffer.writeUInt16LE(packetHeader.destZone2, 48);
|
||||
buffer.writeUInt16LE(packetHeader.origPoint, 50);
|
||||
buffer.writeUInt16LE(packetHeader.destPoint, 52);
|
||||
buffer.writeUInt32LE(packetHeader.prodData, 54);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
this.writePacketHeader = function(packetHeader, ws) {
|
||||
let buffer = new Buffer(FTN_PACKET_HEADER_SIZE);
|
||||
|
@ -317,6 +355,8 @@ function Packet() {
|
|||
buffer.writeUInt32LE(packetHeader.prodData, 54);
|
||||
|
||||
ws.write(buffer);
|
||||
|
||||
return buffer.length;
|
||||
};
|
||||
|
||||
this.processMessageBody = function(messageBodyBuffer, cb) {
|
||||
|
@ -577,6 +617,103 @@ function Packet() {
|
|||
});
|
||||
});
|
||||
};
|
||||
|
||||
this.getMessageEntryBuffer = function(message, options) {
|
||||
let basicHeader = new Buffer(34);
|
||||
|
||||
basicHeader.writeUInt16LE(FTN_PACKET_MESSAGE_TYPE, 0);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_orig_node, 2);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_dest_node, 4);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_orig_network, 6);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_dest_network, 8);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_attr_flags, 10);
|
||||
basicHeader.writeUInt16LE(message.meta.FtnProperty.ftn_cost, 12);
|
||||
|
||||
const dateTimeBuffer = new Buffer(ftn.getDateTimeString(message.modTimestamp) + '\0');
|
||||
dateTimeBuffer.copy(basicHeader, 14);
|
||||
|
||||
// toUserName & fromUserName: up to 36 bytes in length, NULL term'd
|
||||
// :TODO: DRY...
|
||||
let toUserNameBuf = iconv.encode(message.toUserName + '\0', 'CP437').slice(0, 36);
|
||||
toUserNameBuf[toUserNameBuf.length - 1] = '\0'; // ensure it's null term'd
|
||||
|
||||
let fromUserNameBuf = iconv.encode(message.fromUserName + '\0', 'CP437').slice(0, 36);
|
||||
fromUserNameBuf[fromUserNameBuf.length - 1] = '\0'; // ensure it's null term'd
|
||||
|
||||
// subject: up to 72 bytes in length, NULL term'd
|
||||
let subjectBuf = iconv.encode(message.subject + '\0', 'CP437').slice(0, 72);
|
||||
subjectBuf[subjectBuf.length - 1] = '\0'; // ensure it's null term'd
|
||||
|
||||
//
|
||||
// message: unbound length, NULL term'd
|
||||
//
|
||||
// We need to build in various special lines - kludges, area,
|
||||
// seen-by, etc.
|
||||
//
|
||||
// :TODO: Put this in it's own method
|
||||
let msgBody = '';
|
||||
|
||||
function appendMeta(k, m) {
|
||||
if(m) {
|
||||
let a = m;
|
||||
if(!_.isArray(a)) {
|
||||
a = [ a ];
|
||||
}
|
||||
a.forEach(v => {
|
||||
msgBody += `${k}: ${v}\r`;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001
|
||||
// AREA:CONFERENCE
|
||||
// Should be first line in a message
|
||||
//
|
||||
if(message.meta.FtnProperty.ftn_area) {
|
||||
msgBody += `AREA:${message.meta.FtnProperty.ftn_area}\r`; // note: no ^A (0x01)
|
||||
}
|
||||
|
||||
Object.keys(message.meta.FtnKludge).forEach(k => {
|
||||
// we want PATH to be last
|
||||
if('PATH' !== k) {
|
||||
appendMeta(`\x01${k}`, message.meta.FtnKludge[k]);
|
||||
}
|
||||
});
|
||||
|
||||
msgBody += message.message + '\r';
|
||||
|
||||
//
|
||||
// FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001
|
||||
// Tear line should be near the bottom of a message
|
||||
//
|
||||
if(message.meta.FtnProperty.ftn_tear_line) {
|
||||
msgBody += `${message.meta.FtnProperty.ftn_tear_line}\r`;
|
||||
}
|
||||
|
||||
//
|
||||
// Origin line should be near the bottom of a message
|
||||
//
|
||||
if(message.meta.FtnProperty.ftn_origin) {
|
||||
msgBody += `${message.meta.FtnProperty.ftn_origin}\r`;
|
||||
}
|
||||
|
||||
//
|
||||
// FTN-0004.001 @ http://ftsc.org/docs/fts-0004.001
|
||||
// SEEN-BY and PATH should be the last lines of a message
|
||||
//
|
||||
appendMeta('SEEN-BY', message.meta.FtnProperty.ftn_seen_by); // note: no ^A (0x01)
|
||||
|
||||
appendMeta('\x01PATH', message.meta.FtnKludge['PATH']);
|
||||
|
||||
return Buffer.concat( [
|
||||
basicHeader,
|
||||
toUserNameBuf,
|
||||
fromUserNameBuf,
|
||||
subjectBuf,
|
||||
iconv.encode(msgBody + '\0', options.encoding)
|
||||
]);
|
||||
};
|
||||
|
||||
this.writeMessage = function(message, ws, options) {
|
||||
let basicHeader = new Buffer(34);
|
||||
|
@ -750,6 +887,24 @@ Packet.prototype.read = function(pathOrBuffer, iterator, cb) {
|
|||
);
|
||||
};
|
||||
|
||||
Packet.prototype.writeHeader = function(ws, packetHeader) {
|
||||
return this.writePacketHeader(packetHeader, ws);
|
||||
};
|
||||
|
||||
Packet.prototype.writeMessage = function(ws, message, options) {
|
||||
|
||||
}
|
||||
|
||||
Packet.prototype.writeMessageEntry = function(ws, msgEntry) {
|
||||
ws.write(msgEntry);
|
||||
return msgEntry.length;
|
||||
};
|
||||
|
||||
Packet.prototype.writeTerminator = function(ws) {
|
||||
ws.write(new Buffer( [ 0 ] )); // final extra null term
|
||||
return 1;
|
||||
};
|
||||
|
||||
Packet.prototype.writeStream = function(ws, messages, options) {
|
||||
if(!_.isBoolean(options.terminatePacket)) {
|
||||
options.terminatePacket = true;
|
||||
|
|
|
@ -138,9 +138,9 @@ function createMessageUuid(ftnMsgId, ftnArea) {
|
|||
return uuid.unparse(u); // to string
|
||||
}
|
||||
|
||||
function getMessageSerialNumber(message) {
|
||||
function getMessageSerialNumber(messageId) {
|
||||
const msSinceEnigmaEpoc = (Date.now() - Date.UTC(2016, 1, 1));
|
||||
const hash = Math.abs(new FNV1a(msSinceEnigmaEpoc + message.messageId).value).toString(16);
|
||||
const hash = Math.abs(new FNV1a(msSinceEnigmaEpoc + messageId).value).toString(16);
|
||||
return `00000000${hash}`.substr(-8);
|
||||
// return ('00000000' + ((Math.floor((Date.now() - Date.UTC(2016, 1, 1)) / 1000) +
|
||||
// message.messageId)).toString(16)).substr(-8);
|
||||
|
@ -183,7 +183,7 @@ function getMessageSerialNumber(message) {
|
|||
//
|
||||
function getMessageIdentifier(message, address) {
|
||||
const addrStr = new Address(address).toString('5D');
|
||||
return `${message.messageId}.${message.areaTag.toLowerCase()}@${addrStr} ${getMessageSerialNumber(message)}`;
|
||||
return `${message.messageId}.${message.areaTag.toLowerCase()}@${addrStr} ${getMessageSerialNumber(message.messageId)}`;
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -9,6 +9,8 @@ let ftnUtil = require('../ftn_util.js');
|
|||
let Address = require('../ftn_address.js');
|
||||
let Log = require('../logger.js').log;
|
||||
let ArchiveUtil = require('../archive_util.js');
|
||||
let msgDb = require('../database.js').dbs.message;
|
||||
let Message = require('../message.js');
|
||||
|
||||
let moment = require('moment');
|
||||
let _ = require('lodash');
|
||||
|
@ -16,10 +18,11 @@ let paths = require('path');
|
|||
let mkdirp = require('mkdirp');
|
||||
let async = require('async');
|
||||
let fs = require('fs');
|
||||
let later = require('later');
|
||||
|
||||
exports.moduleInfo = {
|
||||
name : 'FTN',
|
||||
desc : 'FidoNet Style Message Scanner/Tosser',
|
||||
name : 'FTN BSO',
|
||||
desc : 'BSO style message scanner/tosser for FTN networks',
|
||||
author : 'NuSkooler',
|
||||
};
|
||||
|
||||
|
@ -35,14 +38,21 @@ exports.moduleInfo = {
|
|||
|
||||
exports.getModule = FTNMessageScanTossModule;
|
||||
|
||||
const SCHEDULE_REGEXP = /(?:^|or )?(@watch\:|@immediate)([^\0]+)?$/;
|
||||
|
||||
function FTNMessageScanTossModule() {
|
||||
MessageScanTossModule.call(this);
|
||||
|
||||
let self = this;
|
||||
|
||||
this.archUtil = new ArchiveUtil();
|
||||
this.archUtil.init();
|
||||
|
||||
|
||||
if(_.has(Config, 'scannerTossers.ftn_bso')) {
|
||||
this.moduleConfig = Config.scannerTossers.ftn_bso;
|
||||
|
||||
|
||||
}
|
||||
|
||||
this.isDefaultDomainZone = function(networkName, address) {
|
||||
|
@ -58,7 +68,7 @@ function FTNMessageScanTossModule() {
|
|||
return dir;
|
||||
};
|
||||
|
||||
this.getOutgoingPacketFileName = function(basePath, message, isTemp) {
|
||||
this.getOutgoingPacketFileName = function(basePath, messageId, isTemp) {
|
||||
//
|
||||
// Generating an outgoing packet file name comes with a few issues:
|
||||
// * We must use DOS 8.3 filenames due to legacy systems that receive
|
||||
|
@ -76,7 +86,7 @@ function FTNMessageScanTossModule() {
|
|||
// * We already have a system for 8-character serial number gernation that is
|
||||
// used for e.g. in FTS-0009.001 MSGIDs... let's use that!
|
||||
//
|
||||
const name = ftnUtil.getMessageSerialNumber(message);
|
||||
const name = ftnUtil.getMessageSerialNumber(messageId);
|
||||
const ext = (true === isTemp) ? 'pk_' : 'pkt';
|
||||
return paths.join(basePath, `${name}.${ext}`);
|
||||
};
|
||||
|
@ -133,8 +143,8 @@ function FTNMessageScanTossModule() {
|
|||
}
|
||||
});
|
||||
};
|
||||
|
||||
this.createMessagePacket = function(message, options) {
|
||||
|
||||
this.exportMessage = function(message, options, cb) {
|
||||
this.prepareMessage(message, options);
|
||||
|
||||
let packet = new ftnMailPacket.Packet();
|
||||
|
@ -153,18 +163,17 @@ function FTNMessageScanTossModule() {
|
|||
|
||||
mkdirp(outgoingDir, err => {
|
||||
if(err) {
|
||||
// :TODO: Handle me!!
|
||||
} else {
|
||||
this.getOutgoingBundleFileName(outgoingDir, options.network.localAddress, options.destAddress, (err, path) => {
|
||||
return cb(err);
|
||||
}
|
||||
this.getOutgoingBundleFileName(outgoingDir, options.network.localAddress, options.destAddress, (err, path) => {
|
||||
console.log(path);
|
||||
});
|
||||
packet.write(
|
||||
this.getOutgoingPacketFileName(outgoingDir, message),
|
||||
packetHeader,
|
||||
[ message ],
|
||||
{ encoding : options.encoding }
|
||||
);
|
||||
}
|
||||
});
|
||||
packet.write(
|
||||
this.getOutgoingPacketFileName(outgoingDir, message),
|
||||
packetHeader,
|
||||
[ message ],
|
||||
{ encoding : options.encoding }
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -261,7 +270,7 @@ function FTNMessageScanTossModule() {
|
|||
|
||||
// :TODO: change to something like isAreaConfigValid
|
||||
// check paths, Addresses, etc.
|
||||
this.isAreaConfigComplete = function(areaConfig) {
|
||||
this.isAreaConfigValid = function(areaConfig) {
|
||||
if(!_.isString(areaConfig.tag) || !_.isString(areaConfig.network)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -272,7 +281,291 @@ function FTNMessageScanTossModule() {
|
|||
|
||||
return (_.isArray(areaConfig.uplinks));
|
||||
};
|
||||
|
||||
|
||||
this.hasValidConfiguration = function() {
|
||||
if(!_.has(this, 'moduleConfig.nodes') || !_.has(Config, 'messageNetworks.ftn.areas')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
this.parseScheduleString = function(schedStr) {
|
||||
let schedule = {};
|
||||
|
||||
const m = SCHEDULE_REGEXP.exec(schedStr);
|
||||
if(m) {
|
||||
schedStr = schedStr.substr(0, m.index).trim();
|
||||
|
||||
if('@watch:' === m[1]) {
|
||||
schedule.watchFile = m[2];
|
||||
} else if('@immediate' === m[1]) {
|
||||
schedule.immediate = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(schedStr.length > 0) {
|
||||
const sched = later.parse.text(schedStr);
|
||||
if(-1 === sched.error) {
|
||||
schedule.sched = sched;
|
||||
}
|
||||
}
|
||||
|
||||
// return undefined if we couldn't parse out anything useful
|
||||
if(!_.isEmpty(schedule)) {
|
||||
return schedule;
|
||||
}
|
||||
};
|
||||
|
||||
this.performImport = function() {
|
||||
};
|
||||
|
||||
this.getAreaLastScanId = function(areaTag, cb) {
|
||||
const sql =
|
||||
`SELECT area_tag, message_id
|
||||
FROM message_area_last_scan
|
||||
WHERE scan_toss = "ftn_bso" AND area_tag = ?
|
||||
LIMIT 1;`;
|
||||
|
||||
msgDb.get(sql, [ areaTag ], (err, row) => {
|
||||
cb(err, row ? row.message_id : 0);
|
||||
});
|
||||
};
|
||||
|
||||
this.getNodeConfigKeyForUplink = function(uplink) {
|
||||
// :TODO: sort by least # of '*' & take top?
|
||||
const nodeKey = _.filter(Object.keys(this.moduleConfig.nodes), addr => {
|
||||
return Address.fromString(addr).isMatch(uplink);
|
||||
})[0];
|
||||
|
||||
return nodeKey;
|
||||
};
|
||||
|
||||
this.exportMessagesByUuid = function(messageUuids, exportOpts, cb) {
|
||||
//
|
||||
// This method has a lot of madness going on:
|
||||
// - Try to stuff messages into packets until we've hit the target size
|
||||
// - We need to wait for write streams to finish before proceeding in many cases
|
||||
// or data will be cut off when closing and creating a new stream
|
||||
//
|
||||
let exportedFiles = [];
|
||||
let currPacketSize = self.moduleConfig.packetTargetByteSize;
|
||||
let packet;
|
||||
let ws;
|
||||
let remainMessageBuf;
|
||||
let remainMessageId;
|
||||
|
||||
async.each(messageUuids, (msgUuid, nextUuid) => {
|
||||
let message = new Message();
|
||||
|
||||
async.series(
|
||||
[
|
||||
function finalizePrevious(callback) {
|
||||
if(packet && currPacketSize >= self.moduleConfig.packetTargetByteSize) {
|
||||
packet.writeTerminator(ws);
|
||||
ws.end();
|
||||
ws.once('finish', () => {
|
||||
callback(null);
|
||||
});
|
||||
} else {
|
||||
callback(null);
|
||||
}
|
||||
},
|
||||
function loadMessage(callback) {
|
||||
message.load( { uuid : msgUuid }, err => {
|
||||
if(!err) {
|
||||
self.prepareMessage(message, exportOpts);
|
||||
}
|
||||
callback(err);
|
||||
});
|
||||
},
|
||||
function createNewPacket(callback) {
|
||||
if(currPacketSize >= self.moduleConfig.packetTargetByteSize) {
|
||||
packet = new ftnMailPacket.Packet();
|
||||
|
||||
const packetHeader = new ftnMailPacket.PacketHeader(
|
||||
exportOpts.network.localAddress,
|
||||
exportOpts.destAddress,
|
||||
exportOpts.nodeConfig.packetType);
|
||||
|
||||
packetHeader.password = exportOpts.nodeConfig.packetPassword || '';
|
||||
|
||||
// use current message ID for filename seed
|
||||
const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, message.messageId);
|
||||
exportedFiles.push(pktFileName);
|
||||
|
||||
ws = fs.createWriteStream(pktFileName);
|
||||
|
||||
currPacketSize = packet.writeHeader(ws, packetHeader);
|
||||
|
||||
if(remainMessageBuf) {
|
||||
currPacketSize += packet.writeMessageEntry(ws, remainMessageBuf);
|
||||
remainMessageBuf = null;
|
||||
}
|
||||
}
|
||||
|
||||
callback(null);
|
||||
},
|
||||
function appendMessage(callback) {
|
||||
const msgBuf = packet.getMessageEntryBuffer(message, exportOpts);
|
||||
currPacketSize += msgBuf.length;
|
||||
|
||||
if(currPacketSize >= self.moduleConfig.packetTargetByteSize) {
|
||||
remainMessageBuf = msgBuf; // save for next packet
|
||||
remainMessageId = message.messageId;
|
||||
} else {
|
||||
ws.write(msgBuf);
|
||||
}
|
||||
callback(null);
|
||||
}
|
||||
],
|
||||
err => {
|
||||
nextUuid(err);
|
||||
}
|
||||
);
|
||||
}, err => {
|
||||
if(err) {
|
||||
cb(err);
|
||||
} else {
|
||||
async.series(
|
||||
[
|
||||
function terminateLast(callback) {
|
||||
if(packet) {
|
||||
packet.writeTerminator(ws);
|
||||
ws.end();
|
||||
ws.once('finish', () => {
|
||||
callback(null);
|
||||
});
|
||||
} else {
|
||||
callback(null);
|
||||
}
|
||||
},
|
||||
function writeRemainPacket(callback) {
|
||||
if(remainMessageBuf) {
|
||||
// :TODO: DRY this with the code above -- they are basically identical
|
||||
packet = new ftnMailPacket.Packet();
|
||||
|
||||
const packetHeader = new ftnMailPacket.PacketHeader(
|
||||
exportOpts.network.localAddress,
|
||||
exportOpts.destAddress,
|
||||
exportOpts.nodeConfig.packetType);
|
||||
|
||||
packetHeader.password = exportOpts.nodeConfig.packetPassword || '';
|
||||
|
||||
// use current message ID for filename seed
|
||||
const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, remainMessageId);
|
||||
exportedFiles.push(pktFileName);
|
||||
|
||||
ws = fs.createWriteStream(pktFileName);
|
||||
|
||||
packet.writeHeader(ws, packetHeader);
|
||||
ws.write(remainMessageBuf);
|
||||
packet.writeTerminator(ws);
|
||||
ws.end();
|
||||
ws.once('finish', () => {
|
||||
callback(null);
|
||||
});
|
||||
} else {
|
||||
callback(null);
|
||||
}
|
||||
}
|
||||
],
|
||||
err => {
|
||||
cb(err, exportedFiles);
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
this.exportMessagesToUplinks = function(messageUuids, areaConfig, cb) {
|
||||
async.each(areaConfig.uplinks, (uplink, nextUplink) => {
|
||||
const nodeConfigKey = self.getNodeConfigKeyForUplink(uplink);
|
||||
if(!nodeConfigKey) {
|
||||
return nextUplink();
|
||||
}
|
||||
|
||||
const exportOpts = {
|
||||
nodeConfig : self.moduleConfig.nodes[nodeConfigKey],
|
||||
network : Config.messageNetworks.ftn.networks[areaConfig.network],
|
||||
destAddress : Address.fromString(uplink),
|
||||
networkName : areaConfig.network,
|
||||
exportDir : self.moduleConfig.paths.temp,
|
||||
};
|
||||
|
||||
if(_.isString(exportOpts.network.localAddress)) {
|
||||
exportOpts.network.localAddress = Address.fromString(exportOpts.network.localAddress);
|
||||
}
|
||||
|
||||
const outgoingDir = self.getOutgoingPacketDir(exportOpts.networkName, exportOpts.destAddress);
|
||||
|
||||
async.waterfall(
|
||||
[
|
||||
function createTempDir(callback) {
|
||||
mkdirp(exportOpts.exportDir, err => {
|
||||
callback(err);
|
||||
});
|
||||
},
|
||||
function createOutgoingDir(callback) {
|
||||
mkdirp(outgoingDir, err => {
|
||||
callback(err);
|
||||
});
|
||||
},
|
||||
function exportToTempArea(callback) {
|
||||
self.exportMessagesByUuid(messageUuids, exportOpts, callback);
|
||||
},
|
||||
function createArcMailBundle(exportedFileNames, callback) {
|
||||
if(self.archUtil.haveArchiver(exportOpts.nodeConfig.archiveType)) {
|
||||
// :TODO: support bundleTargetByteSize:
|
||||
//
|
||||
// Compress to a temp location then we'll move it in the next step
|
||||
//
|
||||
// Note that we must use the *final* output dir for getOutgoingBundleFileName()
|
||||
// as it checks for collisions in bundle names!
|
||||
//
|
||||
self.getOutgoingBundleFileName(outgoingDir, exportOpts.network.localAddress, exportOpts.destAddress, (err, bundlePath) => {
|
||||
if(err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
// adjust back to temp path
|
||||
const tempBundlePath = paths.join(exportOpts.exportDir, paths.basename(bundlePath));
|
||||
|
||||
self.archUtil.compressTo(
|
||||
exportOpts.nodeConfig.archiveType,
|
||||
tempBundlePath,
|
||||
exportedFileNames, err => {
|
||||
// :TODO: we need to delete the original input file(s)
|
||||
fs.rename(tempBundlePath, bundlePath, err => {
|
||||
callback(err, [ bundlePath ] );
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
} else {
|
||||
callback(null, exportedFileNames);
|
||||
}
|
||||
},
|
||||
function moveFilesToOutgoing(exportedFileNames, callback) {
|
||||
async.each(exportedFileNames, (oldPath, nextFile) => {
|
||||
const ext = paths.extname(oldPath);
|
||||
if('.pk_' === ext) {
|
||||
const newPath = paths.join(outgoingDir, paths.basename(oldPath, ext) + '.pkt');
|
||||
fs.rename(oldPath, newPath, nextFile);
|
||||
} else {
|
||||
const newPath = paths.join(outgoingDir, paths.basename(oldPath));
|
||||
fs.rename(oldPath, newPath, nextFile);
|
||||
}
|
||||
}, callback);
|
||||
}
|
||||
],
|
||||
err => {
|
||||
nextUplink();
|
||||
}
|
||||
);
|
||||
}, cb); // complete
|
||||
};
|
||||
}
|
||||
|
||||
require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule);
|
||||
|
@ -280,16 +573,107 @@ require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule);
|
|||
FTNMessageScanTossModule.prototype.startup = function(cb) {
|
||||
Log.info('FidoNet Scanner/Tosser starting up');
|
||||
|
||||
if(_.isObject(this.moduleConfig.schedule)) {
|
||||
const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export);
|
||||
if(exportSchedule) {
|
||||
if(exportSchedule.sched) {
|
||||
let exporting = false;
|
||||
this.exportTimer = later.setInterval( () => {
|
||||
if(!exporting) {
|
||||
exporting = true;
|
||||
|
||||
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message export...');
|
||||
|
||||
this.performExport(err => {
|
||||
exporting = false;
|
||||
});
|
||||
}
|
||||
}, exportSchedule.sched);
|
||||
}
|
||||
|
||||
if(exportSchedule.watchFile) {
|
||||
// :TODO: monitor file for changes/existance with gaze
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FTNMessageScanTossModule.super_.prototype.startup.call(this, cb);
|
||||
};
|
||||
|
||||
FTNMessageScanTossModule.prototype.shutdown = function(cb) {
|
||||
Log.info('FidoNet Scanner/Tosser shutting down');
|
||||
|
||||
if(this.exportTimer) {
|
||||
this.exportTimer.clear();
|
||||
}
|
||||
|
||||
FTNMessageScanTossModule.super_.prototype.shutdown.call(this, cb);
|
||||
};
|
||||
|
||||
FTNMessageScanTossModule.prototype.performExport = function(cb) {
|
||||
//
|
||||
// We're only concerned with areas related to FTN. For each area, loop though
|
||||
// and let's find out what messages need exported.
|
||||
//
|
||||
if(!this.hasValidConfiguration()) {
|
||||
return cb(new Error('No valid configurations for export'));
|
||||
}
|
||||
|
||||
// :TODO: Block exporting (e.g. ignore timer) until export is finished
|
||||
|
||||
const getNewUuidsSql =
|
||||
`SELECT message_uuid
|
||||
FROM message
|
||||
WHERE area_tag = ? AND message_id > ?
|
||||
ORDER BY message_id;`;
|
||||
|
||||
var self = this;
|
||||
|
||||
async.each(Object.keys(Config.messageNetworks.ftn.areas), (areaTag, nextArea) => {
|
||||
const areaConfig = Config.messageNetworks.ftn.areas[areaTag];
|
||||
if(!this.isAreaConfigValid(areaConfig)) {
|
||||
return nextArea();
|
||||
}
|
||||
|
||||
//
|
||||
// For each message that is newer than that of the last scan
|
||||
// we need to export to each configured associated uplink(s)
|
||||
//
|
||||
async.waterfall(
|
||||
[
|
||||
function getLastScanId(callback) {
|
||||
self.getAreaLastScanId(areaTag, callback);
|
||||
},
|
||||
function getNewUuids(lastScanId, callback) {
|
||||
msgDb.all(getNewUuidsSql, [ areaTag, lastScanId ], (err, rows) => {
|
||||
if(err) {
|
||||
callback(err);
|
||||
} else {
|
||||
callback(null, rows.map(r => r.message_uuid)); // convert to simple array of UUIDs
|
||||
}
|
||||
});
|
||||
},
|
||||
function exportToConfiguredUplinks(msgUuids, callback) {
|
||||
self.exportMessagesToUplinks(msgUuids, areaConfig, err => {
|
||||
// :TODO: Log/handle err
|
||||
callback(null, msgUuids[msgUuids.length - 1]);
|
||||
});
|
||||
},
|
||||
function updateLastScanId(newLastScanId, callback) {
|
||||
callback(null);
|
||||
}
|
||||
],
|
||||
function complete(err) {
|
||||
nextArea();
|
||||
}
|
||||
);
|
||||
}, err => {
|
||||
cb(err);
|
||||
});
|
||||
};
|
||||
|
||||
FTNMessageScanTossModule.prototype.record = function(message) {
|
||||
/*
|
||||
if(!_.has(this, 'moduleConfig.nodes') ||
|
||||
!_.has(Config, [ 'messageNetworks', 'ftn', 'areas', message.areaTag ]))
|
||||
{
|
||||
|
@ -297,7 +681,7 @@ FTNMessageScanTossModule.prototype.record = function(message) {
|
|||
}
|
||||
|
||||
const areaConfig = Config.messageNetworks.ftn.areas[message.areaTag];
|
||||
if(!this.isAreaConfigComplete(areaConfig)) {
|
||||
if(!this.isAreaConfigValid(areaConfig)) {
|
||||
// :TODO: should probably log a warning here
|
||||
return;
|
||||
}
|
||||
|
@ -334,4 +718,5 @@ FTNMessageScanTossModule.prototype.record = function(message) {
|
|||
// :TODO: should perhaps record in batches - e.g. start an event, record
|
||||
// to temp location until time is hit or N achieved such that if multiple
|
||||
// messages are being created a .FTN file is not made for each one
|
||||
*/
|
||||
};
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
"ptyw.js": "^0.3.7",
|
||||
"sqlite3": "^3.1.1",
|
||||
"ssh2": "^0.4.13",
|
||||
"string-format": "davidchambers/string-format#mini-language"
|
||||
"string-format": "davidchambers/string-format#mini-language",
|
||||
"later" : "1.2.0"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=0.12.2"
|
||||
|
|
Loading…
Reference in New Issue