Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add middleware system for jobs #584

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 70 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type Config struct {
// If in doubt, leave this property empty.
ID string

// JobMiddleware are optional functions that can be called around different
// parts of each job's lifecycle.
JobMiddleware []rivertype.JobMiddleware

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -459,6 +463,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
FetchCooldown: valutil.ValOrDefault(config.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: valutil.ValOrDefault(config.FetchPollInterval, FetchPollIntervalDefault),
ID: valutil.ValOrDefaultFunc(config.ID, func() string { return defaultClientID(time.Now().UTC()) }),
JobMiddleware: config.JobMiddleware,
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: valutil.ValOrDefault(config.MaxAttempts, MaxAttemptsDefault),
Expand Down Expand Up @@ -1150,7 +1155,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
Copy link
Contributor Author

@brandur brandur Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So not 100% sure on this one yet, but the "insert" part of the middleware needs to receive a type that's not a JobRow because we don't have a job row yet. I basically took JobInsertParams, duplicated it, and promoted it to rivertype. The types are different for now, but they can be type converted to one another because they have the same fields. I basically did this because I like the naming of JobInsertParams, but they could also be the same type or even slightly different types with a couple fields dropped (CreatedAt for example, which is only needed for time injection).

Either way, JobInsertParams stays internal for now, so it should leave refactoring flexibility ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about pulling arg encoding out of this helper and passing the middleware functions a type that includes raw unencoded JobArgs? My thought is that this unlocks more dynamic behavior, because middleware then have the ability to do type assertions against JobArgs including to assert interface implementations.

The downside is they lose the ability to directly access the encoded json bytes, but then I'm not sure I know of any cases where that's desirable. For metadata, sure, but not for args.

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
return nil, nil, fmt.Errorf("error marshaling args to JSON: %w", err)
Expand Down Expand Up @@ -1212,12 +1217,12 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
metadata = []byte("{}")
}

insertParams := &riverdriver.JobInsertFastParams{
insertParams := &rivertype.JobInsertParams{
CreatedAt: createdAt,
EncodedArgs: json.RawMessage(encodedArgs),
EncodedArgs: encodedArgs,
Kind: args.Kind(),
MaxAttempts: maxAttempts,
Metadata: json.RawMessage(metadata),
Metadata: metadata,
Priority: priority,
Queue: queue,
State: rivertype.JobStateAvailable,
Expand Down Expand Up @@ -1295,25 +1300,39 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

tx, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
doInner := func(ctx context.Context) (*rivertype.JobInsertResult, error) {
tx, err := exec.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
}
jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, (*riverdriver.JobInsertFastParams)(params), uniqueOpts)
if err != nil {
return nil, err
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
return nil, err
if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}

return jobInsertRes, nil
}
if err := tx.Commit(ctx); err != nil {
return nil, err

if len(c.config.JobMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(c.config.JobMiddleware) - 1; i >= 0; i-- {
doInner = func(ctx context.Context) (*rivertype.JobInsertResult, error) {
return c.config.JobMiddleware[i].Insert(ctx, params, doInner)
}
}
}

return jobInsertRes, nil
return doInner(ctx)
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1405,32 +1424,50 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM
return c.insertFastMany(ctx, exec, insertParams)
}

func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) {
inserted, err := tx.JobInsertFastMany(ctx, insertParams)
if err != nil {
return inserted, err
}
func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, manyParams []*rivertype.JobInsertParams) (int, error) {
doInner := func(ctx context.Context) (int, error) {
manyInsertParams := sliceutil.Map(manyParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams {
return (*riverdriver.JobInsertFastParams)(params)
bgentry marked this conversation as resolved.
Show resolved Hide resolved
})

queues := make([]string, 0, 10)
for _, params := range insertParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
inserted, err := tx.JobInsertFastMany(ctx, manyInsertParams)
if err != nil {
return inserted, err
}

queues := make([]string, 0, 10)
for _, params := range manyParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
}
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return 0, err
}
return inserted, nil
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return 0, err

if len(c.config.JobMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(c.config.JobMiddleware) - 1; i >= 0; i-- {
doInner = func(ctx context.Context) (int, error) {
return c.config.JobMiddleware[i].InsertMany(ctx, manyParams, doInner)
}
}
}
return inserted, nil

