Skip to content

Commit

Permalink
new InsertMany/InsertManyTx with return values
Browse files Browse the repository at this point in the history
This adds new implementations of `InsertMany` / `InsertManyTx` that use
the multirow `VALUES` syntax to allow the new rows to be returned upon
insert. The alternative `COPY FROM ` implementation has been renamed to
`InsertManyFast` / `InsertManyFastTx`. The expectation is that these
forms will only be needed in cases where an extremely large number of
records is being inserted simultaneously, whereas the new form is more
user-friendly for the vast majority of other cases.
  • Loading branch information
bgentry committed Sep 10, 2024
1 parent b59da75 commit ba2bccd
Show file tree
Hide file tree
Showing 4 changed files with 595 additions and 9 deletions.
136 changes: 136 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,142 @@ type InsertManyParams struct {
InsertOpts *InsertOpts
}

// InsertMany inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertMany(ctx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

// Wrap in a transaction in case we need to notify about inserts.
tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, insertParams)
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertManyTx inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, insertParams)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
jobRows, err := tx.JobInsertManyReturning(ctx, insertParams)
if err != nil {
return nil, err
}

queues := make([]string, 0, 10)
for _, params := range insertParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
}
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return nil, err
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
},
), nil
}

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
}

return insertParams, nil
}

// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
Expand Down
Loading

0 comments on commit ba2bccd

Please sign in to comment.