From c00642b86d12324b668371fbbb8cb0822cb5bd53 Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Tue, 9 Feb 2021 11:34:27 -0600 Subject: [PATCH] Opt out of completion jobs (#222) * add config to opt out of completion job and sans bluebird * changelog [skip ci] * fix migration * fix action response counts * added more checks in timekeeper for stopped instance * docs and types --- CHANGELOG.md | 5 +++ docs/configuration.md | 6 ++++ package-lock.json | 50 +++++++++++++++++---------- package.json | 5 +-- src/attorney.js | 13 +++++++ src/boss.js | 6 ++-- src/manager.js | 12 ++++--- src/migrationStore.js | 16 +++++++++ src/plans.js | 76 ++++++++++++++++++++++++----------------- src/timekeeper.js | 21 +++++++----- src/worker.js | 4 +-- test/archiveTest.js | 8 ++--- test/cancelTest.js | 27 ++++++++------- test/completeTest.js | 66 +++++++++++++++++++++++++++++++++-- test/databaseTest.js | 7 ++-- test/delayTest.js | 8 ++--- test/deleteQueueTest.js | 4 +-- test/deleteTest.js | 4 +-- test/expireTest.js | 6 ++-- test/failureTest.js | 8 ++--- test/fetchTest.js | 1 - test/maintenanceTest.js | 4 +-- test/managerTest.js | 4 +-- test/multiMasterTest.js | 13 +++---- test/retryTest.js | 10 +++--- test/scheduleTest.js | 16 ++++----- test/speedTest.js | 4 +-- test/subscribeTest.js | 16 ++++----- test/throttleTest.js | 10 +++--- types.d.ts | 8 +++-- version.json | 2 +- 31 files changed, 290 insertions(+), 150 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b33bf67f..1b68a5cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changes +## 5.2.0 + +- Added constructor and publish option `onComplete` to opt out of creating a completion job in the queue once a job is completed. This defaults to true for backwards compatibility. +- Replaced bluebird dependency with p-map and delay. + ## 5.1.0 - Added transactional locking to maintenance queries as a safeguard from deadlocks under load. diff --git a/docs/configuration.md b/docs/configuration.md index 5355e66c..68f60abd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -17,6 +17,7 @@ pg-boss can be customized using configuration options when an instance is create - [Deferred jobs](#deferred-jobs) - [Unique jobs](#unique-jobs) - [Throttled jobs](#throttled-jobs) + - [Completion jobs](#completion-jobs) - [Fetch options](#fetch-options) - [Subscribe options](#subscribe-options) - [Job polling options](#job-polling-options) @@ -257,6 +258,11 @@ For example, if you set the `singletonMinutes` to 1, then submit 2 jobs within a Setting `singletonNextSlot` to true will cause the job to be scheduled to run after the current time slot if and when a job is throttled. This option is set to true, for example, when calling the convenience function `publishDebounced()`. +### Completion jobs +* **onComplete**, bool (Default: true) + +When a job completes, a completion job will be created in the queue, copying the same retention policy as the job, for the purpose of `onComplete()` or `fetchCompleted()`. If completion jobs are not used, they will be archived according to the retention policy. If the queue in question has a very high volume, this can be set to `false` to bypass creating the completion job. This can also be set in the constructor as a default for all calls to `publish()`. + ## Fetch options * **includeMetadata**, bool diff --git a/package-lock.json b/package-lock.json index 56784680..89fd2879 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "pg-boss", - "version": "5.0.8", + "version": "5.2.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -364,10 +364,9 @@ "dev": true }, "aggregate-error": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.0.1.tgz", - "integrity": "sha512-quoaXsZ9/BLNae5yiNoUz+Nhkwz83GhWwtYFglcjEQB2NDHCIpApbqXxIFnm4Pq/Nvhrsq5sYJFyohrrxnTGAA==", - "dev": true, + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz", + "integrity": "sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==", "requires": { "clean-stack": "^2.0.0", "indent-string": "^4.0.0" @@ -536,11 +535,6 @@ "integrity": "sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==", "dev": true }, - "bluebird": { - "version": "3.7.2", - "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", - "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" - }, "brace-expansion": { "version": "1.1.11", "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", @@ -651,8 +645,7 @@ "clean-stack": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz", - "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==", - "dev": true + "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==" }, "cliui": { "version": "5.0.0", @@ -865,6 +858,11 @@ "object-keys": "^1.0.12" } }, + "delay": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz", + "integrity": "sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==" + }, "delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -1781,8 +1779,7 @@ "indent-string": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-4.0.0.tgz", - "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==", - "dev": true + "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==" }, "inflight": { "version": "1.0.6", @@ -2025,6 +2022,15 @@ "uuid": "^3.3.3" }, "dependencies": { + "p-map": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-3.0.0.tgz", + "integrity": "sha512-d3qXVTF/s+W+CdJ5A29wywV2n8CQQYahlgz2bFiA+4eVNJbHJodPZ+/gXwPGh0bOqA+j8S+6+ckmvLGPk1QpxQ==", + "dev": true, + "requires": { + "aggregate-error": "^3.0.0" + } + }, "uuid": { "version": "3.4.0", "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", @@ -2675,6 +2681,15 @@ "p-limit": "^2.2.0" } }, + "p-map": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-3.0.0.tgz", + "integrity": "sha512-d3qXVTF/s+W+CdJ5A29wywV2n8CQQYahlgz2bFiA+4eVNJbHJodPZ+/gXwPGh0bOqA+j8S+6+ckmvLGPk1QpxQ==", + "dev": true, + "requires": { + "aggregate-error": "^3.0.0" + } + }, "path-exists": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/path-exists/-/path-exists-4.0.0.tgz", @@ -2856,10 +2871,9 @@ } }, "p-map": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/p-map/-/p-map-3.0.0.tgz", - "integrity": "sha512-d3qXVTF/s+W+CdJ5A29wywV2n8CQQYahlgz2bFiA+4eVNJbHJodPZ+/gXwPGh0bOqA+j8S+6+ckmvLGPk1QpxQ==", - "dev": true, + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz", + "integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==", "requires": { "aggregate-error": "^3.0.0" } diff --git a/package.json b/package.json index 592ebbf2..76f16196 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,15 @@ { "name": "pg-boss", - "version": "5.1.0", + "version": "5.2.0", "description": "Queueing jobs in Node.js using PostgreSQL like a boss", "main": "./src/index.js", "engines": { "node": ">=10.0.0" }, "dependencies": { - "bluebird": "^3.7.2", "cron-parser": "^3.1.0", + "delay": "^5.0.0", + "p-map": "^4.0.0", "pg": "^8.5.1", "uuid": "^8.3.2" }, diff --git a/src/attorney.js b/src/attorney.js index ed727523..5d05d77c 100644 --- a/src/attorney.js +++ b/src/attorney.js @@ -58,6 +58,7 @@ function checkPublishArgs (args, defaults) { applyRetryConfig(options, defaults) applyExpirationConfig(options, defaults) applyRetentionConfig(options, defaults) + applyCompletionConfig(options, defaults) const { startAfter, singletonSeconds, singletonMinutes, singletonHours } = options @@ -147,6 +148,7 @@ function getConfig (value) { applyNewJobCheckInterval(config) applyExpirationConfig(config) applyRetentionConfig(config) + applyCompletionConfig(config) return config } @@ -175,6 +177,17 @@ function applyArchiveConfig (config) { } } +function applyCompletionConfig (config, defaults) { + assert(!('onComplete' in config) || config.onComplete === true || config.onComplete === false, + 'configuration assert: onComplete must be either true or false') + + if (!('onComplete' in config)) { + config.onComplete = defaults + ? defaults.onComplete + : true + } +} + function applyRetentionConfig (config, defaults) { assert(!('retentionSeconds' in config) || config.retentionSeconds >= 1, 'configuration assert: retentionSeconds must be at least every second') diff --git a/src/boss.js b/src/boss.js index f143e1db..b7ab1eaf 100644 --- a/src/boss.js +++ b/src/boss.js @@ -92,7 +92,8 @@ class Boss extends EventEmitter { options = { startAfter, retentionSeconds: this.maintenanceIntervalSeconds * 4, - singletonKey: queues.MAINTENANCE + singletonKey: queues.MAINTENANCE, + onComplete: false } await this.manager.publish(queues.MAINTENANCE, null, options) @@ -104,7 +105,8 @@ class Boss extends EventEmitter { options = { startAfter, retentionSeconds: this.monitorIntervalSeconds * 4, - singletonKey: queues.MONITOR_STATES + singletonKey: queues.MONITOR_STATES, + onComplete: false } await this.manager.publish(queues.MONITOR_STATES, null, options) diff --git a/src/manager.js b/src/manager.js index d9d979a5..463f9531 100644 --- a/src/manager.js +++ b/src/manager.js @@ -1,6 +1,6 @@ const assert = require('assert') const EventEmitter = require('events') -const Promise = require('bluebird') +const pMap = require('p-map') const uuid = require('uuid') const Worker = require('./worker') @@ -85,7 +85,7 @@ class Manager extends EventEmitter { const concurrency = options.teamConcurrency || 1 // either no option was set, or teamSize was used - return Promise.map(jobs, job => + return pMap(jobs, job => callback(job) .then(value => this.complete(job.id, value)) .catch(err => this.fail(job.id, err)) @@ -178,7 +178,8 @@ class Manager extends EventEmitter { singletonSeconds, retryBackoff, retryLimit, - retryDelay + retryDelay, + onComplete } = options const id = uuid[this.config.uuid]() @@ -196,7 +197,8 @@ class Manager extends EventEmitter { singletonOffset, // 10 retryDelay, // 11 retryBackoff, // 12 - keepUntil // 13 + keepUntil, // 13 + onComplete // 14 ] const result = await this.db.executeSql(this.insertJobCommand, values) @@ -292,7 +294,7 @@ class Manager extends EventEmitter { return { jobs: ids, requested: ids.length, - updated: result.rowCount + updated: parseInt(result.rows[0].count) } } diff --git a/src/migrationStore.js b/src/migrationStore.js index 7b2f7415..e498ccdb 100644 --- a/src/migrationStore.js +++ b/src/migrationStore.js @@ -67,6 +67,22 @@ function getAll (schema, config) { const keepUntil = config ? config.keepUntil : DEFAULT_RETENTION return [ + { + release: '5.2.0', + version: 16, + previous: 15, + install: [ + `ALTER TABLE ${schema}.job ADD on_complete boolean`, + `UPDATE ${schema}.job SET on_complete = true`, + `ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET DEFAULT true`, + `ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET NOT NULL`, + `ALTER TABLE ${schema}.archive ADD on_complete boolean` + ], + uninstall: [ + `ALTER TABLE ${schema}.job DROP COLUMN on_complete`, + `ALTER TABLE ${schema}.archive DROP COLUMN on_complete` + ] + }, { release: '5.0.6', version: 15, diff --git a/src/plans.js b/src/plans.js index b7e6dc51..437ad9e2 100644 --- a/src/plans.js +++ b/src/plans.js @@ -134,7 +134,8 @@ function createJobTable (schema) { expireIn interval not null default interval '15 minutes', createdOn timestamp with time zone not null default now(), completedOn timestamp with time zone, - keepUntil timestamp with time zone NOT NULL default now() + interval '30 days' + keepUntil timestamp with time zone NOT NULL default now() + interval '30 days', + on_complete boolean not null default true ) ` } @@ -346,16 +347,18 @@ function completeJobs (schema) { WHERE id IN (SELECT UNNEST($1::uuid[])) AND state = '${states.active}' RETURNING * + ), completion_jobs as ( + INSERT INTO ${schema}.job (name, data, keepUntil) + SELECT + '${completedJobPrefix}' || name, + ${buildJsonCompletionObject(true)}, + ${keepUntilInheritance} + FROM results + WHERE NOT name LIKE '${completedJobPrefix}%' + AND on_complete ) - INSERT INTO ${schema}.job (name, data, keepUntil) - SELECT - '${completedJobPrefix}' || name, - ${buildJsonCompletionObject(true)}, - ${keepUntilInheritance} - FROM results - WHERE NOT name LIKE '${completedJobPrefix}%' - RETURNING 1 - ` // returning 1 here just to count results against input array + SELECT COUNT(*) FROM results + ` } function failJobs (schema) { @@ -372,17 +375,19 @@ function failJobs (schema) { WHERE id IN (SELECT UNNEST($1::uuid[])) AND state < '${states.completed}' RETURNING * + ), completion_jobs as ( + INSERT INTO ${schema}.job (name, data, keepUntil) + SELECT + '${completedJobPrefix}' || name, + ${buildJsonCompletionObject(true)}, + ${keepUntilInheritance} + FROM results + WHERE state = '${states.failed}' + AND NOT name LIKE '${completedJobPrefix}%' + AND on_complete ) - INSERT INTO ${schema}.job (name, data, keepUntil) - SELECT - '${completedJobPrefix}' || name, - ${buildJsonCompletionObject(true)}, - ${keepUntilInheritance} - FROM results - WHERE state = '${states.failed}' - AND NOT name LIKE '${completedJobPrefix}%' - RETURNING 1 - ` // returning 1 here just to count results against input array + SELECT COUNT(*) FROM results + ` } function expire (schema) { @@ -407,18 +412,22 @@ function expire (schema) { FROM results WHERE state = '${states.expired}' AND NOT name LIKE '${completedJobPrefix}%' + AND on_complete ` } function cancelJobs (schema) { return ` - UPDATE ${schema}.job - SET completedOn = now(), - state = '${states.cancelled}' - WHERE id IN (SELECT UNNEST($1::uuid[])) - AND state < '${states.completed}' - RETURNING 1 - ` // returning 1 here just to count results against input array + with results as ( + UPDATE ${schema}.job + SET completedOn = now(), + state = '${states.cancelled}' + WHERE id IN (SELECT UNNEST($1::uuid[])) + AND state < '${states.completed}' + RETURNING 1 + ) + SELECT COUNT(*) from results + ` } function insertJob (schema) { @@ -436,7 +445,8 @@ function insertJob (schema) { singletonOn, retryDelay, retryBackoff, - keepUntil + keepUntil, + on_complete ) SELECT id, @@ -451,7 +461,8 @@ function insertJob (schema) { singletonOn, retryDelay, retryBackoff, - keepUntil + keepUntil, + on_complete FROM ( SELECT *, CASE @@ -481,7 +492,8 @@ function insertJob (schema) { END as singletonOn, $11::int as retryDelay, $12::bool as retryBackoff, - $13::text as keepUntilValue + $13::text as keepUntilValue, + $14::boolean as on_complete ) j1 ) j2 ) j3 @@ -509,10 +521,10 @@ function archive (schema, interval) { RETURNING * ) INSERT INTO ${schema}.archive ( - id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil + id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil, on_complete ) SELECT - id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil + id, name, priority, data, state, retryLimit, retryCount, retryDelay, retryBackoff, startAfter, startedOn, singletonKey, singletonOn, expireIn, createdOn, completedOn, keepUntil, on_complete FROM archived_rows ` } diff --git a/src/timekeeper.js b/src/timekeeper.js index 57d47d2e..c6738910 100644 --- a/src/timekeeper.js +++ b/src/timekeeper.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const pMap = require('p-map') const EventEmitter = require('events') const plans = require('./plans') const cronParser = require('cron-parser') @@ -106,16 +106,15 @@ class Timekeeper extends EventEmitter { async cronMonitorAsync () { const opts = { retryLimit: 2, - retentionSeconds: 60 + retentionSeconds: 60, + onComplete: false } await this.manager.publishDebounced(queues.CRON, null, opts, 60) } async onCron () { - if (this.stopped) { - return - } + if (this.stopped) return try { if (this.config.__test__throw_clock_monitoring) { @@ -126,15 +125,19 @@ class Timekeeper extends EventEmitter { const sending = items.filter(i => this.shouldSendIt(i.cron, i.timezone)) - if (sending.length) { - await Promise.map(sending, it => this.send(it), { concurrency: 5 }) + if (sending.length && !this.stopped) { + await pMap(sending, it => this.send(it), { concurrency: 5 }) } + if (this.stopped) return + await this.setCronTime() } catch (err) { this.emit(this.events.error, err) } + if (this.stopped) return + await this.cronMonitorAsync() } @@ -153,13 +156,15 @@ class Timekeeper extends EventEmitter { async send (job) { const options = { singletonKey: job.name, - singletonSeconds: 60 + singletonSeconds: 60, + onComplete: false } await this.manager.publish(queues.SEND_IT, job, options) } async onSendIt (job) { + if (this.stopped) return const { name, data, options } = job.data await this.manager.publish(name, data, options) } diff --git a/src/worker.js b/src/worker.js index 59ba4e45..2098b206 100644 --- a/src/worker.js +++ b/src/worker.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') class Worker { constructor (config) { @@ -14,7 +14,7 @@ class Worker { const duration = Date.now() - started if (duration < this.config.interval) { - await Promise.delay(this.config.interval - duration) + await delay(this.config.interval - duration) } } } diff --git a/test/archiveTest.js b/test/archiveTest.js index d53a4ae2..37981de5 100644 --- a/test/archiveTest.js +++ b/test/archiveTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('archive', function () { const defaults = { @@ -20,7 +20,7 @@ describe('archive', function () { await boss.complete(jobId) - await Promise.delay(7000) + await delay(7000) const archivedJob = await helper.getArchivedJobById(config.schema, jobId) @@ -38,7 +38,7 @@ describe('archive', function () { const jobId = await boss.publish(queue, null, { retentionSeconds: 1 }) - await Promise.delay(7000) + await delay(7000) const archivedJob = await helper.getArchivedJobById(config.schema, jobId) @@ -56,7 +56,7 @@ describe('archive', function () { const jobId = await boss.publish(queue) - await Promise.delay(7000) + await delay(7000) const archivedJob = await helper.getArchivedJobById(config.schema, jobId) diff --git a/test/cancelTest.js b/test/cancelTest.js index 20a83bda..38ef086a 100644 --- a/test/cancelTest.js +++ b/test/cancelTest.js @@ -30,23 +30,24 @@ describe('cancel', function () { await boss.stop() }) - it('should not cancel a completed job', function (finished) { + it('should not cancel a completed job', async function () { const config = this.test.bossConfig - test() + const boss = await helper.start(config) - async function test () { - const boss = await helper.start(config) - await boss.publish('will_not_cancel') + const queue = 'will_not_cancel' - boss.subscribe('will_not_cancel', async job => { - await job.done() - const response = await boss.cancel(job.id) - assert.strictEqual(response.updated, 0) - await boss.stop() - finished() - }) - } + await boss.publish(queue) + + const job = await boss.fetch(queue) + + await boss.complete(job.id) + + const response = await boss.cancel(job.id) + + assert.strictEqual(response.updated, 0) + + await boss.stop() }) it('should cancel a batch of jobs', async function () { diff --git a/test/completeTest.js b/test/completeTest.js index 4fdfd0a9..a11d2c4c 100644 --- a/test/completeTest.js +++ b/test/completeTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') const assert = require('assert') const helper = require('./testHelper') const PgBoss = require('../') @@ -167,13 +167,13 @@ describe('complete', function () { const job1 = await boss.fetch(jobName) await boss.complete(job1.id) - await Promise.delay(2000) + await delay(2000) await boss.publish(jobName) const job2 = await boss.fetch(jobName) await boss.complete(job2.id) - await Promise.delay(2000) + await delay(2000) assert.strictEqual(receivedCount, 1) @@ -215,4 +215,64 @@ describe('complete', function () { await boss.stop() }) + + it('should not create a completion job if opted out during publish', async function () { + const queue = 'onCompleteOptOut' + + const config = this.test.bossConfig + + const boss = await helper.start(config) + + const jobId = await boss.publish(queue, null, { onComplete: false }) + + await boss.fetch(queue) + + await boss.complete(jobId) + + const job = await boss.fetchCompleted(queue) + + assert.strictEqual(job, null) + + await boss.stop() + }) + + it('should not create a completion job if opted out during constructor', async function () { + const queue = 'onCompleteOptOutGlobal' + + const config = this.test.bossConfig + + const boss = await helper.start({ ...config, onComplete: false }) + + const jobId = await boss.publish(queue) + + await boss.fetch(queue) + + await boss.complete(jobId) + + const job = await boss.fetchCompleted(queue) + + assert.strictEqual(job, null) + + await boss.stop() + }) + + it('should create completion job if overriding the default from constructor', async function () { + const queue = 'onCompleteOptInOverride' + + const config = this.test.bossConfig + + const boss = await helper.start({ ...config, onComplete: false }) + + const jobId = await boss.publish(queue, null, { onComplete: true }) + + await boss.fetch(queue) + + await boss.complete(jobId) + + const job = await boss.fetchCompleted(queue) + + assert.strictEqual(job.data.request.id, jobId) + + await boss.stop() + }) }) diff --git a/test/databaseTest.js b/test/databaseTest.js index d75ad959..f23f98f9 100644 --- a/test/databaseTest.js +++ b/test/databaseTest.js @@ -1,4 +1,5 @@ -const Promise = require('bluebird') +const delay = require('delay') +const pMap = require('p-map') const assert = require('assert') const PgBoss = require('../') const helper = require('./testHelper') @@ -50,9 +51,9 @@ describe('database', function () { const prevConnectionCount = await countConnections(boss.db) - await Promise.map(listeners, (val, index) => boss.subscribe(`job${index}`, () => {})) + await pMap(listeners, (val, index) => boss.subscribe(`job${index}`, () => {})) - await Promise.delay(3000) + await delay(3000) const connectionCount = await countConnections(boss.db) diff --git a/test/delayTest.js b/test/delayTest.js index 896f47bc..89d251b7 100644 --- a/test/delayTest.js +++ b/test/delayTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('delayed jobs', function () { it('should wait until after an int (in seconds)', async function () { @@ -48,7 +48,7 @@ describe('delayed jobs', function () { assert.strictEqual(job, null) - await Promise.delay(5000) + await delay(5000) const job2 = await boss.fetch(queue) @@ -72,7 +72,7 @@ describe('delayed jobs', function () { assert.strictEqual(job, null) - await Promise.delay(2000) + await delay(2000) const job2 = await boss.fetch(queue) @@ -96,7 +96,7 @@ describe('delayed jobs', function () { assert.strictEqual(job, null) - await Promise.delay(2000) + await delay(2000) const job2 = await boss.fetch(queue) diff --git a/test/deleteQueueTest.js b/test/deleteQueueTest.js index e8bee485..215c89ba 100644 --- a/test/deleteQueueTest.js +++ b/test/deleteQueueTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('deleteQueue', function () { it('should clear a specific queue', async function () { @@ -80,7 +80,7 @@ describe('deleteQueue', function () { await boss.complete(jobId) - await Promise.delay(3000) + await delay(3000) const db = await helper.getDb() diff --git a/test/deleteTest.js b/test/deleteTest.js index 8bb3e21e..bebf9fd8 100644 --- a/test/deleteTest.js +++ b/test/deleteTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('delete', async function () { const defaults = { @@ -20,7 +20,7 @@ describe('delete', async function () { await boss.complete(jobId) - await Promise.delay(7000) + await delay(7000) const archivedJob = await helper.getArchivedJobById(config.schema, jobId) diff --git a/test/expireTest.js b/test/expireTest.js index 268c09a4..7f3fcf3f 100644 --- a/test/expireTest.js +++ b/test/expireTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('expire', function () { const defaults = { maintenanceIntervalSeconds: 1 } @@ -15,7 +15,7 @@ describe('expire', function () { await boss.fetch(queue) // this should give it enough time to expire - await Promise.delay(8000) + await delay(8000) const job = await boss.fetchCompleted(queue) @@ -35,7 +35,7 @@ describe('expire', function () { await boss.fetch(queue) // this should give it enough time to expire - await Promise.delay(8000) + await delay(8000) const job = await boss.fetchCompleted(queue) diff --git a/test/failureTest.js b/test/failureTest.js index bc90286d..e6a7d203 100644 --- a/test/failureTest.js +++ b/test/failureTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') const assert = require('assert') const helper = require('./testHelper') @@ -125,7 +125,7 @@ describe('failure', function () { await boss.subscribe(queue, job => Promise.reject(failPayload)) await boss.publish(queue) - await Promise.delay(7000) + await delay(7000) const job = await boss.fetchCompleted(queue) @@ -146,7 +146,7 @@ describe('failure', function () { await boss.subscribe(queue, job => Promise.reject(errorResponse)) await boss.publish(queue) - await Promise.delay(7000) + await delay(7000) const job = await boss.fetchCompleted(queue) @@ -164,7 +164,7 @@ describe('failure', function () { await boss.publish(queue) await boss.subscribe(queue, async () => { throw new Error(message) }) - await Promise.delay(2000) + await delay(2000) const job = await boss.fetchCompleted(queue) diff --git a/test/fetchTest.js b/test/fetchTest.js index 68ae164f..47b3420b 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -1,4 +1,3 @@ -const Promise = require('bluebird') const assert = require('assert') const helper = require('./testHelper') diff --git a/test/maintenanceTest.js b/test/maintenanceTest.js index 8f0acfe4..a963f929 100644 --- a/test/maintenanceTest.js +++ b/test/maintenanceTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') const PgBoss = require('../') describe('maintenance', async function () { @@ -22,7 +22,7 @@ describe('maintenance', async function () { }) // wait for monitoring to check timestamp - await Promise.delay(4000) + await delay(4000) const count = await countJobs() assert(count > 1) diff --git a/test/managerTest.js b/test/managerTest.js index 38036363..fcbc9b67 100644 --- a/test/managerTest.js +++ b/test/managerTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') const assert = require('assert') const PgBoss = require('../') @@ -8,7 +8,7 @@ describe('manager', function () { await boss.start() - await Promise.delay(2000) + await delay(2000) try { await boss.start() diff --git a/test/multiMasterTest.js b/test/multiMasterTest.js index aa013f46..932e33c0 100644 --- a/test/multiMasterTest.js +++ b/test/multiMasterTest.js @@ -1,6 +1,7 @@ const assert = require('assert') +const pMap = require('p-map') +const delay = require('delay') const helper = require('./testHelper') -const Promise = require('bluebird') const PgBoss = require('../') const Contractor = require('../src/contractor') const migrationStore = require('../src/migrationStore') @@ -17,12 +18,12 @@ describe('multi-master', function () { } try { - await Promise.map(instances, i => i.start()) + await pMap(instances, i => i.start()) } catch (err) { console.log(err.message) assert(false) } finally { - await Promise.map(instances, i => i.stop()) + await pMap(instances, i => i.stop()) } }) @@ -51,12 +52,12 @@ describe('multi-master', function () { } try { - await Promise.map(instances, i => i.start()) + await pMap(instances, i => i.start()) } catch (err) { console.log(err.message) assert(false) } finally { - await Promise.map(instances, i => i.stop()) + await pMap(instances, i => i.stop()) } }) @@ -93,7 +94,7 @@ describe('multi-master', function () { await boss.start() - await Promise.delay(3000) + await delay(3000) const completedCount = await countJobs(states.completed) diff --git a/test/retryTest.js b/test/retryTest.js index c32e9a3a..305331a2 100644 --- a/test/retryTest.js +++ b/test/retryTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('retries', function () { const defaults = { maintenanceIntervalSeconds: 1 } @@ -13,7 +13,7 @@ describe('retries', function () { const try1 = await boss.fetch(queue) - await Promise.delay(5000) + await delay(5000) const try2 = await boss.fetch(queue) @@ -70,7 +70,7 @@ describe('retries', function () { assert.strictEqual(job1, null) - await Promise.delay(1000) + await delay(1000) const job2 = await boss.fetch(queue) @@ -89,7 +89,7 @@ describe('retries', function () { await boss.subscribe(queue, { newJobCheckInterval: 500 }, job => job.done(++subscribeCount)) await boss.publish(queue, null, { retryLimit, retryBackoff: true }) - await Promise.delay(9000) + await delay(9000) assert(subscribeCount < retryLimit) @@ -108,7 +108,7 @@ describe('retries', function () { assert.strictEqual(job1, null) - await Promise.delay(1000) + await delay(1000) const job2 = await boss.fetch(queue) diff --git a/test/scheduleTest.js b/test/scheduleTest.js index 83f126a8..aa6e8deb 100644 --- a/test/scheduleTest.js +++ b/test/scheduleTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') const assert = require('assert') const { DateTime } = require('luxon') const helper = require('./testHelper') @@ -15,7 +15,7 @@ describe('schedule', function () { await boss.schedule(queue, '* * * * *') - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -36,7 +36,7 @@ describe('schedule', function () { await boss.schedule(queue, '* * * * *') - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -62,7 +62,7 @@ describe('schedule', function () { await boss.schedule(queue, '* * * * *') - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -82,7 +82,7 @@ describe('schedule', function () { boss = await helper.start(this.test.bossConfig) - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -107,7 +107,7 @@ describe('schedule', function () { await boss.start() - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -134,7 +134,7 @@ describe('schedule', function () { await boss.schedule(queue, `${minuteExpression} * * * *`) - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) @@ -166,7 +166,7 @@ describe('schedule', function () { await boss.schedule(queue, `${minute} ${hour} * * *`, null, { tz }) - await Promise.delay(ASSERT_DELAY) + await delay(ASSERT_DELAY) const job = await boss.fetch(queue) diff --git a/test/speedTest.js b/test/speedTest.js index 8e0fa0b7..21036f0d 100644 --- a/test/speedTest.js +++ b/test/speedTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const pMap = require('p-map') const helper = require('./testHelper') describe('speed', function () { @@ -15,7 +15,7 @@ describe('speed', function () { beforeEach(async function () { const defaults = { noSupervisor: true, min: 10, max: 10 } boss = await helper.start({ ...this.currentTest.bossConfig, ...defaults }) - await Promise.map(jobs, job => boss.publish(job.name, job.data)) + await pMap(jobs, job => boss.publish(job.name, job.data)) }) afterEach(async function () { await boss.stop() }) diff --git a/test/subscribeTest.js b/test/subscribeTest.js index 3b0a1ee1..f8c4ab7b 100644 --- a/test/subscribeTest.js +++ b/test/subscribeTest.js @@ -1,4 +1,4 @@ -const Promise = require('bluebird') +const delay = require('delay') const assert = require('assert') const helper = require('./testHelper') @@ -70,7 +70,7 @@ describe('subscribe', function () { await boss.subscribe(queue, { newJobCheckIntervalSeconds }, () => subscribeCount++) - await Promise.delay(timeout) + await delay(timeout) assert(subscribeCount <= timeout / 1000 / newJobCheckIntervalSeconds) @@ -92,7 +92,7 @@ describe('subscribe', function () { await boss.publish(queue) await boss.publish(queue) - await Promise.delay(5000) + await delay(5000) assert.strictEqual(receivedCount, 1) @@ -155,7 +155,7 @@ describe('subscribe', function () { } // test would time out if it had to wait for each handler to resolve - await Promise.delay(4000) + await delay(4000) }) } }) @@ -196,11 +196,11 @@ describe('subscribe', function () { await boss.subscribe(queue, async () => { // delay slows down subscribe fetch - await Promise.delay(2000) + await delay(2000) subscribeCount++ }) - await Promise.delay(7000) + await delay(7000) assert(subscribeCount < batchSize) @@ -241,7 +241,7 @@ describe('subscribe', function () { await boss.publish(queue) - await Promise.delay(8000) + await delay(8000) const job = await boss.fetchCompleted(queue) @@ -269,7 +269,7 @@ describe('subscribe', function () { await boss.publish(queue) - await Promise.delay(8000) + await delay(8000) const job = await boss.fetchCompleted(queue) diff --git a/test/throttleTest.js b/test/throttleTest.js index 9df635f4..3c925908 100644 --- a/test/throttleTest.js +++ b/test/throttleTest.js @@ -1,6 +1,6 @@ const assert = require('assert') const helper = require('./testHelper') -const Promise = require('bluebird') +const delay = require('delay') describe('throttle', function () { it('should only create 1 job for interval with a delay', async function () { @@ -18,10 +18,10 @@ describe('throttle', function () { for (let i = 0; i < publishCount; i++) { await boss.publish(queue, null, { startAfter, singletonSeconds }) - await Promise.delay(publishInterval) + await delay(publishInterval) } - await Promise.delay(singletonSeconds * 1000) + await delay(singletonSeconds * 1000) assert(subscribeCount <= 2) @@ -44,10 +44,10 @@ describe('throttle', function () { for (let i = 0; i < publishCount; i++) { await boss.publish(queue, null, { singletonSeconds }) - await Promise.delay(publishInterval) + await delay(publishInterval) } - await Promise.delay(assertTimeout) + await delay(assertTimeout) assert(subscribeCount <= jobCount + 1) diff --git a/types.d.ts b/types.d.ts index cf73c363..3eb7bb83 100644 --- a/types.d.ts +++ b/types.d.ts @@ -51,7 +51,11 @@ declare namespace PgBoss { & RetentionOptions & RetryOptions & JobPollingOptions + & CompletionOptions + interface CompletionOptions { + onComplete?: boolean; + } interface ExpirationOptions { expireInSeconds?: number; expireInMinutes?: number; @@ -80,7 +84,7 @@ declare namespace PgBoss { singletonNextSlot?: boolean; } - type PublishOptions = JobOptions & ExpirationOptions & RetentionOptions & RetryOptions + type PublishOptions = JobOptions & ExpirationOptions & RetentionOptions & RetryOptions & CompletionOptions type ScheduleOptions = PublishOptions & { tz?: string } @@ -198,8 +202,6 @@ declare class PgBoss { static getRollbackPlans(schema: string, version: string): string; on(event: "error", handler: (error: Error) => void): void; - on(event: "archived", handler: (count: number) => void): void; - on(event: "expired", handler: (count: number) => void): void; on(event: "maintenance", handler: () => void): void; on(event: "monitor-states", handler: (monitorStates: PgBoss.MonitorStates) => void): void; diff --git a/version.json b/version.json index f724f45f..9e9e6673 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "schema": 15 + "schema": 16 }