Skip to content

Commit

Permalink
tidy up client exp backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
sakoush committed Oct 30, 2024
1 parent 93000d5 commit 8eeee87
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
3 changes: 1 addition & 2 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (c *Client) Start() error {
logFailure := func(err error, delay time.Duration) {
c.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(c.StartService, backOffExp, logFailure)
if err != nil {
c.logger.WithError(err).Fatal("Failed to start client")
Expand Down
6 changes: 1 addition & 5 deletions scheduler/pkg/kafka/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,7 @@ func (kc *KafkaSchedulerClient) Start() error {
logFailure := func(err error, delay time.Duration) {
kc.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
// Set some reasonable settings for trying to reconnect to scheduler
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp.MaxInterval = time.Second * 15
backOffExp.InitialInterval = time.Second
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(kc.SubscribeModelEvents, backOffExp, logFailure)
if err != nil {
kc.logger.WithError(err).Fatal("Failed to start modelgateway client")
Expand Down
6 changes: 1 addition & 5 deletions scheduler/pkg/kafka/pipeline/status/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,7 @@ func (pc *PipelineSchedulerClient) Start(host string, plainTxtPort int, tlsPort
logFailure := func(err error, delay time.Duration) {
logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
// Set some reasonable settings for trying to reconnect to scheduler
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp.MaxInterval = time.Second * 15
backOffExp.InitialInterval = time.Second
backOffExp := util.GetClientExponentialBackoff()
err = backoff.RetryNotify(pc.SubscribePipelineEvents, backOffExp, logFailure)
if err != nil {
logger.WithError(err).Fatal("Failed to start pipeline gateway client")
Expand Down
15 changes: 14 additions & 1 deletion scheduler/pkg/util/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ the Change License after the Change Date as each is defined in accordance with t

package util

import "google.golang.org/grpc/keepalive"
import (
"time"

"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/keepalive"
)

func GetClientKeepAliveParameters() keepalive.ClientParameters {
return keepalive.ClientParameters{
Expand All @@ -25,3 +30,11 @@ func GetServerKeepAliveEnforcementPolicy() keepalive.EnforcementPolicy {
PermitWithoutStream: gRPCKeepAlivePermit,
}
}

func GetClientExponentialBackoff() *backoff.ExponentialBackOff {
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp.MaxInterval = time.Second * 15
backOffExp.InitialInterval = time.Second
return backOffExp
}

0 comments on commit 8eeee87

Please sign in to comment.