Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Rename old InsertMany, add new version with return values #589

Merged
merged 4 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

⚠️ Version 0.12.0 has a small breaking change in `rivermigrate`. As before, we try never to make breaking changes, but this one was deemed worth it because it's quite small and may help avoid panics.
⚠️ Version 0.12.0 has two small breaking changes, one for `InsertMany` and one in `rivermigrate`. As before, we try never to make breaking changes, but these ones were deemed worth it because of minimal impact and to help avoid panics.

- **Breaking change:** `Client.InsertMany` / `InsertManyTx` now return the inserted rows rather than merely returning a count of the inserted rows. The new implementations no longer use Postgres' `COPY FROM` protocol in order to facilitate return values.

Users who relied on the return count can merely wrap the returned rows in a `len()` to return to that behavior, or you can continue using the old APIs using their new names `InsertManyFast` and `InsertManyFastTx`. [PR #589](https://github.com/riverqueue/river/pull/589).

- **Breaking change:** `rivermigrate.New` now returns a possible error along with a migrator. An error may be returned, for example, when a migration line is configured that doesn't exist. [PR #558](https://github.com/riverqueue/river/pull/558).

Expand Down
149 changes: 140 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,138 @@ type InsertManyParams struct {
InsertOpts *InsertOpts
}

// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// InsertMany inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertMany(ctx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a thought that maybe instead of JobInsertResult being a plain struct we could consider using an interface. That would allow for expandability, particularly if we override these on the Pro side to return additional stuff at some point.

Need to think more about concrete use cases for that though.

if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}

tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, params)
if err != nil {
return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertManyTx inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, params)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

jobRows, err := tx.JobInsertManyReturning(ctx, insertParams)
if err != nil {
return nil, err
}

queues := make([]string, 0, 10)
for _, params := range insertParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
}
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return nil, err
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
},
), nil
}

// Validates input parameters for a batch insert operation and generates a set
// of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't supported for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously could
// easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
}

return insertParams, nil
}

// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
Expand All @@ -1345,12 +1476,12 @@ type InsertManyParams struct {
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) {
func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) {
if !c.driver.HasPool() {
return 0, errNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}
Expand All @@ -1362,7 +1493,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
}
defer tx.Rollback(ctx)

inserted, err := c.insertFastMany(ctx, tx, insertParams)
inserted, err := c.insertManyFast(ctx, tx, insertParams)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1395,17 +1526,17 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyParams(params)
func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}

exec := c.driver.UnwrapExecutor(tx)
return c.insertFastMany(ctx, exec, insertParams)
return c.insertManyFast(ctx, exec, insertParams)
}

func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
inserted, err := tx.JobInsertFastMany(ctx, insertParams)
if err != nil {
return inserted, err
Expand All @@ -1425,7 +1556,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}
Expand Down
Loading
Loading