diff --git a/client.go b/client.go index 4aa69df9..a73d35fd 100644 --- a/client.go +++ b/client.go @@ -1326,7 +1326,7 @@ type InsertManyParams struct { InsertOpts *InsertOpts } -// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism, +// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, // making the operation quite fast and memory efficient. Each job is inserted as // an InsertManyParams tuple, which takes job args along with an optional set of // insert options, which override insert options provided by an @@ -1345,12 +1345,12 @@ type InsertManyParams struct { // Job uniqueness is not respected when using InsertMany due to unique inserts // using an internal transaction and advisory lock that might lead to // significant lock contention. Insert unique jobs using Insert instead. -func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) { +func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) { if !c.driver.HasPool() { return 0, errNoDriverDBPool } - insertParams, err := c.insertManyParams(params) + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } @@ -1362,7 +1362,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) } defer tx.Rollback(ctx) - inserted, err := c.insertFastMany(ctx, tx, insertParams) + inserted, err := c.insertManyFast(ctx, tx, insertParams) if err != nil { return 0, err } @@ -1395,17 +1395,17 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) // This variant lets a caller insert jobs atomically alongside other database // changes. An inserted job isn't visible to be worked until the transaction // commits, and if the transaction rolls back, so too is the inserted job. -func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { - insertParams, err := c.insertManyParams(params) +func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } exec := c.driver.UnwrapExecutor(tx) - return c.insertFastMany(ctx, exec, insertParams) + return c.insertManyFast(ctx, exec, insertParams) } -func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { +func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { inserted, err := tx.JobInsertFastMany(ctx, insertParams) if err != nil { return inserted, err @@ -1425,7 +1425,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo // Validates input parameters for an a batch insert operation and generates a // set of batch insert parameters. -func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { +func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { if len(params) < 1 { return nil, errors.New("no jobs to insert") } diff --git a/client_test.go b/client_test.go index ff7c1e80..993da3b0 100644 --- a/client_test.go +++ b/client_test.go @@ -1568,7 +1568,7 @@ func Test_Client_InsertTx(t *testing.T) { }) } -func Test_Client_InsertMany(t *testing.T) { +func Test_Client_InsertManyFast(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1592,7 +1592,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1627,7 +1627,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}}, {Args: callbackArgs{}}, }) @@ -1643,7 +1643,7 @@ func Test_Client_InsertMany(t *testing.T) { // // Note: we specifically use a different queue to ensure that the notify // limiter is immediately to fire on this queue. - count, err = client.InsertMany(ctx, []InsertManyParams{ + count, err = client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, }) require.NoError(t, err) @@ -1675,7 +1675,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, }) @@ -1695,7 +1695,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, }) require.NoError(t, err) @@ -1713,7 +1713,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, }) require.ErrorContains(t, err, "queue name is invalid") @@ -1730,7 +1730,7 @@ func Test_Client_InsertMany(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.ErrorIs(t, err, errNoDriverDBPool) @@ -1742,7 +1742,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{}) + count, err := client.InsertManyFast(ctx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1752,7 +1752,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1768,7 +1768,7 @@ func Test_Client_InsertMany(t *testing.T) { client.config.Workers = nil - _, err := client.InsertMany(ctx, []InsertManyParams{ + _, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1779,7 +1779,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") @@ -1787,7 +1787,7 @@ func Test_Client_InsertMany(t *testing.T) { }) } -func Test_Client_InsertManyTx(t *testing.T) { +func Test_Client_InsertManyFastTx(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1817,7 +1817,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1841,7 +1841,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) require.NoError(t, err) insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) @@ -1858,7 +1858,7 @@ func Test_Client_InsertManyTx(t *testing.T) { startClient(ctx, t, client) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) require.NoError(t, err) require.Equal(t, 1, count) @@ -1881,7 +1881,7 @@ func Test_Client_InsertManyTx(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.NoError(t, err) @@ -1893,7 +1893,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1903,7 +1903,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1919,7 +1919,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client.config.Workers = nil - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1930,7 +1930,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") diff --git a/example_batch_insert_test.go b/example_batch_insert_test.go index a4d3f817..a3b8274d 100644 --- a/example_batch_insert_test.go +++ b/example_batch_insert_test.go @@ -67,7 +67,7 @@ func Example_batchInsert() { panic(err) } - count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ + count, err := riverClient.InsertManyFast(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index c51a59db..7201ed0a 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -168,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -440,7 +440,7 @@ func TestRequireNotInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -738,7 +738,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -758,7 +758,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, @@ -847,7 +847,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -867,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job2Args{Int: 123}}, {Args: Job1Args{String: "foo"}}, }) @@ -888,7 +888,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}},