diff --git a/CHANGELOG.md b/CHANGELOG.md index d50d0a88..45609fdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -⚠️ Version 0.12.0 has a small breaking change in `rivermigrate`. As before, we try never to make breaking changes, but this one was deemed worth it because it's quite small and may help avoid panics. +⚠️ Version 0.12.0 has two small breaking changes, one for `InsertMany` and one in `rivermigrate`. As before, we try never to make breaking changes, but these ones were deemed worth it because of minimal impact and to help avoid panics. + +- **Breaking change:** `Client.InsertMany` / `InsertManyTx` now return the inserted rows rather than merely returning a count of the inserted rows. The new implementations no longer use Postgres' `COPY FROM` protocol in order to facilitate return values. + + Users who relied on the return count can merely wrap the returned rows in a `len()` to return to that behavior, or you can continue using the old APIs using their new names `InsertManyFast` and `InsertManyFastTx`. [PR #589](https://github.com/riverqueue/river/pull/589). - **Breaking change:** `rivermigrate.New` now returns a possible error along with a migrator. An error may be returned, for example, when a migration line is configured that doesn't exist. [PR #558](https://github.com/riverqueue/river/pull/558). diff --git a/client.go b/client.go index 4aa69df9..89509c07 100644 --- a/client.go +++ b/client.go @@ -1326,7 +1326,138 @@ type InsertManyParams struct { InsertOpts *InsertOpts } -// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism, +// InsertMany inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertMany(ctx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + if !c.driver.HasPool() { + return nil, errNoDriverDBPool + } + + tx, err := c.driver.GetExecutor().Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + inserted, err := c.insertMany(ctx, tx, params) + if err != nil { + return nil, err + } + + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return inserted, nil +} + +// InsertManyTx inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// This variant lets a caller insert jobs atomically alongside other database +// changes. An inserted job isn't visible to be worked until the transaction +// commits, and if the transaction rolls back, so too is the inserted job. +func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + exec := c.driver.UnwrapExecutor(tx) + return c.insertMany(ctx, exec, params) +} + +func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + insertParams, err := c.insertManyParams(params) + if err != nil { + return nil, err + } + + jobRows, err := tx.JobInsertFastMany(ctx, insertParams) + if err != nil { + return nil, err + } + + queues := make([]string, 0, 10) + for _, params := range insertParams { + if params.State == rivertype.JobStateAvailable { + queues = append(queues, params.Queue) + } + } + if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil { + return nil, err + } + + return sliceutil.Map(jobRows, + func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult { + return &rivertype.JobInsertResult{Job: jobRow} + }, + ), nil +} + +// Validates input parameters for a batch insert operation and generates a set +// of batch insert parameters. +func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { + if len(params) < 1 { + return nil, errors.New("no jobs to insert") + } + + insertParams := make([]*riverdriver.JobInsertFastParams, len(params)) + for i, param := range params { + if err := c.validateJobArgs(param.Args); err != nil { + return nil, err + } + + if param.InsertOpts != nil { + // UniqueOpts aren't supported for batch inserts because they use PG + // advisory locks to work, and taking many locks simultaneously could + // easily lead to contention and deadlocks. + if !param.InsertOpts.UniqueOpts.isEmpty() { + return nil, errors.New("UniqueOpts are not supported for batch inserts") + } + } + + var err error + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) + if err != nil { + return nil, err + } + } + + return insertParams, nil +} + +// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, // making the operation quite fast and memory efficient. Each job is inserted as // an InsertManyParams tuple, which takes job args along with an optional set of // insert options, which override insert options provided by an @@ -1345,12 +1476,12 @@ type InsertManyParams struct { // Job uniqueness is not respected when using InsertMany due to unique inserts // using an internal transaction and advisory lock that might lead to // significant lock contention. Insert unique jobs using Insert instead. -func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) { +func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) { if !c.driver.HasPool() { return 0, errNoDriverDBPool } - insertParams, err := c.insertManyParams(params) + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } @@ -1362,7 +1493,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) } defer tx.Rollback(ctx) - inserted, err := c.insertFastMany(ctx, tx, insertParams) + inserted, err := c.insertManyFast(ctx, tx, insertParams) if err != nil { return 0, err } @@ -1395,18 +1526,18 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) // This variant lets a caller insert jobs atomically alongside other database // changes. An inserted job isn't visible to be worked until the transaction // commits, and if the transaction rolls back, so too is the inserted job. -func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { - insertParams, err := c.insertManyParams(params) +func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } exec := c.driver.UnwrapExecutor(tx) - return c.insertFastMany(ctx, exec, insertParams) + return c.insertManyFast(ctx, exec, insertParams) } -func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { - inserted, err := tx.JobInsertFastMany(ctx, insertParams) +func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { + inserted, err := tx.JobInsertFastManyNoReturning(ctx, insertParams) if err != nil { return inserted, err } @@ -1425,7 +1556,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo // Validates input parameters for an a batch insert operation and generates a // set of batch insert parameters. -func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { +func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { if len(params) < 1 { return nil, errors.New("no jobs to insert") } diff --git a/client_test.go b/client_test.go index ff7c1e80..e38cbd24 100644 --- a/client_test.go +++ b/client_test.go @@ -1568,7 +1568,7 @@ func Test_Client_InsertTx(t *testing.T) { }) } -func Test_Client_InsertMany(t *testing.T) { +func Test_Client_InsertManyFast(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1592,7 +1592,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1627,7 +1627,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}}, {Args: callbackArgs{}}, }) @@ -1643,7 +1643,7 @@ func Test_Client_InsertMany(t *testing.T) { // // Note: we specifically use a different queue to ensure that the notify // limiter is immediately to fire on this queue. - count, err = client.InsertMany(ctx, []InsertManyParams{ + count, err = client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, }) require.NoError(t, err) @@ -1675,7 +1675,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, }) @@ -1695,7 +1695,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, }) require.NoError(t, err) @@ -1713,7 +1713,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, }) require.ErrorContains(t, err, "queue name is invalid") @@ -1730,7 +1730,7 @@ func Test_Client_InsertMany(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.ErrorIs(t, err, errNoDriverDBPool) @@ -1742,7 +1742,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{}) + count, err := client.InsertManyFast(ctx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1752,7 +1752,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1768,7 +1768,7 @@ func Test_Client_InsertMany(t *testing.T) { client.config.Workers = nil - _, err := client.InsertMany(ctx, []InsertManyParams{ + _, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1779,7 +1779,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") @@ -1787,7 +1787,7 @@ func Test_Client_InsertMany(t *testing.T) { }) } -func Test_Client_InsertManyTx(t *testing.T) { +func Test_Client_InsertManyFastTx(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1817,7 +1817,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1841,7 +1841,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) require.NoError(t, err) insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) @@ -1858,7 +1858,7 @@ func Test_Client_InsertManyTx(t *testing.T) { startClient(ctx, t, client) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) require.NoError(t, err) require.Equal(t, 1, count) @@ -1881,7 +1881,7 @@ func Test_Client_InsertManyTx(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.NoError(t, err) @@ -1893,7 +1893,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1903,7 +1903,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1919,7 +1919,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client.config.Workers = nil - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1930,7 +1930,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") @@ -1938,6 +1938,456 @@ func Test_Client_InsertManyTx(t *testing.T) { }) } +func Test_Client_InsertMany(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + return client, &testBundle{dbPool: dbPool} + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("TriggersImmediateWork", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + _, bundle := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + t.Cleanup(cancel) + + doneCh := make(chan struct{}) + close(doneCh) // don't need to block any jobs from completing + startedCh := make(chan int64) + + config := newTestConfig(t, makeAwaitCallback(startedCh, doneCh)) + config.FetchCooldown = 20 * time.Millisecond + config.FetchPollInterval = 20 * time.Second // essentially disable polling + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}, "another_queue": {MaxWorkers: 1}} + + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}}, + {Args: callbackArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Wait for the client to be ready by waiting for a job to be executed: + riversharedtest.WaitOrTimeoutN(t, startedCh, 2) + + // Now that we've run one job, we shouldn't take longer than the cooldown to + // fetch another after insertion. LISTEN/NOTIFY should ensure we find out + // about the inserted job much faster than the poll interval. + // + // Note: we specifically use a different queue to ensure that the notify + // limiter is immediately to fire on this queue. + results, err = client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + select { + case <-startedCh: + // As long as this is meaningfully shorter than the poll interval, we can be + // sure the re-fetch came from listen/notify. + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for another_queue job to start") + } + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("DoesNotTriggerInsertNotificationForNonAvailableJob", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + _, bundle := setup(t) + + config := newTestConfig(t, nil) + config.FetchCooldown = 5 * time.Second + config.FetchPollInterval = 5 * time.Second + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Queue `a` should be "due" to be triggered because it wasn't triggered above. + require.True(t, client.insertNotifyLimiter.ShouldTrigger("a")) + // Queue `b` should *not* be "due" to be triggered because it was triggered above. + require.False(t, client.insertNotifyLimiter.ShouldTrigger("b")) + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("WithInsertOptsScheduledAtZeroTime", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(noOpArgs{}).Kind()) + jobRow := jobs[0] + require.WithinDuration(t, time.Now(), jobRow.ScheduledAt, 2*time.Second) + }) + + t.Run("ErrorsOnInvalidQueueName", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, + }) + require.ErrorContains(t, err, "queue name is invalid") + require.Nil(t, results) + }) + + t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, _ = setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.ErrorIs(t, err, errNoDriverDBPool) + require.Nil(t, results) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + client.config.Workers = nil + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + +func Test_Client_InsertManyTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + tx pgx.Tx + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return client, &testBundle{ + tx: tx, + } + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("SetsScheduledAtToNowByDefault", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.WithinDuration(t, time.Now(), results[0].Job.ScheduledAt, 2*time.Second) + + insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, insertedJobs, 1) + require.Equal(t, rivertype.JobStateAvailable, insertedJobs[0].State) + require.WithinDuration(t, time.Now(), insertedJobs[0].ScheduledAt, 2*time.Second) + }) + + t.Run("SupportsScheduledJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + startClient(ctx, t, client) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateScheduled, results[0].Job.State) + require.WithinDuration(t, time.Now().Add(time.Minute), results[0].Job.ScheduledAt, 2*time.Second) + }) + + // A client's allowed to send nil to their driver so they can, for example, + // easily use test transactions in their test suite. + t.Run("WithDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + client.config.Workers = nil + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + func Test_Client_JobGet(t *testing.T) { t.Parallel() diff --git a/example_batch_insert_test.go b/example_batch_insert_test.go index a4d3f817..392d7786 100644 --- a/example_batch_insert_test.go +++ b/example_batch_insert_test.go @@ -67,7 +67,7 @@ func Example_batchInsert() { panic(err) } - count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ + results, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, @@ -77,7 +77,7 @@ func Example_batchInsert() { if err != nil { panic(err) } - fmt.Printf("Inserted %d jobs\n", count) + fmt.Printf("Inserted %d jobs\n", len(results)) waitForNJobs(subscribeChan, 5) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 55ef1b91..e8e3e76c 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -16,7 +16,7 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" - "github.com/riverqueue/river/rivershared/testfactory" //nolint:depguard + "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" @@ -919,6 +919,83 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobInsertFastMany", func(t *testing.T) { t.Parallel() + t.Run("AllArgs", func(t *testing.T) { + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)), + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + jobRows, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, jobRows, len(insertParams)) + + for i, job := range jobRows { + require.Equal(t, 0, job.Attempt) + require.Nil(t, job.AttemptedAt) + require.Empty(t, job.AttemptedBy) + require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) + require.Empty(t, job.Errors) + require.Nil(t, job.FinalizedAt) + require.Equal(t, "test_kind", job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, job.Priority) + require.Equal(t, rivercommon.QueueDefault, job.Queue) + requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.Equal(t, []string{"tag"}, job.Tags) + } + }) + + t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { + exec, _ := setup(ctx, t) + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, // explicit nil + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + results, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, results, len(insertParams)) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) + } + }) + }) + + t.Run("JobInsertFastManyNoReturning", func(t *testing.T) { + t.Parallel() + t.Run("AllArgs", func(t *testing.T) { exec, _ := setup(ctx, t) @@ -944,7 +1021,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, insertParams[i].ScheduledAt = &now } - count, err := exec.JobInsertFastMany(ctx, insertParams) + count, err := exec.JobInsertFastManyNoReturning(ctx, insertParams) require.NoError(t, err) require.Len(t, insertParams, count) @@ -987,7 +1064,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, } } - count, err := exec.JobInsertFastMany(ctx, insertParams) + count, err := exec.JobInsertFastManyNoReturning(ctx, insertParams) require.NoError(t, err) require.Len(t, insertParams, count) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index d47b3eee..e94c64a1 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -116,7 +116,8 @@ type Executor interface { JobGetByKindMany(ctx context.Context, kind []string) ([]*rivertype.JobRow, error) JobGetStuck(ctx context.Context, params *JobGetStuckParams) ([]*rivertype.JobRow, error) JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) - JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) + JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error) + JobInsertFastManyNoReturning(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 61eebd1d..dafdc2d9 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -592,7 +592,99 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast return &i, err } -const jobInsertFastMany = `-- name: JobInsertFastMany :execrows +const jobInsertFastMany = `-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertFastManyParams struct { + Args []string + Kind []string + MaxAttempts []int16 + Metadata []string + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobInsertFastMany, + pq.Array(arg.Args), + pq.Array(arg.Kind), + pq.Array(arg.MaxAttempts), + pq.Array(arg.Metadata), + pq.Array(arg.Priority), + pq.Array(arg.Queue), + pq.Array(arg.ScheduledAt), + pq.Array(arg.State), + pq.Array(arg.Tags), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const jobInsertFastManyNoReturning = `-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, @@ -620,7 +712,7 @@ INSERT INTO river_job( string_to_array(unnest($9::text[]), ',') ` -type JobInsertFastManyParams struct { +type JobInsertFastManyNoReturningParams struct { Args []string Kind []string MaxAttempts []int16 @@ -632,8 +724,8 @@ type JobInsertFastManyParams struct { Tags []string } -func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) (int64, error) { - result, err := db.ExecContext(ctx, jobInsertFastMany, +func (q *Queries) JobInsertFastManyNoReturning(ctx context.Context, db DBTX, arg *JobInsertFastManyNoReturningParams) (int64, error) { + result, err := db.ExecContext(ctx, jobInsertFastManyNoReturning, pq.Array(arg.Args), pq.Array(arg.Kind), pq.Array(arg.MaxAttempts), diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ba7599ab..b43076cd 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -216,8 +216,56 @@ func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { +func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ + Args: make([]string, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([]string, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := "{}" + + insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + +func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { + insertJobsParams := &dbsqlc.JobInsertFastManyNoReturningParams{ Args: make([]string, len(params)), Kind: make([]string, len(params)), MaxAttempts: make([]int16, len(params)), @@ -243,10 +291,12 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. tags = []string{} } - insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), "{}") + defaultObject := "{}" + + insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject) insertJobsParams.Kind[i] = params.Kind insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec - insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), "{}") + insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject) insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec insertJobsParams.Queue[i] = params.Queue insertJobsParams.ScheduledAt[i] = scheduledAt @@ -254,7 +304,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. insertJobsParams.Tags[i] = strings.Join(tags, ",") } - numInserted, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) + numInserted, err := dbsqlc.New().JobInsertFastManyNoReturning(ctx, e.dbtx, insertJobsParams) if err != nil { return 0, interpretError(err) } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index ff3decf2..e62ee160 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -223,7 +223,35 @@ INSERT INTO river_job( coalesce(@tags::varchar(255)[], '{}') ) RETURNING *; --- name: JobInsertFastMany :execrows +-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest(@args::jsonb[]), + unnest(@kind::text[]), + unnest(@max_attempts::smallint[]), + unnest(@metadata::jsonb[]), + unnest(@priority::smallint[]), + unnest(@queue::text[]), + unnest(@scheduled_at::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(@tags::text[]), ',') +RETURNING *; + +-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 5beabbe6..a2713bef 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -578,7 +578,96 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast return &i, err } -const jobInsertFastMany = `-- name: JobInsertFastMany :execrows +const jobInsertFastMany = `-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertFastManyParams struct { + Args [][]byte + Kind []string + MaxAttempts []int16 + Metadata [][]byte + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobInsertFastMany, + arg.Args, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const jobInsertFastManyNoReturning = `-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, @@ -606,7 +695,7 @@ INSERT INTO river_job( string_to_array(unnest($9::text[]), ',') ` -type JobInsertFastManyParams struct { +type JobInsertFastManyNoReturningParams struct { Args [][]byte Kind []string MaxAttempts []int16 @@ -618,8 +707,8 @@ type JobInsertFastManyParams struct { Tags []string } -func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) (int64, error) { - result, err := db.Exec(ctx, jobInsertFastMany, +func (q *Queries) JobInsertFastManyNoReturning(ctx context.Context, db DBTX, arg *JobInsertFastManyNoReturningParams) (int64, error) { + result, err := db.Exec(ctx, jobInsertFastManyNoReturning, arg.Args, arg.Kind, arg.MaxAttempts, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 36da4d81..659f60d1 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -211,7 +211,55 @@ func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { +func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertFastManyParams{ + Args: make([][]byte, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([][]byte, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := []byte("{}") + + insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + +func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { insertJobsParams := make([]*dbsqlc.JobInsertFastManyCopyFromParams, len(params)) now := time.Now() diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 400c9d40..8088e05a 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -4,6 +4,15 @@ // therefore omitted from the utilities in `slices`. package sliceutil +// DefaultIfEmpty returns the default slice if the input slice is nil or empty, +// otherwise it returns the input slice. +func DefaultIfEmpty[T any](input []T, defaultSlice []T) []T { + if len(input) == 0 { + return defaultSlice + } + return input +} + // GroupBy returns an object composed of keys generated from the results of // running each element of collection through keyFunc. func GroupBy[T any, U comparable](collection []T, keyFunc func(T) U) map[U][]T { diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index 4b2bc49c..4eb57e56 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -8,6 +8,21 @@ import ( "github.com/stretchr/testify/require" ) +func TestDefaultIfEmpty(t *testing.T) { + t.Parallel() + + result1 := DefaultIfEmpty([]int{1, 2, 3}, []int{4, 5, 6}) + result2 := DefaultIfEmpty([]int{}, []int{4, 5, 6}) + result3 := DefaultIfEmpty(nil, []int{4, 5, 6}) + + require.Len(t, result1, 3) + require.Len(t, result2, 3) + require.Len(t, result3, 3) + require.Equal(t, []int{1, 2, 3}, result1) + require.Equal(t, []int{4, 5, 6}, result2) + require.Equal(t, []int{4, 5, 6}, result3) +} + func TestGroupBy(t *testing.T) { t.Parallel()