diff --git a/core/file_base_area.js b/core/file_base_area.js index 9cfb1cb9..0c065357 100644 --- a/core/file_base_area.js +++ b/core/file_base_area.js @@ -630,23 +630,224 @@ function scanFile(filePath, options, iterator, cb) { fileName : paths.basename(filePath), }; - function callIter(next) { - if(iterator) { - return iterator(stepInfo, next); - } else { - return next(null); - } - } + const callIter = (next) => { + return iterator ? iterator(stepInfo, next) : next(null); + }; - function readErrorCallIter(origError, next) { + const readErrorCallIter = (origError, next) => { stepInfo.step = 'read_error'; stepInfo.error = origError.message; callIter( () => { return next(origError); }); + }; + + let lastCalcHashPercent; + + // don't re-calc hashes for any we already have in |options| + const hashesToCalc = HASH_NAMES.filter(hn => { + if('sha256' === hn && fileEntry.fileSha256) { + return false; + } + + if(`file_${hn}` in fileEntry.meta) { + return false; + } + + return true; + }); + + async.waterfall( + [ + function startScan(callback) { + fs.stat(filePath, (err, stats) => { + if(err) { + return readErrorCallIter(err, callback); + } + + stepInfo.step = 'start'; + stepInfo.byteSize = fileEntry.meta.byte_size = stats.size; + + return callIter(callback); + }); + }, + function processPhysicalFileGeneric(callback) { + stepInfo.bytesProcessed = 0; + + const hashes = {}; + hashesToCalc.forEach(hashName => { + if('crc32' === hashName) { + hashes.crc32 = new CRC32; + } else { + hashes[hashName] = crypto.createHash(hashName); + } + }); + + const updateHashes = (data) => { + for(let i = 0; i < hashesToCalc.length; ++i) { + hashes[hashesToCalc[i]].update(data); + } + }; + + // + // Note that we are not using fs.createReadStream() here: + // While convenient, it is quite a bit slower -- which adds + // up to many seconds in time for larger files. + // + const chunkSize = 1024 * 64; + const buffer = new Buffer(chunkSize); + + fs.open(filePath, 'r', (err, fd) => { + if(err) { + return readErrorCallIter(err, callback); + } + + const nextChunk = () => { + fs.read(fd, buffer, 0, chunkSize, null, (err, bytesRead) => { + if(err) { + fs.close(fd); + return readErrorCallIter(err, callback); + } + + if(0 === bytesRead) { + // done - finalize + fileEntry.meta.byte_size = stepInfo.bytesProcessed; + + for(let i = 0; i < hashesToCalc.length; ++i) { + const hashName = hashesToCalc[i]; + if('sha256' === hashName) { + stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex'); + } else if('sha1' === hashName || 'md5' === hashName) { + stepInfo[hashName] = fileEntry.meta[`file_${hashName}`] = hashes[hashName].digest('hex'); + } else if('crc32' === hashName) { + stepInfo.crc32 = fileEntry.meta.file_crc32 = hashes.crc32.finalize().toString(16); + } + } + + stepInfo.step = 'hash_finish'; + fs.close(fd); + return callIter(callback); + } + + stepInfo.bytesProcessed += bytesRead; + stepInfo.calcHashPercent = Math.round(((stepInfo.bytesProcessed / stepInfo.byteSize) * 100)); + + // + // Only send 'hash_update' step update if we have a noticable percentage change in progress + // + const data = bytesRead < chunkSize ? buffer.slice(0, bytesRead) : buffer; + if(!iterator || stepInfo.calcHashPercent === lastCalcHashPercent) { + updateHashes(data); + return nextChunk(); + } else { + lastCalcHashPercent = stepInfo.calcHashPercent; + stepInfo.step = 'hash_update'; + + callIter(err => { + if(err) { + return callback(err); + } + + updateHashes(data); + return nextChunk(); + }); + } + }); + }; + + nextChunk(); + }); + }, + function processPhysicalFileByType(callback) { + const archiveUtil = ArchiveUtil.getInstance(); + + archiveUtil.detectType(filePath, (err, archiveType) => { + if(archiveType) { + // save this off + fileEntry.meta.archive_type = archiveType; + + populateFileEntryWithArchive(fileEntry, filePath, stepInfo, callIter, err => { + if(err) { + populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => { + if(err) { + logDebug( { error : err.message }, 'Non-archive file entry population failed'); + } + return callback(null); // ignore err + }); + } else { + return callback(null); + } + }); + } else { + populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => { + if(err) { + logDebug( { error : err.message }, 'Non-archive file entry population failed'); + } + return callback(null); // ignore err + }); + } + }); + }, + function fetchExistingEntry(callback) { + getExistingFileEntriesBySha256(fileEntry.fileSha256, (err, dupeEntries) => { + return callback(err, dupeEntries); + }); + }, + function finished(dupeEntries, callback) { + stepInfo.step = 'finished'; + callIter( () => { + return callback(null, dupeEntries); + }); + } + ], + (err, dupeEntries) => { + if(err) { + return cb(err); + } + + return cb(null, fileEntry, dupeEntries); + } + ); +} + +function scanFile2(filePath, options, iterator, cb) { + + if(3 === arguments.length && _.isFunction(iterator)) { + cb = iterator; + iterator = null; + } else if(2 === arguments.length && _.isFunction(options)) { + cb = options; + iterator = null; + options = {}; } + const fileEntry = new FileEntry({ + areaTag : options.areaTag, + meta : options.meta, + hashTags : options.hashTags, // Set() or Array + fileName : paths.basename(filePath), + storageTag : options.storageTag, + fileSha256 : options.sha256, // caller may know this already + }); + + const stepInfo = { + filePath : filePath, + fileName : paths.basename(filePath), + }; + + const callIter = (next) => { + return iterator ? iterator(stepInfo, next) : next(null); + }; + + const readErrorCallIter = (origError, next) => { + stepInfo.step = 'read_error'; + stepInfo.error = origError.message; + + callIter( () => { + return next(origError); + }); + }; let lastCalcHashPercent; @@ -691,17 +892,15 @@ function scanFile(filePath, options, iterator, cb) { const stream = fs.createReadStream(filePath); - function updateHashes(data) { - async.each(hashesToCalc, (hashName, nextHash) => { - hashes[hashName].update(data); - return nextHash(null); - }, () => { - return stream.resume(); - }); - } + const updateHashes = (data) => { + for(let i = 0; i < hashesToCalc.length; ++i) { + hashes[hashesToCalc[i]].update(data); + } + return stream.resume(); + }; stream.on('data', data => { - stream.pause(); // until iterator compeltes + stream.pause(); // until iterator completes stepInfo.bytesProcessed += data.length; stepInfo.calcHashPercent = Math.round(((stepInfo.bytesProcessed / stepInfo.byteSize) * 100)); @@ -709,7 +908,7 @@ function scanFile(filePath, options, iterator, cb) { // // Only send 'hash_update' step update if we have a noticable percentage change in progress // - if(stepInfo.calcHashPercent === lastCalcHashPercent) { + if(!iterator || stepInfo.calcHashPercent === lastCalcHashPercent) { updateHashes(data); } else { lastCalcHashPercent = stepInfo.calcHashPercent; @@ -729,7 +928,8 @@ function scanFile(filePath, options, iterator, cb) { stream.on('end', () => { fileEntry.meta.byte_size = stepInfo.bytesProcessed; - async.each(hashesToCalc, (hashName, nextHash) => { + for(let i = 0; i < hashesToCalc.length; ++i) { + const hashName = hashesToCalc[i]; if('sha256' === hashName) { stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex'); } else if('sha1' === hashName || 'md5' === hashName) { @@ -737,12 +937,10 @@ function scanFile(filePath, options, iterator, cb) { } else if('crc32' === hashName) { stepInfo.crc32 = fileEntry.meta.file_crc32 = hashes.crc32.finalize().toString(16); } + } - return nextHash(null); - }, () => { - stepInfo.step = 'hash_finish'; - return callIter(callback); - }); + stepInfo.step = 'hash_finish'; + return callIter(callback); }); stream.on('error', err => {