* ServerModule's createServer() is now async

* Re-write of NNTP Message-ID <> internal message UUIDs
This commit is contained in:
Bryan Ashby 2018-12-27 02:19:26 -07:00
parent 346815a4f2
commit 9d1815682d
11 changed files with 305 additions and 84 deletions

View File

@ -170,7 +170,7 @@ function getDefaultConfig() {
general : { general : {
boardName : 'Another Fine ENiGMA½ BBS', boardName : 'Another Fine ENiGMA½ BBS',
// :TODO: closedSystem and loginAttemps prob belong under users{}? // :TODO: closedSystem prob belongs under users{}?
closedSystem : false, // is the system closed to new users? closedSystem : false, // is the system closed to new users?
menuFile : 'menu.hjson', // 'oputil.js config new' will set this appropriately in config.hjson; may be full path menuFile : 'menu.hjson', // 'oputil.js config new' will set this appropriately in config.hjson; may be full path
@ -947,13 +947,18 @@ function getDefaultConfig() {
// action: // action:
// - @method:path/to/module.js:theMethodName // - @method:path/to/module.js:theMethodName
// (path is relative to engima base dir) // (path is relative to ENiGMA base dir)
// //
// - @execute:/path/to/something/executable.sh // - @execute:/path/to/something/executable.sh
// //
action : '@method:core/message_area.js:trimMessageAreasScheduledEvent', action : '@method:core/message_area.js:trimMessageAreasScheduledEvent',
}, },
nntpMaintenance : {
schedule : 'every 12 hours', // should generally be < trimMessageAreas interval
action : '@method:core/servers/content/nntp.js:performMaintenanceTask',
},
updateFileAreaStats : { updateFileAreaStats : {
schedule : 'every 1 hours', schedule : 'every 1 hours',
action : '@method:core/file_base_area.js:updateAreaStatsScheduledEvent', action : '@method:core/file_base_area.js:updateAreaStatsScheduledEvent',

View File

@ -30,30 +30,23 @@ function startListening(cb) {
const moduleUtil = require('./module_util.js'); // late load so we get Config const moduleUtil = require('./module_util.js'); // late load so we get Config
async.each( [ 'login', 'content' ], (category, next) => { async.each( [ 'login', 'content' ], (category, next) => {
moduleUtil.loadModulesForCategory(`${category}Servers`, (err, module) => { moduleUtil.loadModulesForCategory(`${category}Servers`, (module, nextModule) => {
if(err) {
if(ErrorReasons.Disabled === err.reasonCode) {
logger.log.debug(err.message);
} else {
logger.log.info( { err : err }, 'Failed loading module');
}
return;
}
const moduleInst = new module.getModule(); const moduleInst = new module.getModule();
try { try {
moduleInst.createServer(); moduleInst.createServer(err => {
if(!moduleInst.listen()) { if(!moduleInst.listen()) {
throw new Error('Failed listening'); throw new Error('Failed listening');
} }
listeningServers[module.moduleInfo.packageName] = {
instance : moduleInst,
info : module.moduleInfo,
};
listeningServers[module.moduleInfo.packageName] = {
instance : moduleInst,
info : module.moduleInfo,
};
return nextModule(err);
});
} catch(e) { } catch(e) {
logger.log.error(e, 'Exception caught creating server!'); logger.log.error(e, 'Exception caught creating server!');
return nextModule(e);
} }
}, err => { }, err => {
return next(err); return next(err);

View File

@ -93,8 +93,15 @@ function loadModulesForCategory(category, iterator, complete) {
async.each(jsModules, (file, next) => { async.each(jsModules, (file, next) => {
loadModule(paths.basename(file, '.js'), category, (err, mod) => { loadModule(paths.basename(file, '.js'), category, (err, mod) => {
iterator(err, mod); if(err) {
return next(); if(ErrorReasons.Disabled === err.reasonCode) {
Log.debug(err.message);
} else {
Log.info( { err : err }, 'Failed loading module');
}
return next(null); // continue no matter what
}
return iterator(mod, next);
}); });
}, err => { }, err => {
if(complete) { if(complete) {

View File

@ -2,10 +2,10 @@
'use strict'; 'use strict';
// ENiGMA½ // ENiGMA½
let loadModulesForCategory = require('./module_util.js').loadModulesForCategory; const loadModulesForCategory = require('./module_util.js').loadModulesForCategory;
// standard/deps // standard/deps
let async = require('async'); const async = require('async');
exports.startup = startup; exports.startup = startup;
exports.shutdown = shutdown; exports.shutdown = shutdown;
@ -17,16 +17,15 @@ function startup(cb) {
async.series( async.series(
[ [
function loadModules(callback) { function loadModules(callback) {
loadModulesForCategory('scannerTossers', (err, module) => { loadModulesForCategory('scannerTossers', (module, nextModule) => {
if(!err) { const modInst = new module.getModule();
const modInst = new module.getModule();
modInst.startup(err => { modInst.startup(err => {
if(!err) { if(!err) {
msgNetworkModules.push(modInst); msgNetworkModules.push(modInst);
} }
}); });
} return nextModule(null);
}, err => { }, err => {
callback(err); callback(err);
}); });

View File

@ -1,15 +1,18 @@
/* jslint node: true */ /* jslint node: true */
'use strict'; 'use strict';
var PluginModule = require('./plugin_module.js').PluginModule; const PluginModule = require('./plugin_module.js').PluginModule;
exports.ServerModule = ServerModule; exports.ServerModule = class ServerModule extends PluginModule {
constructor(options) {
super(options);
}
function ServerModule() { createServer(cb) {
PluginModule.call(this); return cb(null);
} }
require('util').inherits(ServerModule, PluginModule); listen(cb) {
return cb(null);
ServerModule.prototype.createServer = function() { }
}; };

View File

@ -71,9 +71,9 @@ exports.getModule = class GopherModule extends ServerModule {
this.log = Log.child( { server : 'Gopher' } ); this.log = Log.child( { server : 'Gopher' } );
} }
createServer() { createServer(cb) {
if(!this.enabled) { if(!this.enabled) {
return; return cb(null);
} }
const config = Config(); const config = Config();
@ -96,6 +96,8 @@ exports.getModule = class GopherModule extends ServerModule {
} }
}); });
}); });
return cb(null);
} }
listen() { listen() {

View File

@ -5,10 +5,13 @@
const Log = require('../../logger.js').log; const Log = require('../../logger.js').log;
const { ServerModule } = require('../../server_module.js'); const { ServerModule } = require('../../server_module.js');
const Config = require('../../config.js').get; const Config = require('../../config.js').get;
const {
getTransactionDatabase,
getModDatabasePath
} = require('../../database.js');
const { const {
getMessageAreaByTag, getMessageAreaByTag,
getMessageConferenceByTag, getMessageConferenceByTag,
getMessageListForArea,
} = require('../../message_area.js'); } = require('../../message_area.js');
const User = require('../../user.js'); const User = require('../../user.js');
const Errors = require('../../enig_error.js').Errors; const Errors = require('../../enig_error.js').Errors;
@ -28,10 +31,14 @@ const {
const NNTPServerBase = require('nntp-server'); const NNTPServerBase = require('nntp-server');
const _ = require('lodash'); const _ = require('lodash');
const fs = require('fs-extra'); const fs = require('fs-extra');
const forEachSeries = require('async/forEachSeries');
const asyncReduce = require('async/reduce'); const asyncReduce = require('async/reduce');
const asyncMap = require('async/map'); const asyncMap = require('async/map');
const asyncSeries = require('async/series'); const asyncSeries = require('async/series');
const asyncWaterfall = require('async/waterfall');
const LRU = require('lru-cache'); const LRU = require('lru-cache');
const sqlite3 = require('sqlite3');
const paths = require('path');
// //
// Network News Transfer Protocol (NNTP) // Network News Transfer Protocol (NNTP)
@ -50,13 +57,64 @@ exports.moduleInfo = {
packageName : 'codes.l33t.enigma.nntp.server', packageName : 'codes.l33t.enigma.nntp.server',
}; };
exports.performMaintenanceTask = performMaintenanceTask;
/* /*
General TODO General TODO
- ACS checks need worked out. Currently ACS relies on |client|. We need a client - ACS checks need worked out. Currently ACS relies on |client|. We need a client
spec that can be created even without a login server. Some checks and simply spec that can be created even without a login server. Some checks and simply
return false/fail. return false/fail.
*/ */
// simple DB maps NNTP Message-ID's which are
// sequential per group -> ENiG messages
// A single instance is shared across NNTP and/or NNTPS
class NNTPDatabase
{
constructor() {
}
init(cb) {
asyncSeries(
[
(callback) => {
this.db = getTransactionDatabase(new sqlite3.Database(
getModDatabasePath(exports.moduleInfo),
err => {
return callback(err);
}
));
},
(callback) => {
this.db.serialize( () => {
this.db.run(
`CREATE TABLE IF NOT EXISTS nntp_area_message (
nntp_message_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
message_area_tag VARCHAR NOT NULL,
message_uuid VARCHAR NOT NULL,
UNIQUE(nntp_message_id, message_area_tag)
);`
);
this.db.run(
`CREATE INDEX IF NOT EXISTS nntp_area_message_by_uuid_index
ON nntp_area_message (message_uuid);`
);
return callback(null);
});
}
],
err => {
return cb(err);
}
);
}
}
let nntpDatabase;
class NNTPServer extends NNTPServerBase { class NNTPServer extends NNTPServerBase {
constructor(options, serverName) { constructor(options, serverName) {
@ -97,10 +155,6 @@ class NNTPServer extends NNTPServerBase {
}); });
} }
getMessageListIndexByMessageID(id, session) {
return id - _.get(session.groupInfo.messageList, [ 0, 'messageId' ]);
}
isGroupSelected(session) { isGroupSelected(session) {
return Array.isArray(_.get(session, 'groupInfo.messageList')); return Array.isArray(_.get(session, 'groupInfo.messageList'));
} }
@ -145,7 +199,7 @@ class NNTPServer extends NNTPServerBase {
case [ Message.AddressFlavor.Email ] : case [ Message.AddressFlavor.Email ] :
jamStyleFrom = `${fromName} <${remoteFrom}>`; jamStyleFrom = `${fromName} <${remoteFrom}>`;
break; break;
} }
} }
if(!jamStyleFrom) { if(!jamStyleFrom) {
@ -256,14 +310,11 @@ class NNTPServer extends NNTPServerBase {
return null; return null;
} }
// const msg = session.groupInfo.messageList.find(m => {
// Adjust to offset in message list & get UUID return m.index === messageId;
// This works since we create "pseudo IDs" to return to NNTP });
// by using firstRealID + index. A find on |index| member would
// also work, but would be O(n). messageUuid = msg && msg.messageUuid;
//
const mlIndex = this.getMessageListIndexByMessageID(messageId, session);
messageUuid = _.get(session.groupInfo.messageList, [ mlIndex, 'messageUuid']);
} else { } else {
// <Message-ID> request // <Message-ID> request
[ , messageUuid ] = this.getMessageIdentifierParts(messageId); [ , messageUuid ] = this.getMessageIdentifierParts(messageId);
@ -330,15 +381,12 @@ class NNTPServer extends NNTPServerBase {
}); });
} }
_getRange(session, first, last, options) { _getRange(session, first, last /*options*/) {
return new Promise(resolve => { return new Promise(resolve => {
// //
// Build an array of message objects that can later // Build an array of message objects that can later
// be used with the various _build* methods. // be used with the various _build* methods.
// //
// Messages must belong to the range of *pseudo IDs*
// aka |index|.
//
// :TODO: Handle |options| // :TODO: Handle |options|
if(!this.isGroupSelected(session)) { if(!this.isGroupSelected(session)) {
return resolve(null); return resolve(null);
@ -353,7 +401,7 @@ class NNTPServer extends NNTPServerBase {
} }
return true; return true;
}).map(m => { }).map(m => {
return { uuid : m.messageUuid, index : m.index } return { uuid : m.messageUuid, index : m.index };
}); });
asyncMap(uuids, (msgInfo, nextMessageUuid) => { asyncMap(uuids, (msgInfo, nextMessageUuid) => {
@ -507,7 +555,7 @@ class NNTPServer extends NNTPServerBase {
return cb(Errors.DoesNotExist(`No area for areaTag "${areaTag}" / confTag "${confTag}"`)); return cb(Errors.DoesNotExist(`No area for areaTag "${areaTag}" / confTag "${confTag}"`));
} }
getMessageListForArea(null, areaTag, (err, messageList) => { this.getMappedMessageListForArea(areaTag, (err, messageList) => {
if(err) { if(err) {
return cb(err); return cb(err);
} }
@ -533,15 +581,6 @@ class NNTPServer extends NNTPServerBase {
}); });
} }
const firstMsg = messageList[0];
// node-nntp wants "index"
let index = firstMsg.messageId;
messageList.forEach(m => {
m.index = index;
++index;
});
group = { group = {
messageList, messageList,
confTag, confTag,
@ -550,8 +589,8 @@ class NNTPServer extends NNTPServerBase {
friendlyDesc : area.desc, friendlyDesc : area.desc,
nntp : { nntp : {
name : groupName, name : groupName,
min_index : firstMsg.messageId, min_index : messageList[0].index,
max_index : firstMsg.messageId + messageList.length - 1, max_index : messageList[messageList.length - 1].index,
total : messageList.length, total : messageList.length,
}, },
}; };
@ -562,6 +601,115 @@ class NNTPServer extends NNTPServerBase {
}); });
} }
getMappedMessageListForArea(areaTag, cb) {
//
// Get all messages in mapped database. Then, find any messages that are not
// yet mapped with ID's > the highest ID we have. Any new messages will have
// new mappings created.
//
// :TODO: introduce caching
asyncWaterfall(
[
(callback) => {
nntpDatabase.db.all(
`SELECT nntp_message_id, message_id, message_uuid
FROM nntp_area_message
WHERE message_area_tag = ?
ORDER BY nntp_message_id;`,
[ areaTag ],
(err, rows) => {
if(err) {
return callback(err);
}
let messageList;
const lastMessageId = rows.length > 0 ? rows[rows.length - 1].message_id : 0;
if(!lastMessageId) {
messageList = [];
} else {
messageList = rows.map(r => {
return {
areaTag,
index : r.nntp_message_id, // node-nntp wants this name
messageUuid : r.message_uuid,
};
});
}
return callback(null, messageList, lastMessageId);
}
);
},
(messageList, lastMessageId, callback) => {
// Find any new entries
const filter = {
areaTag,
newerThanMessageId : lastMessageId,
sort : 'messageId',
order : 'ascending',
resultType : 'messageList',
};
Message.findMessages(filter, (err, newMessageList) => {
if(err) {
return callback(err);
}
let index = messageList.length > 0 ?
messageList[messageList.length - 1].index + 1
: 1;
newMessageList = newMessageList.map(m => {
return Object.assign(m, { index : index++ } );
});
if(0 === newMessageList.length) {
return callback(null, messageList);
}
// populate mapping DB with any new entries
nntpDatabase.db.beginTransaction( (err, trans) => {
if(err) {
return callback(err);
}
forEachSeries(newMessageList, (newMessage, nextNewMessage) => {
trans.run(
`INSERT INTO nntp_area_message (nntp_message_id, message_id, message_area_tag, message_uuid)
VALUES (?, ?, ?, ?);`,
[ newMessage.index, newMessage.messageId, areaTag, newMessage.messageUuid ],
err => {
return nextNewMessage(err);
}
);
},
err => {
if(err) {
return trans.rollback( () => {
return callback(err);
});
}
trans.commit( () => {
messageList.push(...newMessageList.map(m => {
return {
areaTag,
index : m.nntpMessageId,
messageUuid : m.messageUuid,
};
}));
return callback(null, messageList);
});
});
});
});
}
],
(err, messageList) => {
return cb(err, messageList);
}
);
}
_buildHead(session, message) { _buildHead(session, message) {
return _.map(message.nntpHeaders, (v, k) => `${k}: ${v}`).join('\r\n'); return _.map(message.nntpHeaders, (v, k) => `${k}: ${v}`).join('\r\n');
} }
@ -617,6 +765,7 @@ class NNTPServer extends NNTPServerBase {
} }
getMessageIdentifier(message) { getMessageIdentifier(message) {
// note that we use the *real* message ID here, not the NNTP-specific index.
return this.makeMessageIdentifier(message.messageId, message.messageUuid); return this.makeMessageIdentifier(message.messageId, message.messageUuid);
} }
@ -727,9 +876,9 @@ exports.getModule = class NNTPServerModule extends ServerModule {
return true; return true;
} }
createServer() { createServer(cb) {
if(!this.isEnabled() || !this.isConfigured()) { if(!this.isEnabled() || !this.isConfigured()) {
return; return cb(null);
} }
const config = Config(); const config = Config();
@ -762,6 +911,11 @@ exports.getModule = class NNTPServerModule extends ServerModule {
'NTTPS' 'NTTPS'
); );
} }
nntpDatabase = new NNTPDatabase();
nntpDatabase.init(err => {
return cb(err);
});
} }
listen() { listen() {
@ -785,3 +939,53 @@ exports.getModule = class NNTPServerModule extends ServerModule {
return `${service}://0.0.0.0:${port}`; return `${service}://0.0.0.0:${port}`;
} }
}; };
function performMaintenanceTask(args, cb) {
//
// Delete any message mapping that no longer have
// an actual message associated with them.
//
if(!nntpDatabase) {
Log.trace('Cannot perform NNTP maintenance without NNTP database initialized');
return cb(null);
}
let attached = false;
asyncSeries(
[
(callback) => {
const messageDbPath = paths.join(Config().paths.db, 'message.sqlite3');
nntpDatabase.db.run(
`ATTACH DATABASE "${messageDbPath}" AS msgdb;`,
err => {
attached = !err;
return callback(err);
}
);
},
(callback) => {
nntpDatabase.db.run(
`DELETE FROM nntp_area_message
WHERE message_uuid NOT IN (
SELECT message_uuid
FROM msgdb.message
);`,
function result(err) { // no arrow func; need |this.changes|
if(err) {
Log.warn( { error : err.message }, 'Failed to delete from NNTP database');
} else {
Log.debug( { count : this.changes }, 'Deleted mapped message IDs from NNTP database');
}
return callback(err);
}
);
}
],
err => {
if(attached) {
nntpDatabase.db.run('DETACH DATABASE msgdb;');
}
return cb(err);
}
);
}

View File

@ -104,7 +104,7 @@ exports.getModule = class WebServerModule extends ServerModule {
return this.enableHttp || this.enableHttps; return this.enableHttp || this.enableHttps;
} }
createServer() { createServer(cb) {
if(this.enableHttp) { if(this.enableHttp) {
this.httpServer = http.createServer( (req, resp) => this.routeRequest(req, resp) ); this.httpServer = http.createServer( (req, resp) => this.routeRequest(req, resp) );
} }
@ -121,6 +121,8 @@ exports.getModule = class WebServerModule extends ServerModule {
this.httpsServer = https.createServer(options, (req, resp) => this.routeRequest(req, resp) ); this.httpsServer = https.createServer(options, (req, resp) => this.routeRequest(req, resp) );
} }
return cb(null);
} }
listen() { listen() {

View File

@ -288,10 +288,10 @@ exports.getModule = class SSHServerModule extends LoginServerModule {
super(); super();
} }
createServer() { createServer(cb) {
const config = Config(); const config = Config();
if(true != config.loginServers.ssh.enabled) { if(true != config.loginServers.ssh.enabled) {
return; return cb(null);
} }
const serverConf = { const serverConf = {
@ -318,6 +318,8 @@ exports.getModule = class SSHServerModule extends LoginServerModule {
Log.info(info, 'New SSH connection'); Log.info(info, 'New SSH connection');
this.handleNewClient(new SSHClient(conn), conn._sock, ModuleInfo); this.handleNewClient(new SSHClient(conn), conn._sock, ModuleInfo);
}); });
return cb(null);
} }
listen() { listen() {

View File

@ -852,7 +852,7 @@ exports.getModule = class TelnetServerModule extends LoginServerModule {
super(); super();
} }
createServer() { createServer(cb) {
this.server = net.createServer( sock => { this.server = net.createServer( sock => {
const client = new TelnetClient(sock, sock); const client = new TelnetClient(sock, sock);
@ -876,6 +876,8 @@ exports.getModule = class TelnetServerModule extends LoginServerModule {
this.server.on('error', err => { this.server.on('error', err => {
Log.info( { error : err.message }, 'Telnet server error'); Log.info( { error : err.message }, 'Telnet server error');
}); });
return cb(null);
} }
listen() { listen() {

View File

@ -123,7 +123,7 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule {
super(); super();
} }
createServer() { createServer(cb) {
// //
// We will actually create up to two servers: // We will actually create up to two servers:
// * insecure websocket (ws://) // * insecure websocket (ws://)
@ -131,7 +131,7 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule {
// //
const config = _.get(Config(), 'loginServers.webSocket'); const config = _.get(Config(), 'loginServers.webSocket');
if(!_.isObject(config)) { if(!_.isObject(config)) {
return; return cb(null);
} }
const wsPort = _.get(config, 'ws.port'); const wsPort = _.get(config, 'ws.port');
@ -161,6 +161,8 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule {
wsServer : new WebSocketServer( { server : httpServer } ), wsServer : new WebSocketServer( { server : httpServer } ),
}; };
} }
return cb(null);
} }
listen() { listen() {