* Fix collsion with import/export temporary dirs; better use of temp dirs all around

* Raw (non-bundle) packet exports are now BSO named (e.g. .cut for crash)
This commit is contained in:
Bryan Ashby 2016-03-13 11:11:51 -06:00
parent 86c659849c
commit a787a2eab3
1 changed files with 122 additions and 96 deletions

View File

@ -146,19 +146,25 @@ function FTNMessageScanTossModule() {
const ext = (true === isTemp) ? 'pk_' : 'pkt'; const ext = (true === isTemp) ? 'pk_' : 'pkt';
return paths.join(basePath, `${name}.${ext}`); return paths.join(basePath, `${name}.${ext}`);
}; };
this.getOutgoingFlowFileName = function(basePath, destAddress, flowType, exportType) { this.getOutgoingFlowFileExtension = function(destAddress, flowType, exportType) {
let basename;
let ext; let ext;
switch(flowType) { switch(flowType) {
case 'netmail' : ext = `${exportType.toLowerCase()[0]}ut`; break; case 'mail' : ext = `${exportType.toLowerCase()[0]}ut`; break;
case 'ref' : ext = `${exportType.toLowerCase()[0]}lo`; break; case 'ref' : ext = `${exportType.toLowerCase()[0]}lo`; break;
case 'busy' : ext = 'bsy'; break; case 'busy' : ext = 'bsy'; break;
case 'request' : ext = 'req'; break; case 'request' : ext = 'req'; break;
case 'requests' : ext = 'hrq'; break; case 'requests' : ext = 'hrq'; break;
} }
return ext;
};
this.getOutgoingFlowFileName = function(basePath, destAddress, flowType, exportType) {
let basename;
const ext = self.getOutgoingFlowFileExtension(destAddress, flowType, exportType);
if(destAddress.point) { if(destAddress.point) {
} else { } else {
@ -431,6 +437,7 @@ function FTNMessageScanTossModule() {
let ws; let ws;
let remainMessageBuf; let remainMessageBuf;
let remainMessageId; let remainMessageId;
const createTempPacket = !_.isString(exportOpts.nodeConfig.archiveType) || 0 === exportOpts.nodeConfig.archiveType.length;
async.each(messageUuids, (msgUuid, nextUuid) => { async.each(messageUuids, (msgUuid, nextUuid) => {
let message = new Message(); let message = new Message();
@ -468,7 +475,7 @@ function FTNMessageScanTossModule() {
packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; packetHeader.password = exportOpts.nodeConfig.packetPassword || '';
// use current message ID for filename seed // use current message ID for filename seed
const pktFileName = self.getOutgoingPacketFileName(exportOpts.tempDir, message.messageId); const pktFileName = self.getOutgoingPacketFileName(self.exportTempDir, message.messageId, createTempPacket);
exportedFiles.push(pktFileName); exportedFiles.push(pktFileName);
ws = fs.createWriteStream(pktFileName); ws = fs.createWriteStream(pktFileName);
@ -543,7 +550,7 @@ function FTNMessageScanTossModule() {
packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; packetHeader.password = exportOpts.nodeConfig.packetPassword || '';
// use current message ID for filename seed // use current message ID for filename seed
const pktFileName = self.getOutgoingPacketFileName(exportOpts.tempDir, remainMessageId); const pktFileName = self.getOutgoingPacketFileName(self.exportTempDir, remainMessageId, createTempPacket);
exportedFiles.push(pktFileName); exportedFiles.push(pktFileName);
ws = fs.createWriteStream(pktFileName); ws = fs.createWriteStream(pktFileName);
@ -586,16 +593,11 @@ function FTNMessageScanTossModule() {
exportOpts.network.localAddress = Address.fromString(exportOpts.network.localAddress); exportOpts.network.localAddress = Address.fromString(exportOpts.network.localAddress);
} }
const outgoingDir = self.getOutgoingPacketDir(exportOpts.networkName, exportOpts.destAddress); const outgoingDir = self.getOutgoingPacketDir(exportOpts.networkName, exportOpts.destAddress);
const exportType = self.getExportType(exportOpts.nodeConfig);
async.waterfall( async.waterfall(
[ [
function createTempDir(callback) {
temp.mkdir('enigftnexport--', (err, tempDir) => {
exportOpts.tempDir = tempDir;
callback(err);
});
},
function createOutgoingDir(callback) { function createOutgoingDir(callback) {
mkdirp(outgoingDir, err => { mkdirp(outgoingDir, err => {
callback(err); callback(err);
@ -619,7 +621,7 @@ function FTNMessageScanTossModule() {
} }
// adjust back to temp path // adjust back to temp path
const tempBundlePath = paths.join(exportOpts.tempDir, paths.basename(bundlePath)); const tempBundlePath = paths.join(self.exportTempDir, paths.basename(bundlePath));
self.archUtil.compressTo( self.archUtil.compressTo(
exportOpts.nodeConfig.archiveType, exportOpts.nodeConfig.archiveType,
@ -637,14 +639,29 @@ function FTNMessageScanTossModule() {
async.each(exportedFileNames, (oldPath, nextFile) => { async.each(exportedFileNames, (oldPath, nextFile) => {
const ext = paths.extname(oldPath); const ext = paths.extname(oldPath);
if('.pk_' === ext) { if('.pk_' === ext) {
const newPath = paths.join(outgoingDir, paths.basename(oldPath, ext) + '.pkt'); //
// For a given temporary .pk_ file, we need to move it to the outoing
// directory with the appropriate BSO style filename.
//
const ext = self.getOutgoingFlowFileExtension(
exportOpts.destAddress,
'mail',
exportType);
const newPath = paths.join(
outgoingDir,
`${paths.basename(oldPath, 'pk_')}${ext}`);
fs.rename(oldPath, newPath, nextFile); fs.rename(oldPath, newPath, nextFile);
} else { } else {
const newPath = paths.join(outgoingDir, paths.basename(oldPath)); const newPath = paths.join(outgoingDir, paths.basename(oldPath));
//fs.rename(oldPath, newPath, nextFile);
fs.rename(oldPath, newPath, err => { fs.rename(oldPath, newPath, err => {
if(err) { if(err) {
// :TODO: Log this - but move on to the next file // :TODO: Log this - but move on to the next file
Log.warn(
{ oldPath : oldPath, newPath : newPath },
'Failed moving temporary bundle file!');
return nextFile(); return nextFile();
} }
@ -655,27 +672,18 @@ function FTNMessageScanTossModule() {
outgoingDir, outgoingDir,
exportOpts.destAddress, exportOpts.destAddress,
'ref', 'ref',
self.getExportType(exportOpts.nodeConfig)); exportType);
// directive of '^' = delete file after transfer // directive of '^' = delete file after transfer
self.flowFileAppendRefs(flowFilePath, [ newPath ], '^', err => { self.flowFileAppendRefs(flowFilePath, [ newPath ], '^', err => {
if(err) { if(err) {
// :TODO: Log this! Log.warn( { path : flowFilePath }, 'Failed appending flow reference record!');
} }
nextFile(); nextFile();
}); });
}); });
} }
}, callback); }, callback);
},
function cleanUpTempDir(callback) {
temp.cleanup((err, stats) => {
Log.trace(
Object.assign(stats, { tempDir : exportOpts.tempDir }),
'Temporary directory cleaned up');
callback(null);
});
} }
], ],
err => { err => {
@ -764,7 +772,7 @@ function FTNMessageScanTossModule() {
// * https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/pkt.c // * https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/pkt.c
// https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/handle.c // https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/handle.c
// //
this.importMessagesFromPacketFile = function(packetPath, cb) { this.importMessagesFromPacketFile = function(packetPath, password, cb) {
let packetHeader; let packetHeader;
new ftnMailPacket.Packet().read(packetPath, (entryType, entryData, next) => { new ftnMailPacket.Packet().read(packetPath, (entryType, entryData, next) => {
@ -818,7 +826,7 @@ function FTNMessageScanTossModule() {
}); });
}; };
this.importPacketFilesFromDirectory = function(importDir, cb) { this.importPacketFilesFromDirectory = function(importDir, password, cb) {
async.waterfall( async.waterfall(
[ [
function getPacketFiles(callback) { function getPacketFiles(callback) {
@ -832,7 +840,7 @@ function FTNMessageScanTossModule() {
function importPacketFiles(packetFiles, callback) { function importPacketFiles(packetFiles, callback) {
let rejects = []; let rejects = [];
async.each(packetFiles, (packetFile, nextFile) => { async.each(packetFiles, (packetFile, nextFile) => {
self.importMessagesFromPacketFile(paths.join(importDir, packetFile), err => { self.importMessagesFromPacketFile(paths.join(importDir, packetFile), '', err => {
// :TODO: check err -- log / track rejects, etc. // :TODO: check err -- log / track rejects, etc.
if(err) { if(err) {
rejects.push(packetFile); rejects.push(packetFile);
@ -867,13 +875,11 @@ function FTNMessageScanTossModule() {
}; };
this.importMessagesFromDirectory = function(inboundType, importDir, cb) { this.importMessagesFromDirectory = function(inboundType, importDir, cb) {
let tempDirectory;
async.waterfall( async.waterfall(
[ [
// start with .pkt files // start with .pkt files
function importPacketFiles(callback) { function importPacketFiles(callback) {
self.importPacketFilesFromDirectory(importDir, err => { self.importPacketFilesFromDirectory(importDir, '', err => {
callback(err); callback(err);
}); });
}, },
@ -891,18 +897,12 @@ function FTNMessageScanTossModule() {
}); });
}); });
}, },
function createTempDir(bundleFiles, callback) {
temp.mkdir('enigftnimport-', (err, tempDir) => {
tempDirectory = tempDir;
callback(err, bundleFiles);
});
},
function importBundles(bundleFiles, callback) { function importBundles(bundleFiles, callback) {
let rejects = []; let rejects = [];
async.each(bundleFiles, (bundleFile, nextFile) => { async.each(bundleFiles, (bundleFile, nextFile) => {
if(_.isUndefined(bundleFile.archName)) { if(_.isUndefined(bundleFile.archName)) {
Log.info( Log.warn(
{ fileName : bundleFile.path }, { fileName : bundleFile.path },
'Unknown bundle archive type'); 'Unknown bundle archive type');
@ -913,11 +913,11 @@ function FTNMessageScanTossModule() {
self.archUtil.extractTo( self.archUtil.extractTo(
bundleFile.path, bundleFile.path,
tempDirectory, self.importTempDir,
bundleFile.archName, bundleFile.archName,
err => { err => {
if(err) { if(err) {
Log.info( Log.warn(
{ fileName : bundleFile.path, error : err.toString() }, { fileName : bundleFile.path, error : err.toString() },
'Failed to extract bundle'); 'Failed to extract bundle');
@ -935,7 +935,8 @@ function FTNMessageScanTossModule() {
// //
// All extracted - import .pkt's // All extracted - import .pkt's
// //
self.importPacketFilesFromDirectory(tempDirectory, err => { self.importPacketFilesFromDirectory(self.importTempDir, '', err => {
// :TODO: handle |err|
callback(null, bundleFiles, rejects); callback(null, bundleFiles, rejects);
}); });
}); });
@ -956,71 +957,83 @@ function FTNMessageScanTossModule() {
} }
], ],
err => { err => {
if(tempDirectory) { cb(err);
temp.cleanup( (errIgnored, stats) => {
Log.trace(
Object.assign(stats, { tempDir : tempDirectory } ),
'Temporary directory cleaned up'
);
cb(err); // orig err
});
} else {
cb(err);
}
} }
); );
}; };
this.createTempDirectories = function(cb) {
temp.mkdir('enigftnexport-', (err, tempDir) => {
if(err) {
return cb(err);
}
self.exportTempDir = tempDir;
temp.mkdir('enigftnimport-', (err, tempDir) => {
self.importTempDir = tempDir;
cb(err);
});
});
};
} }
require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule); require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule);
FTNMessageScanTossModule.prototype.startup = function(cb) { FTNMessageScanTossModule.prototype.startup = function(cb) {
Log.info('FidoNet Scanner/Tosser starting up'); Log.info(`${exports.moduleInfo.name} Scanner/Tosser starting up`);
if(_.isObject(this.moduleConfig.schedule)) { this.createTempDirectories(err => {
const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export); if(err) {
if(exportSchedule) { Log.warn( { error : err.toStrong() }, 'Failed creating temporary directories!');
if(exportSchedule.sched) { return cb(err);
let exporting = false; }
this.exportTimer = later.setInterval( () => {
if(!exporting) { if(_.isObject(this.moduleConfig.schedule)) {
exporting = true; const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export);
if(exportSchedule) {
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message scan/export...'); if(exportSchedule.sched) {
let exporting = false;
this.performExport(err => { this.exportTimer = later.setInterval( () => {
exporting = false; if(!exporting) {
}); exporting = true;
}
}, exportSchedule.sched); Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message scan/export...');
this.performExport( () => {
exporting = false;
});
}
}, exportSchedule.sched);
}
if(exportSchedule.watchFile) {
// :TODO: monitor file for changes/existance with gaze
}
} }
if(exportSchedule.watchFile) { const importSchedule = this.parseScheduleString(this.moduleConfig.schedule.import);
// :TODO: monitor file for changes/existance with gaze if(importSchedule) {
if(importSchedule.sched) {
let importing = false;
this.importTimer = later.setInterval( () => {
if(!importing) {
importing = true;
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message import/toss...');
this.performImport( () => {
importing = false;
});
}
}, importSchedule.sched);
}
} }
} }
const importSchedule = this.parseScheduleString(this.moduleConfig.schedule.import); FTNMessageScanTossModule.super_.prototype.startup.call(this, cb);
if(importSchedule) { });
if(importSchedule.sched) {
let importing = false;
this.importTimer = later.setInterval( () => {
if(!importing) {
importing = true;
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message import/toss...');
this.performImport(err => {
importing = false;
});
}
}, importSchedule.sched);
}
}
}
FTNMessageScanTossModule.super_.prototype.startup.call(this, cb);
}; };
FTNMessageScanTossModule.prototype.shutdown = function(cb) { FTNMessageScanTossModule.prototype.shutdown = function(cb) {
@ -1029,8 +1042,21 @@ FTNMessageScanTossModule.prototype.shutdown = function(cb) {
if(this.exportTimer) { if(this.exportTimer) {
this.exportTimer.clear(); this.exportTimer.clear();
} }
FTNMessageScanTossModule.super_.prototype.shutdown.call(this, cb); //
// Clean up temp dir/files we created
//
temp.cleanup((err, stats) => {
const fullStats = Object.assign(stats, { exportTemp : this.exportTempDir, importTemp : this.importTempDir } );
if(err) {
Log.warn(fullStats, 'Failed cleaning up temporary directories!');
} else {
Log.trace(fullStats, 'Temporary directories cleaned up');
}
FTNMessageScanTossModule.super_.prototype.shutdown.call(this, cb);
});
}; };
FTNMessageScanTossModule.prototype.performImport = function(cb) { FTNMessageScanTossModule.prototype.performImport = function(cb) {