From 479fc81101dbe03a1236a1046263945aca584e5d Mon Sep 17 00:00:00 2001 From: Roman Zavodskikh Date: Thu, 30 Nov 2023 16:58:39 +0100 Subject: [PATCH] Revert "Do not use endpointregistry in the hotpath (#2673)" This reverts commit 9de6be225b2d3cbacd1f83adf451a9adf2d2ec9a. Signed-off-by: Roman Zavodskikh --- loadbalancer/algorithm.go | 24 ++++++++++++++---------- proxy/proxy.go | 3 +++ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/loadbalancer/algorithm.go b/loadbalancer/algorithm.go index 23fdcba1bd..84fa3bcc4b 100644 --- a/loadbalancer/algorithm.go +++ b/loadbalancer/algorithm.go @@ -75,7 +75,8 @@ func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routin rt := ctx.Route ep := ctx.LBEndpoints for _, epi := range ep { - wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected) + detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime() + wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected) sum += wi } @@ -83,7 +84,8 @@ func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routin r := rnd.Float64() * sum var upto float64 for i, epi := range ep { - upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected) + detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime() + upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected) if upto > r { choice = ep[i] break @@ -112,11 +114,12 @@ func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, now time func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing.LBAlgorithm) routing.LBEndpoint { ep := ctx.LBEndpoints now := time.Now() + detected := ctx.Registry.GetMetrics(ctx.LBEndpoints[choice].Host).DetectedTime() f := fadeIn( now, ctx.Route.LBFadeInDuration, ctx.Route.LBFadeInExponent, - ctx.LBEndpoints[choice].Detected, + detected, ) if rnd.Float64() < f { @@ -124,7 +127,8 @@ func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing } notFadingIndexes := make([]int, 0, len(ep)) for i := 0; i < len(ep); i++ { - if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn { + detected := ctx.Registry.GetMetrics(ep[i].Host).DetectedTime() + if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, detected); !fadingIn { notFadingIndexes = append(notFadingIndexes, i) } } @@ -263,7 +267,7 @@ func computeLoadAverage(ctx *routing.LBContext) float64 { sum := 1.0 // add 1 to include the request that just arrived endpoints := ctx.LBEndpoints for _, v := range endpoints { - sum += float64(v.Metrics.GetInflightRequests()) + sum += float64(ctx.Registry.GetMetrics(v.Host).InflightRequests()) } return sum / float64(len(endpoints)) } @@ -280,10 +284,10 @@ func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, c if skipEndpoint(endpointIndex) { continue } - load := ctx.LBEndpoints[endpointIndex].Metrics.GetInflightRequests() + load := ctx.Registry.GetMetrics(ctx.LBEndpoints[endpointIndex].Host).InflightRequests() // We know there must be an endpoint whose load <= average load. // Since targetLoad >= average load (balancerFactor >= 1), there must also be an endpoint with load <= targetLoad. - if load <= int(targetLoad) { + if float64(load) <= targetLoad { break } ringIndex = (ringIndex + 1) % ch.Len() @@ -365,7 +369,7 @@ func (p *powerOfRandomNChoices) Apply(ctx *routing.LBContext) routing.LBEndpoint for i := 1; i < p.numberOfChoices; i++ { ce := ctx.LBEndpoints[p.rnd.Intn(ne)] - if p.getScore(ce) > p.getScore(best) { + if p.getScore(ctx, ce) > p.getScore(ctx, best) { best = ce } } @@ -373,9 +377,9 @@ func (p *powerOfRandomNChoices) Apply(ctx *routing.LBContext) routing.LBEndpoint } // getScore returns negative value of inflightrequests count. -func (p *powerOfRandomNChoices) getScore(e routing.LBEndpoint) int64 { +func (p *powerOfRandomNChoices) getScore(ctx *routing.LBContext, e routing.LBEndpoint) int64 { // endpoints with higher inflight request should have lower score - return -int64(e.Metrics.GetInflightRequests()) + return -ctx.Registry.GetMetrics(e.Host).InflightRequests() } type ( diff --git a/proxy/proxy.go b/proxy/proxy.go index 7b6923cb2e..84fedafcab 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -848,6 +848,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co if endpoint != nil { endpoint.Metrics.IncInflightRequest() defer endpoint.Metrics.DecInflightRequest() + + p.registry.IncInflightRequest(endpoint.Host) + defer p.registry.DecInflightRequest(endpoint.Host) } if p.experimentalUpgrade && isUpgradeRequest(req) {