Much faster hash calculation / processing & therefor faster scanFile()
* Manaul read of buffers vs stream (fs.createReadStream()) * Small optimization by skipping work if no progress iterator * Don't use async loop for updating hashes - vanilla for loop
This commit is contained in:
parent
f3cd36ad07
commit
681e45cb6d
|
@ -630,23 +630,224 @@ function scanFile(filePath, options, iterator, cb) {
|
||||||
fileName : paths.basename(filePath),
|
fileName : paths.basename(filePath),
|
||||||
};
|
};
|
||||||
|
|
||||||
function callIter(next) {
|
const callIter = (next) => {
|
||||||
if(iterator) {
|
return iterator ? iterator(stepInfo, next) : next(null);
|
||||||
return iterator(stepInfo, next);
|
};
|
||||||
} else {
|
|
||||||
return next(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function readErrorCallIter(origError, next) {
|
const readErrorCallIter = (origError, next) => {
|
||||||
stepInfo.step = 'read_error';
|
stepInfo.step = 'read_error';
|
||||||
stepInfo.error = origError.message;
|
stepInfo.error = origError.message;
|
||||||
|
|
||||||
callIter( () => {
|
callIter( () => {
|
||||||
return next(origError);
|
return next(origError);
|
||||||
});
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
let lastCalcHashPercent;
|
||||||
|
|
||||||
|
// don't re-calc hashes for any we already have in |options|
|
||||||
|
const hashesToCalc = HASH_NAMES.filter(hn => {
|
||||||
|
if('sha256' === hn && fileEntry.fileSha256) {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(`file_${hn}` in fileEntry.meta) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
async.waterfall(
|
||||||
|
[
|
||||||
|
function startScan(callback) {
|
||||||
|
fs.stat(filePath, (err, stats) => {
|
||||||
|
if(err) {
|
||||||
|
return readErrorCallIter(err, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
stepInfo.step = 'start';
|
||||||
|
stepInfo.byteSize = fileEntry.meta.byte_size = stats.size;
|
||||||
|
|
||||||
|
return callIter(callback);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function processPhysicalFileGeneric(callback) {
|
||||||
|
stepInfo.bytesProcessed = 0;
|
||||||
|
|
||||||
|
const hashes = {};
|
||||||
|
hashesToCalc.forEach(hashName => {
|
||||||
|
if('crc32' === hashName) {
|
||||||
|
hashes.crc32 = new CRC32;
|
||||||
|
} else {
|
||||||
|
hashes[hashName] = crypto.createHash(hashName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const updateHashes = (data) => {
|
||||||
|
for(let i = 0; i < hashesToCalc.length; ++i) {
|
||||||
|
hashes[hashesToCalc[i]].update(data);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
//
|
||||||
|
// Note that we are not using fs.createReadStream() here:
|
||||||
|
// While convenient, it is quite a bit slower -- which adds
|
||||||
|
// up to many seconds in time for larger files.
|
||||||
|
//
|
||||||
|
const chunkSize = 1024 * 64;
|
||||||
|
const buffer = new Buffer(chunkSize);
|
||||||
|
|
||||||
|
fs.open(filePath, 'r', (err, fd) => {
|
||||||
|
if(err) {
|
||||||
|
return readErrorCallIter(err, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
const nextChunk = () => {
|
||||||
|
fs.read(fd, buffer, 0, chunkSize, null, (err, bytesRead) => {
|
||||||
|
if(err) {
|
||||||
|
fs.close(fd);
|
||||||
|
return readErrorCallIter(err, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(0 === bytesRead) {
|
||||||
|
// done - finalize
|
||||||
|
fileEntry.meta.byte_size = stepInfo.bytesProcessed;
|
||||||
|
|
||||||
|
for(let i = 0; i < hashesToCalc.length; ++i) {
|
||||||
|
const hashName = hashesToCalc[i];
|
||||||
|
if('sha256' === hashName) {
|
||||||
|
stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex');
|
||||||
|
} else if('sha1' === hashName || 'md5' === hashName) {
|
||||||
|
stepInfo[hashName] = fileEntry.meta[`file_${hashName}`] = hashes[hashName].digest('hex');
|
||||||
|
} else if('crc32' === hashName) {
|
||||||
|
stepInfo.crc32 = fileEntry.meta.file_crc32 = hashes.crc32.finalize().toString(16);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stepInfo.step = 'hash_finish';
|
||||||
|
fs.close(fd);
|
||||||
|
return callIter(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
stepInfo.bytesProcessed += bytesRead;
|
||||||
|
stepInfo.calcHashPercent = Math.round(((stepInfo.bytesProcessed / stepInfo.byteSize) * 100));
|
||||||
|
|
||||||
|
//
|
||||||
|
// Only send 'hash_update' step update if we have a noticable percentage change in progress
|
||||||
|
//
|
||||||
|
const data = bytesRead < chunkSize ? buffer.slice(0, bytesRead) : buffer;
|
||||||
|
if(!iterator || stepInfo.calcHashPercent === lastCalcHashPercent) {
|
||||||
|
updateHashes(data);
|
||||||
|
return nextChunk();
|
||||||
|
} else {
|
||||||
|
lastCalcHashPercent = stepInfo.calcHashPercent;
|
||||||
|
stepInfo.step = 'hash_update';
|
||||||
|
|
||||||
|
callIter(err => {
|
||||||
|
if(err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
updateHashes(data);
|
||||||
|
return nextChunk();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
nextChunk();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function processPhysicalFileByType(callback) {
|
||||||
|
const archiveUtil = ArchiveUtil.getInstance();
|
||||||
|
|
||||||
|
archiveUtil.detectType(filePath, (err, archiveType) => {
|
||||||
|
if(archiveType) {
|
||||||
|
// save this off
|
||||||
|
fileEntry.meta.archive_type = archiveType;
|
||||||
|
|
||||||
|
populateFileEntryWithArchive(fileEntry, filePath, stepInfo, callIter, err => {
|
||||||
|
if(err) {
|
||||||
|
populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => {
|
||||||
|
if(err) {
|
||||||
|
logDebug( { error : err.message }, 'Non-archive file entry population failed');
|
||||||
|
}
|
||||||
|
return callback(null); // ignore err
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return callback(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => {
|
||||||
|
if(err) {
|
||||||
|
logDebug( { error : err.message }, 'Non-archive file entry population failed');
|
||||||
|
}
|
||||||
|
return callback(null); // ignore err
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function fetchExistingEntry(callback) {
|
||||||
|
getExistingFileEntriesBySha256(fileEntry.fileSha256, (err, dupeEntries) => {
|
||||||
|
return callback(err, dupeEntries);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function finished(dupeEntries, callback) {
|
||||||
|
stepInfo.step = 'finished';
|
||||||
|
callIter( () => {
|
||||||
|
return callback(null, dupeEntries);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
],
|
||||||
|
(err, dupeEntries) => {
|
||||||
|
if(err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
return cb(null, fileEntry, dupeEntries);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function scanFile2(filePath, options, iterator, cb) {
|
||||||
|
|
||||||
|
if(3 === arguments.length && _.isFunction(iterator)) {
|
||||||
|
cb = iterator;
|
||||||
|
iterator = null;
|
||||||
|
} else if(2 === arguments.length && _.isFunction(options)) {
|
||||||
|
cb = options;
|
||||||
|
iterator = null;
|
||||||
|
options = {};
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileEntry = new FileEntry({
|
||||||
|
areaTag : options.areaTag,
|
||||||
|
meta : options.meta,
|
||||||
|
hashTags : options.hashTags, // Set() or Array
|
||||||
|
fileName : paths.basename(filePath),
|
||||||
|
storageTag : options.storageTag,
|
||||||
|
fileSha256 : options.sha256, // caller may know this already
|
||||||
|
});
|
||||||
|
|
||||||
|
const stepInfo = {
|
||||||
|
filePath : filePath,
|
||||||
|
fileName : paths.basename(filePath),
|
||||||
|
};
|
||||||
|
|
||||||
|
const callIter = (next) => {
|
||||||
|
return iterator ? iterator(stepInfo, next) : next(null);
|
||||||
|
};
|
||||||
|
|
||||||
|
const readErrorCallIter = (origError, next) => {
|
||||||
|
stepInfo.step = 'read_error';
|
||||||
|
stepInfo.error = origError.message;
|
||||||
|
|
||||||
|
callIter( () => {
|
||||||
|
return next(origError);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
let lastCalcHashPercent;
|
let lastCalcHashPercent;
|
||||||
|
|
||||||
|
@ -691,17 +892,15 @@ function scanFile(filePath, options, iterator, cb) {
|
||||||
|
|
||||||
const stream = fs.createReadStream(filePath);
|
const stream = fs.createReadStream(filePath);
|
||||||
|
|
||||||
function updateHashes(data) {
|
const updateHashes = (data) => {
|
||||||
async.each(hashesToCalc, (hashName, nextHash) => {
|
for(let i = 0; i < hashesToCalc.length; ++i) {
|
||||||
hashes[hashName].update(data);
|
hashes[hashesToCalc[i]].update(data);
|
||||||
return nextHash(null);
|
|
||||||
}, () => {
|
|
||||||
return stream.resume();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
return stream.resume();
|
||||||
|
};
|
||||||
|
|
||||||
stream.on('data', data => {
|
stream.on('data', data => {
|
||||||
stream.pause(); // until iterator compeltes
|
stream.pause(); // until iterator completes
|
||||||
|
|
||||||
stepInfo.bytesProcessed += data.length;
|
stepInfo.bytesProcessed += data.length;
|
||||||
stepInfo.calcHashPercent = Math.round(((stepInfo.bytesProcessed / stepInfo.byteSize) * 100));
|
stepInfo.calcHashPercent = Math.round(((stepInfo.bytesProcessed / stepInfo.byteSize) * 100));
|
||||||
|
@ -709,7 +908,7 @@ function scanFile(filePath, options, iterator, cb) {
|
||||||
//
|
//
|
||||||
// Only send 'hash_update' step update if we have a noticable percentage change in progress
|
// Only send 'hash_update' step update if we have a noticable percentage change in progress
|
||||||
//
|
//
|
||||||
if(stepInfo.calcHashPercent === lastCalcHashPercent) {
|
if(!iterator || stepInfo.calcHashPercent === lastCalcHashPercent) {
|
||||||
updateHashes(data);
|
updateHashes(data);
|
||||||
} else {
|
} else {
|
||||||
lastCalcHashPercent = stepInfo.calcHashPercent;
|
lastCalcHashPercent = stepInfo.calcHashPercent;
|
||||||
|
@ -729,7 +928,8 @@ function scanFile(filePath, options, iterator, cb) {
|
||||||
stream.on('end', () => {
|
stream.on('end', () => {
|
||||||
fileEntry.meta.byte_size = stepInfo.bytesProcessed;
|
fileEntry.meta.byte_size = stepInfo.bytesProcessed;
|
||||||
|
|
||||||
async.each(hashesToCalc, (hashName, nextHash) => {
|
for(let i = 0; i < hashesToCalc.length; ++i) {
|
||||||
|
const hashName = hashesToCalc[i];
|
||||||
if('sha256' === hashName) {
|
if('sha256' === hashName) {
|
||||||
stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex');
|
stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex');
|
||||||
} else if('sha1' === hashName || 'md5' === hashName) {
|
} else if('sha1' === hashName || 'md5' === hashName) {
|
||||||
|
@ -737,13 +937,11 @@ function scanFile(filePath, options, iterator, cb) {
|
||||||
} else if('crc32' === hashName) {
|
} else if('crc32' === hashName) {
|
||||||
stepInfo.crc32 = fileEntry.meta.file_crc32 = hashes.crc32.finalize().toString(16);
|
stepInfo.crc32 = fileEntry.meta.file_crc32 = hashes.crc32.finalize().toString(16);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nextHash(null);
|
|
||||||
}, () => {
|
|
||||||
stepInfo.step = 'hash_finish';
|
stepInfo.step = 'hash_finish';
|
||||||
return callIter(callback);
|
return callIter(callback);
|
||||||
});
|
});
|
||||||
});
|
|
||||||
|
|
||||||
stream.on('error', err => {
|
stream.on('error', err => {
|
||||||
return readErrorCallIter(err, callback);
|
return readErrorCallIter(err, callback);
|
||||||
|
|
Loading…
Reference in New Issue