-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add example demonstrating graceful shutdown #2
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
package river_test | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"log/slog" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/jackc/pgx/v5/pgxpool" | ||
|
||
"github.com/riverqueue/river" | ||
"github.com/riverqueue/river/internal/riverinternaltest" | ||
"github.com/riverqueue/river/internal/util/slogutil" | ||
"github.com/riverqueue/river/riverdriver/riverpgxv5" | ||
) | ||
|
||
type WaitsForCancelOnlyArgs struct{} | ||
|
||
func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" } | ||
|
||
// WaitsForCancelOnlyWorker is a worker that will never finish jobs until its | ||
// context is cancelled. | ||
type WaitsForCancelOnlyWorker struct { | ||
river.WorkerDefaults[WaitsForCancelOnlyArgs] | ||
jobStarted chan struct{} | ||
} | ||
|
||
func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error { | ||
fmt.Printf("Working job that doesn't finish until cancelled\n") | ||
close(w.jobStarted) | ||
<-ctx.Done() | ||
fmt.Printf("Job cancelled\n") | ||
return nil | ||
} | ||
|
||
// Example_gracefulShutdown demonstrates a realistic-looking stop loop for | ||
// River. It listens for a SIGTERM (like might be received on a platform like | ||
// Heroku to stop a process) and when receives, tries a soft stop that waits for | ||
// work to finish. If it doesn't finish in time, a second SIGTERM will initiate | ||
// a hard stop that cancels all jobs using context cancellation. | ||
func Example_gracefulShutdown() { | ||
ctx := context.Background() | ||
|
||
dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer dbPool.Close() | ||
|
||
// Required for the purpose of this test, but not necessary in real usage. | ||
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { | ||
panic(err) | ||
} | ||
|
||
jobStarted := make(chan struct{}) | ||
|
||
workers := river.NewWorkers() | ||
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted}) | ||
|
||
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ | ||
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), | ||
Queues: map[string]river.QueueConfig{ | ||
river.DefaultQueue: {MaxWorkers: 100}, | ||
}, | ||
Workers: workers, | ||
}) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
_, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
if err := riverClient.Start(ctx); err != nil { | ||
panic(err) | ||
} | ||
|
||
riverClientStopped := make(chan struct{}) | ||
|
||
sigterm := make(chan os.Signal, 1) | ||
signal.Notify(sigterm, syscall.SIGTERM) | ||
|
||
// This is meant to be a realistic-looking stop goroutine that might go in a | ||
// real program. It waits for SIGTERM and when received, tries to stop | ||
// gracefully by allowing a chance for jobs to finish. But if that isn't | ||
// working, a second SIGTERM will tell it to terminate with prejudice and | ||
// it'll issue a hard stop that cancels the context of all active jobs. | ||
go func() { | ||
defer close(riverClientStopped) | ||
|
||
<-sigterm | ||
fmt.Printf("Received SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") | ||
|
||
softStopSucceeded := make(chan struct{}) | ||
go func() { | ||
if err := riverClient.Stop(ctx); err != nil { | ||
if !errors.Is(err, context.Canceled) { | ||
panic(err) | ||
} | ||
} | ||
close(softStopSucceeded) | ||
}() | ||
|
||
select { | ||
case <-sigterm: | ||
fmt.Printf("Received SIGTERM again; initiating hard stop (cancel everything)\n") | ||
if err := riverClient.StopAndCancel(ctx); err != nil { | ||
panic(err) | ||
} | ||
case <-softStopSucceeded: | ||
// Will never be reached in this example. | ||
} | ||
}() | ||
|
||
// Make sure our job starts being worked before doing anything else. | ||
<-jobStarted | ||
|
||
// Cheat a little by sending a SIGTERM manually for the purpose of this | ||
// example (normally this will be sent by user or supervisory process). The | ||
// first SIGTERM tries a soft stop in which jobs are givcen a chance to | ||
// finish up. | ||
sigterm <- syscall.SIGTERM | ||
|
||
// The soft stop will never work in this example because our job only | ||
// respects context cancellation, but wait a short amount of time to give it | ||
// a chance. After it elapses, send another SIGTERM to initiate a hard stop. | ||
select { | ||
case <-riverClientStopped: | ||
// Will never be reached in this example because our job will only ever | ||
// finish on context cancellation. | ||
fmt.Printf("Soft stop succeeded\n") | ||
|
||
case <-time.After(100 * time.Millisecond): | ||
sigterm <- syscall.SIGTERM | ||
<-riverClientStopped | ||
} | ||
|
||
// Output: | ||
// Working job that doesn't finish until cancelled | ||
// Received SIGTERM; initiating soft stop (try to wait for jobs to finish) | ||
// Received SIGTERM again; initiating hard stop (cancel everything) | ||
// Job cancelled | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super happy with this wait. It's not strictly necessary to include it, but if we don't, then we don't show any opportunity for the soft stop to work at all, so I put it in.