Skip to content

Commit

Permalink
block bulk inserts using advisory lock uniqueness
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 21, 2024
1 parent 736cfc7 commit 035b0e1
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 44 deletions.
36 changes: 14 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts, bulk bool) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand Down Expand Up @@ -1239,7 +1239,9 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
// TODO: block this path if we're within a multirow insert.
if bulk {
return nil, nil, errors.New("bulk inserts do not support advisory lock uniqueness and cannot remove required states")
}
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
Expand Down Expand Up @@ -1288,7 +1290,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, errNoDriverDBPool
}

return c.insert(ctx, c.driver.GetExecutor(), args, opts)
return c.insert(ctx, c.driver.GetExecutor(), args, opts, false)
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand All @@ -1309,15 +1311,15 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// transactions, the job will not be worked until the transaction has committed,
// and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts, false)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts, bulk bool) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}

params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts)
params, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, args, opts, bulk)
if err != nil {
return nil, err
}
Expand All @@ -1336,7 +1338,11 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}
} else {
if bulk {
return nil, errors.New("bulk inserts do not support advisory lock uniqueness")
}
// Old deprecated advisory lock route
c.baseService.Logger.WarnContext(ctx, "Using deprecated advisory lock uniqueness for job insert")
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1475,19 +1481,11 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdrive
return nil, err
}

insertParamsItem, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}

if uniqueOpts != nil {
// These are returned only for the v1 style advisory lock based uniqueness,
// which has never been supported on bulk inserts due to the potential for
// contention and deadlocks.
//
// This can be removed when v1 uniqueness is removed.
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
insertParams[i] = insertParamsItem
}

Expand Down Expand Up @@ -1604,16 +1602,10 @@ func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverd
return nil, err
}

insertParamsItem, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
insertParamsItem, _, err := insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts, true)
if err != nil {
return nil, err
}
if uniqueOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
insertParams[i] = insertParamsItem
}

Expand Down
45 changes: 26 additions & 19 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@ func Test_Client_InsertManyFast(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable},
}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Equal(t, 0, count)
})
}
Expand Down Expand Up @@ -1982,7 +1982,7 @@ func Test_Client_InsertManyFastTx(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable},
}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Equal(t, 0, count)
})
}
Expand Down Expand Up @@ -2248,7 +2248,7 @@ func Test_Client_InsertMany(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable},
}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Empty(t, results)
})
}
Expand Down Expand Up @@ -2463,7 +2463,7 @@ func Test_Client_InsertManyTx(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable},
}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.EqualError(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Empty(t, results)
})
}
Expand Down Expand Up @@ -2966,7 +2966,7 @@ func Test_Client_ErrorHandler(t *testing.T) {

// Bypass the normal Insert function because that will error on an
// unknown job.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(t, err)
Expand Down Expand Up @@ -4627,7 +4627,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
subscribeChan, cancel := client.Subscribe(EventKindJobFailed)
t.Cleanup(cancel)

insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil, false)
require.NoError(err)
insertedResult, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)
Expand Down Expand Up @@ -5237,7 +5237,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("Defaults", func(t *testing.T) {
t.Parallel()

insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil)
insertParams, uniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, nil, false)
require.NoError(t, err)
require.Equal(t, `{"name":""}`, string(insertParams.EncodedArgs))
require.Equal(t, (noOpArgs{}).Kind(), insertParams.Kind)
Expand All @@ -5259,7 +5259,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
MaxAttempts: 34,
}

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, overrideConfig, noOpArgs{}, nil, false)
require.NoError(t, err)
require.Equal(t, overrideConfig.MaxAttempts, insertParams.MaxAttempts)
})
Expand All @@ -5274,7 +5274,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ScheduledAt: time.Now().Add(time.Hour),
Tags: []string{"tag1", "tag2"},
}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, opts, false)
require.NoError(t, err)
require.Equal(t, 42, insertParams.MaxAttempts)
require.Equal(t, 2, insertParams.Priority)
Expand All @@ -5290,7 +5290,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: nearFuture,
}, nil)
}, nil, false)
require.NoError(t, err)
// All these come from overrides in customInsertOptsJobArgs's definition:
require.Equal(t, 42, insertParams.MaxAttempts)
Expand All @@ -5306,7 +5306,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{
ScheduledAt: time.Time{},
}, nil)
}, nil, false)
require.NoError(t, err)
require.Nil(t, insertParams.ScheduledAt)
})
Expand All @@ -5317,14 +5317,14 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{strings.Repeat("h", 256)},
})
}, false)
require.EqualError(t, err, "tags should be a maximum of 255 characters long")
}

{
_, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, &customInsertOptsJobArgs{}, &InsertOpts{
Tags: []string{"tag,with,comma"},
})
}, false)
require.EqualError(t, err, "tags should match regex "+tagRE.String())
}
})
Expand All @@ -5342,7 +5342,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ExcludeKind: true,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand Down Expand Up @@ -5380,7 +5380,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: states,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand Down Expand Up @@ -5410,7 +5410,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, false)
require.NoError(t, err)
require.NotNil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
Expand All @@ -5423,6 +5423,12 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {

require.Nil(t, params.UniqueKey)
require.Zero(t, params.UniqueStates)

// In a bulk insert, this should be explicitly blocked:
params, resultUniqueOpts, err = insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.ErrorContains(t, err, "bulk inserts do not support advisory lock uniqueness and cannot remove required states")
require.Nil(t, params)
require.Nil(t, resultUniqueOpts)
})

t.Run("UniqueOptsWithPartialArgs", func(t *testing.T) {
Expand All @@ -5442,7 +5448,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
Excluded: true,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, &InsertOpts{UniqueOpts: uniqueOpts}, true)
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{ByArgs: true}
Expand Down Expand Up @@ -5473,7 +5479,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
t.Parallel()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5})
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{Priority: 5}, false)
require.ErrorContains(t, err, "priority must be between 1 and 4")
require.Nil(t, insertParams)
})
Expand All @@ -5482,7 +5488,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
t.Parallel()

args := timeoutTestArgs{TimeoutValue: time.Hour}
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, args, nil, false)
require.NoError(t, err)
require.Equal(t, `{"timeout_value":3600000000000}`, string(insertParams.EncodedArgs))
})
Expand All @@ -5498,6 +5504,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
false,
)
require.EqualError(t, err, "JobUniqueOpts.ByPeriod should not be less than 1 second")
require.Nil(t, insertParams)
Expand Down
2 changes: 1 addition & 1 deletion periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe
if args == nil {
return nil, nil, maintenance.ErrNoJobToInsert
}
return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options)
return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options, false)
},
RunOnStart: opts.RunOnStart,
ScheduleFunc: periodicJob.scheduleFunc.Next,
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {

params := make([]*riverdriver.JobInsertFastParams, maxJobCount)
for i := range params {
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(archetype, config, WithJobNumArgs{JobNum: i}, nil, true)
require.NoError(err)

params[i] = insertParams
Expand Down Expand Up @@ -277,7 +277,7 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin
mustInsert := func(ctx context.Context, t *testing.T, bundle *testBundle, args JobArgs) {
t.Helper()

insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil)
insertParams, _, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, args, nil, false)
require.NoError(t, err)

_, err = bundle.exec.JobInsertFast(ctx, insertParams)
Expand Down

0 comments on commit 035b0e1

Please sign in to comment.