return doInner(ctx)
}

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype.JobInsertParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
insertParams := make([]*rivertype.JobInsertParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return nil, err
Expand Down Expand Up @@ -1558,6 +1595,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobMiddleware: c.config.JobMiddleware,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
// unknown job.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(t, err)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
_, err = client.driver.GetExecutor().JobInsertFast(ctx, (*riverdriver.JobInsertFastParams)(insertParams))
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, bundle.SubscribeChan)
Expand Down Expand Up @@ -4026,7 +4026,7 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, (*riverdriver.JobInsertFastParams)(insertParams))
require.NoError(err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
Expand Down
6 changes: 3 additions & 3 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (ts *PeriodicJobEnqueuerTestSignals) Init() {
// river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a
// subpackage.
type PeriodicJob struct {
ConstructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error)
ConstructorFunc func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error)
RunOnStart bool
ScheduleFunc func(time.Time) time.Time

Expand Down Expand Up @@ -409,7 +409,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
s.TestSignals.InsertedJobs.Signal(struct{}{})
}

func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, bool) {
func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, bool) {
insertParams, uniqueOpts, err := constructorFunc()
if err != nil {
if errors.Is(err, ErrNoJobToInsert) {
Expand All @@ -425,7 +425,7 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
insertParams.ScheduledAt = &scheduledAt
}

return insertParams, uniqueOpts, true
return (*riverdriver.JobInsertFastParams)(insertParams), uniqueOpts, true
}

const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour
Expand Down
10 changes: 5 additions & 5 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
waitChan chan (struct{})
}

jobConstructorWithQueueFunc := func(name string, unique bool, queue string) func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
return func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
return &riverdriver.JobInsertFastParams{
jobConstructorWithQueueFunc := func(name string, unique bool, queue string) func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
return func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
return &rivertype.JobInsertParams{
EncodedArgs: []byte("{}"),
Kind: name,
MaxAttempts: rivercommon.MaxAttemptsDefault,
Expand All @@ -46,7 +46,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
}
}

jobConstructorFunc := func(name string, unique bool) func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
jobConstructorFunc := func(name string, unique bool) func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
return jobConstructorWithQueueFunc(name, unique, rivercommon.QueueDefault)
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) {

svc.AddMany([]*PeriodicJob{
// skip this insert when it returns nil:
{ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
{ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
return nil, nil, ErrNoJobToInsert
}, RunOnStart: true},
})
Expand Down
4 changes: 2 additions & 2 deletions internal/maintenance/queue_maintainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/riverinternaltest/sharedtx"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/startstoptest"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivertype"
)

