Skip to content

Commit

Permalink
refactor: remove EnablePassiveHealthCheck flag
Browse files Browse the repository at this point in the history
rely on PassiveHealthCheck struct pointer to be nil or not nil instead

Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Apr 19, 2024
1 parent 0ce11b9 commit 83a1958
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 111 deletions.
18 changes: 7 additions & 11 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ const (

func defaultEndpointRegistry() *routing.EndpointRegistry {
return routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 10,
MaxHealthCheckDropProbability: 1.0,
PassiveHealthCheck: &routing.PassiveHealthCheck{
Period: period,
MinRequests: 10,
MaxDropProbability: 1.0,
},
})
}

Expand All @@ -49,11 +50,8 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
}

func setupProxy(t *testing.T, doc string) (*testProxy, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
EndpointRegistry: defaultEndpointRegistry(),
})
require.NoError(t, err)

Expand Down Expand Up @@ -106,12 +104,10 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
w.WriteHeader(http.StatusOK)
}))
defer service.Close()
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
EndpointRegistry: defaultEndpointRegistry(),
})
if err != nil {
t.Fatal(err)
Expand Down
45 changes: 13 additions & 32 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,67 +145,54 @@ type OpenTracingParams struct {
ExcludeTags []string
}

type PassiveHealthCheck struct {
// The period of time after which the endpointregistry begins to calculate endpoints statistics
// from scratch
Period time.Duration

// The minimum number of total requests that should be sent to an endpoint in a single period to
// potentially opt out the endpoint from the list of healthy endpoints
MinRequests int64

// The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request
MaxDropProbability float64
}

func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) {
func InitPassiveHealthChecker(o map[string]string) (*routing.PassiveHealthCheck, error) {
if len(o) == 0 {
return false, &PassiveHealthCheck{}, nil
return nil, nil
}

result := &PassiveHealthCheck{}
result := &routing.PassiveHealthCheck{}
keysInitialized := make(map[string]struct{})

for key, value := range o {
switch key {
case "period":
period, err := time.ParseDuration(value)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value)
return nil, fmt.Errorf("passive health check: invalid period value: %s", value)
}
if period < 0 {
return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value)
return nil, fmt.Errorf("passive health check: invalid period value: %s", value)
}
result.Period = period
case "min-requests":
minRequests, err := strconv.Atoi(value)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
}
if minRequests < 0 {
return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value)
}
result.MinRequests = int64(minRequests)
case "max-drop-probability":
maxDropProbability, err := strconv.ParseFloat(value, 64)
if err != nil {
return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
}
if maxDropProbability < 0 || maxDropProbability > 1 {
return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value)
}
result.MaxDropProbability = maxDropProbability
default:
return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value)
return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value)
}

keysInitialized[key] = struct{}{}
}

if len(keysInitialized) != 3 {
return false, nil, fmt.Errorf("passive health check: missing required parameters")
return nil, fmt.Errorf("passive health check: missing required parameters")
}
return true, result, nil
return result, nil
}

// Proxy initialization options.
Expand Down Expand Up @@ -310,12 +297,6 @@ type Params struct {
// and returns some metadata about endpoint. Information about the metadata
// returned from the registry could be found in routing.Metrics interface.
EndpointRegistry *routing.EndpointRegistry

// EnablePassiveHealthCheck enables the healthy endpoints checker
EnablePassiveHealthCheck bool

// PassiveHealthCheck defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheck
}

