Skip to content

Commit

Permalink
Demonstrate internal job row type using struct conversion
Browse files Browse the repository at this point in the history
I wanted to throw this one up as a possible alternative to a public
facing `rivertype` module in #36, in which we demonstrate an alternate
strategy using struct conversions. The problems it aims to solve are the
same:

* A way to access core job types from within subpackages, allowing more
  modularity (e.g. `Rescuer` could live with other maintenance services).

* Long term, a way of giving drivers access to job primitives without
  having to reference the top-level `river`.

The advantage of this approach is there's fewer user-facing changes, and
the user never has to interact with an alternate package.

It works by doing a struct conversion as necessary when moving from an
internal package to one that bubbles up to `river`. Go allows pure
struct conversions as long as their fields are the same, but there are
some limitations:

* Substructs like `AttemptError` can't be converted, even if identical.

* Custom types like `JobState` can't be converted, even if identical.

We work around these issues by:

* Keeping errors as a `[][]byte` instead of it being unmarshaled by pgx.
  A helper called `ErrorsUnmarshaled()` is added to the top-level
  `JobRow` to give easy access to unmarshaled errors.

* `State` is changed to a basic string akin to functions in `http`. This
  is technically a little less type-safe, but practically speaking would
  make very little difference because all these string-based types can
  be converted to and from each other with no checks anyway. Also, users
  aren't expected to access job state all that often. When they do, they
  can still use `job.State == river.JobStateAvailable` because the
  constants have been made available as strings.

I think this approach does also work reasonably well, although I worry a
little bit that there may be some sharp edge in the future that ends up
limiting us in some unexpected way. Also, having to use helpers like
`ErrorsUnmarshaled()` is definitely a little bit worse. That said, ~no
user-facing API changes, and has some marginal performance benefits:

* Converting structs between each other should be ~free.

* Errors probably aren't used that much, so by not unmarshaling them
  most of the time, we save on a lot of unnecessary JSON parsing.
  • Loading branch information
brandur committed Nov 18, 2023
1 parent 37b4a72 commit 61fbf70
Show file tree
Hide file tree
Showing 17 changed files with 200 additions and 143 deletions.
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func (c *Client[TTx]) distributeJobCompleterCallback(update jobcompleter.Complet
c.statsNumJobs++
}()

c.distributeJob(jobRowFromInternal(update.Job), jobStatisticsFromInternal(update.JobStats))
c.distributeJob((*JobRow)(dbsqlc.JobRowFromInternal(update.Job)), jobStatisticsFromInternal(update.JobStats))
}

// Dump aggregate stats from job completions to logs periodically. These
Expand Down Expand Up @@ -963,7 +963,7 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbad
insertParams.UniqueByArgs = uniqueOpts.ByArgs
insertParams.UniqueByQueue = uniqueOpts.ByQueue
insertParams.UniqueByPeriod = uniqueOpts.ByPeriod
insertParams.UniqueByState = sliceutil.Map(uniqueOpts.ByState, func(s JobState) dbsqlc.JobState { return dbsqlc.JobState(s) })
insertParams.UniqueByState = sliceutil.Map(uniqueOpts.ByState, func(s string) dbsqlc.JobState { return dbsqlc.JobState(s) })
}

if !insertOpts.ScheduledAt.IsZero() {
Expand Down Expand Up @@ -1005,7 +1005,7 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
return nil, err
}

return jobRowFromInternal(res.Job), nil
return (*JobRow)(dbsqlc.JobRowFromInternal(res.Job)), nil
}

// InsertTx inserts a new job with the provided args on the given transaction.
Expand Down Expand Up @@ -1037,7 +1037,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *
return nil, err
}

return jobRowFromInternal(res.Job), nil
return (*JobRow)(dbsqlc.JobRowFromInternal(res.Job)), nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
// how it would've looked after being run through the queue.
originalJob.Attempt += 1

expectedNextScheduledAt := client.config.RetryPolicy.NextRetry(jobRowFromInternal(originalJob))
expectedNextScheduledAt := client.config.RetryPolicy.NextRetry((*JobRow)(dbsqlc.JobRowFromInternal(originalJob)))

