From 66d260eef0cbe461fe89b3d588c8731defd69c4a Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 8 Nov 2023 16:19:38 -0800 Subject: [PATCH] Push rescuer down into `internal/maintenance` + `JobRow` to `rivertype` Backstory: We've made some attempt to modularize River so that it's entire implementation doesn't sit in one top-level package. We had some success with this approach (see `internal/`), but after making some progress we eventually hit a brick wall with most of the core types, and even regressed as one of the queue maintainers (rescuer) had to live in the top-level package so it could access worker-related information. Modularity aside: I realized the other day that we also have another problem on the horizon. We have a new driver system, but current drivers like `riverpgxv5` cheat by implementing a trivial wrapper around pgx. Long term, these drives are going to have to look more like how `DBAdapter` looks today. They'll have functions like `JobInsert` and `JobComplete` specific to the particular driver implementation, and those functions will need to take and return types representing things like insert parameters and job return values. It might be possible to use types from `river` for this, but we'd have to invert the dependency requirement from `river` -> `riverpgxv5` and change a lot of tests. Being able to depend on a shared submodule (which `rivertype` could become) would be a lot easier. I wanted to try and fix that and pursue further modularization, but it wasn't easy. I tried every trick in the book to cast types from something internal to public-facing etc., but as implemented, it's either not possible, or only possible with considerable downsides (see bottom). However, there is a change we can make that unlocks everything: `JobRow` is the core type that needs to be shared amongst everything. If it could live in a shared package, internal packages get the ability to reference it and whole puzzle unjumbles into crystal clarity. Previously, I'd convinced you that we could put core types into a separate `rivertype` package, which we did, and then later undid after finding a few techniques to share some internally referenced types. Here, we bring that idea back to some degree, but only for one type: `JobRow` (and okay, `AttemptError` too). This doesn't produce anywhere near the same degree of required API adjustments because `JobRow` is generally something that's returned from functions, and therefore `rivertype` never needs to be referenced. It's also embedded on `Job`, so its properties are available there again without importing `rivertype`. `Job` stays in the top-level package so worker `Work` signatures are not affected. Signatures of the error handler and a few other types are affected, but it's certainly the less common case. With that move in place we go on to demonstrate that further encapsulation becomes possible. We move the new work unit primitives into a shared internal package, and then make use of them in the rescuer so that it can decouple itself from the concept of a top-level worker. With that done, we can push it into `internal/maintenance` where it was supposed to be originally. Another benefit: when introducing `rivertest`, we had to duplicate the `jobRowFromInternal` function because of the same type problems mentioned above. With `JobRow` not in a shared package, we can push this helper into `dbsqlc` and have it shared amongst `river` and `rivertest`, eliminating the necessity for this extra function. And from here all the other pieces can fall too: job executor, producer, etc. could all be encapsulated separately if we'd like them to be. Go allows struct to easily be converted from one to another so you could potentially have `river.JobRow(internalJobRow)`, but the limitation is that you can't do it as soon as substructs (e.g. `AttempError`) are introduced, even if the substructs are also identical to each other. One thing that we could potentially do here is change `AttemptError` back to a plain set of `[]byte`s and add an unmarshaling helper like `UnmarshalAttemptErrors() []AttemptError`. I think it'd work, but also has downsides of its own: * Prevents us from having adding other substructs to the job row in the future. * Caching unmarshaled attempt errors is difficult because if either struct picked up an unexported member, it can no longer be converted to the other. * The conversions and transform wrappers that'd need to happen to pass back and forth with interfaces like `ErrorHandler` would be tricky and add probably cruft. Another trick you can do similar to the above: if you're sure two structs really are identical, you can convert one to the other with `unsafe.Pointer`, even if they include substructs like `AttemptError`. I made a prototype of this and it definitely works, so I really considered this as a good possible way forward. However, use of the `unsafe` package is frowned upon, and it's also banned from some hosted providers like Google App Engine, which seems bad for our use case. --- client.go | 31 ++-- client_test.go | 39 +++-- error_handler.go | 10 +- event.go | 3 +- example_error_handler_test.go | 5 +- insert_opts.go | 4 +- internal/dbsqlc/river_job_ext.go | 41 +++++ rescuer.go => internal/maintenance/rescuer.go | 80 ++++----- .../maintenance/rescuer_test.go | 111 ++++++------ internal/workunit/work_unit.go | 33 ++++ job.go | 164 ++---------------- job_executor.go | 10 +- job_executor_test.go | 40 +++-- job_test.go | 8 +- producer.go | 17 +- retry_policy.go | 5 +- retry_policy_test.go | 5 +- rivertest/rivertest.go | 63 ++----- rivertype/job_row.go | 116 +++++++++++++ work_unit.go | 60 ------- work_unit_wrapper.go | 38 ++++ worker.go | 6 +- 22 files changed, 450 insertions(+), 439 deletions(-) create mode 100644 internal/dbsqlc/river_job_ext.go rename rescuer.go => internal/maintenance/rescuer.go (75%) rename rescuer_test.go => internal/maintenance/rescuer_test.go (75%) create mode 100644 internal/workunit/work_unit.go create mode 100644 rivertype/job_row.go delete mode 100644 work_unit.go create mode 100644 work_unit_wrapper.go diff --git a/client.go b/client.go index e16a202c..5f1c95d9 100644 --- a/client.go +++ b/client.go @@ -27,7 +27,9 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/internal/util/valutil" + "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" ) const ( @@ -286,7 +288,7 @@ type clientTestSignals struct { jobCleaner *maintenance.JobCleanerTestSignals periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals reindexer *maintenance.ReindexerTestSignals - rescuer *rescuerTestSignals + rescuer *maintenance.RescuerTestSignals scheduler *maintenance.SchedulerTestSignals } @@ -366,9 +368,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // For convenience, in case the user's specified a large JobTimeout but no // RescueStuckJobsAfter, since RescueStuckJobsAfter must be greater than // JobTimeout, set a reasonable default value that's longer thah JobTimeout. - rescueAfter := defaultRescueAfter + rescueAfter := maintenance.DefaultRescueAfter if config.JobTimeout > 0 && config.RescueStuckJobsAfter < 1 && config.JobTimeout > config.RescueStuckJobsAfter { - rescueAfter = config.JobTimeout + defaultRescueAfter + rescueAfter = config.JobTimeout + maintenance.DefaultRescueAfter } // Create a new version of config with defaults filled in. This replaces the @@ -512,10 +514,15 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } { - rescuer := newRescuer(archetype, &rescuerConfig{ + rescuer := maintenance.NewRescuer(archetype, &maintenance.RescuerConfig{ ClientRetryPolicy: retryPolicy, RescueAfter: config.RescueStuckJobsAfter, - Workers: config.Workers, + WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { + if workerInfo, ok := config.Workers.workersMap[kind]; ok { + return workerInfo.workUnitFactory + } + return nil + }, }, driver.GetDBPool()) maintenanceServices = append(maintenanceServices, rescuer) client.testSignals.rescuer = &rescuer.TestSignals @@ -758,7 +765,7 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) { } // Distribute a single job into any listening subscriber channels. -func (c *Client[TTx]) distributeJob(job *JobRow, stats *JobStatistics) { +func (c *Client[TTx]) distributeJob(job *rivertype.JobRow, stats *JobStatistics) { c.subscriptionsMu.Lock() defer c.subscriptionsMu.Unlock() @@ -809,7 +816,7 @@ func (c *Client[TTx]) distributeJobCompleterCallback(update jobcompleter.Complet c.statsNumJobs++ }() - c.distributeJob(jobRowFromInternal(update.Job), jobStatisticsFromInternal(update.JobStats)) + c.distributeJob(dbsqlc.JobRowFromInternal(update.Job), jobStatisticsFromInternal(update.JobStats)) } // Dump aggregate stats from job completions to logs periodically. These @@ -963,7 +970,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad insertParams.UniqueByArgs = uniqueOpts.ByArgs insertParams.UniqueByQueue = uniqueOpts.ByQueue insertParams.UniqueByPeriod = uniqueOpts.ByPeriod - insertParams.UniqueByState = sliceutil.Map(uniqueOpts.ByState, func(s JobState) dbsqlc.JobState { return dbsqlc.JobState(s) }) + insertParams.UniqueByState = sliceutil.Map(uniqueOpts.ByState, func(s rivertype.JobState) dbsqlc.JobState { return dbsqlc.JobState(s) }) } if !insertOpts.ScheduledAt.IsZero() { @@ -986,7 +993,7 @@ var errInsertNoDriverDBPool = fmt.Errorf("driver must have non-nil database pool // if err != nil { // // handle error // } -func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*JobRow, error) { +func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) { if c.driver.GetDBPool() == nil { return nil, errInsertNoDriverDBPool } @@ -1005,7 +1012,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts return nil, err } - return jobRowFromInternal(res.Job), nil + return dbsqlc.JobRowFromInternal(res.Job), nil } // InsertTx inserts a new job with the provided args on the given transaction. @@ -1022,7 +1029,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts // This variant lets a caller insert jobs atomically alongside other database // changes. An inserted job isn't visible to be worked until the transaction // commits, and if the transaction rolls back, so too is the inserted job. -func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*JobRow, error) { +func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) { if err := c.validateJobArgs(args); err != nil { return nil, err } @@ -1037,7 +1044,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts * return nil, err } - return jobRowFromInternal(res.Job), nil + return dbsqlc.JobRowFromInternal(res.Job), nil } // InsertManyParams encapsulates a single job combined with insert options for diff --git a/client_test.go b/client_test.go index 3f20e3cf..d169e681 100644 --- a/client_test.go +++ b/client_test.go @@ -30,6 +30,7 @@ import ( "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/internal/util/valutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" ) func waitForClientHealthy(ctx context.Context, t *testing.T, statusUpdateCh <-chan componentstatus.ClientSnapshot) { @@ -1062,7 +1063,7 @@ func Test_Client_ErrorHandler(t *testing.T) { return client, &testBundle{SubscribeChan: subscribeChan} } - requireInsert := func(ctx context.Context, client *Client[pgx.Tx]) *JobRow { + requireInsert := func(ctx context.Context, client *Client[pgx.Tx]) *rivertype.JobRow { job, err := client.Insert(ctx, callbackArgs{}, nil) require.NoError(t, err) return job @@ -1078,7 +1079,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var errorHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { + HandleErrorFunc: func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { require.Equal(t, handlerErr, err) errorHandlerCalled = true return &ErrorHandlerResult{} @@ -1100,7 +1101,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var errorHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { + HandleErrorFunc: func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { var unknownJobKindErr *UnknownJobKindError require.ErrorAs(t, err, &unknownJobKindErr) require.Equal(t, *unknownJobKindErr, UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"}) @@ -1132,7 +1133,7 @@ func Test_Client_ErrorHandler(t *testing.T) { var panicHandlerCalled bool config.ErrorHandler = &testErrorHandler{ - HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { + HandlePanicFunc: func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { require.Equal(t, "panic val", panicVal) panicHandlerCalled = true return &ErrorHandlerResult{} @@ -1179,7 +1180,7 @@ func Test_Client_Maintenance(t *testing.T) { errorsBytes := make([][]byte, errorCount) for i := 0; i < errorCount; i++ { var err error - errorsBytes[i], err = json.Marshal(AttemptError{ + errorsBytes[i], err = json.Marshal(rivertype.AttemptError{ At: time.Now(), Error: "mocked error", Num: i + 1, @@ -1359,7 +1360,7 @@ func Test_Client_Maintenance(t *testing.T) { startClient(ctx, t, client) client.testSignals.electedLeader.WaitOrTimeout() - svc := maintenance.GetService[*rescuer](client.queueMaintainer) + svc := maintenance.GetService[*maintenance.Rescuer](client.queueMaintainer) svc.TestSignals.FetchedBatch.WaitOrTimeout() svc.TestSignals.UpdatedBatch.WaitOrTimeout() @@ -1449,7 +1450,7 @@ func Test_Client_RetryPolicy(t *testing.T) { ctx := context.Background() - requireInsert := func(ctx context.Context, client *Client[pgx.Tx]) *JobRow { + requireInsert := func(ctx context.Context, client *Client[pgx.Tx]) *rivertype.JobRow { job, err := client.Insert(ctx, callbackArgs{}, nil) require.NoError(t, err) return job @@ -1527,7 +1528,7 @@ func Test_Client_RetryPolicy(t *testing.T) { // how it would've looked after being run through the queue. originalJob.Attempt += 1 - expectedNextScheduledAt := client.config.RetryPolicy.NextRetry(jobRowFromInternal(originalJob)) + expectedNextScheduledAt := client.config.RetryPolicy.NextRetry(dbsqlc.JobRowFromInternal(originalJob)) t.Logf("Attempt number %d scheduled %v from original `attempted_at`", originalJob.Attempt, finishedJob.ScheduledAt.Sub(*originalJob.AttemptedAt)) @@ -1571,7 +1572,7 @@ func Test_Client_Subscribe(t *testing.T) { }) } - requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *JobRow { + requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow { job, err := client.Insert(ctx, callbackArgs{Name: jobName}, nil) require.NoError(t, err) return job @@ -1599,7 +1600,7 @@ func Test_Client_Subscribe(t *testing.T) { jobFailed1 := requireInsert(ctx, client, "failed1") jobFailed2 := requireInsert(ctx, client, "failed2") - expectedJobs := []*JobRow{ + expectedJobs := []*rivertype.JobRow{ jobCompleted1, jobCompleted2, jobFailed1, @@ -1663,7 +1664,7 @@ func Test_Client_Subscribe(t *testing.T) { jobCompleted := requireInsert(ctx, client, "completed1") requireInsert(ctx, client, "failed1") - expectedJobs := []*JobRow{ + expectedJobs := []*rivertype.JobRow{ jobCompleted, } @@ -1704,7 +1705,7 @@ func Test_Client_Subscribe(t *testing.T) { requireInsert(ctx, client, "completed1") jobFailed := requireInsert(ctx, client, "failed1") - expectedJobs := []*JobRow{ + expectedJobs := []*rivertype.JobRow{ jobFailed, } @@ -2395,7 +2396,7 @@ func Test_NewClient_Validations(t *testing.T) { config.JobTimeout = 23 * time.Hour }, validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper - require.Equal(t, 23*time.Hour+defaultRescueAfter, client.config.RescueStuckJobsAfter) + require.Equal(t, 23*time.Hour+maintenance.DefaultRescueAfter, client.config.RescueStuckJobsAfter) }, }, { @@ -2740,7 +2741,7 @@ func TestInsert(t *testing.T) { name string args noOpArgs opts *InsertOpts - assert func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *JobRow) + assert func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *rivertype.JobRow) }{ { name: "all options specified", @@ -2752,7 +2753,7 @@ func TestInsert(t *testing.T) { ScheduledAt: now.Add(time.Hour).In(time.FixedZone("UTC-5", -5*60*60)), Tags: []string{"tag1", "tag2"}, }, - assert: func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *JobRow) { + assert: func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *rivertype.JobRow) { t.Helper() require := require.New(t) @@ -2768,14 +2769,14 @@ func TestInsert(t *testing.T) { require.Equal(JobStateScheduled, insertedJob.State) require.Equal("noOp", insertedJob.Kind) // default state: - require.Equal([]byte("{}"), insertedJob.metadata) + // require.Equal([]byte("{}"), insertedJob.metadata) }, }, { name: "all defaults", args: noOpArgs{Name: "testJob"}, opts: nil, - assert: func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *JobRow) { + assert: func(t *testing.T, args *noOpArgs, opts *InsertOpts, insertedJob *rivertype.JobRow) { t.Helper() require := require.New(t) @@ -2790,7 +2791,7 @@ func TestInsert(t *testing.T) { // Default comes from database now(), and we can't know the exact value: require.WithinDuration(time.Now(), insertedJob.ScheduledAt, 2*time.Second) require.Equal([]string{}, insertedJob.Tags) - require.Equal([]byte("{}"), insertedJob.metadata) + // require.Equal([]byte("{}"), insertedJob.metadata) }, }, } @@ -2872,7 +2873,7 @@ func TestUniqueOpts(t *testing.T) { uniqueOpts := UniqueOpts{ ByPeriod: 24 * time.Hour, - ByState: []JobState{JobStateAvailable, JobStateCompleted}, + ByState: []rivertype.JobState{JobStateAvailable, JobStateCompleted}, } job0, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{ diff --git a/error_handler.go b/error_handler.go index da76bcc5..9b6b7050 100644 --- a/error_handler.go +++ b/error_handler.go @@ -1,6 +1,10 @@ package river -import "context" +import ( + "context" + + "github.com/riverqueue/river/rivertype" +) // ErrorHandler provides an interface that will be invoked in case of an error // or panic occurring in the job. This is often useful for logging and exception @@ -10,13 +14,13 @@ type ErrorHandler interface { // // Context is descended from the one used to start the River client that // worked the job. - HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult + HandleError(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult // HandlePanic is invoked in case of a panic occurring in a job. // // Context is descended from the one used to start the River client that // worked the job. - HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult + HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult } type ErrorHandlerResult struct { diff --git a/event.go b/event.go index f3123554..ef84b86b 100644 --- a/event.go +++ b/event.go @@ -4,6 +4,7 @@ import ( "time" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/rivertype" ) // EventKind is a kind of event to subscribe to from a client. @@ -45,7 +46,7 @@ type Event struct { Kind EventKind // Job contains job-related information. - Job *JobRow + Job *rivertype.JobRow // JobStats are statistics about the run of a job. JobStats *JobStatistics diff --git a/example_error_handler_test.go b/example_error_handler_test.go index 59757846..19bf5c51 100644 --- a/example_error_handler_test.go +++ b/example_error_handler_test.go @@ -11,16 +11,17 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/slogutil" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" ) type CustomErrorHandler struct{} -func (*CustomErrorHandler) HandleError(ctx context.Context, job *river.JobRow, err error) *river.ErrorHandlerResult { +func (*CustomErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult { fmt.Printf("Job errored with: %s\n", err) return nil } -func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *river.JobRow, panicVal any) *river.ErrorHandlerResult { +func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *river.ErrorHandlerResult { fmt.Printf("Job panicked with: %v\n", panicVal) // Either function can also set the job to be immediately cancelled. diff --git a/insert_opts.go b/insert_opts.go index efea443e..e3667239 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -4,6 +4,8 @@ import ( "fmt" "slices" "time" + + "github.com/riverqueue/river/rivertype" ) // InsertOpts are optional settings for a new job which can be provided at job @@ -103,7 +105,7 @@ type UniqueOpts struct { // With this setting, any jobs of the same kind that have been completed or // discarded, but not yet cleaned out by the system, won't count towards the // uniqueness of a new insert. - ByState []JobState + ByState []rivertype.JobState } // isEmpty returns true for an empty, uninitialized options struct. diff --git a/internal/dbsqlc/river_job_ext.go b/internal/dbsqlc/river_job_ext.go new file mode 100644 index 00000000..d3811ab6 --- /dev/null +++ b/internal/dbsqlc/river_job_ext.go @@ -0,0 +1,41 @@ +package dbsqlc + +import ( + "github.com/riverqueue/river/internal/util/sliceutil" + "github.com/riverqueue/river/rivertype" +) + +func JobRowFromInternal(internal *RiverJob) *rivertype.JobRow { + tags := internal.Tags + if tags == nil { + tags = []string{} + } + return &rivertype.JobRow{ + ID: internal.ID, + Attempt: max(int(internal.Attempt), 0), + AttemptedAt: internal.AttemptedAt, + AttemptedBy: internal.AttemptedBy, + CreatedAt: internal.CreatedAt, + EncodedArgs: internal.Args, + Errors: sliceutil.Map(internal.Errors, func(e AttemptError) rivertype.AttemptError { return AttemptErrorFromInternal(&e) }), + FinalizedAt: internal.FinalizedAt, + Kind: internal.Kind, + MaxAttempts: max(int(internal.MaxAttempts), 0), + Priority: max(int(internal.Priority), 0), + Queue: internal.Queue, + ScheduledAt: internal.ScheduledAt.UTC(), // TODO(brandur): Very weird this is the only place a UTC conversion happens. + State: rivertype.JobState(internal.State), + Tags: tags, + + // metadata: internal.Metadata, + } +} + +func AttemptErrorFromInternal(e *AttemptError) rivertype.AttemptError { + return rivertype.AttemptError{ + At: e.At, + Error: e.Error, + Num: int(e.Num), + Trace: e.Trace, + } +} diff --git a/rescuer.go b/internal/maintenance/rescuer.go similarity index 75% rename from rescuer.go rename to internal/maintenance/rescuer.go index ad5541fd..d627509f 100644 --- a/rescuer.go +++ b/internal/maintenance/rescuer.go @@ -1,4 +1,4 @@ -package river +package maintenance import ( "context" @@ -10,31 +10,36 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/dbsqlc" - "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/maintenance/startstop" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/util/timeutil" "github.com/riverqueue/river/internal/util/valutil" + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" ) const ( - defaultRescueAfter = time.Hour - defaultRescuerInterval = 30 * time.Second + DefaultRescueAfter = time.Hour + DefaultRescuerInterval = 30 * time.Second ) +type ClientRetryPolicy interface { + NextRetry(job *rivertype.JobRow) time.Time +} + // Test-only properties. -type rescuerTestSignals struct { +type RescuerTestSignals struct { FetchedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has fetched a batch of jobs UpdatedBatch rivercommon.TestSignal[struct{}] // notifies when runOnce has updated rescued jobs from a batch } -func (ts *rescuerTestSignals) Init() { +func (ts *RescuerTestSignals) Init() { ts.FetchedBatch.Init() ts.UpdatedBatch.Init() } -type rescuerConfig struct { +type RescuerConfig struct { // ClientRetryPolicy is the default retry policy to use for workers that don't // overide NextRetry. ClientRetryPolicy ClientRetryPolicy @@ -46,11 +51,10 @@ type rescuerConfig struct { // considered stuck and should be rescued. RescueAfter time.Duration - // Workers is the bundle of workers for - Workers *Workers + WorkUnitFactoryFunc func(kind string) workunit.WorkUnitFactory } -func (c *rescuerConfig) mustValidate() *rescuerConfig { +func (c *RescuerConfig) mustValidate() *RescuerConfig { if c.ClientRetryPolicy == nil { panic("RescuerConfig.ClientRetryPolicy must be set") } @@ -60,44 +64,44 @@ func (c *rescuerConfig) mustValidate() *rescuerConfig { if c.RescueAfter <= 0 { panic("RescuerConfig.JobDuration must be above zero") } - if c.Workers == nil { - panic("RescuerConfig.Workers must be set") + if c.WorkUnitFactoryFunc == nil { + panic("RescuerConfig.WorkUnitFactoryFunc must be set") } return c } -// rescuer periodically rescues jobs that have been executing for too long +// Rescuer periodically rescues jobs that have been executing for too long // and are considered to be "stuck". -type rescuer struct { +type Rescuer struct { baseservice.BaseService startstop.BaseStartStop // exported for test purposes - Config *rescuerConfig - TestSignals rescuerTestSignals + Config *RescuerConfig + TestSignals RescuerTestSignals batchSize int // configurable for test purposes dbExecutor dbutil.Executor queries *dbsqlc.Queries } -func newRescuer(archetype *baseservice.Archetype, config *rescuerConfig, executor dbutil.Executor) *rescuer { - return baseservice.Init(archetype, &rescuer{ - Config: (&rescuerConfig{ - ClientRetryPolicy: config.ClientRetryPolicy, - Interval: valutil.ValOrDefault(config.Interval, defaultRescuerInterval), - RescueAfter: valutil.ValOrDefault(config.RescueAfter, defaultRescueAfter), - Workers: config.Workers, +func NewRescuer(archetype *baseservice.Archetype, config *RescuerConfig, executor dbutil.Executor) *Rescuer { + return baseservice.Init(archetype, &Rescuer{ + Config: (&RescuerConfig{ + ClientRetryPolicy: config.ClientRetryPolicy, + Interval: valutil.ValOrDefault(config.Interval, DefaultRescuerInterval), + RescueAfter: valutil.ValOrDefault(config.RescueAfter, DefaultRescueAfter), + WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, }).mustValidate(), - batchSize: maintenance.DefaultBatchSize, + batchSize: DefaultBatchSize, dbExecutor: executor, queries: dbsqlc.New(), }) } -func (s *rescuer) Start(ctx context.Context) error { +func (s *Rescuer) Start(ctx context.Context) error { ctx, shouldStart, stopped := s.StartInit(ctx) if !shouldStart { return nil @@ -105,7 +109,7 @@ func (s *rescuer) Start(ctx context.Context) error { // Jitter start up slightly so services don't all perform their first run at // exactly the same time. - s.CancellableSleepRandomBetween(ctx, maintenance.JitterMin, maintenance.JitterMax) + s.CancellableSleepRandomBetween(ctx, JitterMin, JitterMax) go func() { s.Logger.InfoContext(ctx, s.Name+": Run loop started") @@ -144,7 +148,7 @@ type rescuerRunOnceResult struct { NumJobsRetried int64 } -func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { +func (s *Rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { res := &rescuerRunOnceResult{} for { @@ -155,11 +159,6 @@ func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { s.TestSignals.FetchedBatch.Signal(struct{}{}) - // Return quickly in case there's no work to do. - if len(stuckJobs) < 1 { - return res, nil - } - now := time.Now().UTC() rescueManyParams := dbsqlc.JobRescueManyParams{ @@ -173,7 +172,7 @@ func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { for i, job := range stuckJobs { rescueManyParams.ID[i] = job.ID - rescueManyParams.Error[i], err = json.Marshal(AttemptError{ + rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{ At: now, Error: "Stuck job rescued by Rescuer", Num: max(int(job.Attempt), 0), @@ -214,13 +213,13 @@ func (s *rescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) { slog.Int64("num_jobs_retried", res.NumJobsRetried), ) - s.CancellableSleepRandomBetween(ctx, maintenance.BatchBackoffMin, maintenance.BatchBackoffMax) + s.CancellableSleepRandomBetween(ctx, BatchBackoffMin, BatchBackoffMax) } return res, nil } -func (s *rescuer) getStuckJobs(ctx context.Context) ([]*dbsqlc.RiverJob, error) { +func (s *Rescuer) getStuckJobs(ctx context.Context) ([]*dbsqlc.RiverJob, error) { ctx, cancelFunc := context.WithTimeout(ctx, 30*time.Second) defer cancelFunc() @@ -234,16 +233,17 @@ func (s *rescuer) getStuckJobs(ctx context.Context) ([]*dbsqlc.RiverJob, error) // makeRetryDecision decides whether or not a rescued job should be retried, and if so, // when. -func (s *rescuer) makeRetryDecision(ctx context.Context, internalJob *dbsqlc.RiverJob) (bool, time.Time) { - job := jobRowFromInternal(internalJob) - workerInfo, ok := s.Config.Workers.workersMap[job.Kind] - if !ok { +func (s *Rescuer) makeRetryDecision(ctx context.Context, internalJob *dbsqlc.RiverJob) (bool, time.Time) { + job := dbsqlc.JobRowFromInternal(internalJob) + + workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind) + if workUnitFactory == nil { s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding", slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) return false, time.Time{} } - workUnit := workerInfo.workUnitFactory.MakeUnit(job) + workUnit := workUnitFactory.MakeUnit(job) if err := workUnit.UnmarshalJob(); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error unmarshaling job args: %s"+err.Error(), slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) diff --git a/rescuer_test.go b/internal/maintenance/rescuer_test.go similarity index 75% rename from rescuer_test.go rename to internal/maintenance/rescuer_test.go index ed8ec285..0a52c4ad 100644 --- a/rescuer_test.go +++ b/internal/maintenance/rescuer_test.go @@ -1,8 +1,8 @@ -package river +package maintenance import ( "context" - "sync" + "math" "testing" "time" @@ -10,33 +10,47 @@ import ( "github.com/stretchr/testify/require" "github.com/riverqueue/river/internal/dbsqlc" - "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/ptrutil" + "github.com/riverqueue/river/internal/util/timeutil" + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" ) -type rescuerTestArgs struct{} - -func (rescuerTestArgs) Kind() string { - return "RescuerTest" +// callbackWorkUnitFactory wraps a Worker to implement workUnitFactory. +type callbackWorkUnitFactory struct { + Callback func(ctx context.Context, jobRow *rivertype.JobRow) error } -type rescuerTestWorker struct { - WorkerDefaults[rescuerTestArgs] +func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit { + return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow} } -func (w *rescuerTestWorker) Work(context.Context, *Job[rescuerTestArgs]) error { - return nil +// callbackWorkUnit implements workUnit for a job and Worker. +type callbackWorkUnit struct { + callback func(ctx context.Context, jobRow *rivertype.JobRow) error + jobRow *rivertype.JobRow } -func (w *rescuerTestWorker) NextRetry(*Job[rescuerTestArgs]) time.Time { - return time.Now().Add(30 * time.Second) +func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) } +func (w *callbackWorkUnit) Timeout() time.Duration { return 0 } +func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) } +func (w *callbackWorkUnit) UnmarshalJob() error { return nil } + +type SimpleClientRetryPolicy struct{} + +func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { + errorCount := len(job.Errors) + 1 + retrySeconds := math.Pow(float64(errorCount), 4) + return job.AttemptedAt.Add(timeutil.SecondsAsDuration(retrySeconds)) } func TestRescuer(t *testing.T) { t.Parallel() + const rescuerJobKind = "rescuer" + var ( ctx = context.Background() queries = dbsqlc.New() @@ -59,7 +73,7 @@ func TestRescuer(t *testing.T) { Attempt: params.Attempt, AttemptedAt: params.AttemptedAt, Args: []byte("{}"), - Kind: (&rescuerTestArgs{}).Kind(), + Kind: rescuerJobKind, MaxAttempts: 5, Priority: int16(rivercommon.DefaultPriority), Queue: rivercommon.DefaultQueue, @@ -69,53 +83,58 @@ func TestRescuer(t *testing.T) { return job } - setup := func(t *testing.T) (*rescuer, *testBundle) { + setup := func(t *testing.T) (*Rescuer, *testBundle) { t.Helper() bundle := &testBundle{ - rescueHorizon: time.Now().Add(-defaultRescueAfter), + rescueHorizon: time.Now().Add(-DefaultRescueAfter), tx: riverinternaltest.TestTx(ctx, t), } - workers := NewWorkers() - AddWorker(workers, &rescuerTestWorker{}) - - cleaner := newRescuer( + rescuer := NewRescuer( riverinternaltest.BaseServiceArchetype(t), - &rescuerConfig{ - ClientRetryPolicy: &DefaultClientRetryPolicy{}, - Interval: defaultRescuerInterval, - RescueAfter: defaultRescueAfter, - Workers: workers, + &RescuerConfig{ + ClientRetryPolicy: &SimpleClientRetryPolicy{}, + Interval: DefaultRescuerInterval, + RescueAfter: DefaultRescueAfter, + WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { + if kind == rescuerJobKind { + return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }} + } + panic("unhandled kind: " + kind) + }, }, bundle.tx) - cleaner.TestSignals.Init() - t.Cleanup(cleaner.Stop) + rescuer.TestSignals.Init() + t.Cleanup(rescuer.Stop) - return cleaner, bundle + return rescuer, bundle } t.Run("Defaults", func(t *testing.T) { t.Parallel() - cleaner := newRescuer( + cleaner := NewRescuer( riverinternaltest.BaseServiceArchetype(t), - &rescuerConfig{ClientRetryPolicy: &DefaultClientRetryPolicy{}, Workers: NewWorkers()}, + &RescuerConfig{ + ClientRetryPolicy: &SimpleClientRetryPolicy{}, + WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { return nil }, + }, nil, ) - require.Equal(t, cleaner.Config.RescueAfter, defaultRescueAfter) - require.Equal(t, cleaner.Config.Interval, defaultRescuerInterval) + require.Equal(t, cleaner.Config.RescueAfter, DefaultRescueAfter) + require.Equal(t, cleaner.Config.Interval, DefaultRescuerInterval) }) t.Run("StartStopStress", func(t *testing.T) { t.Parallel() - cleaner, _ := setup(t) - cleaner.Logger = riverinternaltest.LoggerWarn(t) // loop started/stop log is very noisy; suppress - cleaner.TestSignals = rescuerTestSignals{} // deinit so channels don't fill + rescuer, _ := setup(t) + rescuer.Logger = riverinternaltest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + rescuer.TestSignals = RescuerTestSignals{} // deinit so channels don't fill - runStartStopStress(ctx, t, cleaner) + runStartStopStress(ctx, t, rescuer) }) t.Run("RescuesStuckJobs", func(t *testing.T) { @@ -279,23 +298,3 @@ func TestRescuer(t *testing.T) { require.Equal(t, dbsqlc.JobStateDiscarded, job2After.State) }) } - -// copied from maintenance package tests because there's no good place to expose it:. -func runStartStopStress(ctx context.Context, tb testing.TB, svc maintenance.Service) { - tb.Helper() - - var wg sync.WaitGroup - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - for j := 0; j < 50; j++ { - require.NoError(tb, svc.Start(ctx)) - svc.Stop() - } - wg.Done() - }() - } - - wg.Wait() -} diff --git a/internal/workunit/work_unit.go b/internal/workunit/work_unit.go new file mode 100644 index 00000000..746454f2 --- /dev/null +++ b/internal/workunit/work_unit.go @@ -0,0 +1,33 @@ +package workunit + +import ( + "context" + "time" + + "github.com/riverqueue/river/rivertype" +) + +// WorkUnit provides an interface to a struct that wraps a job to be done +// combined with a work function that can execute it. Its main purpose is to +// wrap a struct that contains generic types (like a Worker[T] that needs to be +// invoked with a Job[T]) in such a way as to make it non-generic so that it can +// be used in other non-generic code like jobExecutor. +// +// Implemented by river.wrapperWorkUnit. +type WorkUnit interface { + NextRetry() time.Time + Timeout() time.Duration + UnmarshalJob() error + Work(ctx context.Context) error +} + +// WorkUnitFactory provides an interface to a struct that can generate a +// workUnit, a wrapper around a job to be done combined with a work function +// that can execute it. +// +// Implemented by river.workUnitFactoryWrapper. +type WorkUnitFactory interface { + // Make a workUnit, which wraps a job to be done and work function that can + // execute it. + MakeUnit(jobRow *rivertype.JobRow) WorkUnit +} diff --git a/job.go b/job.go index 149c271b..8eeeb208 100644 --- a/job.go +++ b/job.go @@ -7,14 +7,14 @@ import ( "time" "github.com/riverqueue/river/internal/dbsqlc" - "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" ) // Job represents a single unit of work, holding both the arguments and // information for a job with args of type T. type Job[T JobArgs] struct { - *JobRow + *rivertype.JobRow // Args are the arguments for the job. Args T @@ -36,130 +36,6 @@ type JobArgsWithInsertOpts interface { InsertOpts() InsertOpts } -// JobRow contains the properties of a job that are persisted to the database. -// Use of `Job[T]` will generally be preferred in user-facing code like worker -// interfaces. -type JobRow struct { - // ID of the job. Generated as part of a Postgres sequence and generally - // ascending in nature, but there may be gaps in it as transactions roll - // back. - ID int64 - - // Attempt is the attempt number of the job. Jobs are inserted at 0, the - // number is incremented to 1 the first time work its worked, and may - // increment further if it's either snoozed or errors. - Attempt int - - // AttemptedAt is the time that the job was last worked. Starts out as `nil` - // on a new insert. - AttemptedAt *time.Time - - // AttemptedBy is the set of worker IDs that have worked this job. A worker - // ID differs between different programs, but is shared by all executors - // within any given one. (i.e. Different Go processes have different IDs, - // but IDs are shared within any given process.) A process generates a new - // ULID (an ordered UUID) worker ID when it starts up. - AttemptedBy []string - - // CreatedAt is when the job record was created. - CreatedAt time.Time - - // EncodedArgs is the job's JobArgs encoded as JSON. - EncodedArgs []byte - - // Errors is a set of errors that occurred when the job was worked, one for - // each attempt. Ordered from earliest error to the latest error. - Errors []AttemptError - - // FinalizedAt is the time at which the job was "finalized", meaning it was - // either completed successfully or errored for the last time such that - // it'll no longer be retried. - FinalizedAt *time.Time - - // Kind uniquely identifies the type of job and instructs which worker - // should work it. It is set at insertion time via `Kind()` on the - // `JobArgs`. - Kind string - - // MaxAttempts is the maximum number of attempts that the job will be tried - // before it errors for the last time and will no longer be worked. - // - // Extracted (in order of precedence) from job-specific InsertOpts - // on Insert, from the worker level InsertOpts from JobArgsWithInsertOpts, - // or from a client's default value. - MaxAttempts int - - // Priority is the priority of the job, with 1 being the highest priority and - // 4 being the lowest. When fetching available jobs to work, the highest - // priority jobs will always be fetched before any lower priority jobs are - // fetched. Note that if your workers are swamped with more high-priority jobs - // then they can handle, lower priority jobs may not be fetched. - Priority int - - // Queue is the name of the queue where the job will be worked. Queues can - // be configured independently and be used to isolate jobs. - // - // Extracted from either specific InsertOpts on Insert, or InsertOpts from - // JobArgsWithInsertOpts, or a client's default value. - Queue string - - // ScheduledAt is when the job is scheduled to become available to be - // worked. Jobs default to running immediately, but may be scheduled - // for the future when they're inserted. They may also be scheduled for - // later because they were snoozed or because they errored and have - // additional retry attempts remaining. - ScheduledAt time.Time - - // State is the state of job like `available` or `completed`. Jobs are - // `available` when they're first inserted. - State JobState - - // Tags are an arbitrary list of keywords to add to the job. They have no - // functional behavior and are meant entirely as a user-specified construct - // to help group and categorize jobs. - Tags []string - - // metadata is a field that'll eventually be used to store arbitrary data on - // a job for flexible use and use with plugins. It's currently unexported - // until we get a chance to more fully flesh out this feature. - metadata []byte -} - -// WARNING!!!!! -// -// !!! When updating this function, the equivalent in `./rivertest/rivertest.go` -// must also be updated!!! -// -// This is obviously not ideal, but since JobRow is at the top-level package, -// there's no way to put a helper in a shared package that can produce one, -// which is why we have this copy/pasta. There are some potential alternatives -// to this, but none of them are great. -func jobRowFromInternal(internal *dbsqlc.RiverJob) *JobRow { - tags := internal.Tags - if tags == nil { - tags = []string{} - } - return &JobRow{ - ID: internal.ID, - Attempt: max(int(internal.Attempt), 0), - AttemptedAt: internal.AttemptedAt, - AttemptedBy: internal.AttemptedBy, - CreatedAt: internal.CreatedAt, - EncodedArgs: internal.Args, - Errors: sliceutil.Map(internal.Errors, func(e dbsqlc.AttemptError) AttemptError { return attemptErrorFromInternal(&e) }), - FinalizedAt: internal.FinalizedAt, - Kind: internal.Kind, - MaxAttempts: max(int(internal.MaxAttempts), 0), - Priority: max(int(internal.Priority), 0), - Queue: internal.Queue, - ScheduledAt: internal.ScheduledAt.UTC(), // TODO(brandur): Very weird this is the only place a UTC conversion happens. - State: JobState(internal.State), - Tags: tags, - - metadata: internal.Metadata, - } -} - // JobCompleteTx marks the job as completed as part of transaction tx. If tx is // rolled back, the completion will be as well. // @@ -188,7 +64,7 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx return nil, err } - updatedJob := &Job[TArgs]{JobRow: jobRowFromInternal(internal)} + updatedJob := &Job[TArgs]{JobRow: dbsqlc.JobRowFromInternal(internal)} if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { return nil, err @@ -197,19 +73,17 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx return updatedJob, nil } -type JobState string - const ( - JobStateAvailable JobState = JobState(dbsqlc.JobStateAvailable) - JobStateCancelled JobState = JobState(dbsqlc.JobStateCancelled) - JobStateCompleted JobState = JobState(dbsqlc.JobStateCompleted) - JobStateDiscarded JobState = JobState(dbsqlc.JobStateDiscarded) - JobStateRetryable JobState = JobState(dbsqlc.JobStateRetryable) - JobStateRunning JobState = JobState(dbsqlc.JobStateRunning) - JobStateScheduled JobState = JobState(dbsqlc.JobStateScheduled) + JobStateAvailable = rivertype.JobStateAvailable + JobStateCancelled = rivertype.JobStateCancelled + JobStateCompleted = rivertype.JobStateCompleted + JobStateDiscarded = rivertype.JobStateDiscarded + JobStateRetryable = rivertype.JobStateRetryable + JobStateRunning = rivertype.JobStateRunning + JobStateScheduled = rivertype.JobStateScheduled ) -var jobStateAll = []JobState{ //nolint:gochecknoglobals +var jobStateAll = []rivertype.JobState{ //nolint:gochecknoglobals JobStateAvailable, JobStateCancelled, JobStateCompleted, @@ -218,19 +92,3 @@ var jobStateAll = []JobState{ //nolint:gochecknoglobals JobStateRunning, JobStateScheduled, } - -type AttemptError struct { - At time.Time `json:"at"` - Error string `json:"error"` - Num int `json:"num"` - Trace string `json:"trace"` -} - -func attemptErrorFromInternal(e *dbsqlc.AttemptError) AttemptError { - return AttemptError{ - At: e.At, - Error: e.Error, - Num: int(e.Num), - Trace: e.Trace, - } -} diff --git a/job_executor.go b/job_executor.go index 9a80efeb..7736ed22 100644 --- a/job_executor.go +++ b/job_executor.go @@ -13,6 +13,8 @@ import ( "github.com/riverqueue/river/internal/dbadapter" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" ) // UnknownJobKindError is returned when a Client fetches and attempts to @@ -104,9 +106,9 @@ type jobExecutor struct { Completer jobcompleter.JobCompleter ClientRetryPolicy ClientRetryPolicy ErrorHandler ErrorHandler - InformProducerDoneFunc func(jobRow *JobRow) - JobRow *JobRow - WorkUnit workUnit + InformProducerDoneFunc func(jobRow *rivertype.JobRow) + JobRow *rivertype.JobRow + WorkUnit workunit.WorkUnit // Meant to be used from within the job executor only. result *jobExecutorResult @@ -267,7 +269,7 @@ func (e *jobExecutor) reportError(ctx context.Context) { } } - attemptErr := AttemptError{ + attemptErr := rivertype.AttemptError{ At: e.start, Error: errorStr, Num: e.JobRow.Attempt, diff --git a/job_executor_test.go b/job_executor_test.go index 76435238..23868a09 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -18,6 +18,8 @@ import ( "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/util/ptrutil" "github.com/riverqueue/river/internal/util/timeutil" + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" ) type customRetryPolicyWorker struct { @@ -39,7 +41,7 @@ func (w *customRetryPolicyWorker) Work(ctx context.Context, j *Job[callbackArgs] // Makes a workerInfo using the real workerWrapper with a job that uses a // callback Work func and allows for customizable maxAttempts and nextRetry. -func newWorkUnitFactoryWithCustomRetry(f func() error, nextRetry func() time.Time) workUnitFactory { +func newWorkUnitFactoryWithCustomRetry(f func() error, nextRetry func() time.Time) workunit.WorkUnitFactory { return &workUnitFactoryWrapper[callbackArgs]{worker: &customRetryPolicyWorker{ f: f, nextRetry: nextRetry, @@ -51,7 +53,7 @@ type retryPolicyCustom struct { DefaultClientRetryPolicy } -func (p *retryPolicyCustom) NextRetry(job *JobRow) time.Time { +func (p *retryPolicyCustom) NextRetry(job *rivertype.JobRow) time.Time { var backoffDuration time.Duration switch job.Attempt { case 1: @@ -72,7 +74,7 @@ type retryPolicyInvalid struct { DefaultClientRetryPolicy } -func (p *retryPolicyInvalid) NextRetry(job *JobRow) time.Time { return time.Time{} } +func (p *retryPolicyInvalid) NextRetry(job *rivertype.JobRow) time.Time { return time.Time{} } // Identical to default retry policy except that it leaves off the jitter to // make checking against it more convenient. @@ -80,32 +82,32 @@ type retryPolicyNoJitter struct { DefaultClientRetryPolicy } -func (p *retryPolicyNoJitter) NextRetry(job *JobRow) time.Time { +func (p *retryPolicyNoJitter) NextRetry(job *rivertype.JobRow) time.Time { return job.AttemptedAt.Add(timeutil.SecondsAsDuration(p.retrySecondsWithoutJitter(job.Attempt))) } type testErrorHandler struct { HandleErrorCalled bool - HandleErrorFunc func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult + HandleErrorFunc func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult HandlePanicCalled bool - HandlePanicFunc func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult + HandlePanicFunc func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult } // Test handler with no-ops for both error handling functions. func newTestErrorHandler() *testErrorHandler { return &testErrorHandler{ - HandleErrorFunc: func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { return nil }, - HandlePanicFunc: func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { return nil }, + HandleErrorFunc: func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { return nil }, + HandlePanicFunc: func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { return nil }, } } -func (h *testErrorHandler) HandleError(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { +func (h *testErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { h.HandleErrorCalled = true return h.HandleErrorFunc(ctx, job, err) } -func (h *testErrorHandler) HandlePanic(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { +func (h *testErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { h.HandlePanicCalled = true return h.HandlePanicFunc(ctx, job, panicVal) } @@ -123,7 +125,7 @@ func TestJobExecutor_Execute(t *testing.T) { completer *jobcompleter.InlineJobCompleter errorHandler *testErrorHandler getUpdatesAndStop func() []jobcompleter.CompleterJobUpdated - jobRow *JobRow + jobRow *rivertype.JobRow tx pgx.Tx } @@ -177,7 +179,7 @@ func TestJobExecutor_Execute(t *testing.T) { completer: completer, errorHandler: newTestErrorHandler(), getUpdatesAndStop: getJobUpdates, - jobRow: jobRowFromInternal(job), + jobRow: dbsqlc.JobRowFromInternal(job), tx: tx, } @@ -186,7 +188,7 @@ func TestJobExecutor_Execute(t *testing.T) { ClientRetryPolicy: &retryPolicyNoJitter{}, Completer: bundle.completer, ErrorHandler: bundle.errorHandler, - InformProducerDoneFunc: func(job *JobRow) {}, + InformProducerDoneFunc: func(job *rivertype.JobRow) {}, JobRow: bundle.jobRow, WorkUnit: workUnitFactory.MakeUnit(bundle.jobRow), }) @@ -393,7 +395,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { require.Equal(t, workerErr, err) return nil } @@ -415,7 +417,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { return &ErrorHandlerResult{SetCancelled: true} } @@ -436,7 +438,7 @@ func TestJobExecutor_Execute(t *testing.T) { workerErr := fmt.Errorf("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *JobRow, err error) *ErrorHandlerResult { + bundle.errorHandler.HandleErrorFunc = func(ctx context.Context, job *rivertype.JobRow, err error) *ErrorHandlerResult { panic("error handled panicked!") } @@ -510,7 +512,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { require.Equal(t, "panic val", panicVal) return nil } @@ -531,7 +533,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { return &ErrorHandlerResult{SetCancelled: true} } @@ -551,7 +553,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) - bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *JobRow, panicVal any) *ErrorHandlerResult { + bundle.errorHandler.HandlePanicFunc = func(ctx context.Context, job *rivertype.JobRow, panicVal any) *ErrorHandlerResult { panic("panic handler panicked!") } diff --git a/job_test.go b/job_test.go index aefe3fca..392685d0 100644 --- a/job_test.go +++ b/job_test.go @@ -5,6 +5,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivertype" ) func TestJobUniqueOpts_isEmpty(t *testing.T) { @@ -14,7 +16,7 @@ func TestJobUniqueOpts_isEmpty(t *testing.T) { require.False(t, (&UniqueOpts{ByArgs: true}).isEmpty()) require.False(t, (&UniqueOpts{ByPeriod: 1 * time.Nanosecond}).isEmpty()) require.False(t, (&UniqueOpts{ByQueue: true}).isEmpty()) - require.False(t, (&UniqueOpts{ByState: []JobState{JobStateAvailable}}).isEmpty()) + require.False(t, (&UniqueOpts{ByState: []rivertype.JobState{JobStateAvailable}}).isEmpty()) } func TestJobUniqueOpts_validate(t *testing.T) { @@ -25,9 +27,9 @@ func TestJobUniqueOpts_validate(t *testing.T) { ByArgs: true, ByPeriod: 1 * time.Second, ByQueue: true, - ByState: []JobState{JobStateAvailable}, + ByState: []rivertype.JobState{JobStateAvailable}, }).validate()) require.EqualError(t, (&UniqueOpts{ByPeriod: 1 * time.Millisecond}).validate(), "JobUniqueOpts.ByPeriod should not be less than 1 second") - require.EqualError(t, (&UniqueOpts{ByState: []JobState{JobState("invalid")}}).validate(), `JobUniqueOpts.ByState contains invalid state "invalid"`) + require.EqualError(t, (&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobState("invalid")}}).validate(), `JobUniqueOpts.ByState contains invalid state "invalid"`) } diff --git a/producer.go b/producer.go index 82cd2a9c..8047b745 100644 --- a/producer.go +++ b/producer.go @@ -11,10 +11,13 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/componentstatus" "github.com/riverqueue/river/internal/dbadapter" + "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/util/chanutil" "github.com/riverqueue/river/internal/util/sliceutil" + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" ) type producerConfig struct { @@ -61,7 +64,7 @@ type producer struct { // Receives completed jobs from workers. Written by completed workers, only // read from main goroutine. - jobResultCh chan *JobRow + jobResultCh chan *rivertype.JobRow jobTimeout time.Duration @@ -115,7 +118,7 @@ func newProducer(archetype *baseservice.Archetype, adapter dbadapter.Adapter, co completer: completer, config: config, errorHandler: config.ErrorHandler, - jobResultCh: make(chan *JobRow, config.MaxWorkerCount), + jobResultCh: make(chan *rivertype.JobRow, config.MaxWorkerCount), jobTimeout: config.JobTimeout, retryPolicy: config.RetryPolicy, workers: config.Workers, @@ -269,7 +272,7 @@ func (p *producer) dispatchWork(count int32, jobsFetchedCh chan<- producerFetchR jobsFetchedCh <- producerFetchResult{err: err} return } - jobs := sliceutil.Map(internalJobs, jobRowFromInternal) + jobs := sliceutil.Map(internalJobs, dbsqlc.JobRowFromInternal) jobsFetchedCh <- producerFetchResult{jobs: jobs} } @@ -292,11 +295,11 @@ func (p *producer) heartbeatLogLoop(ctx context.Context) { } } -func (p *producer) startNewExecutors(workCtx context.Context, jobs []*JobRow) { +func (p *producer) startNewExecutors(workCtx context.Context, jobs []*rivertype.JobRow) { for _, job := range jobs { workInfo, ok := p.workers.workersMap[job.Kind] - var workUnit workUnit + var workUnit workunit.WorkUnit if ok { workUnit = workInfo.workUnitFactory.MakeUnit(job) } @@ -328,11 +331,11 @@ func (p *producer) maxJobsToFetch() int32 { return int32(p.config.MaxWorkerCount) - p.numJobsActive.Load() } -func (p *producer) handleWorkerDone(job *JobRow) { +func (p *producer) handleWorkerDone(job *rivertype.JobRow) { p.jobResultCh <- job } type producerFetchResult struct { - jobs []*JobRow + jobs []*rivertype.JobRow err error } diff --git a/retry_policy.go b/retry_policy.go index f59dea8b..bcdcb5fb 100644 --- a/retry_policy.go +++ b/retry_policy.go @@ -8,6 +8,7 @@ import ( "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/internal/util/timeutil" + "github.com/riverqueue/river/rivertype" ) // ClientRetryPolicy is an interface that can be implemented to provide a retry @@ -23,7 +24,7 @@ type ClientRetryPolicy interface { // given when it was last attempted and its number of attempts, or any other // of the job's properties a user-configured retry policy might want to // consider. - NextRetry(job *JobRow) time.Time + NextRetry(job *rivertype.JobRow) time.Time } // River's default retry policy. @@ -42,7 +43,7 @@ type DefaultClientRetryPolicy struct { // used instead of the attempt count. This means that snoozing a job (even // repeatedly) will not lead to a future error having a longer than expected // retry delay. -func (p *DefaultClientRetryPolicy) NextRetry(job *JobRow) time.Time { +func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { // For the purposes of calculating the backoff, we can look solely at the // number of errors. If we were to use the raw attempt count, this would be // incemented and influenced by snoozes. However the use case for snoozing is diff --git a/retry_policy_test.go b/retry_policy_test.go index a56f9183..54862298 100644 --- a/retry_policy_test.go +++ b/retry_policy_test.go @@ -10,6 +10,7 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/timeutil" + "github.com/riverqueue/river/rivertype" ) // Just proves that DefaultRetryPolicy implements the RetryPolicy interface. @@ -25,10 +26,10 @@ func TestDefaultClientRetryPolicy_NextRetry(t *testing.T) { retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt) allowedDelta := timeutil.SecondsAsDuration(retrySecondsWithoutJitter * 0.2) - nextRetryAt := retryPolicy.NextRetry(&JobRow{ + nextRetryAt := retryPolicy.NextRetry(&rivertype.JobRow{ Attempt: attempt, AttemptedAt: &now, - Errors: make([]AttemptError, attempt-1), + Errors: make([]rivertype.AttemptError, attempt-1), }) require.WithinDuration(t, now.Add(timeutil.SecondsAsDuration(retrySecondsWithoutJitter)), nextRetryAt, allowedDelta) } diff --git a/rivertest/rivertest.go b/rivertest/rivertest.go index 100f8c12..8f63dc3f 100644 --- a/rivertest/rivertest.go +++ b/rivertest/rivertest.go @@ -17,6 +17,7 @@ import ( "github.com/riverqueue/river/internal/dbsqlc" "github.com/riverqueue/river/internal/util/sliceutil" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" ) // dbtx is a database-like executor which is implemented by all of pgxpool.Pool, @@ -72,7 +73,7 @@ type RequireInsertedOpts struct { // State is the expected state of the inserted job. // // No assertion is made if left the zero value. - State river.JobState + State rivertype.JobState // Tags are the expected tags of the inserted job. // @@ -160,7 +161,7 @@ func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo return nil, nil //nolint:nilnil } - jobRow := jobRowFromInternal(dbJobs[0]) + jobRow := dbsqlc.JobRowFromInternal(dbJobs[0]) var actualArgs TArgs if err := json.Unmarshal(jobRow.EncodedArgs, &actualArgs); err != nil { @@ -207,12 +208,12 @@ type ExpectedJob struct { // the number specified, and will fail in case this expectation isn't met. So if // a job of a certain kind is emitted multiple times, it must be expected // multiple times. -func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow { +func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow { tb.Helper() return requireManyInserted(ctx, tb, driver, expectedJobs) } -func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*river.JobRow { +func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow { actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetDBPool(), expectedJobs) if err != nil { failure(t, "Internal failure: %s", err) @@ -240,14 +241,14 @@ func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.C // the number specified, and will fail in case this expectation isn't met. So if // a job of a certain kind is emitted multiple times, it must be expected // multiple times. -func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow { +func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow { tb.Helper() return requireManyInsertedTx[TDriver](ctx, tb, tx, expectedJobs) } // Internal function used by the tests so that the exported version can take // `testing.TB` instead of `testing.T`. -func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*river.JobRow { +func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow { var driver TDriver actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapTx(tx), expectedJobs) if err != nil { @@ -256,7 +257,7 @@ func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context return actualArgs } -func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, db dbtx, expectedJobs []ExpectedJob) ([]*river.JobRow, error) { +func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, db dbtx, expectedJobs []ExpectedJob) ([]*rivertype.JobRow, error) { queries := dbsqlc.New() expectedArgsKinds := sliceutil.Map(expectedJobs, func(j ExpectedJob) string { return j.Args.Kind() }) @@ -275,7 +276,7 @@ func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx contex return nil, nil } - jobRows := sliceutil.Map(dbJobs, jobRowFromInternal) + jobRows := sliceutil.Map(dbJobs, dbsqlc.JobRowFromInternal) for i, jobRow := range jobRows { if expectedJobs[i].Opts != nil { @@ -290,7 +291,7 @@ func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx contex const rfc3339Micro = "2006-01-02T15:04:05.999999Z07:00" -func compareJobToInsertOpts(t testingT, jobRow *river.JobRow, expectedOpts RequireInsertedOpts, index int) bool { +func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts RequireInsertedOpts, index int) bool { // Adds an index position for the case of multiple expected jobs. Wrapped in // a function so that the string is only marshaled if needed. positionStr := func() string { @@ -358,47 +359,3 @@ func failure(t testingT, format string, a ...any) { func failureString(format string, a ...any) string { return "\n River assertion failure:\n " + fmt.Sprintf(format, a...) + "\n" } - -// WARNING!!!!! -// -// !!! When updating this function, the equivalent in `./job.go` must also be -// updated!!! -// -// This is obviously not ideal, but since JobRow is at the top-level package, -// there's no way to put a helper in a shared package that can produce one, -// which is why we have this copy/pasta. There are some potential alternatives -// to this, but none of them are great. -func jobRowFromInternal(internal *dbsqlc.RiverJob) *river.JobRow { - tags := internal.Tags - if tags == nil { - tags = []string{} - } - return &river.JobRow{ - ID: internal.ID, - Attempt: max(int(internal.Attempt), 0), - AttemptedAt: internal.AttemptedAt, - AttemptedBy: internal.AttemptedBy, - CreatedAt: internal.CreatedAt, - EncodedArgs: internal.Args, - Errors: sliceutil.Map(internal.Errors, func(e dbsqlc.AttemptError) river.AttemptError { return attemptErrorFromInternal(&e) }), - FinalizedAt: internal.FinalizedAt, - Kind: internal.Kind, - MaxAttempts: max(int(internal.MaxAttempts), 0), - Priority: max(int(internal.Priority), 0), - Queue: internal.Queue, - ScheduledAt: internal.ScheduledAt.UTC(), // TODO(brandur): Very weird this is the only place a UTC conversion happens. - State: river.JobState(internal.State), - Tags: tags, - - // metadata: internal.Metadata, - } -} - -func attemptErrorFromInternal(e *dbsqlc.AttemptError) river.AttemptError { - return river.AttemptError{ - At: e.At, - Error: e.Error, - Num: int(e.Num), - Trace: e.Trace, - } -} diff --git a/rivertype/job_row.go b/rivertype/job_row.go new file mode 100644 index 00000000..a4d9bfb1 --- /dev/null +++ b/rivertype/job_row.go @@ -0,0 +1,116 @@ +// Package rivertype stores some of the lowest level River primitives so they +// can be shared amongst a number of packages including the top-level river +// package, database drivers, and internal utilities. +package rivertype + +import ( + "time" +) + +// JobRow contains the properties of a job that are persisted to the database. +// Use of `Job[T]` will generally be preferred in user-facing code like worker +// interfaces. +type JobRow struct { + // ID of the job. Generated as part of a Postgres sequence and generally + // ascending in nature, but there may be gaps in it as transactions roll + // back. + ID int64 + + // Attempt is the attempt number of the job. Jobs are inserted at 0, the + // number is incremented to 1 the first time work its worked, and may + // increment further if it's either snoozed or errors. + Attempt int + + // AttemptedAt is the time that the job was last worked. Starts out as `nil` + // on a new insert. + AttemptedAt *time.Time + + // AttemptedBy is the set of worker IDs that have worked this job. A worker + // ID differs between different programs, but is shared by all executors + // within any given one. (i.e. Different Go processes have different IDs, + // but IDs are shared within any given process.) A process generates a new + // ULID (an ordered UUID) worker ID when it starts up. + AttemptedBy []string + + // CreatedAt is when the job record was created. + CreatedAt time.Time + + // EncodedArgs is the job's JobArgs encoded as JSON. + EncodedArgs []byte + + // Errors is a set of errors that occurred when the job was worked, one for + // each attempt. Ordered from earliest error to the latest error. + Errors []AttemptError + + // FinalizedAt is the time at which the job was "finalized", meaning it was + // either completed successfully or errored for the last time such that + // it'll no longer be retried. + FinalizedAt *time.Time + + // Kind uniquely identifies the type of job and instructs which worker + // should work it. It is set at insertion time via `Kind()` on the + // `JobArgs`. + Kind string + + // MaxAttempts is the maximum number of attempts that the job will be tried + // before it errors for the last time and will no longer be worked. + // + // Extracted (in order of precedence) from job-specific InsertOpts + // on Insert, from the worker level InsertOpts from JobArgsWithInsertOpts, + // or from a client's default value. + MaxAttempts int + + // Priority is the priority of the job, with 1 being the highest priority and + // 4 being the lowest. When fetching available jobs to work, the highest + // priority jobs will always be fetched before any lower priority jobs are + // fetched. Note that if your workers are swamped with more high-priority jobs + // then they can handle, lower priority jobs may not be fetched. + Priority int + + // Queue is the name of the queue where the job will be worked. Queues can + // be configured independently and be used to isolate jobs. + // + // Extracted from either specific InsertOpts on Insert, or InsertOpts from + // JobArgsWithInsertOpts, or a client's default value. + Queue string + + // ScheduledAt is when the job is scheduled to become available to be + // worked. Jobs default to running immediately, but may be scheduled + // for the future when they're inserted. They may also be scheduled for + // later because they were snoozed or because they errored and have + // additional retry attempts remaining. + ScheduledAt time.Time + + // State is the state of job like `available` or `completed`. Jobs are + // `available` when they're first inserted. + State JobState + + // Tags are an arbitrary list of keywords to add to the job. They have no + // functional behavior and are meant entirely as a user-specified construct + // to help group and categorize jobs. + Tags []string + + // metadata is a field that'll eventually be used to store arbitrary data on + // a job for flexible use and use with plugins. It's currently unexported + // until we get a chance to more fully flesh out this feature. + // metadata []byte +} + +type JobState string + +const ( + JobStateAvailable JobState = "available" + JobStateCancelled JobState = "cancelled" + JobStateCompleted JobState = "completed" + JobStateDiscarded JobState = "discarded" + JobStateRetryable JobState = "retryable" + JobStateRunning JobState = "running" + JobStateScheduled JobState = "scheduled" +) + +type AttemptError struct { + At time.Time `json:"at"` + Error string `json:"error"` + Num int `json:"num"` + Trace string `json:"trace"` +} diff --git a/work_unit.go b/work_unit.go deleted file mode 100644 index e2a1d5ce..00000000 --- a/work_unit.go +++ /dev/null @@ -1,60 +0,0 @@ -package river - -import ( - "context" - "encoding/json" - "time" -) - -// workUnit provides an interface to a struct that wraps a job to be done -// combined with a work function that can execute it. Its main purpose is to -// wrap a struct that contains generic types (like a Worker[T] that needs to be -// invoked with a Job[T]) in such a way as to make it non-generic so that it can -// be used in other non-generic code like jobExecutor. -// -// Implemented by wrapperWorkUnit. -type workUnit interface { - NextRetry() time.Time - Timeout() time.Duration - UnmarshalJob() error - Work(ctx context.Context) error -} - -// workUnitFactory provides an interface to a struct that can generate a -// workUnit, a wrapper around a job to be done combined with a work function -// that can execute it. -// -// Implemented by workUnitFactoryWrapper. -type workUnitFactory interface { - // Make a workUnit, which wraps a job to be done and work function that can - // execute it. - MakeUnit(jobRow *JobRow) workUnit -} - -// workUnitFactoryWrapper wraps a Worker to implement workUnitFactory. -type workUnitFactoryWrapper[T JobArgs] struct { - worker Worker[T] -} - -func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *JobRow) workUnit { - return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker} -} - -// wrapperWorkUnit implements workUnit for a job and Worker. -type wrapperWorkUnit[T JobArgs] struct { - job *Job[T] // not set until after UnmarshalJob is invoked - jobRow *JobRow - worker Worker[T] -} - -func (w *wrapperWorkUnit[T]) NextRetry() time.Time { return w.worker.NextRetry(w.job) } -func (w *wrapperWorkUnit[T]) Timeout() time.Duration { return w.worker.Timeout(w.job) } -func (w *wrapperWorkUnit[T]) Work(ctx context.Context) error { return w.worker.Work(ctx, w.job) } - -func (w *wrapperWorkUnit[T]) UnmarshalJob() error { - w.job = &Job[T]{ - JobRow: w.jobRow, - } - - return json.Unmarshal(w.jobRow.EncodedArgs, &w.job.Args) -} diff --git a/work_unit_wrapper.go b/work_unit_wrapper.go new file mode 100644 index 00000000..9e741ee7 --- /dev/null +++ b/work_unit_wrapper.go @@ -0,0 +1,38 @@ +package river + +import ( + "context" + "encoding/json" + "time" + + "github.com/riverqueue/river/internal/workunit" + "github.com/riverqueue/river/rivertype" +) + +// workUnitFactoryWrapper wraps a Worker to implement workUnitFactory. +type workUnitFactoryWrapper[T JobArgs] struct { + worker Worker[T] +} + +func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit { + return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker} +} + +// wrapperWorkUnit implements workUnit for a job and Worker. +type wrapperWorkUnit[T JobArgs] struct { + job *Job[T] // not set until after UnmarshalJob is invoked + jobRow *rivertype.JobRow + worker Worker[T] +} + +func (w *wrapperWorkUnit[T]) NextRetry() time.Time { return w.worker.NextRetry(w.job) } +func (w *wrapperWorkUnit[T]) Timeout() time.Duration { return w.worker.Timeout(w.job) } +func (w *wrapperWorkUnit[T]) Work(ctx context.Context) error { return w.worker.Work(ctx, w.job) } + +func (w *wrapperWorkUnit[T]) UnmarshalJob() error { + w.job = &Job[T]{ + JobRow: w.jobRow, + } + + return json.Unmarshal(w.jobRow.EncodedArgs, &w.job.Args) +} diff --git a/worker.go b/worker.go index 71277b10..185cf9c0 100644 --- a/worker.go +++ b/worker.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "time" + + "github.com/riverqueue/river/internal/workunit" ) // Worker is an interface that can perform a job with args of type T. A typical @@ -122,7 +124,7 @@ type Workers struct { // in a Workers bundle. type workerInfo struct { jobArgs JobArgs - workUnitFactory workUnitFactory + workUnitFactory workunit.WorkUnitFactory } // NewWorkers initializes a new registry of available job workers. @@ -135,7 +137,7 @@ func NewWorkers() *Workers { } } -func (w Workers) add(jobArgs JobArgs, workUnitFactory workUnitFactory) error { +func (w Workers) add(jobArgs JobArgs, workUnitFactory workunit.WorkUnitFactory) error { kind := jobArgs.Kind() if _, ok := w.workersMap[kind]; ok {