From feb1270ee2483d69d10aa260f205c2eddc8c56f6 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Wed, 13 Jan 2016 23:39:08 -0700 Subject: [PATCH] Fix: Improve sink implementation --- lib/sink.js | 38 +++++++++++++++++-- test/dest.js | 103 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 131 insertions(+), 10 deletions(-) diff --git a/lib/sink.js b/lib/sink.js index eacdb16b..cfab4698 100644 --- a/lib/sink.js +++ b/lib/sink.js @@ -2,7 +2,16 @@ var Writable = require('readable-stream/writable'); +function listenerCount(stream, evt) { + return stream.listeners(evt).length; +} + +function hasListeners(stream) { + return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data')); +} + function sink(stream) { + var sinkAdded = false; var sinkStream = new Writable({ objectMode: true, write: function(file, enc, cb) { @@ -10,14 +19,35 @@ function sink(stream) { }, }); - return function() { - // Respect readable listeners on the underlying stream - if (stream.listeners('readable').length > 0) { + function addSink() { + if (sinkAdded) { + return; + } + + if (hasListeners(stream)) { return; } + sinkAdded = true; stream.pipe(sinkStream); - }; + } + + function removeSink(evt) { + if (evt !== 'readable' && evt !== 'data') { + return; + } + + if (hasListeners(stream)) { + sinkAdded = false; + stream.unpipe(sinkStream); + } + } + + stream.on('newListener', removeSink); + stream.on('removeListener', removeSink); + stream.on('removeListener', addSink); + + return addSink; } module.exports = sink; diff --git a/test/dest.js b/test/dest.js index 34b2f430..6300f0bc 100644 --- a/test/dest.js +++ b/test/dest.js @@ -38,6 +38,8 @@ var realMode = function(n) { return n & parseInt('777', 8); }; +function noop() {} + describe('dest stream', function() { beforeEach(wipeOut); afterEach(wipeOut); @@ -1349,15 +1351,104 @@ describe('dest stream', function() { destStream.on('readable', function() { var data = destStream.read(); - if (data == null) { - // Stream ended - readables.should.equal(1); - done(); - } else { - // New data + if (data != null) { readables++; } }); + + destStream.on('error', done); + + destStream.on('finish', function() { + readables.should.equal(1); + done(); + }); + }); + + it('should respect data listeners on destination stream', function(done) { + var srcPath = path.join(__dirname, './fixtures/test.coffee'); + var srcStream = vfs.src(srcPath); + var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname }); + + srcStream + .pipe(destStream); + + var datas = 0; + destStream.on('data', function() { + datas++; + }); + + destStream.on('error', done); + + destStream.on('finish', function() { + datas.should.equal(1); + done(); + }); + }); + + it('sinks the stream if all the readable event handlers are removed', function(done) { + fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark')); + for (var idx = 0; idx < 17; idx++) { + fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt')); + } + + var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt'); + var srcStream = vfs.src(srcPath); + var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname }); + + var fileCount = 0; + var countFiles = through.obj(function(file, enc, cb) { + fileCount++; + + cb(null, file); + }); + + destStream.on('readable', noop); + + destStream.once('finish', function() { + fileCount.should.equal(17); + done(); + }); + + srcStream.pipe(countFiles).pipe(destStream); + + process.nextTick(function() { + destStream.removeListener('readable', noop); + }); + }); + + it('sinks the stream if all the data event handlers are removed', function(done) { + fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark')); + for (var idx = 0; idx < 17; idx++) { + fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt')); + } + + var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt'); + var srcStream = vfs.src(srcPath); + var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname }); + + var fileCount = 0; + function onData() { + fileCount++; + } + + var countFiles = through.obj(function(file, enc, cb) { + onData(); + + cb(null, file); + }); + + destStream.on('data', onData); + + destStream.once('finish', function() { + fileCount.should.equal(17); + done(); + }); + + srcStream.pipe(countFiles).pipe(destStream); + + process.nextTick(function() { + destStream.removeListener('data', onData); + }); }); it('should pass options to through2', function(done) {