Skip to content

Commit

Permalink
Opt out of completion jobs (#222)
Browse files Browse the repository at this point in the history
* add config to opt out of completion job and sans bluebird

* changelog [skip ci]

* fix migration

* fix action response counts

* added more checks in timekeeper for stopped instance

* docs and types
  • Loading branch information
timgit authored Feb 9, 2021
1 parent 0f2d6a8 commit c00642b
Show file tree
Hide file tree
Showing 31 changed files with 290 additions and 150 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changes

## 5.2.0

- Added constructor and publish option `onComplete` to opt out of creating a completion job in the queue once a job is completed. This defaults to true for backwards compatibility.
- Replaced bluebird dependency with p-map and delay.

## 5.1.0

- Added transactional locking to maintenance queries as a safeguard from deadlocks under load.
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pg-boss can be customized using configuration options when an instance is create
- [Deferred jobs](#deferred-jobs)
- [Unique jobs](#unique-jobs)
- [Throttled jobs](#throttled-jobs)
- [Completion jobs](#completion-jobs)
- [Fetch options](#fetch-options)
- [Subscribe options](#subscribe-options)
- [Job polling options](#job-polling-options)
Expand Down Expand Up @@ -257,6 +258,11 @@ For example, if you set the `singletonMinutes` to 1, then submit 2 jobs within a
Setting `singletonNextSlot` to true will cause the job to be scheduled to run after the current time slot if and when a job is throttled. This option is set to true, for example, when calling the convenience function `publishDebounced()`.

### Completion jobs
* **onComplete**, bool (Default: true)

When a job completes, a completion job will be created in the queue, copying the same retention policy as the job, for the purpose of `onComplete()` or `fetchCompleted()`. If completion jobs are not used, they will be archived according to the retention policy. If the queue in question has a very high volume, this can be set to `false` to bypass creating the completion job. This can also be set in the constructor as a default for all calls to `publish()`.

## Fetch options

* **includeMetadata**, bool
Expand Down
50 changes: 32 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
{
"name": "pg-boss",
"version": "5.1.0",
"version": "5.2.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
"node": ">=10.0.0"
},
"dependencies": {
"bluebird": "^3.7.2",
"cron-parser": "^3.1.0",
"delay": "^5.0.0",
"p-map": "^4.0.0",
"pg": "^8.5.1",
"uuid": "^8.3.2"
},
Expand Down
13 changes: 13 additions & 0 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ function checkPublishArgs (args, defaults) {
applyRetryConfig(options, defaults)
applyExpirationConfig(options, defaults)
applyRetentionConfig(options, defaults)
applyCompletionConfig(options, defaults)

const { startAfter, singletonSeconds, singletonMinutes, singletonHours } = options

Expand Down Expand Up @@ -147,6 +148,7 @@ function getConfig (value) {
applyNewJobCheckInterval(config)
applyExpirationConfig(config)
applyRetentionConfig(config)
applyCompletionConfig(config)

return config
}
Expand Down Expand Up @@ -175,6 +177,17 @@ function applyArchiveConfig (config) {
}
}

function applyCompletionConfig (config, defaults) {
assert(!('onComplete' in config) || config.onComplete === true || config.onComplete === false,
'configuration assert: onComplete must be either true or false')

if (!('onComplete' in config)) {
config.onComplete = defaults
? defaults.onComplete
: true
}
}

function applyRetentionConfig (config, defaults) {
assert(!('retentionSeconds' in config) || config.retentionSeconds >= 1,
'configuration assert: retentionSeconds must be at least every second')
Expand Down
6 changes: 4 additions & 2 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class Boss extends EventEmitter {
options = {
startAfter,
retentionSeconds: this.maintenanceIntervalSeconds * 4,
singletonKey: queues.MAINTENANCE
singletonKey: queues.MAINTENANCE,
onComplete: false
}

await this.manager.publish(queues.MAINTENANCE, null, options)
Expand All @@ -104,7 +105,8 @@ class Boss extends EventEmitter {
options = {
startAfter,
retentionSeconds: this.monitorIntervalSeconds * 4,
singletonKey: queues.MONITOR_STATES
singletonKey: queues.MONITOR_STATES,
onComplete: false
}

await this.manager.publish(queues.MONITOR_STATES, null, options)
Expand Down
12 changes: 7 additions & 5 deletions src/manager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const assert = require('assert')
const EventEmitter = require('events')
const Promise = require('bluebird')
const pMap = require('p-map')
const uuid = require('uuid')

const Worker = require('./worker')
Expand Down Expand Up @@ -85,7 +85,7 @@ class Manager extends EventEmitter {
const concurrency = options.teamConcurrency || 1

// either no option was set, or teamSize was used
return Promise.map(jobs, job =>
return pMap(jobs, job =>
callback(job)
.then(value => this.complete(job.id, value))
.catch(err => this.fail(job.id, err))
Expand Down Expand Up @@ -178,7 +178,8 @@ class Manager extends EventEmitter {
singletonSeconds,
retryBackoff,
retryLimit,
retryDelay
retryDelay,
onComplete
} = options

const id = uuid[this.config.uuid]()
Expand All @@ -196,7 +197,8 @@ class Manager extends EventEmitter {
singletonOffset, // 10
retryDelay, // 11
retryBackoff, // 12
keepUntil // 13
keepUntil, // 13
onComplete // 14
]

const result = await this.db.executeSql(this.insertJobCommand, values)
Expand Down Expand Up @@ -292,7 +294,7 @@ class Manager extends EventEmitter {
return {
jobs: ids,
requested: ids.length,
updated: result.rowCount
updated: parseInt(result.rows[0].count)
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/migrationStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ function getAll (schema, config) {
const keepUntil = config ? config.keepUntil : DEFAULT_RETENTION

return [
{
release: '5.2.0',
version: 16,
previous: 15,
install: [
`ALTER TABLE ${schema}.job ADD on_complete boolean`,
`UPDATE ${schema}.job SET on_complete = true`,
`ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET DEFAULT true`,
`ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET NOT NULL`,
`ALTER TABLE ${schema}.archive ADD on_complete boolean`
],
uninstall: [
`ALTER TABLE ${schema}.job DROP COLUMN on_complete`,
`ALTER TABLE ${schema}.archive DROP COLUMN on_complete`
]
},
{
release: '5.0.6',
version: 15,
Expand Down
Loading

0 comments on commit c00642b

Please sign in to comment.