* Implemented @watch rule for import schedule

* Implemented @immediate rule for export schedule
This commit is contained in:
Bryan Ashby 2016-03-15 21:44:24 -06:00
parent 964c53ea9f
commit d29829a46c
1 changed files with 66 additions and 35 deletions

View File

@ -21,6 +21,7 @@ let fs = require('fs');
let later = require('later'); let later = require('later');
let temp = require('temp').track(); // track() cleans up temp dir/files for us let temp = require('temp').track(); // track() cleans up temp dir/files for us
let assert = require('assert'); let assert = require('assert');
let gaze = require('gaze');
exports.moduleInfo = { exports.moduleInfo = {
name : 'FTN BSO', name : 'FTN BSO',
@ -1009,6 +1010,21 @@ function FTNMessageScanTossModule() {
}); });
}); });
}; };
// Starts an export block - returns true if we can proceed
this.exportingStart = function() {
if(!this.exportRunning) {
this.exportRunning = true;
return true;
}
return false;
};
// ends an export block
this.exportingEnd = function() {
this.exportRunning = false;
};
} }
require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule); require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule);
@ -1016,6 +1032,22 @@ require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule);
FTNMessageScanTossModule.prototype.startup = function(cb) { FTNMessageScanTossModule.prototype.startup = function(cb) {
Log.info(`${exports.moduleInfo.name} Scanner/Tosser starting up`); Log.info(`${exports.moduleInfo.name} Scanner/Tosser starting up`);
let importing = false;
let self = this;
function tryImportNow(reasonDesc) {
if(!importing) {
importing = true;
Log.info( { module : exports.moduleInfo.name }, reasonDesc);
self.performImport( () => {
importing = false;
});
}
}
this.createTempDirectories(err => { this.createTempDirectories(err => {
if(err) { if(err) {
Log.warn( { error : err.toStrong() }, 'Failed creating temporary directories!'); Log.warn( { error : err.toStrong() }, 'Failed creating temporary directories!');
@ -1025,25 +1057,17 @@ FTNMessageScanTossModule.prototype.startup = function(cb) {
if(_.isObject(this.moduleConfig.schedule)) { if(_.isObject(this.moduleConfig.schedule)) {
const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export); const exportSchedule = this.parseScheduleString(this.moduleConfig.schedule.export);
if(exportSchedule) { if(exportSchedule) {
if(exportSchedule.sched) { if(exportSchedule.sched && this.exportingStart()) {
let exporting = false;
this.exportTimer = later.setInterval( () => { this.exportTimer = later.setInterval( () => {
if(!exporting) {
exporting = true;
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message scan/export...'); Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message scan/export...');
this.performExport( () => { this.performExport( () => {
exporting = false; this.exportingEnd();
}); });
}
}, exportSchedule.sched); }, exportSchedule.sched);
} }
if(exportSchedule.watchFile) {
// :TODO: monitor file for changes/existance with gaze
}
if(_.isBoolean(exportSchedule.immediate)) { if(_.isBoolean(exportSchedule.immediate)) {
this.exportImmediate = exportSchedule.immediate; this.exportImmediate = exportSchedule.immediate;
} }
@ -1052,19 +1076,20 @@ FTNMessageScanTossModule.prototype.startup = function(cb) {
const importSchedule = this.parseScheduleString(this.moduleConfig.schedule.import); const importSchedule = this.parseScheduleString(this.moduleConfig.schedule.import);
if(importSchedule) { if(importSchedule) {
if(importSchedule.sched) { if(importSchedule.sched) {
let importing = false;
this.importTimer = later.setInterval( () => { this.importTimer = later.setInterval( () => {
if(!importing) { tryImportNow('Performing scheduled message import/toss...');
importing = true;
Log.info( { module : exports.moduleInfo.name }, 'Performing scheduled message import/toss...');
this.performImport( () => {
importing = false;
});
}
}, importSchedule.sched); }, importSchedule.sched);
} }
if(_.isString(importSchedule.watchFile)) {
gaze(importSchedule.watchFile, (err, watcher) => {
watcher.on('all', (event, watchedPath) => {
if(importSchedule.watchFile === watchedPath) {
tryImportNow(`Performing import/toss due to @watch: ${watchedPath} (${event})`);
}
});
});
}
} }
} }
@ -1228,12 +1253,18 @@ FTNMessageScanTossModule.prototype.record = function(message) {
return; return;
} }
// :TODO: We must share a check to block export with schedule/timer when this is exporting... if(this.exportingStart()) {
// :TODO: Messages must be marked as "exported" else we will export this particular message again later @ schedule/timer this.exportMessagesToUplinks( [ message.uuid ], areaConfig, err => {
// ...if getNewUuidsSql in performExport checks for MSGID existence also we can omit already-exported messages const info = { uuid : message.uuid, subject : message.subject };
this.exportMessagesToUplinks( [ message.uuid ], areaConfig, err => { if(err) {
}); Log.warn(info, 'Failed exporting message');
} else {
Log.info(info, 'Message exported');
}
this.exportingEnd();
});
}
} }
}; };