t.Logf("Attempt number %d scheduled %v from original `attempted_at`",
originalJob.Attempt, finishedJob.ScheduledAt.Sub(*originalJob.AttemptedAt))
Expand Down Expand Up @@ -2092,7 +2092,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {
require.Equal(insertRes.Job.ID, event.Job.ID)
require.Equal(insertRes.Job.Kind, "RandomWorkerNameThatIsNeverRegistered")
require.Len(event.Job.Errors, 1)
require.Equal((&UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"}).Error(), event.Job.Errors[0].Error)
require.Equal((&UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"}).Error(), event.Job.ErrorsUnmarshaled()[0].Error)
require.Equal(JobStateRetryable, event.Job.State)
// Ensure that ScheduledAt was updated with next run time:
require.True(event.Job.ScheduledAt.After(insertRes.Job.ScheduledAt))
Expand Down Expand Up @@ -2768,7 +2768,7 @@ func TestInsert(t *testing.T) {
require.Equal(JobStateScheduled, insertedJob.State)
require.Equal("noOp", insertedJob.Kind)
// default state:
require.Equal([]byte("{}"), insertedJob.metadata)
// require.Equal([]byte("{}"), insertedJob.metadata)
},
},
{
Expand All @@ -2790,7 +2790,7 @@ func TestInsert(t *testing.T) {
// Default comes from database now(), and we can't know the exact value:
require.WithinDuration(time.Now(), insertedJob.ScheduledAt, 2*time.Second)
require.Equal([]string{}, insertedJob.Tags)
require.Equal([]byte("{}"), insertedJob.metadata)
// require.Equal([]byte("{}"), insertedJob.metadata)
},
},
}
Expand Down Expand Up @@ -2872,7 +2872,7 @@ func TestUniqueOpts(t *testing.T) {

uniqueOpts := UniqueOpts{
ByPeriod: 24 * time.Hour,
ByState: []JobState{JobStateAvailable, JobStateCompleted},
ByState: []string{JobStateAvailable, JobStateCompleted},
}

job0, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
Expand Down
4 changes: 2 additions & 2 deletions insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ type UniqueOpts struct {
// Unlike other unique options, ByState gets a default when it's not set for
// user convenience. The default is equivalent to:
//
// ByState: []river.JobState{river.JobStateAvailable, river.JobStateCompleted, river.JobStateRunning, river.JobStateRetryable, river.JobStateScheduled}
// ByState: []string{river.JobStateAvailable, river.JobStateCompleted, river.JobStateRunning, river.JobStateRetryable, river.JobStateScheduled}
//
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
// uniqueness of a new insert.
ByState []JobState
ByState []string
}

// isEmpty returns true for an empty, uninitialized options struct.
Expand Down
10 changes: 5 additions & 5 deletions internal/dbadapter/db_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,11 @@ func Test_StandardAdapter_JobSetErroredIfRunning(t *testing.T) {
require.Equal(t, dbsqlc.JobStateRetryable, j.State)

// validate error payload:
require.Len(t, jAfter.Errors, 1)
require.Equal(t, bundle.baselineTime.UTC(), jAfter.Errors[0].At)
require.Equal(t, uint16(1), jAfter.Errors[0].Num)
require.Equal(t, "fake error", jAfter.Errors[0].Error)
require.Equal(t, "foo.go:123\nbar.go:456", jAfter.Errors[0].Trace)
// require.Len(t, jAfter.Errors, 1)
// require.Equal(t, bundle.baselineTime.UTC(), jAfter.Errors[0].At)
// require.Equal(t, uint16(1), jAfter.Errors[0].Num)
// require.Equal(t, "fake error", jAfter.Errors[0].Error)
// require.Equal(t, "foo.go:123\nbar.go:456", jAfter.Errors[0].Trace)
})

t.Run("DoesNotTouchAlreadyRetryableJob", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions internal/dbsqlc/river_job_ext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dbsqlc

import (
"github.com/riverqueue/river/internal/rivertype"
)

func JobRowFromInternal(internal *RiverJob) *rivertype.JobRow {
tags := internal.Tags
if tags == nil {
tags = []string{}
}
return &rivertype.JobRow{
ID: internal.ID,
Attempt: max(int(internal.Attempt), 0),
AttemptedAt: internal.AttemptedAt,
AttemptedBy: internal.AttemptedBy,
CreatedAt: internal.CreatedAt,
EncodedArgs: internal.Args,
Errors: internal.Errors,
FinalizedAt: internal.FinalizedAt,
Kind: internal.Kind,
MaxAttempts: max(int(internal.MaxAttempts), 0),
Priority: max(int(internal.Priority), 0),
Queue: internal.Queue,
ScheduledAt: internal.ScheduledAt.UTC(), // TODO(brandur): Very weird this is the only place a UTC conversion happens.
State: string(internal.State),
Tags: tags,

// metadata: internal.Metadata,
}
}
5 changes: 0 additions & 5 deletions internal/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,3 @@ sql:
type: "time.Time"
pointer: true
nullable: true

# specific columns
- column: "river_job.errors"
go_type:
type: "[]AttemptError"
92 changes: 92 additions & 0 deletions internal/rivertype/river_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rivertype

import "time"

// JobRow contains the properties of a job that are persisted to the database.
// Use of `Job[T]` will generally be preferred in user-facing code like worker
// interfaces.
type JobRow struct {
// ID of the job. Generated as part of a Postgres sequence and generally
// ascending in nature, but there may be gaps in it as transactions roll
// back.
ID int64

// Attempt is the attempt number of the job. Jobs are inserted at 0, the
// number is incremented to 1 the first time work its worked, and may
// increment further if it's either snoozed or errors.
Attempt int

// AttemptedAt is the time that the job was last worked. Starts out as `nil`
// on a new insert.
AttemptedAt *time.Time

// AttemptedBy is the set of worker IDs that have worked this job. A worker
// ID differs between different programs, but is shared by all executors
// within any given one. (i.e. Different Go processes have different IDs,
// but IDs are shared within any given process.) A process generates a new
// ULID (an ordered UUID) worker ID when it starts up.
AttemptedBy []string

// CreatedAt is when the job record was created.
CreatedAt time.Time

// EncodedArgs is the job's JobArgs encoded as JSON.
EncodedArgs []byte

// Errors is a set of errors that occurred when the job was worked, one for
// each attempt. Ordered from earliest error to the latest error.
Errors [][]byte

// FinalizedAt is the time at which the job was "finalized", meaning it was
// either completed successfully or errored for the last time such that
// it'll no longer be retried.
FinalizedAt *time.Time

// Kind uniquely identifies the type of job and instructs which worker
// should work it. It is set at insertion time via `Kind()` on the
// `JobArgs`.
Kind string

// MaxAttempts is the maximum number of attempts that the job will be tried
// before it errors for the last time and will no longer be worked.
//
// Extracted (in order of precedence) from job-specific InsertOpts
// on Insert, from the worker level InsertOpts from JobArgsWithInsertOpts,
// or from a client's default value.
MaxAttempts int

// Priority is the priority of the job, with 1 being the highest priority and
// 4 being the lowest. When fetching available jobs to work, the highest
// priority jobs will always be fetched before any lower priority jobs are
// fetched. Note that if your workers are swamped with more high-priority jobs
// then they can handle, lower priority jobs may not be fetched.
Priority int

// Queue is the name of the queue where the job will be worked. Queues can
// be configured independently and be used to isolate jobs.
//
// Extracted from either specific InsertOpts on Insert, or InsertOpts from
// JobArgsWithInsertOpts, or a client's default value.
Queue string

// ScheduledAt is when the job is scheduled to become available to be
// worked. Jobs default to running immediately, but may be scheduled
// for the future when they're inserted. They may also be scheduled for
// later because they were snoozed or because they errored and have
// additional retry attempts remaining.
ScheduledAt time.Time

// State is the state of job like `available` or `completed`. Jobs are
// `available` when they're first inserted.
State string

// Tags are an arbitrary list of keywords to add to the job. They have no
// functional behavior and are meant entirely as a user-specified construct
// to help group and categorize jobs.
Tags []string

// metadata is a field that'll eventually be used to store arbitrary data on
// a job for flexible use and use with plugins. It's currently unexported
// until we get a chance to more fully flesh out this feature.
// metadata []byte
}
76 changes: 20 additions & 56 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/riverqueue/river/internal/dbsqlc"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
)

Expand Down Expand Up @@ -69,7 +68,7 @@ type JobRow struct {

// Errors is a set of errors that occurred when the job was worked, one for
// each attempt. Ordered from earliest error to the latest error.
Errors []AttemptError
Errors [][]byte

// FinalizedAt is the time at which the job was "finalized", meaning it was
// either completed successfully or errored for the last time such that
Expand Down Expand Up @@ -112,7 +111,7 @@ type JobRow struct {

// State is the state of job like `available` or `completed`. Jobs are
// `available` when they're first inserted.
State JobState
State string

// Tags are an arbitrary list of keywords to add to the job. They have no
// functional behavior and are meant entirely as a user-specified construct
Expand All @@ -122,42 +121,18 @@ type JobRow struct {
// metadata is a field that'll eventually be used to store arbitrary data on
// a job for flexible use and use with plugins. It's currently unexported
// until we get a chance to more fully flesh out this feature.
metadata []byte
// metadata []byte
}

// WARNING!!!!!
//
// !!! When updating this function, the equivalent in `./rivertest/rivertest.go`
// must also be updated!!!
//
// This is obviously not ideal, but since JobRow is at the top-level package,
// there's no way to put a helper in a shared package that can produce one,
// which is why we have this copy/pasta. There are some potential alternatives
// to this, but none of them are great.
func jobRowFromInternal(internal *dbsqlc.RiverJob) *JobRow {
tags := internal.Tags
if tags == nil {
tags = []string{}
}
return &JobRow{
ID: internal.ID,
Attempt: max(int(internal.Attempt), 0),
AttemptedAt: internal.AttemptedAt,
AttemptedBy: internal.AttemptedBy,
CreatedAt: internal.CreatedAt,
EncodedArgs: internal.Args,
Errors: sliceutil.Map(internal.Errors, func(e dbsqlc.AttemptError) AttemptError { return attemptErrorFromInternal(&e) }),
FinalizedAt: internal.FinalizedAt,
Kind: internal.Kind,
MaxAttempts: max(int(internal.MaxAttempts), 0),
Priority: max(int(internal.Priority), 0),
Queue: internal.Queue,
ScheduledAt: internal.ScheduledAt.UTC(), // TODO(brandur): Very weird this is the only place a UTC conversion happens.
State: JobState(internal.State),
Tags: tags,

metadata: internal.Metadata,
func (j *JobRow) ErrorsUnmarshaled() []AttemptError {
errors := make([]AttemptError, len(j.Errors))
for i, errorData := range j.Errors {
if err := json.Unmarshal(errorData, &errors[i]); err != nil {
// Assume that JSON was well-formatted if it was persisted to Postgres.
panic(err)
}
}
return errors
}

// JobCompleteTx marks the job as completed as part of transaction tx. If tx is
Expand Down Expand Up @@ -188,7 +163,7 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx
return nil, err
}

updatedJob := &Job[TArgs]{JobRow: jobRowFromInternal(internal)}
updatedJob := &Job[TArgs]{JobRow: (*JobRow)(dbsqlc.JobRowFromInternal(internal))}

if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil {
return nil, err
Expand All @@ -197,19 +172,17 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx
return updatedJob, nil
}

type JobState string

const (
JobStateAvailable JobState = JobState(dbsqlc.JobStateAvailable)
JobStateCancelled JobState = JobState(dbsqlc.JobStateCancelled)
JobStateCompleted JobState = JobState(dbsqlc.JobStateCompleted)
JobStateDiscarded JobState = JobState(dbsqlc.JobStateDiscarded)
JobStateRetryable JobState = JobState(dbsqlc.JobStateRetryable)
JobStateRunning JobState = JobState(dbsqlc.JobStateRunning)
JobStateScheduled JobState = JobState(dbsqlc.JobStateScheduled)
JobStateAvailable = string(dbsqlc.JobStateAvailable)
JobStateCancelled = string(dbsqlc.JobStateCancelled)
JobStateCompleted = string(dbsqlc.JobStateCompleted)
JobStateDiscarded = string(dbsqlc.JobStateDiscarded)
JobStateRetryable = string(dbsqlc.JobStateRetryable)
JobStateRunning = string(dbsqlc.JobStateRunning)
JobStateScheduled = string(dbsqlc.JobStateScheduled)
)

var jobStateAll = []JobState{ //nolint:gochecknoglobals
var jobStateAll = []string{ //nolint:gochecknoglobals
JobStateAvailable,
JobStateCancelled,
JobStateCompleted,
Expand All @@ -225,12 +198,3 @@ type AttemptError struct {
Num int `json:"num"`
Trace string `json:"trace"`
}

func attemptErrorFromInternal(e *dbsqlc.AttemptError) AttemptError {
return AttemptError{
At: e.At,
Error: e.Error,
Num: int(e.Num),
Trace: e.Trace,
}
}
Loading

0 comments on commit 61fbf70

Please sign in to comment.