Skip to content

Commit

Permalink
Bulk unique insertion, uniqueness with subset of args (#590)
Browse files Browse the repository at this point in the history
* add state machine mermaid chart

* new unique jobs implementation that works on bulk insert

* support uniqueness for specific args via struct tags

* block bulk inserts using advisory lock uniqueness

* handle unique conflicts from scheduler by discarding jobs

* add changelog entry
  • Loading branch information
bgentry authored Sep 22, 2024
1 parent 3ed1d29 commit d977e10
Show file tree
Hide file tree
Showing 42 changed files with 2,592 additions and 1,853 deletions.
52 changes: 52 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

If not using River's internal migration system, the raw SQL can alternatively be dumped with:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-get --version 6 --up > river6.up.sql
river migrate-get --version 6 --down > river6.down.sql
```

The migration **includes a new index**. Users with a very large job table may want to consider raising the index separately using `CONCURRENTLY` (which must be run outside of a transaction), then run `river migrate-up` to finalize the process (it will tolerate an index that already exists):

```sql
ALTER TABLE river_job ADD COLUMN unique_states BIT(8);

CREATE UNIQUE INDEX CONCURRENTLY river_job_unique_idx ON river_job (unique_key)
WHERE unique_key IS NOT NULL
AND unique_states IS NOT NULL
AND river_job_state_in_bitmask(unique_states, state);
```

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

## Added

- `rivertest.WorkContext`, a test function that can be used to initialize a context to test a `JobArgs.Work` implementation that will have a client set to context for use with `river.ClientFromContext`. [PR #526](https://github.com/riverqueue/river/pull/526).
Expand Down Expand Up @@ -35,6 +66,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- Unique jobs have been improved to allow bulk insertion of unique jobs via `InsertMany` / `InsertManyTx`, and to allow customizing the `ByState` list to add or remove certain states. This enables users to expand the set of unique states to also include `cancelled` and `discarded` jobs, or to remove `retryable` from uniqueness consideration. This updated implementation maintains the speed advantage of the newer index-backed uniqueness system, while allowing some flexibility in which job states.

Unique jobs utilizing `ByArgs` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the `trace_id` field:
```go
type MyJobArgs {
CustomerID string `json:"customer_id" river:"unique`
TraceID string `json:"trace_id"`
}
```
Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result, even if the encoded JSON isn't sorted consistently. For example `encoding/json` encodes struct fields in their defined order, so merely reordering struct fields would previously have been enough to cause a new job to not be considered identical to a pre-existing one with different JSON order.

The `UniqueOpts` type also gains an `ExcludeKind` option for cases where uniqueness needs to be guaranteed across multiple job types.

In-flight unique jobs using the previous designs will continue to be executed successfully with these changes, so there should be no need for downtime as part of the migration. However the v6 migration adds a new unique job index while also removing the old one, so users with in-flight unique jobs may also wish to avoid removing the old index until the new River release has been deployed in order to guarantee that jobs aren't duplicated by old River code once that index is removed.
**Deprecated**: The original unique jobs implementation which relied on advisory locks has been deprecated, but not yet removed. The only way to trigger this old code path is with a single insert (`Insert`/`InsertTx`) and using `UniqueOpts.ByState` with a custom list of states that omits some of the now-required states for unique jobs. Specifically, `pending`, `scheduled`, `available`, and `running` can not be removed from the `ByState` list with the new implementation. These are included in the default list so only the places which customize this attribute need to be updated to opt into the new (much faster) unique jobs. The advisory lock unique implementation will be removed in an upcoming release.
[PR #590](https://github.com/riverqueue/river/pull/590).
- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.
- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).
Expand Down
90 changes: 52 additions & 38 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type Config struct {
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
// Advisory locks are currently only used for the deprecated fallback/slow
// path of unique job insertion when pending, scheduled, available, or running
// are omitted from a customized ByState configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down Expand Up @@ -346,7 +346,7 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand Down Expand Up @@ -1225,6 +1225,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
}

insertParams := &riverdriver.JobInsertFastParams{
Args: args,
CreatedAt: createdAt,
EncodedArgs: json.RawMessage(encodedArgs),
Kind: args.Kind(),
Expand All @@ -1235,6 +1236,22 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
if err != nil {
return nil, nil, err
}
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}
}

switch {
case !insertOpts.ScheduledAt.IsZero():
Expand All @@ -1253,7 +1270,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, (*dbunique.UniqueOpts)(&uniqueOpts), nil
return insertParams, returnUniqueOpts, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand All @@ -1273,7 +1290,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts)
return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1294,15 +1311,15 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
if err != nil {
return nil, err
}
Expand All @@ -1313,9 +1330,23 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
}
defer tx.Rollback(ctx)

jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
// TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design
var jobInsertRes *riverdriver.JobInsertFastResult
if uniqueOpts == nil {
jobInsertRes, err = tx.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
}
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
Expand All @@ -1325,7 +1356,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

return jobInsertRes, nil
return (*rivertype.JobInsertResult)(jobInsertRes), nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1431,8 +1462,8 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
}
Expand All @@ -1450,20 +1481,12 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't supported for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously could
// easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}

insertParams[i] = insertParamsItem
}

return insertParams, nil
Expand Down Expand Up @@ -1579,20 +1602,11 @@ func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverd
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}
insertParams[i] = insertParamsItem
}

return insertParams, nil
Expand Down
Loading

0 comments on commit d977e10

Please sign in to comment.