diff --git a/internal/cmd/producersample/main.go b/internal/cmd/producersample/main.go index b8b6c539..46c87108 100644 --- a/internal/cmd/producersample/main.go +++ b/internal/cmd/producersample/main.go @@ -17,9 +17,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/riverqueue/river" - "github.com/riverqueue/river/internal/baseservice" - "github.com/riverqueue/river/internal/dbadapter" - "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/riverdriver/riverpgxv5" ) @@ -58,30 +55,6 @@ func (p *producerSample) produce(ctx context.Context) error { myJobWorker := &MyJobWorker{logger: p.logger} river.AddWorker(workers, myJobWorker) - config := &river.Config{ - FetchCooldown: 2 * time.Millisecond, - Logger: p.logger, - Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}}, - Workers: workers, - } - - // TODO: shouldn't need to pass worker name to adapter as well as producer. - // Also the name "WorkerName" is sort of using conflicting terminology (the - // "producer" has a "worker name"?). - // - // Probably shouldn't have to set up the adapter manually outside of the - // Client. - adapter := dbadapter.NewStandardAdapter( - &baseservice.Archetype{ - Logger: p.logger, - TimeNowUTC: func() time.Time { return time.Now().UTC() }, - }, - &dbadapter.StandardAdapterConfig{ - Executor: dbPool, - WorkerName: "Worker0", - }, - ) - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -91,16 +64,22 @@ func (p *producerSample) produce(ctx context.Context) error { return err } - if err := p.insertBulkJobs(ctx, adapter, jobCount); err != nil { + client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + FetchCooldown: 2 * time.Millisecond, + Logger: p.logger, + Queues: map[string]river.QueueConfig{river.DefaultQueue: {MaxWorkers: numWorkers}}, + Workers: workers, + }) + if err != nil { + return fmt.Errorf("error creating river client: %w", err) + } + + if err := p.insertBulkJobs(ctx, client, jobCount); err != nil { return err } p.logger.Info(p.name + ": Finished inserting jobs; working") - client, err := river.NewClient(riverpgxv5.New(dbPool), config) - if err != nil { - return fmt.Errorf("error creating river client: %w", err) - } if err := client.Start(ctx); err != nil { return fmt.Errorf("error running river client: %w", err) } @@ -130,19 +109,6 @@ func (p *producerSample) produce(ctx context.Context) error { } }() - // go func() { - // for { - // select { - // case <-prodCtx.Done(): - // return - // case <-time.After(1 * time.Second): - // if err := insertBulkJobs(ctx, adapter, 1000); err != nil { - // return err - // } - // } - // } - // }() - done := make(chan struct{}) go func() { @@ -197,14 +163,14 @@ func (p *producerSample) initiateHardShutdown(ctx context.Context, client *river return nil } -func (p *producerSample) insertBulkJobs(ctx context.Context, adapter dbadapter.Adapter, jobCount int) error { +func (p *producerSample) insertBulkJobs(ctx context.Context, client *river.Client[pgx.Tx], jobCount int) error { p.logger.Info(p.name+": Inserting jobs", "num_jobs", jobCount) - insertParams := make([]*dbadapter.JobInsertParams, jobCount) + insertParams := make([]river.InsertManyParams, jobCount) for i := 0; i < jobCount; i++ { - insertParams[i] = makeFakeJobParams(i) + insertParams[i] = river.InsertManyParams{Args: &MyJobArgs{JobNum: i}} } - inserted, err := adapter.JobInsertMany(ctx, insertParams) + inserted, err := client.InsertMany(ctx, insertParams) if err != nil { return fmt.Errorf("error inserting jobs: %w", err) } @@ -230,19 +196,8 @@ func getDatabaseURL() string { return "postgres:///river_testdb_example?sslmode=disable" } -func makeFakeJobParams(i int) *dbadapter.JobInsertParams { - return &dbadapter.JobInsertParams{ - EncodedArgs: []byte(fmt.Sprintf(`{"job_num": %d}`, i)), - Kind: "MyJob", - Metadata: []byte("{}"), - Priority: river.DefaultPriority, - Queue: river.DefaultQueue, - State: dbsqlc.JobStateAvailable, - } -} - type MyJobArgs struct { - JobNum int64 `json:"job_num"` + JobNum int `json:"job_num"` } func (MyJobArgs) Kind() string { return "MyJob" }