Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add middleware system for jobs #584

Closed
wants to merge 1 commit into from
Closed

Conversation

brandur
Copy link
Contributor

@brandur brandur commented Sep 9, 2024

Here, experiment with a middleware-like system that adds middleware
functions to job lifecycles, which results in them being invoked during
specific phases of a job like as it's being inserted or worked.

The most obvious unlock for this is telemetry (e.g. logging, metrics),
but it also acts as a building block for features like encrypted jobs.

Here, experiment with a middleware-like system that adds middleware
functions to job lifecycles, which results in them being invoked during
specific phases of a job like as it's being inserted or worked.

The most obvious unlock for this is telemetry (e.g. logging, metrics),
but it also acts as a building block for features like encrypted jobs.
//
// InsertBegin is *not* invoked on a batch insertion with InsertMany or
// InsertManyTx. Integrations should implement InsertManyBegin separately.
// InsertBegin(ctx context.Context, params *JobLifecycleInsertParams) (context.Context, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I originally tried to write this system as "hooks" instead of middleware, and you can see what the old entrypoints looked like here (left them in as gravestones for the time being). They almost worked, but I found that I was running into two major problems with them compared to middleware:

  • It'd be a common thing to want to be put something in context in the "begin" part, then extract that thing in the "end" part (e.g. for metrics or whatever). I accomplished that by returning a context here as you can see, but it was fairly awkward, and I don't think users would've like it.
  • The bigger issue is that it was hard to know how the "end" functions should be called (or not called) under various error conditions like a panic or error returned. We would've either need to send back parameters for all of (1) successful return, (2) possible error, (3) panic val, OR just have totally separate functions for when errors occurred, but both those options were extremely awkward. With middleware, the return values are right there, and it's up to the caller to just do whatever they want with them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I think this is probably the cleaner & more flexible option.

@@ -1150,7 +1155,7 @@ func (c *Client[TTx]) ID() string {
return c.config.ID
}

func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) {
Copy link
Contributor Author

@brandur brandur Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So not 100% sure on this one yet, but the "insert" part of the middleware needs to receive a type that's not a JobRow because we don't have a job row yet. I basically took JobInsertParams, duplicated it, and promoted it to rivertype. The types are different for now, but they can be type converted to one another because they have the same fields. I basically did this because I like the naming of JobInsertParams, but they could also be the same type or even slightly different types with a couple fields dropped (CreatedAt for example, which is only needed for time injection).

Either way, JobInsertParams stays internal for now, so it should leave refactoring flexibility ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about pulling arg encoding out of this helper and passing the middleware functions a type that includes raw unencoded JobArgs? My thought is that this unlocks more dynamic behavior, because middleware then have the ability to do type assertions against JobArgs including to assert interface implementations.

The downside is they lose the ability to directly access the encoded json bytes, but then I'm not sure I know of any cases where that's desirable. For metadata, sure, but not for args.

@brandur
Copy link
Contributor Author

brandur commented Sep 9, 2024

@bgentry Wanted to get a minimal POC out there just so I don't spend too much time on this in case there's backpressure, so didn't write tests or anything like that. Rough thoughts?

Copy link
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome progress. I had a few thoughts/concerns to consider whether this design may need some tweaks, but I'm generally good to move forward with it.

Comment on lines +15 to +25
func (l *JobMiddlewareDefaults) Insert(ctx context.Context, params *rivertype.JobInsertParams, doInner func(ctx context.Context) (*rivertype.JobInsertResult, error)) (*rivertype.JobInsertResult, error) {
return doInner(ctx)
}

func (l *JobMiddlewareDefaults) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) (int, error)) (int, error) {
return doInner(ctx)
}

func (l *JobMiddlewareDefaults) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
return doInner(ctx)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this to be the most useful it would need to be customizable on a per-job basis. I'm wondering what that looks like in practice with this design. Like what if I wanted to add a middleware that uses some aspect of the worker or args to dynamically determine what to do? (maybe some optional interface gets fulfilled by either of those types to indicate to the middleware what it should do).

The problem with trying to do that here is the args have already been encoded, so there's no longer any access to the underlying JobArgs type. Is there any path to potentially having the middleware stack get called before the JSON encoding part? That could more easily enable dynamic behavior based on the type.

Additionally, this might be further exposing the somewhat confusing split between JobArgs and Worker implementations. We had some recent customer feedback about it being a little weird that i.e. the timeout must be customized on the Worker and can't easily be tweaked at insertion time via the args. In this case though you mentioned potentially allowing for middleware to be configured at the JobArgs level, which seems fine for insert time but IMO doesn't make any sense for the Work() middleware. I don't want to have two separate middleware stacks/concepts, but it does feel a bit odd to have both of these on a single interface given the way this split is designed today 🤔

Copy link
Contributor

@bgentry bgentry Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up putting JobArgs into the struct in my uniqueness PR #590 and I think it's a great thing to have available. Can still encode args in advance but just keep the original around for introspection.

//
// InsertBegin is *not* invoked on a batch insertion with InsertMany or
// InsertManyTx. Integrations should implement InsertManyBegin separately.
// InsertBegin(ctx context.Context, params *JobLifecycleInsertParams) (context.Context, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I think this is probably the cleaner & more flexible option.

client.go Show resolved Hide resolved
@bgentry
Copy link
Contributor

bgentry commented Sep 13, 2024

I rebased this branch on top of #589 now that it's merged. I also refactored our bulk insert methods so they use the same underlying code aside from a narrow adapter function for each query. I think it can be improved further, but IMO it's a good start: https://github.com/riverqueue/river/compare/bg-lifecycle-hooks-on-insert-many-refactor?expand=1

Finally, I had the realization that given the goal of aligning our Insert and InsertMany APIs to use a single code path (each supporting the same set of features too) that maybe we don't want to introduce a middleware interface that differentiates between those two. I made that change in my above branch as well.

@bgentry
Copy link
Contributor

bgentry commented Sep 30, 2024

I know we talked about potentially wanting to have the database transaction available as part of the middleware interface, but I think I've talked myself out of that. With the driver concept it becomes pretty tough to do that cleanly, especially given we don't want the driver interfaces to be considered stable. I also don't think it's needed for anything I'm doing at the moment (#627 seems like the way for me to do database-related customizations).

@bgentry
Copy link
Contributor

bgentry commented Oct 5, 2024

A version of this was merged in #632 and will be in the next release.

@bgentry bgentry closed this Oct 5, 2024
@bgentry bgentry deleted the brandur-lifecycle-hooks branch October 5, 2024 20:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants