From f246c20a88bc304decb74157a0013b70e93ac6f1 Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Wed, 8 Nov 2023 19:44:37 -0800 Subject: [PATCH] Add example demonstrating graceful shutdown (#2) As I was writing the docs for graceful shutdown, I realized that it wasn't a half bad idea to include a full code sample for a realistic program that when exiting, would (1) try a soft stop to start, and (2) do a hard stop if the soft stop didn't work in time. This is somewhat non-trivial code though, and I'd be afraid to just include it it docs without testing it. I started writing the code to be actually runnable, and realized that as long as it was going to be runnable anyway, we may as well also include an example for it and commit to the repository. So that's what we do here: add an example test that shows a realistic shutdown loop demonstrating both a soft and hard stop. --- example_graceful_shutdown_test.go | 149 ++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 example_graceful_shutdown_test.go diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go new file mode 100644 index 00000000..296152a3 --- /dev/null +++ b/example_graceful_shutdown_test.go @@ -0,0 +1,149 @@ +package river_test + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/util/slogutil" + "github.com/riverqueue/river/riverdriver/riverpgxv5" +) + +type WaitsForCancelOnlyArgs struct{} + +func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" } + +// WaitsForCancelOnlyWorker is a worker that will never finish jobs until its +// context is cancelled. +type WaitsForCancelOnlyWorker struct { + river.WorkerDefaults[WaitsForCancelOnlyArgs] + jobStarted chan struct{} +} + +func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error { + fmt.Printf("Working job that doesn't finish until cancelled\n") + close(w.jobStarted) + <-ctx.Done() + fmt.Printf("Job cancelled\n") + return nil +} + +// Example_gracefulShutdown demonstrates a realistic-looking stop loop for +// River. It listens for a SIGTERM (like might be received on a platform like +// Heroku to stop a process) and when receives, tries a soft stop that waits for +// work to finish. If it doesn't finish in time, a second SIGTERM will initiate +// a hard stop that cancels all jobs using context cancellation. +func Example_gracefulShutdown() { + ctx := context.Background() + + dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example")) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Required for the purpose of this test, but not necessary in real usage. + if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil { + panic(err) + } + + jobStarted := make(chan struct{}) + + workers := river.NewWorkers() + river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), + Queues: map[string]river.QueueConfig{ + river.DefaultQueue: {MaxWorkers: 100}, + }, + Workers: workers, + }) + if err != nil { + panic(err) + } + + _, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil) + if err != nil { + panic(err) + } + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + riverClientStopped := make(chan struct{}) + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGTERM) + + // This is meant to be a realistic-looking stop goroutine that might go in a + // real program. It waits for SIGTERM and when received, tries to stop + // gracefully by allowing a chance for jobs to finish. But if that isn't + // working, a second SIGTERM will tell it to terminate with prejudice and + // it'll issue a hard stop that cancels the context of all active jobs. + go func() { + defer close(riverClientStopped) + + <-sigterm + fmt.Printf("Received SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") + + softStopSucceeded := make(chan struct{}) + go func() { + if err := riverClient.Stop(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + panic(err) + } + } + close(softStopSucceeded) + }() + + select { + case <-sigterm: + fmt.Printf("Received SIGTERM again; initiating hard stop (cancel everything)\n") + if err := riverClient.StopAndCancel(ctx); err != nil { + panic(err) + } + case <-softStopSucceeded: + // Will never be reached in this example. + } + }() + + // Make sure our job starts being worked before doing anything else. + <-jobStarted + + // Cheat a little by sending a SIGTERM manually for the purpose of this + // example (normally this will be sent by user or supervisory process). The + // first SIGTERM tries a soft stop in which jobs are givcen a chance to + // finish up. + sigterm <- syscall.SIGTERM + + // The soft stop will never work in this example because our job only + // respects context cancellation, but wait a short amount of time to give it + // a chance. After it elapses, send another SIGTERM to initiate a hard stop. + select { + case <-riverClientStopped: + // Will never be reached in this example because our job will only ever + // finish on context cancellation. + fmt.Printf("Soft stop succeeded\n") + + case <-time.After(100 * time.Millisecond): + sigterm <- syscall.SIGTERM + <-riverClientStopped + } + + // Output: + // Working job that doesn't finish until cancelled + // Received SIGTERM; initiating soft stop (try to wait for jobs to finish) + // Received SIGTERM again; initiating hard stop (cancel everything) + // Job cancelled +}