Skip to content

Commit

Permalink
Respect attempts when reclaiming queue (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
f2prateek authored and fathyb committed Jul 19, 2018
1 parent 5d34c90 commit ddb0cc2
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 4 deletions.
8 changes: 4 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,20 +310,20 @@ Queue.prototype._reclaim = function(id) {
queue: other.get(this.keys.QUEUE) || []
};

// add their queue to ours, resetting run-time to immediate and attempt# to 0
// add their queue to ours, resetting run-time to immediate and copying the attempt#
each(function(el) {
our.queue.push({
item: el.item,
attemptNumber: 0,
attemptNumber: el.attemptNumber,
time: self._schedule.now()
});
}, their.queue);

// if the queue is abandoned, all the in-progress are failed. retry them immediately and reset the attempt#
// if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt#
each(function(el) {
our.queue.push({
item: el.item,
attemptNumber: 0,
attemptNumber: el.attemptNumber + 1,
time: self._schedule.now()
});
}, their.inProgress);
Expand Down
142 changes: 142 additions & 0 deletions test/lifecycle.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
'use strict';

var assert = require('proclaim');
var lolex = require('lolex');

var Queue = require('..');
var Schedule = require('../lib/schedule');

describe('Queue Lifecycle', function() {
it('should not reattempt to publish an expired item', function() {
var attempts = 8;
var options = { maxAttempts: attempts };

openTab(options, function(tab) {
tab.queue.addItem('test item');
tab.queue.start();

assert(tab.queue.wait(attempts) !== -1);
assert(tab.queue.wait(1) === -1);

assert(tab.forkAndConsume(1) === -1);
});
});

it('should respect attempts when reclaiming a queue', function() {
var options = { maxAttempts: 10 };

openTab(options, function(tab) {
tab.queue.addItem('test');
tab.queue.start();

assert(tab.forkAndConsume(3) !== -1);
assert(tab.forkAndConsume(4) !== -1);
assert(tab.forkAndConsume(3) !== -1);

// Now check that no attempts are made in a new tab
assert(tab.forkAndConsume(1) === -1);
});
});

it('should increment in-progress items attempts when reclaiming a queue', function() {
var options = { maxAttempts: 6 };

openTab(options, function(tab) {
tab.queue.addItem('test');
tab.queue.start();

assert(tab.forkAndConsume(3) !== -1);

// Make this attempt timeout by not calling the done callback
// This will put the item in the inProgress queue
tab.queue.ignore = true;
assert(tab.queue.wait(1) !== -1);
tab.queue.ignore = false;

// Check that we only consume the two remaining attempts
assert(tab.forkAndConsume(2) !== -1);
assert(tab.forkAndConsume(1) === -1);
});
});
});

// Utilities

function createTestQueue(tab, options) {
var waiting = 0;
var queue = new Queue('segment::multi_queue_test', options, function(item, done) {
waiting--;
queue.calls++;

if (!queue.ignore) {
done(new Error(item));
}
});

queue.calls = 0;
queue.wait = function(condition) {
waiting = condition;

return tab.wait(function() {
return waiting === 0;
});
};

queue.waitFirstAttempt = function() {
return queue.wait(1);
};

return queue;
}

var ONE_SECOND = 1000;
var ONE_MINUTE = 60 * ONE_SECOND;

// Mocks a browser tab with its own clock
function openTab(options, fn) {
var tab = {
clock: lolex.createClock(),

// Waits for a condition to be true by immediatly executing all timers
wait: function(condition) {
var clock = tab.clock;
var start = clock.now;
var timeout = 15 * ONE_MINUTE;

while (!condition()) {
if (clock.now - start > timeout || !clock.timers) {
return -1;
}

clock.next();
}

return clock.now - start;
},

// Simulates a event-loop freeze, like a browser would do with a background tab
freeze: function() {
tab.clock.reset();
tab.clock.setTimeout = tab.clock.clearTimeout = function noop() {};
},

// Opens a new tab and waits for $attempts to be made, returns -1 otherwise
forkAndConsume: function(attempts) {
// Freeze the current tab so the next tab can reclaim the queue
tab.freeze();

return openTab(options, function(newTab) {
tab.queue = newTab.queue;
tab = newTab;
tab.queue.start();

return tab.queue.wait(attempts);
});
}
};

tab.queue = createTestQueue(tab, options);
Schedule.setClock(tab.clock);

return fn(tab);
}

0 comments on commit ddb0cc2

Please sign in to comment.