Skip to content

Commit

Permalink
Fix: Improve sink implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
phated committed Nov 30, 2017
1 parent 9b6a0e2 commit feb1270
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 10 deletions.
38 changes: 34 additions & 4 deletions lib/sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,52 @@

var Writable = require('readable-stream/writable');

function listenerCount(stream, evt) {
return stream.listeners(evt).length;
}

function hasListeners(stream) {
return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
}

function sink(stream) {
var sinkAdded = false;
var sinkStream = new Writable({
objectMode: true,
write: function(file, enc, cb) {
cb();
},
});

return function() {
// Respect readable listeners on the underlying stream
if (stream.listeners('readable').length > 0) {
function addSink() {
if (sinkAdded) {
return;
}

if (hasListeners(stream)) {
return;
}

sinkAdded = true;
stream.pipe(sinkStream);
};
}

function removeSink(evt) {
if (evt !== 'readable' && evt !== 'data') {
return;
}

if (hasListeners(stream)) {
sinkAdded = false;
stream.unpipe(sinkStream);
}
}

stream.on('newListener', removeSink);
stream.on('removeListener', removeSink);
stream.on('removeListener', addSink);

return addSink;
}

module.exports = sink;
103 changes: 97 additions & 6 deletions test/dest.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var realMode = function(n) {
return n & parseInt('777', 8);
};

function noop() {}

describe('dest stream', function() {
beforeEach(wipeOut);
afterEach(wipeOut);
Expand Down Expand Up @@ -1349,15 +1351,104 @@ describe('dest stream', function() {
destStream.on('readable', function() {
var data = destStream.read();

if (data == null) {
// Stream ended
readables.should.equal(1);
done();
} else {
// New data
if (data != null) {
readables++;
}
});

destStream.on('error', done);

destStream.on('finish', function() {
readables.should.equal(1);
done();
});
});

it('should respect data listeners on destination stream', function(done) {
var srcPath = path.join(__dirname, './fixtures/test.coffee');
var srcStream = vfs.src(srcPath);
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });

srcStream
.pipe(destStream);

var datas = 0;
destStream.on('data', function() {
datas++;
});

destStream.on('error', done);

destStream.on('finish', function() {
datas.should.equal(1);
done();
});
});

it('sinks the stream if all the readable event handlers are removed', function(done) {
fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
for (var idx = 0; idx < 17; idx++) {
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
}

var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
var srcStream = vfs.src(srcPath);
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });

var fileCount = 0;
var countFiles = through.obj(function(file, enc, cb) {
fileCount++;

cb(null, file);
});

destStream.on('readable', noop);

destStream.once('finish', function() {
fileCount.should.equal(17);
done();
});

srcStream.pipe(countFiles).pipe(destStream);

process.nextTick(function() {
destStream.removeListener('readable', noop);
});
});

it('sinks the stream if all the data event handlers are removed', function(done) {
fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
for (var idx = 0; idx < 17; idx++) {
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
}

var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
var srcStream = vfs.src(srcPath);
var destStream = vfs.dest('./out-fixtures/', { cwd: __dirname });

var fileCount = 0;
function onData() {
fileCount++;
}

var countFiles = through.obj(function(file, enc, cb) {
onData();

cb(null, file);
});

destStream.on('data', onData);

destStream.once('finish', function() {
fileCount.should.equal(17);
done();
});

srcStream.pipe(countFiles).pipe(destStream);

process.nextTick(function() {
destStream.removeListener('data', onData);
});
});

it('should pass options to through2', function(done) {
Expand Down

0 comments on commit feb1270

Please sign in to comment.