diff --git a/src/evented-queue.js b/src/evented-queue.js index 1c832c6..47628b8 100644 --- a/src/evented-queue.js +++ b/src/evented-queue.js @@ -3,27 +3,27 @@ const EventEmitter = require('events'); class EventedQueue extends EventEmitter { constructor() { super(); - this.queue = new Set(); + this.waiting = new Set(); } add(item) { - this.queue.add(item); + this.waiting.add(item); this.emit('add', item); return this; } delete(item) { - this.queue.delete(item); + this.waiting.delete(item); this.emit('delete', item); return this; } - waitFor(items = []) { + waitBehind(items = []) { return new Promise((resolve) => { const callback = () => { - const itemsRunning = items.some((item) => this.queue.has(item)); + const itemsWaiting = items.some((item) => this.waiting.has(item)); - if (!itemsRunning) { + if (!itemsWaiting) { this.removeListener('delete', callback); resolve(); } diff --git a/src/run-parallel.js b/src/run-parallel.js index 5e698de..6897ca1 100644 --- a/src/run-parallel.js +++ b/src/run-parallel.js @@ -9,22 +9,21 @@ module.exports = (tasks = [], concurrency = 1, preserveOrder = false) => { logger.info(`Executing up to ${concurrency} tasks at a time`); return Promise.all( - tasks.map(({ pkg, apply }) => { + tasks.map(async ({ pkg, apply }) => { queue.add(pkg.name); - return semaphore - .acquire() - .then(() => { - // wait for any dependencies still in the queue to finish - return preserveOrder ? queue.waitFor(pkg.allDependencies) : null; - }) - .then(() => { - return apply(); - }) - .then(() => { - queue.delete(pkg.name); - return semaphore.release(); - }); + await semaphore.acquire(); + + // wait for any dependencies still in the queue to finish + if (preserveOrder) { + await queue.waitBehind(pkg.allDependencies); + } + + await apply(); + + queue.delete(pkg.name); + + semaphore.release(); }) ); }; diff --git a/test/src/evented-queue.spec.js b/test/src/evented-queue.spec.js index 0a4feb0..4277483 100644 --- a/test/src/evented-queue.spec.js +++ b/test/src/evented-queue.spec.js @@ -10,7 +10,7 @@ describe('src/evented-queue', () => { describe('#add', () => { it('adds the given item to the queue', () => { instance.add('foo'); - expect(instance.queue.size).toEqual(1); + expect(instance.waiting.size).toEqual(1); }); it('emits an event when items are added to the queue', (done) => { @@ -25,10 +25,10 @@ describe('src/evented-queue', () => { describe('#delete', () => { it('removes the given item to the queue', () => { instance.add('foo'); - expect(instance.queue.size).toEqual(1); + expect(instance.waiting.size).toEqual(1); instance.delete('foo'); - expect(instance.queue.size).toEqual(0); + expect(instance.waiting.size).toEqual(0); }); it('emits an event when items are removed from the queue', (done) => { @@ -42,15 +42,15 @@ describe('src/evented-queue', () => { }); }); - describe('#waitFor', () => { + describe('#waitBehind', () => { it('resolves when the queue no longer contains any of the given items', (done) => { instance .add('foo') .add('bar') .add('baz'); - instance.waitFor(['foo', 'bar', 'baz']).then(() => { - expect(instance.queue.size).toEqual(0); + instance.waitBehind(['foo', 'bar', 'baz']).then(() => { + expect(instance.waiting.size).toEqual(0); done(); });