From 5d34c909e4f0378764f62e213dc8c946de75369c Mon Sep 17 00:00:00 2001 From: Fathy Boundjadj Date: Tue, 17 Jul 2018 01:02:47 +0200 Subject: [PATCH] Limit inProgress using maxItems (#11) --- lib/index.js | 4 +++- test/index.test.js | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/lib/index.js b/lib/index.js index d6b591c..8518392 100644 --- a/lib/index.js +++ b/lib/index.js @@ -211,7 +211,9 @@ Queue.prototype._processHead = function() { }); } - while (queue.length && queue[0].time <= now) { + var inProgressSize = Object.keys(inProgress).length; + + while (queue.length && queue[0].time <= now && inProgressSize++ < self.maxItems) { var el = queue.shift(); var id = uuid(); diff --git a/test/index.test.js b/test/index.test.js index de2f844..5e2229f 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -237,6 +237,63 @@ describe('Queue', function() { assert(call === queue.maxAttempts + 1); }); }); + + it('should limit inProgress using maxItems', function() { + var waiting = []; + var i; + + queue.maxItems = 100; + queue.maxAttempts = 2; + queue.fn = function(_, done) { + waiting.push(done); + }; + + // add maxItems * 2 items + for (i = 0; i < queue.maxItems * 2; i++) { + queue.addItem({ index: i }); + } + + // the queue should be full + assert(size(queue).queue === queue.maxItems); + + queue.start(); + // the queue is now empty and everything is in progress + assert(size(queue).queue === 0); + assert(size(queue).inProgress === queue.maxItems); + + // while the items are in progress let's add maxItems times two items + for (i = 0; i < queue.maxItems * 2; i++) { + queue.addItem({ index: i }); + } + + // inProgress and queue should be full + assert(size(queue).queue === queue.maxItems); + assert(size(queue).inProgress === queue.maxItems); + assert(waiting.length === queue.maxItems); + + // resolved all waiting items + while (waiting.length) { + waiting.pop()(); + } + + // inProgress should now be empty + assert(size(queue).queue === queue.maxItems); + assert(size(queue).inProgress === 0); + + // wait for the queue to be processed + clock.tick(queue.getDelay(0)); + + // items should now be in progress + assert(size(queue).queue === 0); + assert(size(queue).inProgress === queue.maxItems); + + function size(queue) { + return { + queue: queue._store.get(queue.keys.QUEUE).length, + inProgress: Object.keys(queue._store.get(queue.keys.IN_PROGRESS) || {}).length + }; + } + }); }); describe('events', function() {