Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #27 from Financial-Times/matth/minor-refactors
Browse files Browse the repository at this point in the history
Minor internal refactors
  • Loading branch information
i-like-robots authored Feb 13, 2019
2 parents 32a2661 + 05dcd01 commit c6b7be7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 26 deletions.
12 changes: 6 additions & 6 deletions src/evented-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ const EventEmitter = require('events');
class EventedQueue extends EventEmitter {
constructor() {
super();
this.queue = new Set();
this.waiting = new Set();
}

add(item) {
this.queue.add(item);
this.waiting.add(item);
this.emit('add', item);
return this;
}

delete(item) {
this.queue.delete(item);
this.waiting.delete(item);
this.emit('delete', item);
return this;
}

waitFor(items = []) {
waitBehind(items = []) {
return new Promise((resolve) => {
const callback = () => {
const itemsRunning = items.some((item) => this.queue.has(item));
const itemsWaiting = items.some((item) => this.waiting.has(item));

if (!itemsRunning) {
if (!itemsWaiting) {
this.removeListener('delete', callback);
resolve();
}
Expand Down
27 changes: 13 additions & 14 deletions src/run-parallel.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@ module.exports = (tasks = [], concurrency = 1, preserveOrder = false) => {
logger.info(`Executing up to ${concurrency} tasks at a time`);

return Promise.all(
tasks.map(({ pkg, apply }) => {
tasks.map(async ({ pkg, apply }) => {
queue.add(pkg.name);

return semaphore
.acquire()
.then(() => {
// wait for any dependencies still in the queue to finish
return preserveOrder ? queue.waitFor(pkg.allDependencies) : null;
})
.then(() => {
return apply();
})
.then(() => {
queue.delete(pkg.name);
return semaphore.release();
});
await semaphore.acquire();

// wait for any dependencies still in the queue to finish
if (preserveOrder) {
await queue.waitBehind(pkg.allDependencies);
}

await apply();

queue.delete(pkg.name);

semaphore.release();
})
);
};
12 changes: 6 additions & 6 deletions test/src/evented-queue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe('src/evented-queue', () => {
describe('#add', () => {
it('adds the given item to the queue', () => {
instance.add('foo');
expect(instance.queue.size).toEqual(1);
expect(instance.waiting.size).toEqual(1);
});

it('emits an event when items are added to the queue', (done) => {
Expand All @@ -25,10 +25,10 @@ describe('src/evented-queue', () => {
describe('#delete', () => {
it('removes the given item to the queue', () => {
instance.add('foo');
expect(instance.queue.size).toEqual(1);
expect(instance.waiting.size).toEqual(1);

instance.delete('foo');
expect(instance.queue.size).toEqual(0);
expect(instance.waiting.size).toEqual(0);
});

it('emits an event when items are removed from the queue', (done) => {
Expand All @@ -42,15 +42,15 @@ describe('src/evented-queue', () => {
});
});

describe('#waitFor', () => {
describe('#waitBehind', () => {
it('resolves when the queue no longer contains any of the given items', (done) => {
instance
.add('foo')
.add('bar')
.add('baz');

instance.waitFor(['foo', 'bar', 'baz']).then(() => {
expect(instance.queue.size).toEqual(0);
instance.waitBehind(['foo', 'bar', 'baz']).then(() => {
expect(instance.waiting.size).toEqual(0);
done();
});

Expand Down

0 comments on commit c6b7be7

Please sign in to comment.