diff --git a/scheduler/pkg/agent/client.go b/scheduler/pkg/agent/client.go index c947f5cf39..9cc86bfa73 100644 --- a/scheduler/pkg/agent/client.go +++ b/scheduler/pkg/agent/client.go @@ -236,8 +236,7 @@ func (c *Client) Start() error { logFailure := func(err error, delay time.Duration) { c.logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls + backOffExp := util.GetClientExponentialBackoff() err := backoff.RetryNotify(c.StartService, backOffExp, logFailure) if err != nil { c.logger.WithError(err).Fatal("Failed to start client") diff --git a/scheduler/pkg/kafka/gateway/client.go b/scheduler/pkg/kafka/gateway/client.go index 234169e3d9..9623d79ef7 100644 --- a/scheduler/pkg/kafka/gateway/client.go +++ b/scheduler/pkg/kafka/gateway/client.go @@ -118,11 +118,7 @@ func (kc *KafkaSchedulerClient) Start() error { logFailure := func(err error, delay time.Duration) { kc.logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - // Set some reasonable settings for trying to reconnect to scheduler - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls - backOffExp.MaxInterval = time.Second * 15 - backOffExp.InitialInterval = time.Second + backOffExp := util.GetClientExponentialBackoff() err := backoff.RetryNotify(kc.SubscribeModelEvents, backOffExp, logFailure) if err != nil { kc.logger.WithError(err).Fatal("Failed to start modelgateway client") diff --git a/scheduler/pkg/kafka/pipeline/status/client.go b/scheduler/pkg/kafka/pipeline/status/client.go index f0aebbb555..20ce757b34 100644 --- a/scheduler/pkg/kafka/pipeline/status/client.go +++ b/scheduler/pkg/kafka/pipeline/status/client.go @@ -124,11 +124,7 @@ func (pc *PipelineSchedulerClient) Start(host string, plainTxtPort int, tlsPort logFailure := func(err error, delay time.Duration) { logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - // Set some reasonable settings for trying to reconnect to scheduler - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls - backOffExp.MaxInterval = time.Second * 15 - backOffExp.InitialInterval = time.Second + backOffExp := util.GetClientExponentialBackoff() err = backoff.RetryNotify(pc.SubscribePipelineEvents, backOffExp, logFailure) if err != nil { logger.WithError(err).Fatal("Failed to start pipeline gateway client") diff --git a/scheduler/pkg/util/grpc.go b/scheduler/pkg/util/grpc.go index 5b1520b3af..3ed6fc4235 100644 --- a/scheduler/pkg/util/grpc.go +++ b/scheduler/pkg/util/grpc.go @@ -9,7 +9,12 @@ the Change License after the Change Date as each is defined in accordance with t package util -import "google.golang.org/grpc/keepalive" +import ( + "time" + + "github.com/cenkalti/backoff/v4" + "google.golang.org/grpc/keepalive" +) func GetClientKeepAliveParameters() keepalive.ClientParameters { return keepalive.ClientParameters{ @@ -25,3 +30,11 @@ func GetServerKeepAliveEnforcementPolicy() keepalive.EnforcementPolicy { PermitWithoutStream: gRPCKeepAlivePermit, } } + +func GetClientExponentialBackoff() *backoff.ExponentialBackOff { + backOffExp := backoff.NewExponentialBackOff() + backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls + backOffExp.MaxInterval = time.Second * 15 + backOffExp.InitialInterval = time.Second + return backOffExp +}