type testService struct {
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestQueueMaintainer(t *testing.T) {
NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{
PeriodicJobs: []*PeriodicJob{
{
ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
ConstructorFunc: func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
return nil, nil, ErrNoJobToInsert
},
ScheduleFunc: cron.Every(15 * time.Minute).Next,
Expand Down
27 changes: 22 additions & 5 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type jobExecutor struct {
ErrorHandler ErrorHandler
InformProducerDoneFunc func(jobRow *rivertype.JobRow)
JobRow *rivertype.JobRow
JobMiddleware []rivertype.JobMiddleware
SchedulerInterval time.Duration
WorkUnit workunit.WorkUnit

Expand Down Expand Up @@ -190,11 +191,11 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
return &jobExecutorResult{Err: &UnknownJobKindError{Kind: e.JobRow.Kind}}
}

if err := e.WorkUnit.UnmarshalJob(); err != nil {
return &jobExecutorResult{Err: err}
}
doInner := func(ctx context.Context) error {
if err := e.WorkUnit.UnmarshalJob(); err != nil {
return err
}

{
jobTimeout := e.WorkUnit.Timeout()
if jobTimeout == 0 {
jobTimeout = e.ClientJobTimeout
Expand All @@ -207,8 +208,24 @@ func (e *jobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
defer cancel()
}

return &jobExecutorResult{Err: e.WorkUnit.Work(ctx)}
if err := e.WorkUnit.Work(ctx); err != nil {
return err
}

return nil
}

if len(e.JobMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(e.JobMiddleware) - 1; i >= 0; i-- {
doInner = func(ctx context.Context) error {
return e.JobMiddleware[i].Work(ctx, e.JobRow, doInner)
}
}
}

return &jobExecutorResult{Err: doInner(ctx)}
}

func (e *jobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorResult) bool {
Expand Down
47 changes: 47 additions & 0 deletions middleware_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package river

import (
"context"

"github.com/riverqueue/river/rivertype"
)

// JobMiddlewareDefaults is an embeddable struct that provides default
// implementations for the rivertype.JobMiddleware. Use of this struct is
// recommended in case rivertype.JobMiddleware is expanded in the future so that
// existing code isn't unexpectedly broken during an upgrade.
type JobMiddlewareDefaults struct{}

func (l *JobMiddlewareDefaults) Insert(ctx context.Context, params *rivertype.JobInsertParams, doInner func(ctx context.Context) (*rivertype.JobInsertResult, error)) (*rivertype.JobInsertResult, error) {
return doInner(ctx)
}

func (l *JobMiddlewareDefaults) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) (int, error)) (int, error) {
return doInner(ctx)
}

func (l *JobMiddlewareDefaults) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
return doInner(ctx)
}
Comment on lines +15 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this to be the most useful it would need to be customizable on a per-job basis. I'm wondering what that looks like in practice with this design. Like what if I wanted to add a middleware that uses some aspect of the worker or args to dynamically determine what to do? (maybe some optional interface gets fulfilled by either of those types to indicate to the middleware what it should do).

The problem with trying to do that here is the args have already been encoded, so there's no longer any access to the underlying JobArgs type. Is there any path to potentially having the middleware stack get called before the JSON encoding part? That could more easily enable dynamic behavior based on the type.

Additionally, this might be further exposing the somewhat confusing split between JobArgs and Worker implementations. We had some recent customer feedback about it being a little weird that i.e. the timeout must be customized on the Worker and can't easily be tweaked at insertion time via the args. In this case though you mentioned potentially allowing for middleware to be configured at the JobArgs level, which seems fine for insert time but IMO doesn't make any sense for the Work() middleware. I don't want to have two separate middleware stacks/concepts, but it does feel a bit odd to have both of these on a single interface given the way this split is designed today 🤔

Copy link
Contributor

@bgentry bgentry Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up putting JobArgs into the struct in my uniqueness PR #590 and I think it's a great thing to have available. Can still encode args in advance but just keep the original around for introspection.


// func (l *JobLifecycleDefaults) InsertBegin(ctx context.Context, params *rivertype.JobLifecycleInsertParams) (context.Context, error) {
// return ctx, nil
// }

// func (l *JobLifecycleDefaults) InsertEnd(ctx context.Context, res *rivertype.JobInsertResult) error {
// return nil
// }

// func (l *JobLifecycleDefaults) InsertManyBegin(ctx context.Context, manyParams []*rivertype.JobLifecycleInsertParams) (context.Context, error) {
// return ctx, nil
// }

// func (l *JobLifecycleDefaults) InsertManyEnd(ctx context.Context, manyParams []*rivertype.JobLifecycleInsertParams) error {
// return nil
// }

// func (l *JobLifecycleDefaults) WorkBegin(ctx context.Context, job *rivertype.JobRow) (context.Context, error) {
// return ctx, nil
// }

// func (l *JobLifecycleDefaults) WorkEnd(ctx context.Context, job *rivertype.JobRow) error { return nil }
5 changes: 5 additions & 0 deletions middleware_defaults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package river

import "github.com/riverqueue/river/rivertype"

var _ rivertype.JobMiddleware = &JobMiddlewareDefaults{}
3 changes: 1 addition & 2 deletions periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -181,7 +180,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe
opts = periodicJob.opts
}
return &maintenance.PeriodicJob{
ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
ConstructorFunc: func() (*rivertype.JobInsertParams, *dbunique.UniqueOpts, error) {
args, options := periodicJob.constructorFunc()
if args == nil {
return nil, nil, maintenance.ErrNoJobToInsert
Expand Down
Loading
Loading