diff --git a/rivertest/example_require_inserted_test.go b/rivertest/example_require_inserted_test.go index d83f4e75..e1c7ef57 100644 --- a/rivertest/example_require_inserted_test.go +++ b/rivertest/example_require_inserted_test.go @@ -54,7 +54,13 @@ func Example_requireInserted() { panic(err) } - _, err = riverClient.Insert(ctx, RequiredArgs{ + tx, err := dbPool.Begin(ctx) + if err != nil { + panic(err) + } + defer func() { _ = tx.Rollback(ctx) }() + + _, err = riverClient.InsertTx(ctx, tx, &RequiredArgs{ Message: "Hello.", }, nil) if err != nil { @@ -65,16 +71,23 @@ func Example_requireInserted() { // *testing.T that comes from a test's argument. t := &testing.T{} - job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil) + job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil) fmt.Printf("Test passed with message: %s\n", job.Args.Message) // Verify the same job again, and this time that it was inserted at the // default priority and default queue. - _ = rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, &rivertest.RequireInsertedOpts{ + _ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{ Priority: 1, Queue: river.DefaultQueue, }) + // Insert and verify one on a pool instead of transaction. + _, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil) + if err != nil { + panic(err) + } + _ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil) + // Output: // Test passed with message: Hello. } diff --git a/rivertest/example_require_many_inserted_test.go b/rivertest/example_require_many_inserted_test.go index 7ae424ea..a296870f 100644 --- a/rivertest/example_require_many_inserted_test.go +++ b/rivertest/example_require_many_inserted_test.go @@ -72,17 +72,23 @@ func Example_requireManyInserted() { panic(err) } - _, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first."}, nil) + tx, err := dbPool.Begin(ctx) if err != nil { panic(err) } + defer func() { _ = tx.Rollback(ctx) }() - _, err = riverClient.Insert(ctx, SecondRequiredArgs{Message: "Hello from second."}, nil) + _, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first."}, nil) if err != nil { panic(err) } - _, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first (again)."}, nil) + _, err = riverClient.InsertTx(ctx, tx, &SecondRequiredArgs{Message: "Hello from second."}, nil) + if err != nil { + panic(err) + } + + _, err = riverClient.InsertTx(ctx, tx, &FirstRequiredArgs{Message: "Hello from first (again)."}, nil) if err != nil { panic(err) } @@ -91,7 +97,7 @@ func Example_requireManyInserted() { // *testing.T that comes from a test's argument. t := &testing.T{} - jobs := rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{ + jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{ {Args: &FirstRequiredArgs{}}, {Args: &SecondRequiredArgs{}}, {Args: &FirstRequiredArgs{}}, @@ -102,13 +108,22 @@ func Example_requireManyInserted() { // Verify again, and this time that the second job was inserted at the // default priority and default queue. - _ = rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{ + _ = rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{ {Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{ Priority: 1, Queue: river.DefaultQueue, }}, }) + // Insert and verify one on a pool instead of transaction. + _, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil) + if err != nil { + panic(err) + } + _ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{ + {Args: &FirstRequiredArgs{}}, + }) + // Output: // Job 0 args: {"message": "Hello from first."} // Job 1 args: {"message": "Hello from second."} diff --git a/rivertest/rivertest.go b/rivertest/rivertest.go index 83192a0f..857fdf92 100644 --- a/rivertest/rivertest.go +++ b/rivertest/rivertest.go @@ -16,12 +16,24 @@ import ( "github.com/riverqueue/river" "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/util/sliceutil" + "github.com/riverqueue/river/riverdriver" ) -// DBTX is a database-like executor which is implemented by all of pgxpool.Pool, +type Tester[TTx any] struct { + driver riverdriver.Driver[TTx] +} + +func New[TTx any](driver riverdriver.Driver[TTx]) *Tester[TTx] { + return &Tester[TTx]{driver: driver} +} + +// func (t *Tester[TTx]) RequireInserted[T river.JobArgs](ctx context.Context, tb testing.TB, db dbtx, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] { +// } + +// dbtx is a database-like executor which is implemented by all of pgxpool.Pool, // pgx.Conn, and pgx.Tx. It's used to let this package's assertions be as // flexible as possible in what database argument they can take. -type DBTX interface { +type dbtx interface { CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error) @@ -79,14 +91,14 @@ type RequireInsertedOpts struct { Tags []string } -// RequireInserted is a test helper that verifies that a job of the given kind was -// inserted for work, failing the test if it wasn't. The dbtx argument can be -// any of a Pgx connection pool, connection, or transaction. If found, the -// inserted job is returned so that further assertions can be made against it. +// RequireInserted is a test helper that verifies that a job of the given kind +// was inserted for work, failing the test if it wasn't. If found, the inserted +// job is returned so that further assertions can be made against it. +// +// job := RequireInserted(ctx, t, riverpgxv5.New(dbPool), &Job1Args{}, nil) // -// func TestInsert(t *testing.T) { -// job := RequireInserted(ctx, t, poolOrConnOrTx, &Job1Args{}, nil) -// ... +// This variant takes a driver that wraps a database pool. See also +// RequireManyInsertedTx which takes a transaction. // // A RequireInsertedOpts struct can be provided as the last argument, and if it is, // its properties (e.g. max attempts, priority, queue name) will act as required @@ -95,26 +107,56 @@ type RequireInsertedOpts struct { // The assertion will fail if more than one job of the given kind was found // because at that point the job to return is ambiguous. Use RequireManyInserted // to cover that case instead. -func RequireInserted[T river.JobArgs](ctx context.Context, tb testing.TB, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] { +func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { tb.Helper() - return requireInserted(ctx, tb, dbtx, expectedJob, opts) + return requireInserted(ctx, tb, driver, expectedJob, opts) +} + +func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { + actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetDBPool(), expectedJob, opts) + if err != nil { + failure(t, "Internal failure: %s", err) + } + return actualArgs +} + +// RequireInsertedTx is a test helper that verifies that a job of the given kind +// was inserted for work, failing the test if it wasn't. If found, the inserted +// job is returned so that further assertions can be made against it. +// +// job := RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &Job1Args{}, nil) +// +// This variant takes a transaction. See also RequireInserted which takes a +// driver that wraps a database pool. +// +// A RequireInsertedOpts struct can be provided as the last argument, and if it is, +// its properties (e.g. max attempts, priority, queue name) will act as required +// assertions in the inserted job row. UniqueOpts is ignored. +// +// The assertion will fail if more than one job of the given kind was found +// because at that point the job to return is ambiguous. Use RequireManyInserted +// to cover that case instead. +func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { + tb.Helper() + return requireInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts) } // Internal function used by the tests so that the exported version can take // `testing.TB` instead of `testing.T`. -func requireInserted[T river.JobArgs](ctx context.Context, t testingT, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) *river.Job[T] { - actualArgs, err := requireInsertedErr(ctx, t, dbtx, expectedJob, opts) +func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { + var driver TDriver + actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapTx(tx), expectedJob, opts) if err != nil { failure(t, "Internal failure: %s", err) } return actualArgs } -func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) (*river.Job[T], error) { +func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, db dbtx, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) { queries := dbsqlc.New() // Returned ordered by ID. - dbJobs, err := queries.JobGetByKind(ctx, dbtx, expectedJob.Kind()) + dbJobs, err := queries.JobGetByKind(ctx, db, expectedJob.Kind()) if err != nil { return nil, fmt.Errorf("error querying jobs: %w", err) } @@ -131,7 +173,7 @@ func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx D jobRow := jobRowFromInternal(dbJobs[0]) - var actualArgs T + var actualArgs TArgs if err := json.Unmarshal(jobRow.EncodedArgs, &actualArgs); err != nil { return nil, fmt.Errorf("error unmarshaling job args: %w", err) } @@ -142,7 +184,7 @@ func requireInsertedErr[T river.JobArgs](ctx context.Context, t testingT, dbtx D } } - return &river.Job[T]{JobRow: jobRow, Args: actualArgs}, nil + return &river.Job[TArgs]{JobRow: jobRow, Args: actualArgs}, nil } // ExpectedJob is a single job to expect encapsulating job args and possible @@ -156,17 +198,50 @@ type ExpectedJob struct { Opts *RequireInsertedOpts } -// RequireManyInserted is a test helper that verifies that jobs of the given kinds -// were inserted for work, failing the test if they weren't, or were inserted in -// the wrong order. The dbtx argument can be any of a Pgx connection pool, -// connection, or transaction. If found, the inserted jobs are returned so that +// RequireManyInserted is a test helper that verifies that jobs of the given +// kinds were inserted for work, failing the test if they weren't, or were +// inserted in the wrong order. If found, the inserted jobs are returned so that // further assertions can be made against them. // -// func TestInsertMany(t *testing.T) { -// job := RequireManyInserted(ctx, t, poolOrConnOrTx, []river.JobArgs{ -// &Job1Args{}, -// }) -// ... +// job := RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []river.JobArgs{ +// &Job1Args{}, +// }) +// +// This variant takes a driver that wraps a database pool. See also +// RequireManyInsertedTx which takes a transaction. +// +// A RequireInsertedOpts struct can be provided for each expected job, and if it is, +// its properties (e.g. max attempts, priority, queue name) will act as required +// assertions for the corresponding inserted job row. UniqueOpts is ignored. +// +// The assertion expects emitted jobs to have occurred exactly in the order and +// the number specified, and will fail in case this expectation isn't met. So if +// a job of a certain kind is emitted multiple times, it must be expected +// multiple times. +func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow { + tb.Helper() + return requireManyInserted(ctx, tb, driver, expectedJobs) +} + +func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow { + actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetDBPool(), expectedJobs) + if err != nil { + failure(t, "Internal failure: %s", err) + } + return actualArgs +} + +// RequireManyInsertedTx is a test helper that verifies that jobs of the given +// kinds were inserted for work, failing the test if they weren't, or were +// inserted in the wrong order. If found, the inserted jobs are returned so that +// further assertions can be made against them. +// +// job := RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []river.JobArgs{ +// &Job1Args{}, +// }) +// +// This variant takes a transaction. See also RequireManyInserted which takes a +// driver that wraps a database pool. // // A RequireInsertedOpts struct can be provided for each expected job, and if it is, // its properties (e.g. max attempts, priority, queue name) will act as required @@ -176,28 +251,29 @@ type ExpectedJob struct { // the number specified, and will fail in case this expectation isn't met. So if // a job of a certain kind is emitted multiple times, it must be expected // multiple times. -func RequireManyInserted(ctx context.Context, tb testing.TB, dbtx DBTX, expectedJobs []ExpectedJob) []*river.JobRow { +func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow { tb.Helper() - return requireManyInserted(ctx, tb, dbtx, expectedJobs) + return requireManyInsertedTx[TDriver](ctx, tb, tx, expectedJobs) } // Internal function used by the tests so that the exported version can take // `testing.TB` instead of `testing.T`. -func requireManyInserted(ctx context.Context, t testingT, dbtx DBTX, expectedJobs []ExpectedJob) []*river.JobRow { - actualArgs, err := requireManyInsertedErr(ctx, t, dbtx, expectedJobs) +func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow { + var driver TDriver + actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapTx(tx), expectedJobs) if err != nil { failure(t, "Internal failure: %s", err) } return actualArgs } -func requireManyInsertedErr(ctx context.Context, t testingT, dbtx DBTX, expectedJobs []ExpectedJob) ([]*river.JobRow, error) { +func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, db dbtx, expectedJobs []ExpectedJob) ([]*river.JobRow, error) { queries := dbsqlc.New() expectedArgsKinds := sliceutil.Map(expectedJobs, func(j ExpectedJob) string { return j.Args.Kind() }) // Returned ordered by ID. - dbJobs, err := queries.JobGetByKindMany(ctx, dbtx, expectedArgsKinds) + dbJobs, err := queries.JobGetByKindMany(ctx, db, expectedArgsKinds) if err != nil { return nil, fmt.Errorf("error querying jobs: %w", err) } diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index edd62803..65b0f255 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -43,6 +43,8 @@ type Job2Worker struct { func (w *Job2Worker) Work(ctx context.Context, j *river.Job[Job2Args]) error { return nil } +// The tests for this function are quite minimal because it uses the same +// implementation as the `*Tx` variant, so most of the test happens below. func TestRequireInserted(t *testing.T) { t.Parallel() @@ -88,7 +90,64 @@ func TestRequireInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - job := requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, nil) + job := requireInserted(ctx, t, riverpgxv5.New(bundle.dbPool), &Job1Args{}, nil) + require.False(t, bundle.mockT.Failed) + require.Equal(t, "foo", job.Args.String) + }) +} + +func TestRequireInsertedTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + mockT *MockT + tx pgx.Tx + } + + setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + + workers := river.NewWorkers() + river.AddWorker(workers, &Job1Worker{}) + river.AddWorker(workers, &Job2Worker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.DefaultQueue: {MaxWorkers: 100}, + }, + Workers: workers, + }) + require.NoError(t, err) + + err = riverClient.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) }) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return riverClient, &testBundle{ + dbPool: dbPool, + mockT: NewMockT(t), + tx: tx, + } + } + + t.Run("VerifiesInsert", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) + require.NoError(t, err) + + job := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job.Args.String) }) @@ -104,34 +163,35 @@ func TestRequireInserted(t *testing.T) { _, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil) require.NoError(t, err) - job1 := requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, nil) + job1 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job1.Args.String) - job2 := requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job2Args{}, nil) + job2 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job2Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, 123, job2.Args.Int) }) - t.Run("VerifiesTransaction", func(t *testing.T) { + t.Run("TransactionVisibility", func(t *testing.T) { t.Parallel() riverClient, bundle := setup(t) + // Start a second transaction with different visibility. tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) t.Cleanup(func() { tx.Rollback(ctx) }) - _, err = riverClient.InsertTx(ctx, tx, Job1Args{String: "foo"}, nil) + _, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - job := requireInserted(ctx, bundle.mockT, tx, &Job1Args{}, nil) + // Visible in the original transaction. + job := requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job.Args.String) - // Fails on the connection pool because the job isn't visible outside of - // the transaction. - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, nil) + // Not visible in the second transaction. + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) }) @@ -144,7 +204,7 @@ func TestRequireInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, &RequireInsertedOpts{ MaxAttempts: river.DefaultMaxAttempts, Priority: 1, Queue: river.DefaultQueue, @@ -161,7 +221,7 @@ func TestRequireInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 2, Queue: "another-queue", @@ -179,7 +239,7 @@ func TestRequireInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job2Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("No jobs found with kind: job2")+"\n", @@ -197,7 +257,7 @@ func TestRequireInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("More than one job found with kind: job1 (you might want RequireManyInserted instead)")+"\n", @@ -209,7 +269,7 @@ func TestRequireInserted(t *testing.T) { _, bundle := setup(t) - _ = requireInserted(ctx, bundle.mockT, bundle.dbPool, &Job1Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("No jobs found with kind: job1")+"\n", @@ -234,7 +294,7 @@ func TestRequireInserted(t *testing.T) { // Max attempts { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 77, Priority: 2, Queue: "another-queue", @@ -251,7 +311,7 @@ func TestRequireInserted(t *testing.T) { // Priority { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 3, Queue: "another-queue", @@ -268,7 +328,7 @@ func TestRequireInserted(t *testing.T) { // Queue { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 2, Queue: "wrong-queue", @@ -285,7 +345,7 @@ func TestRequireInserted(t *testing.T) { // Scheduled at { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 2, Queue: "another-queue", @@ -302,7 +362,7 @@ func TestRequireInserted(t *testing.T) { // State { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 2, Queue: "another-queue", @@ -319,7 +379,7 @@ func TestRequireInserted(t *testing.T) { // Tags { mockT := NewMockT(t) - _ = requireInserted(ctx, mockT, bundle.dbPool, &Job2Args{}, &RequireInsertedOpts{ + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, &RequireInsertedOpts{ MaxAttempts: 78, Priority: 2, Queue: "another-queue", @@ -335,6 +395,8 @@ func TestRequireInserted(t *testing.T) { }) } +// The tests for this function are quite minimal because it uses the same +// implementation as the `*Tx` variant, so most of the test happens below. func TestRequireManyInserted(t *testing.T) { t.Parallel() @@ -380,34 +442,94 @@ func TestRequireManyInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + jobs := requireManyInserted(ctx, bundle.mockT, riverpgxv5.New(bundle.dbPool), []ExpectedJob{ + {Args: &Job1Args{}}, + }) + require.False(t, bundle.mockT.Failed) + require.Equal(t, "job1", jobs[0].Kind) + }) +} + +func TestRequireManyInsertedTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + mockT *MockT + tx pgx.Tx + } + + setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { //nolint:dupl + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + + workers := river.NewWorkers() + river.AddWorker(workers, &Job1Worker{}) + river.AddWorker(workers, &Job2Worker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.DefaultQueue: {MaxWorkers: 100}, + }, + Workers: workers, + }) + require.NoError(t, err) + + err = riverClient.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, riverClient.Stop(ctx)) }) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return riverClient, &testBundle{ + dbPool: dbPool, + mockT: NewMockT(t), + tx: tx, + } + } + + t.Run("VerifiesInsert", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) + require.NoError(t, err) + + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.False(t, bundle.mockT.Failed) require.Equal(t, "job1", jobs[0].Kind) }) - t.Run("VerifiesTransaction", func(t *testing.T) { + t.Run("TransactionVisibility", func(t *testing.T) { t.Parallel() riverClient, bundle := setup(t) + // Start a second transaction with different visibility. tx, err := bundle.dbPool.Begin(ctx) require.NoError(t, err) t.Cleanup(func() { tx.Rollback(ctx) }) - _, err = riverClient.InsertTx(ctx, tx, Job1Args{String: "foo"}, nil) + _, err = riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, tx, []ExpectedJob{ + // Visible in the original transaction. + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.False(t, bundle.mockT.Failed) require.Equal(t, "job1", jobs[0].Kind) - // Fails on the connection pool because the job isn't visible outside of - // the transaction. - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + // Not visible in the second transaction. + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -424,7 +546,7 @@ func TestRequireManyInserted(t *testing.T) { _, err = riverClient.Insert(ctx, Job2Args{Int: 123}, nil) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job2Args{}}, }) @@ -444,7 +566,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, }) @@ -467,7 +589,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, {Args: &Job2Args{}}, @@ -491,7 +613,7 @@ func TestRequireManyInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ { Args: &Job1Args{}, Opts: &RequireInsertedOpts{ @@ -513,7 +635,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -533,7 +655,7 @@ func TestRequireManyInserted(t *testing.T) { _, bundle := setup(t) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -553,7 +675,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -573,7 +695,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job2Args{}}, }) @@ -597,7 +719,7 @@ func TestRequireManyInserted(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInserted(ctx, bundle.mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, {Args: &Job2Args{}}, @@ -628,7 +750,7 @@ func TestRequireManyInserted(t *testing.T) { // Max attempts { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -650,7 +772,7 @@ func TestRequireManyInserted(t *testing.T) { // Priority { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -672,7 +794,7 @@ func TestRequireManyInserted(t *testing.T) { // Queue { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -694,7 +816,7 @@ func TestRequireManyInserted(t *testing.T) { // Scheduled at { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -716,7 +838,7 @@ func TestRequireManyInserted(t *testing.T) { // State { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -738,7 +860,7 @@ func TestRequireManyInserted(t *testing.T) { // Tags { mockT := NewMockT(t) - _ = requireManyInserted(ctx, mockT, bundle.dbPool, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{