type (
Expand Down Expand Up @@ -790,7 +771,7 @@ func WithParams(p Params) *Proxy {
hostname := os.Getenv("HOSTNAME")

var healthyEndpointsChooser *healthyEndpoints
if p.EnablePassiveHealthCheck {
if p.EndpointRegistry.GetPassiveHealthCheck() != nil {
healthyEndpointsChooser = &healthyEndpoints{
rnd: rand.New(loadbalancer.NewLockedSource()),
endpointRegistry: p.EndpointRegistry,
Expand Down
69 changes: 27 additions & 42 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,35 +2283,31 @@ func BenchmarkAccessLogEnable(b *testing.B) { benchmarkAccessLog(b, "enableAcces

func TestInitPassiveHealthChecker(t *testing.T) {
for i, ti := range []struct {
inputArg map[string]string
expectedEnabled bool
expectedParams *PassiveHealthCheck
expectedError error
inputArg map[string]string
expectedParams *routing.PassiveHealthCheck
expectedError error
}{
{
inputArg: map[string]string{},
expectedEnabled: false,
expectedParams: nil,
expectedError: nil,
inputArg: map[string]string{},
expectedParams: nil,
expectedError: nil,
},
{
inputArg: map[string]string{
"period": "somethingInvalid",
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: true,
expectedParams: &PassiveHealthCheck{
expectedParams: &routing.PassiveHealthCheck{
Period: 1 * time.Minute,
MinRequests: 10,
MaxDropProbability: 0.9,
Expand All @@ -2324,59 +2320,53 @@ func TestInitPassiveHealthChecker(t *testing.T) {
"min-requests": "10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: -1m"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid period value: -1m"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "somethingInvalid",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "-10",
"max-drop-probability": "0.9",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "somethingInvalid",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "-0.1",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
"max-drop-probability": "3.1415",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"),
},
{
inputArg: map[string]string{
Expand All @@ -2385,28 +2375,23 @@ func TestInitPassiveHealthChecker(t *testing.T) {
"max-drop-probability": "0.9",
"non-existing": "non-existing",
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"),
},
{
inputArg: map[string]string{
"period": "1m",
"min-requests": "10",
/* forgot max-drop-probability */
},
expectedEnabled: false,
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: missing required parameters"),
expectedParams: nil,
expectedError: fmt.Errorf("passive health check: missing required parameters"),
},
} {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
enabled, params, err := InitPassiveHealthChecker(ti.inputArg)
assert.Equal(t, ti.expectedEnabled, enabled)
params, err := InitPassiveHealthChecker(ti.inputArg)
assert.Equal(t, ti.expectedError, err)
if enabled {
assert.Equal(t, ti.expectedParams, params)
}
assert.Equal(t, ti.expectedParams, params)
})
}
}
31 changes: 14 additions & 17 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ func newEntry() *entry {
}

type EndpointRegistry struct {
lastSeenTimeout time.Duration
statsResetPeriod time.Duration
minRequests int64
maxHealthCheckDropProbability float64
lastSeenTimeout time.Duration
passiveHealthCheck *PassiveHealthCheck

quit chan struct{}

Expand All @@ -107,11 +105,8 @@ type EndpointRegistry struct {
var _ PostProcessor = &EndpointRegistry{}

type RegistryOptions struct {
LastSeenTimeout time.Duration
PassiveHealthCheckEnabled bool
StatsResetPeriod time.Duration
MinRequests int64
MaxHealthCheckDropProbability float64
LastSeenTimeout time.Duration
PassiveHealthCheck *PassiveHealthCheck
}

func (r *EndpointRegistry) Do(routes []*Route) []*Route {
Expand Down Expand Up @@ -151,7 +146,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
}

func (r *EndpointRegistry) updateStats() {
ticker := time.NewTicker(r.statsResetPeriod)
ticker := time.NewTicker(r.passiveHealthCheck.Period)

for {
r.data.Range(func(key, value any) bool {
Expand All @@ -164,9 +159,9 @@ func (r *EndpointRegistry) updateStats() {

failed := e.totalFailedRoundTrips[curSlot].Load()
requests := e.totalRequests[curSlot].Load()
if requests > r.minRequests {
if requests > r.passiveHealthCheck.MinRequests {
failedRoundTripsRatio := float64(failed) / float64(requests)
e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.maxHealthCheckDropProbability))
e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.passiveHealthCheck.MaxDropProbability))
} else {
e.healthCheckDropProbability.Store(0.0)
}
Expand All @@ -193,17 +188,15 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
}

registry := &EndpointRegistry{
lastSeenTimeout: o.LastSeenTimeout,
statsResetPeriod: o.StatsResetPeriod,
minRequests: o.MinRequests,
maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability,
lastSeenTimeout: o.LastSeenTimeout,
passiveHealthCheck: o.PassiveHealthCheck,

quit: make(chan struct{}),

now: time.Now,
data: sync.Map{},
}
if o.PassiveHealthCheckEnabled {
if o.PassiveHealthCheck != nil {
go registry.updateStats()
}

Expand All @@ -219,6 +212,10 @@ func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics {
return e.(*entry)
}

func (r *EndpointRegistry) GetPassiveHealthCheck() *PassiveHealthCheck {
return r.passiveHealthCheck
}

func (r *EndpointRegistry) allMetrics() map[string]Metrics {
result := make(map[string]Metrics)
r.data.Range(func(k, v any) bool {
Expand Down
13 changes: 13 additions & 0 deletions routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ func (o MatchingOptions) ignoreTrailingSlash() bool {
return o&IgnoreTrailingSlash > 0
}

type PassiveHealthCheck struct {
// The period of time after which the endpointregistry begins to calculate endpoints statistics
// from scratch
Period time.Duration

// The minimum number of total requests that should be sent to an endpoint in a single period to
// potentially opt out the endpoint from the list of healthy endpoints
MinRequests int64

// The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request
MaxDropProbability float64
}

// DataClient instances provide data sources for
// route definitions.
type DataClient interface {
Expand Down
Loading

0 comments on commit 83a1958

Please sign in to comment.