Skip to content

Commit

Permalink
rename InsertMany to InsertManyFast
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 12, 2024
1 parent 6bb7e88 commit f93246e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
18 changes: 9 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ type InsertManyParams struct {
InsertOpts *InsertOpts
}

// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
Expand All @@ -1345,12 +1345,12 @@ type InsertManyParams struct {
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) {
func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) {
if !c.driver.HasPool() {
return 0, errNoDriverDBPool
}

insertParams, err := c.insertManyParams(params)
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}
Expand All @@ -1362,7 +1362,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
}
defer tx.Rollback(ctx)

inserted, err := c.insertFastMany(ctx, tx, insertParams)
inserted, err := c.insertManyFast(ctx, tx, insertParams)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1395,17 +1395,17 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyParams(params)
func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) {
insertParams, err := c.insertManyFastParams(params)
if err != nil {
return 0, err
}

exec := c.driver.UnwrapExecutor(tx)
return c.insertFastMany(ctx, exec, insertParams)
return c.insertManyFast(ctx, exec, insertParams)
}

func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
inserted, err := tx.JobInsertFastMany(ctx, insertParams)
if err != nil {
return inserted, err
Expand All @@ -1425,7 +1425,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}
Expand Down
42 changes: 21 additions & 21 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func Test_Client_InsertTx(t *testing.T) {
})
}

func Test_Client_InsertMany(t *testing.T) {
func Test_Client_InsertManyFast(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -1592,7 +1592,7 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}},
{Args: noOpArgs{}},
})
Expand Down Expand Up @@ -1627,7 +1627,7 @@ func Test_Client_InsertMany(t *testing.T) {
startClient(ctx, t, client)
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: callbackArgs{}},
{Args: callbackArgs{}},
})
Expand All @@ -1643,7 +1643,7 @@ func Test_Client_InsertMany(t *testing.T) {
//
// Note: we specifically use a different queue to ensure that the notify
// limiter is immediately to fire on this queue.
count, err = client.InsertMany(ctx, []InsertManyParams{
count, err = client.InsertManyFast(ctx, []InsertManyParams{
{Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}},
})
require.NoError(t, err)
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func Test_Client_InsertMany(t *testing.T) {
startClient(ctx, t, client)
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}},
{Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}},
})
Expand All @@ -1695,7 +1695,7 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}},
})
require.NoError(t, err)
Expand All @@ -1713,7 +1713,7 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}},
})
require.ErrorContains(t, err, "queue name is invalid")
Expand All @@ -1730,7 +1730,7 @@ func Test_Client_InsertMany(t *testing.T) {
})
require.NoError(t, err)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.ErrorIs(t, err, errNoDriverDBPool)
Expand All @@ -1742,7 +1742,7 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{})
count, err := client.InsertManyFast(ctx, []InsertManyParams{})
require.EqualError(t, err, "no jobs to insert")
require.Equal(t, 0, count)
})
Expand All @@ -1752,7 +1752,7 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
var unknownJobKindErr *UnknownJobKindError
Expand All @@ -1768,7 +1768,7 @@ func Test_Client_InsertMany(t *testing.T) {

client.config.Workers = nil

_, err := client.InsertMany(ctx, []InsertManyParams{
_, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
Expand All @@ -1779,15 +1779,15 @@ func Test_Client_InsertMany(t *testing.T) {

client, _ := setup(t)

count, err := client.InsertMany(ctx, []InsertManyParams{
count, err := client.InsertManyFast(ctx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
require.Equal(t, 0, count)
})
}

func Test_Client_InsertManyTx(t *testing.T) {
func Test_Client_InsertManyFastTx(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand Down Expand Up @@ -1817,7 +1817,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client, bundle := setup(t)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}},
{Args: noOpArgs{}},
})
Expand All @@ -1841,7 +1841,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client, bundle := setup(t)

_, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}})
_, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}})
require.NoError(t, err)

insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()})
Expand All @@ -1858,7 +1858,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

startClient(ctx, t, client)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}})
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}})
require.NoError(t, err)
require.Equal(t, 1, count)

Expand All @@ -1881,7 +1881,7 @@ func Test_Client_InsertManyTx(t *testing.T) {
})
require.NoError(t, err)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}},
})
require.NoError(t, err)
Expand All @@ -1893,7 +1893,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client, bundle := setup(t)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{})
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{})
require.EqualError(t, err, "no jobs to insert")
require.Equal(t, 0, count)
})
Expand All @@ -1903,7 +1903,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client, bundle := setup(t)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
var unknownJobKindErr *UnknownJobKindError
Expand All @@ -1919,7 +1919,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client.config.Workers = nil

_, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
_, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: unregisteredJobArgs{}},
})
require.NoError(t, err)
Expand All @@ -1930,7 +1930,7 @@ func Test_Client_InsertManyTx(t *testing.T) {

client, bundle := setup(t)

count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{
count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{
{Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}},
})
require.EqualError(t, err, "UniqueOpts are not supported for batch inserts")
Expand Down
2 changes: 1 addition & 1 deletion example_batch_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Example_batchInsert() {
panic(err)
}

count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{
count, err := riverClient.InsertManyFast(ctx, []river.InsertManyParams{
{Args: BatchInsertArgs{}},
{Args: BatchInsertArgs{}},
{Args: BatchInsertArgs{}},
Expand Down
14 changes: 7 additions & 7 deletions rivertest/rivertest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestRequireNotInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand Down Expand Up @@ -738,7 +738,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -758,7 +758,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
})
Expand All @@ -867,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job2Args{Int: 123}},
{Args: Job1Args{String: "foo"}},
})
Expand All @@ -888,7 +888,7 @@ func TestRequireManyInsertedTx(t *testing.T) {

riverClient, bundle := setup(t)

_, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{
_, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{
{Args: Job1Args{String: "foo"}},
{Args: Job1Args{String: "bar"}},
{Args: Job2Args{Int: 123}},
Expand Down

0 comments on commit f93246e

Please sign in to comment.