Skip to content

Commit

Permalink
A few fixes/changes for the graceful shutdown example
Browse files Browse the repository at this point in the history
Follows up #2 with a few fixes/changes. As I was writing documentation
for graceful shutdown I realized there were a few things that weren't
quite ideal:

* The job should return an error in the event of cancellation so that it
  can be persisted as errored and be worked again.

* The example now respects either `SIGINT` or `SIGTERM`. `SIGTERM` is
  what's used on Heroku, but `SIGINT` is the standard signal from
  `Ctrl+C` in a terminal, so by respecting both we can have a program
  that works well in either development or a common hosted environment.

* Add a third phase in which the program initiates an unclean stop by
  not waiting on stop any longer. This is probably something that most
  programs should have because it's going to be reasonably easy to write
  workers that accidentally don't respect context cancellation and get
  stuck.

* Add a 10 second timeout to each phase. This is for Heroku's benefit.
  It'll send one `SIGTERM` and wait 30 seconds before issuing `SIGKILL`.
  So the program here waits 10 seconds for a soft stop, another 10
  seconds for a hard stop, and then exits uncleanly on its own volition
  before getting `SIGKILL`ed.
  • Loading branch information
brandur committed Nov 10, 2023
1 parent 8e46ee1 commit 67495cb
Showing 1 changed file with 58 additions and 22 deletions.
80 changes: 58 additions & 22 deletions example_graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ type WaitsForCancelOnlyWorker struct {
func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error {
fmt.Printf("Working job that doesn't finish until cancelled\n")
close(w.jobStarted)

<-ctx.Done()
fmt.Printf("Job cancelled\n")
return nil

// In the event of cancellation, an error should be returned so that the job
// goes back in the retry queue.
return ctx.Err()
}

// Example_gracefulShutdown demonstrates a realistic-looking stop loop for
// River. It listens for a SIGTERM (like might be received on a platform like
// Heroku to stop a process) and when receives, tries a soft stop that waits for
// work to finish. If it doesn't finish in time, a second SIGTERM will initiate
// a hard stop that cancels all jobs using context cancellation.
// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C
// locally or on a platform like Heroku to stop a process) and when received,
// tries a soft stop that waits for work to finish. If it doesn't finish in
// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs
// using context cancellation. A third will give up on the stop procedure and
// exit uncleanly.
func Example_gracefulShutdown() {
ctx := context.Background()

Expand Down Expand Up @@ -83,19 +89,21 @@ func Example_gracefulShutdown() {

riverClientStopped := make(chan struct{})

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
sigintOrTerm := make(chan os.Signal, 1)
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)

// This is meant to be a realistic-looking stop goroutine that might go in a
// real program. It waits for SIGTERM and when received, tries to stop
// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
// gracefully by allowing a chance for jobs to finish. But if that isn't
// working, a second SIGTERM will tell it to terminate with prejudice and
// it'll issue a hard stop that cancels the context of all active jobs.
// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
// it'll issue a hard stop that cancels the context of all active jobs. In
// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
// completely and exits uncleanly.
go func() {
defer close(riverClientStopped)

<-sigterm
fmt.Printf("Received SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")
<-sigintOrTerm
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")

softStopSucceeded := make(chan struct{})
go func() {
Expand All @@ -107,14 +115,41 @@ func Example_gracefulShutdown() {
close(softStopSucceeded)
}()

// Wait for soft stop to succeed, or another SIGINT/SIGTERM.
select {
case <-sigterm:
fmt.Printf("Received SIGTERM again; initiating hard stop (cancel everything)\n")
if err := riverClient.StopAndCancel(ctx); err != nil {
panic(err)
}
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")

case <-time.After(10 * time.Second):
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")

case <-softStopSucceeded:
// Will never be reached in this example.
return
}

hardStopSucceeded := make(chan struct{})
go func() {
if err := riverClient.StopAndCancel(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
close(hardStopSucceeded)
}()

// As long as all jobs respect context cancellation, StopAndCancel will
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n")

case <-time.After(10 * time.Second):
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")

case <-hardStopSucceeded:
}
}()

Expand All @@ -123,9 +158,9 @@ func Example_gracefulShutdown() {

// Cheat a little by sending a SIGTERM manually for the purpose of this
// example (normally this will be sent by user or supervisory process). The
// first SIGTERM tries a soft stop in which jobs are givcen a chance to
// first SIGTERM tries a soft stop in which jobs are given a chance to
// finish up.
sigterm <- syscall.SIGTERM
sigintOrTerm <- syscall.SIGTERM

// The soft stop will never work in this example because our job only
// respects context cancellation, but wait a short amount of time to give it
Expand All @@ -137,13 +172,14 @@ func Example_gracefulShutdown() {
fmt.Printf("Soft stop succeeded\n")

case <-time.After(100 * time.Millisecond):
sigterm <- syscall.SIGTERM
sigintOrTerm <- syscall.SIGTERM
<-riverClientStopped
}

// Output:
// Working job that doesn't finish until cancelled
// Received SIGTERM; initiating soft stop (try to wait for jobs to finish)
// Received SIGTERM again; initiating hard stop (cancel everything)
// Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
// Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
// Job cancelled
// jobExecutor: Job failed
}

0 comments on commit 67495cb

Please sign in to comment.