Skip to content

Commit

Permalink
block bulk inserts using advisory lock uniqueness
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 20, 2024
1 parent 0cffa61 commit 21e3033
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 26 deletions.
22 changes: 14 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand Down Expand Up @@ -1239,7 +1239,9 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
// TODO: block this path if we're within a multirow insert.
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
Expand Down Expand Up @@ -1285,7 +1287,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts)
return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1306,15 +1308,15 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
if err != nil {
return nil, err
}
Expand All @@ -1333,7 +1335,11 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1482,7 +1488,7 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1611,7 +1617,7 @@ func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverd
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, false)
if err != nil {
return nil, err
}
Expand Down
37 changes: 22 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2887,7 +2887,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -4489,7 +4489,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false)
require.NoError(err)
insertedResult, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -5099,7 +5099,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil, false)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
Expand All @@ -5121,7 +5121,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil, false)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})
Expand All @@ -5136,7 +5136,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts, false)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -5152,7 +5152,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: nearFuture,
}, nil)
}, nil, false)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -5168,7 +5168,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: time.Time{},
}, nil)
}, nil, false)
require.NoError(t, err)
require.Nil(t, insertParams.ScheduledAt)
})
Expand All @@ -5179,14 +5179,14 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{strings.Repeat("h", 256)},
})
}, false)
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
}

{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{"tag,with,comma"},
})
}, false)
require.EqualError(t, err, "tags should match regex "+tagRE.String())
}
})
Expand All @@ -5204,7 +5204,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ExcludeKind: true,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand Down Expand Up @@ -5241,7 +5241,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: states,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand Down Expand Up @@ -5270,7 +5270,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false)
require.NoError(t, err)
require.NotNil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand All @@ -5283,6 +5283,12 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

require.Nil(t, params.UniqueKey)
require.Zero(t, params.UniqueStates)

// In a bulk insert, this should be explicitly blocked:
params, resultUniqueOpts, err = insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.ErrorContains(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Nil(t, params)
require.Nil(t, resultUniqueOpts)
})

t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) {
Expand All @@ -5302,7 +5308,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
Excluded: true,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true}
Expand All @@ -5316,7 +5322,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}, false)
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -5325,7 +5331,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil, false)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -5341,6 +5347,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
false,
)
require.EqualError(t, err, "JobUniqueOpts.ByPeriod should not be less than 1 second")
require.Nil(t, insertParams)
Expand Down
2 changes: 1 addition & 1 deletion periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe
if args == nil {
return nil, nil, maintenance.ErrNoJobToInsert
}
return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options)
return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options, false)
},
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {

params := make([]*riverdriver.JobInsertFastParams, maxJobCount)
for i := range params {
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil, true)
require.NoError(err)

params[i] = insertParams
Expand Down Expand Up @@ -277,7 +277,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin
mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) {
t.Helper()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil, false)
require.NoError(t, err)

_, err = bundle.exec.JobInsertFast(ctx, insertParams)
Expand Down

0 comments on commit 21e3033

Please sign in to comment.