Skip to content

Commit

Permalink
Fix producer sample so that it picks up usable defaults (#38)
Browse files Browse the repository at this point in the history
The producer sample currently fails with this error:

> `time=2023-11-18T13:37:05.945-08:00 level=ERROR msg=failed error="error inserting jobs: error inserting many jobs: ERROR: new row for relation \"river_job\" violates check constraint \"max_attempts_is_positive\" (SQLSTATE 23514)"`

Digging into it, it turns out that this is because (1) batch insert's
raw query uses `COPY FROM` so it's not allowed to have `coalesce` in it
like the single insert, (2) defaults are added for bulk inserts by
River's Go code, but at the client level and not the adapter level, and
(3) the producer sample manually creates an adapter to insert jobs with.

So it worked around the use of normal client inserts using an adapter
instead, wasn't adding its own `MaxAttempts` and no default was provided,
so it failed the on the positive check constraint.

I could fix this by putting a `MaxAttempts` into the insert parameters,
but I don't think this is the right way to go -- instead, the sample
should just bulk insert through the client. There might be some nominal
performance benefit to inserting through the adapter instead, but even
if there is, I don't think it'd be that honest to use it in a benchmark
because our users would be inserting through the client anyway. Using
the client also simplifies the code quite a bit.
  • Loading branch information
brandur authored Nov 18, 2023
1 parent 37b4a72 commit fd4b93e
Showing 1 changed file with 16 additions and 61 deletions.
77 changes: 16 additions & 61 deletions internal/cmd/producersample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/dbadapter"
"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

Expand Down Expand Up @@ -58,30 +55,6 @@ func (p *producerSample) produce(ctx context.Context) error {
myJobWorker := &MyJobWorker{logger: p.logger}
river.AddWorker(workers, myJobWorker)

config := &river.Config{
FetchCooldown: 2 * time.Millisecond,
Logger: p.logger,
Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}},
Workers: workers,
}

// TODO: shouldn't need to pass worker name to adapter as well as producer.
// Also the name "WorkerName" is sort of using conflicting terminology (the
// "producer" has a "worker name"?).
//
// Probably shouldn't have to set up the adapter manually outside of the
// Client.
adapter := dbadapter.NewStandardAdapter(
&baseservice.Archetype{
Logger: p.logger,
TimeNowUTC: func() time.Time { return time.Now().UTC() },
},
&dbadapter.StandardAdapterConfig{
Executor: dbPool,
WorkerName: "Worker0",
},
)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -91,16 +64,22 @@ func (p *producerSample) produce(ctx context.Context) error {
return err
}

if err := p.insertBulkJobs(ctx, adapter, jobCount); err != nil {
client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
FetchCooldown: 2 * time.Millisecond,
Logger: p.logger,
Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}},
Workers: workers,
})
if err != nil {
return fmt.Errorf("error creating river client: %w", err)
}

if err := p.insertBulkJobs(ctx, client, jobCount); err != nil {
return err
}

p.logger.Info(p.name + ": Finished inserting jobs; working")

client, err := river.NewClient(riverpgxv5.New(dbPool), config)
if err != nil {
return fmt.Errorf("error creating river client: %w", err)
}
if err := client.Start(ctx); err != nil {
return fmt.Errorf("error running river client: %w", err)
}
Expand Down Expand Up @@ -130,19 +109,6 @@ func (p *producerSample) produce(ctx context.Context) error {
}
}()

// go func() {
// for {
// select {
// case <-prodCtx.Done():
// return
// case <-time.After(1 * time.Second):
// if err := insertBulkJobs(ctx, adapter, 1000); err != nil {
// return err
// }
// }
// }
// }()

done := make(chan struct{})

go func() {
Expand Down Expand Up @@ -197,14 +163,14 @@ func (p *producerSample) initiateHardShutdown(ctx context.Context, client *river
return nil
}

func (p *producerSample) insertBulkJobs(ctx context.Context, adapter dbadapter.Adapter, jobCount int) error {
func (p *producerSample) insertBulkJobs(ctx context.Context, client *river.Client[pgx.Tx], jobCount int) error {
p.logger.Info(p.name+": Inserting jobs", "num_jobs", jobCount)

insertParams := make([]*dbadapter.JobInsertParams, jobCount)
insertParams := make([]river.InsertManyParams, jobCount)
for i := 0; i < jobCount; i++ {
insertParams[i] = makeFakeJobParams(i)
insertParams[i] = river.InsertManyParams{Args: &MyJobArgs{JobNum: i}}
}
inserted, err := adapter.JobInsertMany(ctx, insertParams)
inserted, err := client.InsertMany(ctx, insertParams)
if err != nil {
return fmt.Errorf("error inserting jobs: %w", err)
}
Expand All @@ -230,19 +196,8 @@ func getDatabaseURL() string {
return "postgres:///river_testdb_example?sslmode=disable"
}

func makeFakeJobParams(i int) *dbadapter.JobInsertParams {
return &dbadapter.JobInsertParams{
EncodedArgs: []byte(fmt.Sprintf(`{"job_num": %d}`, i)),
Kind: "MyJob",
Metadata: []byte("{}"),
Priority: river.DefaultPriority,
Queue: river.DefaultQueue,
State: dbsqlc.JobStateAvailable,
}
}

type MyJobArgs struct {
JobNum int64 `json:"job_num"`
JobNum int `json:"job_num"`
}

func (MyJobArgs) Kind() string { return "MyJob" }
Expand Down

0 comments on commit fd4b93e

Please sign in to comment.