diff --git a/core/config.js b/core/config.js index 90e641d6..9c9c4cd4 100644 --- a/core/config.js +++ b/core/config.js @@ -170,7 +170,7 @@ function getDefaultConfig() { general : { boardName : 'Another Fine ENiGMA½ BBS', - // :TODO: closedSystem and loginAttemps prob belong under users{}? + // :TODO: closedSystem prob belongs under users{}? closedSystem : false, // is the system closed to new users? menuFile : 'menu.hjson', // 'oputil.js config new' will set this appropriately in config.hjson; may be full path @@ -947,13 +947,18 @@ function getDefaultConfig() { // action: // - @method:path/to/module.js:theMethodName - // (path is relative to engima base dir) + // (path is relative to ENiGMA base dir) // // - @execute:/path/to/something/executable.sh // action : '@method:core/message_area.js:trimMessageAreasScheduledEvent', }, + nntpMaintenance : { + schedule : 'every 12 hours', // should generally be < trimMessageAreas interval + action : '@method:core/servers/content/nntp.js:performMaintenanceTask', + }, + updateFileAreaStats : { schedule : 'every 1 hours', action : '@method:core/file_base_area.js:updateAreaStatsScheduledEvent', diff --git a/core/listening_server.js b/core/listening_server.js index f28bea7f..8743d6ce 100644 --- a/core/listening_server.js +++ b/core/listening_server.js @@ -30,30 +30,23 @@ function startListening(cb) { const moduleUtil = require('./module_util.js'); // late load so we get Config async.each( [ 'login', 'content' ], (category, next) => { - moduleUtil.loadModulesForCategory(`${category}Servers`, (err, module) => { - if(err) { - if(ErrorReasons.Disabled === err.reasonCode) { - logger.log.debug(err.message); - } else { - logger.log.info( { err : err }, 'Failed loading module'); - } - return; - } - + moduleUtil.loadModulesForCategory(`${category}Servers`, (module, nextModule) => { const moduleInst = new module.getModule(); try { - moduleInst.createServer(); - if(!moduleInst.listen()) { - throw new Error('Failed listening'); - } - - listeningServers[module.moduleInfo.packageName] = { - instance : moduleInst, - info : module.moduleInfo, - }; + moduleInst.createServer(err => { + if(!moduleInst.listen()) { + throw new Error('Failed listening'); + } + listeningServers[module.moduleInfo.packageName] = { + instance : moduleInst, + info : module.moduleInfo, + }; + return nextModule(err); + }); } catch(e) { logger.log.error(e, 'Exception caught creating server!'); + return nextModule(e); } }, err => { return next(err); diff --git a/core/module_util.js b/core/module_util.js index 0e8e5976..f61929d2 100644 --- a/core/module_util.js +++ b/core/module_util.js @@ -93,8 +93,15 @@ function loadModulesForCategory(category, iterator, complete) { async.each(jsModules, (file, next) => { loadModule(paths.basename(file, '.js'), category, (err, mod) => { - iterator(err, mod); - return next(); + if(err) { + if(ErrorReasons.Disabled === err.reasonCode) { + Log.debug(err.message); + } else { + Log.info( { err : err }, 'Failed loading module'); + } + return next(null); // continue no matter what + } + return iterator(mod, next); }); }, err => { if(complete) { diff --git a/core/msg_network.js b/core/msg_network.js index b26d5f1b..e0018ece 100644 --- a/core/msg_network.js +++ b/core/msg_network.js @@ -2,10 +2,10 @@ 'use strict'; // ENiGMA½ -let loadModulesForCategory = require('./module_util.js').loadModulesForCategory; +const loadModulesForCategory = require('./module_util.js').loadModulesForCategory; // standard/deps -let async = require('async'); +const async = require('async'); exports.startup = startup; exports.shutdown = shutdown; @@ -17,16 +17,15 @@ function startup(cb) { async.series( [ function loadModules(callback) { - loadModulesForCategory('scannerTossers', (err, module) => { - if(!err) { - const modInst = new module.getModule(); + loadModulesForCategory('scannerTossers', (module, nextModule) => { + const modInst = new module.getModule(); - modInst.startup(err => { - if(!err) { - msgNetworkModules.push(modInst); - } - }); - } + modInst.startup(err => { + if(!err) { + msgNetworkModules.push(modInst); + } + }); + return nextModule(null); }, err => { callback(err); }); diff --git a/core/server_module.js b/core/server_module.js index c9ba1cab..9ba522bf 100644 --- a/core/server_module.js +++ b/core/server_module.js @@ -1,15 +1,18 @@ /* jslint node: true */ 'use strict'; -var PluginModule = require('./plugin_module.js').PluginModule; +const PluginModule = require('./plugin_module.js').PluginModule; -exports.ServerModule = ServerModule; +exports.ServerModule = class ServerModule extends PluginModule { + constructor(options) { + super(options); + } -function ServerModule() { - PluginModule.call(this); -} + createServer(cb) { + return cb(null); + } -require('util').inherits(ServerModule, PluginModule); - -ServerModule.prototype.createServer = function() { + listen(cb) { + return cb(null); + } }; diff --git a/core/servers/content/gopher.js b/core/servers/content/gopher.js index c47efc5c..7553793e 100644 --- a/core/servers/content/gopher.js +++ b/core/servers/content/gopher.js @@ -71,9 +71,9 @@ exports.getModule = class GopherModule extends ServerModule { this.log = Log.child( { server : 'Gopher' } ); } - createServer() { + createServer(cb) { if(!this.enabled) { - return; + return cb(null); } const config = Config(); @@ -96,6 +96,8 @@ exports.getModule = class GopherModule extends ServerModule { } }); }); + + return cb(null); } listen() { diff --git a/core/servers/content/nntp.js b/core/servers/content/nntp.js index 7135a585..7f20cba7 100644 --- a/core/servers/content/nntp.js +++ b/core/servers/content/nntp.js @@ -5,10 +5,13 @@ const Log = require('../../logger.js').log; const { ServerModule } = require('../../server_module.js'); const Config = require('../../config.js').get; +const { + getTransactionDatabase, + getModDatabasePath +} = require('../../database.js'); const { getMessageAreaByTag, getMessageConferenceByTag, - getMessageListForArea, } = require('../../message_area.js'); const User = require('../../user.js'); const Errors = require('../../enig_error.js').Errors; @@ -28,10 +31,14 @@ const { const NNTPServerBase = require('nntp-server'); const _ = require('lodash'); const fs = require('fs-extra'); +const forEachSeries = require('async/forEachSeries'); const asyncReduce = require('async/reduce'); const asyncMap = require('async/map'); const asyncSeries = require('async/series'); +const asyncWaterfall = require('async/waterfall'); const LRU = require('lru-cache'); +const sqlite3 = require('sqlite3'); +const paths = require('path'); // // Network News Transfer Protocol (NNTP) @@ -50,13 +57,64 @@ exports.moduleInfo = { packageName : 'codes.l33t.enigma.nntp.server', }; +exports.performMaintenanceTask = performMaintenanceTask; + /* General TODO - ACS checks need worked out. Currently ACS relies on |client|. We need a client spec that can be created even without a login server. Some checks and simply - return false/fail. + return false/fail. */ +// simple DB maps NNTP Message-ID's which are +// sequential per group -> ENiG messages +// A single instance is shared across NNTP and/or NNTPS +class NNTPDatabase +{ + constructor() { + } + + init(cb) { + asyncSeries( + [ + (callback) => { + this.db = getTransactionDatabase(new sqlite3.Database( + getModDatabasePath(exports.moduleInfo), + err => { + return callback(err); + } + )); + }, + (callback) => { + this.db.serialize( () => { + this.db.run( + `CREATE TABLE IF NOT EXISTS nntp_area_message ( + nntp_message_id INTEGER NOT NULL, + message_id INTEGER NOT NULL, + message_area_tag VARCHAR NOT NULL, + message_uuid VARCHAR NOT NULL, + + UNIQUE(nntp_message_id, message_area_tag) + );` + ); + + this.db.run( + `CREATE INDEX IF NOT EXISTS nntp_area_message_by_uuid_index + ON nntp_area_message (message_uuid);` + ); + + return callback(null); + }); + } + ], + err => { + return cb(err); + } + ); + } +} + +let nntpDatabase; class NNTPServer extends NNTPServerBase { constructor(options, serverName) { @@ -97,10 +155,6 @@ class NNTPServer extends NNTPServerBase { }); } - getMessageListIndexByMessageID(id, session) { - return id - _.get(session.groupInfo.messageList, [ 0, 'messageId' ]); - } - isGroupSelected(session) { return Array.isArray(_.get(session, 'groupInfo.messageList')); } @@ -145,7 +199,7 @@ class NNTPServer extends NNTPServerBase { case [ Message.AddressFlavor.Email ] : jamStyleFrom = `${fromName} <${remoteFrom}>`; break; - } + } } if(!jamStyleFrom) { @@ -256,14 +310,11 @@ class NNTPServer extends NNTPServerBase { return null; } - // - // Adjust to offset in message list & get UUID - // This works since we create "pseudo IDs" to return to NNTP - // by using firstRealID + index. A find on |index| member would - // also work, but would be O(n). - // - const mlIndex = this.getMessageListIndexByMessageID(messageId, session); - messageUuid = _.get(session.groupInfo.messageList, [ mlIndex, 'messageUuid']); + const msg = session.groupInfo.messageList.find(m => { + return m.index === messageId; + }); + + messageUuid = msg && msg.messageUuid; } else { // request [ , messageUuid ] = this.getMessageIdentifierParts(messageId); @@ -330,15 +381,12 @@ class NNTPServer extends NNTPServerBase { }); } - _getRange(session, first, last, options) { + _getRange(session, first, last /*options*/) { return new Promise(resolve => { // // Build an array of message objects that can later // be used with the various _build* methods. // - // Messages must belong to the range of *pseudo IDs* - // aka |index|. - // // :TODO: Handle |options| if(!this.isGroupSelected(session)) { return resolve(null); @@ -353,7 +401,7 @@ class NNTPServer extends NNTPServerBase { } return true; }).map(m => { - return { uuid : m.messageUuid, index : m.index } + return { uuid : m.messageUuid, index : m.index }; }); asyncMap(uuids, (msgInfo, nextMessageUuid) => { @@ -507,7 +555,7 @@ class NNTPServer extends NNTPServerBase { return cb(Errors.DoesNotExist(`No area for areaTag "${areaTag}" / confTag "${confTag}"`)); } - getMessageListForArea(null, areaTag, (err, messageList) => { + this.getMappedMessageListForArea(areaTag, (err, messageList) => { if(err) { return cb(err); } @@ -533,15 +581,6 @@ class NNTPServer extends NNTPServerBase { }); } - const firstMsg = messageList[0]; - - // node-nntp wants "index" - let index = firstMsg.messageId; - messageList.forEach(m => { - m.index = index; - ++index; - }); - group = { messageList, confTag, @@ -550,8 +589,8 @@ class NNTPServer extends NNTPServerBase { friendlyDesc : area.desc, nntp : { name : groupName, - min_index : firstMsg.messageId, - max_index : firstMsg.messageId + messageList.length - 1, + min_index : messageList[0].index, + max_index : messageList[messageList.length - 1].index, total : messageList.length, }, }; @@ -562,6 +601,115 @@ class NNTPServer extends NNTPServerBase { }); } + getMappedMessageListForArea(areaTag, cb) { + // + // Get all messages in mapped database. Then, find any messages that are not + // yet mapped with ID's > the highest ID we have. Any new messages will have + // new mappings created. + // + // :TODO: introduce caching + asyncWaterfall( + [ + (callback) => { + nntpDatabase.db.all( + `SELECT nntp_message_id, message_id, message_uuid + FROM nntp_area_message + WHERE message_area_tag = ? + ORDER BY nntp_message_id;`, + [ areaTag ], + (err, rows) => { + if(err) { + return callback(err); + } + + let messageList; + const lastMessageId = rows.length > 0 ? rows[rows.length - 1].message_id : 0; + if(!lastMessageId) { + messageList = []; + } else { + messageList = rows.map(r => { + return { + areaTag, + index : r.nntp_message_id, // node-nntp wants this name + messageUuid : r.message_uuid, + }; + }); + } + + return callback(null, messageList, lastMessageId); + } + ); + }, + (messageList, lastMessageId, callback) => { + // Find any new entries + const filter = { + areaTag, + newerThanMessageId : lastMessageId, + sort : 'messageId', + order : 'ascending', + resultType : 'messageList', + }; + Message.findMessages(filter, (err, newMessageList) => { + if(err) { + return callback(err); + } + + let index = messageList.length > 0 ? + messageList[messageList.length - 1].index + 1 + : 1; + newMessageList = newMessageList.map(m => { + return Object.assign(m, { index : index++ } ); + }); + + if(0 === newMessageList.length) { + return callback(null, messageList); + } + + // populate mapping DB with any new entries + nntpDatabase.db.beginTransaction( (err, trans) => { + if(err) { + return callback(err); + } + + forEachSeries(newMessageList, (newMessage, nextNewMessage) => { + trans.run( + `INSERT INTO nntp_area_message (nntp_message_id, message_id, message_area_tag, message_uuid) + VALUES (?, ?, ?, ?);`, + [ newMessage.index, newMessage.messageId, areaTag, newMessage.messageUuid ], + err => { + return nextNewMessage(err); + } + ); + }, + err => { + if(err) { + return trans.rollback( () => { + return callback(err); + }); + } + + trans.commit( () => { + messageList.push(...newMessageList.map(m => { + return { + areaTag, + index : m.nntpMessageId, + messageUuid : m.messageUuid, + }; + })); + + return callback(null, messageList); + }); + }); + }); + }); + } + ], + (err, messageList) => { + return cb(err, messageList); + } + ); + } + _buildHead(session, message) { return _.map(message.nntpHeaders, (v, k) => `${k}: ${v}`).join('\r\n'); } @@ -617,6 +765,7 @@ class NNTPServer extends NNTPServerBase { } getMessageIdentifier(message) { + // note that we use the *real* message ID here, not the NNTP-specific index. return this.makeMessageIdentifier(message.messageId, message.messageUuid); } @@ -727,9 +876,9 @@ exports.getModule = class NNTPServerModule extends ServerModule { return true; } - createServer() { + createServer(cb) { if(!this.isEnabled() || !this.isConfigured()) { - return; + return cb(null); } const config = Config(); @@ -762,6 +911,11 @@ exports.getModule = class NNTPServerModule extends ServerModule { 'NTTPS' ); } + + nntpDatabase = new NNTPDatabase(); + nntpDatabase.init(err => { + return cb(err); + }); } listen() { @@ -785,3 +939,53 @@ exports.getModule = class NNTPServerModule extends ServerModule { return `${service}://0.0.0.0:${port}`; } }; + +function performMaintenanceTask(args, cb) { + // + // Delete any message mapping that no longer have + // an actual message associated with them. + // + if(!nntpDatabase) { + Log.trace('Cannot perform NNTP maintenance without NNTP database initialized'); + return cb(null); + } + + let attached = false; + asyncSeries( + [ + (callback) => { + const messageDbPath = paths.join(Config().paths.db, 'message.sqlite3'); + nntpDatabase.db.run( + `ATTACH DATABASE "${messageDbPath}" AS msgdb;`, + err => { + attached = !err; + return callback(err); + } + ); + }, + (callback) => { + nntpDatabase.db.run( + `DELETE FROM nntp_area_message + WHERE message_uuid NOT IN ( + SELECT message_uuid + FROM msgdb.message + );`, + function result(err) { // no arrow func; need |this.changes| + if(err) { + Log.warn( { error : err.message }, 'Failed to delete from NNTP database'); + } else { + Log.debug( { count : this.changes }, 'Deleted mapped message IDs from NNTP database'); + } + return callback(err); + } + ); + } + ], + err => { + if(attached) { + nntpDatabase.db.run('DETACH DATABASE msgdb;'); + } + return cb(err); + } + ); +} \ No newline at end of file diff --git a/core/servers/content/web.js b/core/servers/content/web.js index 7088b8f9..f0e0d903 100644 --- a/core/servers/content/web.js +++ b/core/servers/content/web.js @@ -104,7 +104,7 @@ exports.getModule = class WebServerModule extends ServerModule { return this.enableHttp || this.enableHttps; } - createServer() { + createServer(cb) { if(this.enableHttp) { this.httpServer = http.createServer( (req, resp) => this.routeRequest(req, resp) ); } @@ -121,6 +121,8 @@ exports.getModule = class WebServerModule extends ServerModule { this.httpsServer = https.createServer(options, (req, resp) => this.routeRequest(req, resp) ); } + + return cb(null); } listen() { diff --git a/core/servers/login/ssh.js b/core/servers/login/ssh.js index 27b5aed9..263a3929 100644 --- a/core/servers/login/ssh.js +++ b/core/servers/login/ssh.js @@ -288,10 +288,10 @@ exports.getModule = class SSHServerModule extends LoginServerModule { super(); } - createServer() { + createServer(cb) { const config = Config(); if(true != config.loginServers.ssh.enabled) { - return; + return cb(null); } const serverConf = { @@ -318,6 +318,8 @@ exports.getModule = class SSHServerModule extends LoginServerModule { Log.info(info, 'New SSH connection'); this.handleNewClient(new SSHClient(conn), conn._sock, ModuleInfo); }); + + return cb(null); } listen() { diff --git a/core/servers/login/telnet.js b/core/servers/login/telnet.js index 1f8afa6a..ae1ecbe9 100644 --- a/core/servers/login/telnet.js +++ b/core/servers/login/telnet.js @@ -852,7 +852,7 @@ exports.getModule = class TelnetServerModule extends LoginServerModule { super(); } - createServer() { + createServer(cb) { this.server = net.createServer( sock => { const client = new TelnetClient(sock, sock); @@ -876,6 +876,8 @@ exports.getModule = class TelnetServerModule extends LoginServerModule { this.server.on('error', err => { Log.info( { error : err.message }, 'Telnet server error'); }); + + return cb(null); } listen() { diff --git a/core/servers/login/websocket.js b/core/servers/login/websocket.js index 43245e74..b27aaf6d 100644 --- a/core/servers/login/websocket.js +++ b/core/servers/login/websocket.js @@ -123,7 +123,7 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule { super(); } - createServer() { + createServer(cb) { // // We will actually create up to two servers: // * insecure websocket (ws://) @@ -131,7 +131,7 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule { // const config = _.get(Config(), 'loginServers.webSocket'); if(!_.isObject(config)) { - return; + return cb(null); } const wsPort = _.get(config, 'ws.port'); @@ -161,6 +161,8 @@ exports.getModule = class WebSocketLoginServer extends LoginServerModule { wsServer : new WebSocketServer( { server : httpServer } ), }; } + + return cb(null); } listen() {