-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
can't configure RescueStuckJobsAfter for an individual job #347
Comments
Hm, okay, quite a lot of content here, but speaking to the headline issue: I spoke to @bgentry a little about this, and it does feel like a per-job customizable One point of clarification: ideally in a perfectly functioning system,
This is interesting, but my first reaction against is somewhat similar to the above: it'd be such a specific feature that I think we'd want some very strong rationale for why it should be included in the default API. It's worth remembering too that River could be quite easily be augmented with something like a common job bootstrap where project-specific functionality could be customized to your heart's desire. e.g. type MyProjectCommonArgs struct {
Deadline time.Time `json:"deadline"`
}
func (a *MyProjectCommonArgs) GetCommonArgs() *MyProjectCommonArgs {
return a
}
type WithCommonArgs interface {
GetCommonArgs() *MyProjectCommonArgs
}
type SortArgs struct {
MyProjectCommonArgs
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" } func CommonJobBootstrap[T WithCommonArgs](ctx context.Context, args T, workFunc func(args T) error) error {
commonArgs := args.GetCommonArgs()
if commonArgs.Deadline.Before(time.Now()) {
return errors.New("deadline exceeded")
}
return workFunc(args)
}
type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
return CommonJobBootstrap(ctx, args, func(args SortArgs) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
} |
We lose jobs when the program panicking in a separate code path than the job. I see this as an unavoidable problem - yes you would like it to not happen - but iterating fast means it will, and we need to be defensive when they happen. We seem to be also losing jobs when we lose connection to the database - even if the worker itself does not die, for instance if the database restarts due to crash, or when the patroni cluster goes through re-election. I don't know if this is intended or a bug/race in river. For instance, if we are panicking every 30 minutes, the result should be a slight service interruptions every 30 minutes, not the entire application coming to a halt because it can't process the latest task. the job queue is a tool that is meant to help us meet these ends. scheduling jobs means that you can have workers crash and jobs still be processed by other workers - that is in my opinion, one of the major features of the job queue. that is - when i chose to add a job queue to my application - it is because i want this sort of protection (one bad worker doesn't stall everything), and features such as retry, uniqueness, etc, are to help facilitate this. deadline might not be the solution - but i think that stuck jobs are an unavoidable problem that can't be hand-waved, and how a specific job queue system deals with stuck jobs is a key differentiating factor.
this is how the hack we are using to implement our own locking mechanisms that i mentioned is being done. I was just trying to avoid making one for this job.
Yeah, i would tend to agree, hence floating the idea of a deadline i have seen in other queues.
Would it break river to run my own CleanStuckJobs task, putting deadlines per task and processing myself? To me adding this sort of "extension" to river would be preferred if possible, I'm just not too sure how well river plays around with other things modifying its tables. ultimately im just looking to schedule my own maintenance task, which I believe i can just do with a periodicjob. |
Okay, I think I'd have to see this all in action to wrap my head around it, but I can't shake the feeling that the idea of rescuing specific types of jobs more quickly is the right answer. I run a big Go app at work and we treat every bug that's resulting in a panic as priority 0 in terms of triage. Our API runs on literally two Go processes, so if even one of them is panicking, that's 50% of our capacity gone right there. Our background worker is 1x Go process. If it's panicking, that's 100% of our background work capacity gone. The acceptable number of expected panics at steady state is exactly zero — no other number is tolerable. That might sound pie-in-the-sky, and I think in most languages it would be, but in Go because everything uses error-based returns, we find that it's pretty doable. One comes up maybe 2-3 times a year (usually through a start up or configuration bug) and we have each one squashed inside of a day.
You'd probably have to try it to know for sure, but conceptually, I think it should be okay. River's maintenance tasks wrap all their operations in a transaction, so there shouldn't be any data consistency type bugs that are possible. The one thing to watch out for I suppose is not putting job rows into impossible states that River might make assumptions about — e.g. if |
This one came up when I was thinking about the job specific rescue threshold floated in [1]. I was going to suggest the possible workaround of setting an aggressive rescue threshold combined with a low job timeout globally, and then override the timeout on any specific job workers that needed to run longer than the new low global job timeout. But then I realized this wouldn't work because the job rescuer doesn't account for job-specific timeouts -- it just rescues or discards everything it finds beyond the run's rescue threshold. Here, add new logic to address that problem. Luckily we were already pulling worker information to procure what might be a possible custom retry schedule, so we just have to piggyback onto that to also examine a possible custom work timeout. [1] #347
This one came up when I was thinking about the job specific rescue threshold floated in [1]. I was going to suggest the possible workaround of setting an aggressive rescue threshold combined with a low job timeout globally, and then override the timeout on any specific job workers that needed to run longer than the new low global job timeout. But then I realized this wouldn't work because the job rescuer doesn't account for job-specific timeouts -- it just rescues or discards everything it finds beyond the run's rescue threshold. Here, add new logic to address that problem. Luckily we were already pulling worker information to procure what might be a possible custom retry schedule, so we just have to piggyback onto that to also examine a possible custom work timeout. [1] #347
This one came up when I was thinking about the job specific rescue threshold floated in [1]. I was going to suggest the possible workaround of setting an aggressive rescue threshold combined with a low job timeout globally, and then override the timeout on any specific job workers that needed to run longer than the new low global job timeout. But then I realized this wouldn't work because the job rescuer doesn't account for job-specific timeouts -- it just rescues or discards everything it finds beyond the run's rescue threshold. Here, add new logic to address that problem. Luckily we were already pulling worker information to procure what might be a possible custom retry schedule, so we just have to piggyback onto that to also examine a possible custom work timeout. [1] #347
It should be ok to do this, but I would caution you to be careful to have the task operate in a manner similar to River's own stuck job rescuer, at least in terms of the updates it makes to jobs. I believe a lot of the foot guns are prevented by means of proper database constraints on the |
we have some jobs that take 1-2 minutes, and other jobs that takes at most 5 seconds, and so we set worker timeouts accordingly
however, we can't configure RescueStuckJobsAfter for an individual job. this means that when a worker running the 5 seconds task dies, we need to wait for the global RescueStuckJobsAfter, instead of what should be 5-10 seconds. this means that updates that should be happening around once a second pause for a few minutes instead of a few seconds.
we schedule with a unique constraint because this task runs on every event, but sometimes events can be up to 10 per second, and we are okay with not scheduling the task if it is already running since it is an aggregate and will just be picked up on the next unique task. if we schedule too many jobs, we will not only schedule more than we can process, but be doing extra work, and need to worry about race conditions where tasks that started earlier finish later.
one possible solution is to read the
jobs
table during InsertTx, and check usingscheduled_time
if the job is stuck ourselves, and manually fix it, but i'm not sure if that's allowed. for now i am checking the scheduled_time in a loop in my go application to manually rescue the task as this one specific task is critical to the movement of the rest of the applications.a
deadline
feature inInsertOpts
i think would make me really happy, as it would also resolve other issues im having to work around. workers should discard jobs that they pick up that are past their deadline, and a maintenance job which "discards" tasks that are after their deadline could run on a regular interval.the problem is adjacent to something that @bgentry mentioned in a previous reply to me in #336 (comment) . This is actually a different summary job in a different app, we deal with a lot of data that needs to be updated in this manner.
we have had success using some hacky locking mechanisms for our other tasks, however in this case we are trying using the job=running as our locking mechanism, and the issue is that it takes too long for a dying worker to "abandon" their lock.
for reference, the system we are migrating from used a user defined hash as an optional uniqueness key with support for a job timeout and deadline on the job level. the key would be locked when scheduled, and unlocked at deadline or job completion. we could bring that idea back and use it along with river.
this calls back to what i was told to do here: #346
and also very much relates to this problem: #165
just checking if the job is running using my own locks table.
that said, these restrictions + the incredibly slow insert speed (100-200rps) with unique makes me feel like the current "unique constraint" feature is missing a lot of power.
I am feeling like my best course of action at this point given the current state of Unique to move forward with river is to build a layer on top of river that deals a little better with these more complex scheduling constraints.
The text was updated successfully, but these errors were encountered: