From 26bbdca977402e743d2ebefe8294d9203001c876 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 18 Nov 2023 13:36:46 -0800 Subject: [PATCH] Fix producer sample so that it picks up usable defaults The producer sample currently fails with this error: > `time=2023-11-18T13:37:05.945-08:00 level=ERROR msg=failed error="error inserting jobs: error inserting many jobs: ERROR: new row for relation \"river_job\" violates check constraint \"max_attempts_is_positive\" (SQLSTATE 23514)"` Digging into it, it turns out that this is because (1) batch insert's raw query uses `COPY FROM` so it's not allowed to have `coalesce` in it like the single insert, (2) defaults are added for bulk inserts by River's Go code, but at the client level and not the adapter level, and (3) the producer sample manually creates an adapter to insert jobs with. So it worked around the use of normal client inserts using an adapter instead, wasn't adding its own `MaxAttempts` and no default was provided, so it failed the on the positive check constraint. I could fix this by putting a `MaxAttempts` into the insert parameters, but I don't think this is the right way to go -- instead, the sample should just bulk insert through the client. There might be some nominal performance benefit to inserting through the adapter instead, but even if there is, I don't think it'd be that honest to use it in a benchmark because our users would be inserting through the client anyway. Using the client also simplifies the code quite a bit. --- internal/cmd/producersample/main.go | 77 ++++++----------------------- 1 file changed, 16 insertions(+), 61 deletions(-) diff --git a/internal/cmd/producersample/main.go b/internal/cmd/producersample/main.go index b8b6c539..46c87108 100644 --- a/internal/cmd/producersample/main.go +++ b/internal/cmd/producersample/main.go @@ -17,9 +17,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" - "github.com/riverqueue/river/internal/baseservice" - "github.com/riverqueue/river/internal/dbadapter" - "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) @@ -58,30 +55,6 @@ func (p *producerSample) produce(ctx context.Context) error { myJobWorker := &MyJobWorker{logger: p.logger} river.AddWorker(workers, myJobWorker) - config := &river.Config{ - FetchCooldown: 2 * time.Millisecond, - Logger: p.logger, - Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}}, - Workers: workers, - } - - // TODO: shouldn't need to pass worker name to adapter as well as producer. - // Also the name "WorkerName" is sort of using conflicting terminology (the - // "producer" has a "worker name"?). - // - // Probably shouldn't have to set up the adapter manually outside of the - // Client. - adapter := dbadapter.NewStandardAdapter( - &baseservice.Archetype{ - Logger: p.logger, - TimeNowUTC: func() time.Time { return time.Now().UTC() }, - }, - &dbadapter.StandardAdapterConfig{ - Executor: dbPool, - WorkerName: "Worker0", - }, - ) - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -91,16 +64,22 @@ func (p *producerSample) produce(ctx context.Context) error { return err } - if err := p.insertBulkJobs(ctx, adapter, jobCount); err != nil { + client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + FetchCooldown: 2 * time.Millisecond, + Logger: p.logger, + Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}}, + Workers: workers, + }) + if err != nil { + return fmt.Errorf("error creating river client: %w", err) + } + + if err := p.insertBulkJobs(ctx, client, jobCount); err != nil { return err } p.logger.Info(p.name + ": Finished inserting jobs; working") - client, err := river.NewClient(riverpgxv5.New(dbPool), config) - if err != nil { - return fmt.Errorf("error creating river client: %w", err) - } if err := client.Start(ctx); err != nil { return fmt.Errorf("error running river client: %w", err) } @@ -130,19 +109,6 @@ func (p *producerSample) produce(ctx context.Context) error { } }() - // go func() { - // for { - // select { - // case <-prodCtx.Done(): - // return - // case <-time.After(1 * time.Second): - // if err := insertBulkJobs(ctx, adapter, 1000); err != nil { - // return err - // } - // } - // } - // }() - done := make(chan struct{}) go func() { @@ -197,14 +163,14 @@ func (p *producerSample) initiateHardShutdown(ctx context.Context, client *river return nil } -func (p *producerSample) insertBulkJobs(ctx context.Context, adapter dbadapter.Adapter, jobCount int) error { +func (p *producerSample) insertBulkJobs(ctx context.Context, client *river.Client[pgx.Tx], jobCount int) error { p.logger.Info(p.name+": Inserting jobs", "num_jobs", jobCount) - insertParams := make([]*dbadapter.JobInsertParams, jobCount) + insertParams := make([]river.InsertManyParams, jobCount) for i := 0; i < jobCount; i++ { - insertParams[i] = makeFakeJobParams(i) + insertParams[i] = river.InsertManyParams{Args: &MyJobArgs{JobNum: i}} } - inserted, err := adapter.JobInsertMany(ctx, insertParams) + inserted, err := client.InsertMany(ctx, insertParams) if err != nil { return fmt.Errorf("error inserting jobs: %w", err) } @@ -230,19 +196,8 @@ func getDatabaseURL() string { return "postgres:///river_testdb_example?sslmode=disable" } -func makeFakeJobParams(i int) *dbadapter.JobInsertParams { - return &dbadapter.JobInsertParams{ - EncodedArgs: []byte(fmt.Sprintf(`{"job_num": %d}`, i)), - Kind: "MyJob", - Metadata: []byte("{}"), - Priority: river.DefaultPriority, - Queue: river.DefaultQueue, - State: dbsqlc.JobStateAvailable, - } -} - type MyJobArgs struct { - JobNum int64 `json:"job_num"` + JobNum int `json:"job_num"` } func (MyJobArgs) Kind() string { return "MyJob" }