diff --git a/package.json b/package.json index d20e093a..7e73080d 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "pelias-dbclient": "2.0.0", "pelias-logger": "0.1.0", "pelias-model": "4.3.0", + "pelias-parallel-stream": "0.0.2", "through2": "^2.0.0", "through2-filter": "^2.0.0", "through2-map": "^2.0.0", diff --git a/src/components/fileIsReadable.js b/src/components/fileIsReadable.js deleted file mode 100644 index 650da445..00000000 --- a/src/components/fileIsReadable.js +++ /dev/null @@ -1,19 +0,0 @@ -var filter = require('through2-filter'); -var fs = require('fs'); -var path = require('path'); -var util = require('util'); - -module.exports.create = function create(dataDirectory) { - return filter.obj(function(record) { - var fullpath = path.join(dataDirectory, record.path); - - try { - fs.accessSync(fullpath, fs.R_OK); - return true; - } - catch (err) { - console.error(util.format('data file cannot be read: %s', fullpath)); - return false; - } - }); -}; diff --git a/src/components/loadJSON.js b/src/components/loadJSON.js index 585fc6f4..fd48ce2c 100644 --- a/src/components/loadJSON.js +++ b/src/components/loadJSON.js @@ -1,16 +1,25 @@ -var map = require('through2-map'); var fs = require('fs'); +var parallelStream = require('pelias-parallel-stream'); + +var maxInFlight = 10; module.exports.create = function create_json_parse_stream(dataDirectory) { - return map.obj(function(record) { - try { - return JSON.parse(fs.readFileSync(dataDirectory + record.path)); - } catch (err) { - console.error('exception on %s:', record.path, err); - console.error('Inability to parse JSON usually means that WOF has been cloned ' + - 'without using git-lfs, please see instructions here: ' + - 'https://github.com/whosonfirst/whosonfirst-data#git-and-large-files'); - return {}; - } + return parallelStream(maxInFlight, function(record, enc, next) { + fs.readFile(dataDirectory + record.path, function(err, data) { + if (err) { + next(err); + } else { + try { + var object = JSON.parse(data); + next(null, object); + } catch (parse_err) { + console.error('exception on %s:', record.path, parse_err); + console.error('Inability to parse JSON usually means that WOF has been cloned ' + + 'without using git-lfs, please see instructions here: ' + + 'https://github.com/whosonfirst/whosonfirst-data#git-and-large-files'); + next(null, {}); + } + } + }); }); }; diff --git a/src/readStream.js b/src/readStream.js index 28749837..a4a1905c 100644 --- a/src/readStream.js +++ b/src/readStream.js @@ -6,7 +6,6 @@ var through2 = require('through2'); var logger = require( 'pelias-logger' ).get( 'whosonfirst' ); var isValidId = require('./components/isValidId'); -var fileIsReadable = require('./components/fileIsReadable'); var loadJSON = require('./components/loadJSON'); var recordHasIdAndProperties = require('./components/recordHasIdAndProperties'); var isActiveRecord = require('./components/isActiveRecord'); @@ -59,7 +58,6 @@ function createReadStream(directory, types, wofAdminRecords) { return createMetaRecordStream(metaFilePaths, types) .pipe(isValidId.create()) - .pipe(fileIsReadable.create(directory + 'data/')) .pipe(loadJSON.create(directory + 'data/')) .pipe(recordHasIdAndProperties.create()) .pipe(isActiveRecord.create()) diff --git a/test/components/fileIsReadableTest.js b/test/components/fileIsReadableTest.js deleted file mode 100644 index 4decb79b..00000000 --- a/test/components/fileIsReadableTest.js +++ /dev/null @@ -1,54 +0,0 @@ -var tape = require('tape'); -var event_stream = require('event-stream'); -var fs = require('fs'); -var intercept = require('intercept-stdout'); -var temp = require('temp'); -var path = require('path'); - -var fileIsReadable = require('../../src/components/fileIsReadable'); - -function test_stream(input, testedStream, callback) { - var input_stream = event_stream.readArray(input); - var destination_stream = event_stream.writeArray(callback); - - input_stream.pipe(testedStream).pipe(destination_stream); -} - -tape('filterOutUnreadableFiles', function(test) { - test.test('file existing should filter out those with paths that don\'t exist', function(t) { - temp.track(); - - var filename = temp.path(); - fs.writeFileSync(filename, ''); - - var basename = path.basename(filename); - var dirname = path.dirname(filename); - - var input = [ - { path: basename }, - { path: 'does_not_exist.txt' } - ]; - - var expected = [ - { path: basename } - ]; - - var stderr = ''; - - // intercept/swallow stderr - var unhook_intercept = intercept( - function() { }, - function(txt) { stderr += txt; return ''; } - ); - - test_stream(input, fileIsReadable.create(dirname), function(err, actual) { - temp.cleanupSync(); - unhook_intercept(); - t.deepEqual(actual, expected, 'should have returned true'); - t.equal(stderr, 'data file cannot be read: ' + dirname + path.sep + 'does_not_exist.txt\n'); - t.end(); - }); - - }); - -}); diff --git a/test/test.js b/test/test.js index da1cfc25..eb8df12c 100644 --- a/test/test.js +++ b/test/test.js @@ -1,5 +1,4 @@ require ('./components/extractFieldsTest.js'); -require ('./components/fileIsReadableTest.js'); require ('./components/isActiveRecordTest.js'); require ('./components/isValidIdTest.js'); require ('./components/loadJSONTest.js');