diff --git a/filters/fadein/fadein.go b/filters/fadein/fadein.go index 22551dea34..ecc10c384c 100644 --- a/filters/fadein/fadein.go +++ b/filters/fadein/fadein.go @@ -38,6 +38,7 @@ type ( } postProcessor struct { + endpointRegisty *routing.EndpointRegistry // "http://10.2.1.53:1234": {t0 60s t0-10s} detected map[string]detectedFadeIn } @@ -192,11 +193,20 @@ func (endpointCreated) CreateFilter(args []interface{}) (filters.Filter, error) func (endpointCreated) Request(filters.FilterContext) {} func (endpointCreated) Response(filters.FilterContext) {} +type PostProcessorOptions struct { + EndpointRegistry *routing.EndpointRegistry +} + // NewPostProcessor creates post-processor for maintaining the detection time of LB endpoints with fade-in // behavior. -func NewPostProcessor() routing.PostProcessor { +func NewPostProcessor(options PostProcessorOptions) routing.PostProcessor { + if options.EndpointRegistry == nil { + options.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{}) + } + return &postProcessor{ - detected: make(map[string]detectedFadeIn), + endpointRegisty: options.EndpointRegistry, + detected: make(map[string]detectedFadeIn), } } @@ -235,6 +245,10 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route { } ep.Detected = detected + metrics := p.endpointRegisty.GetMetrics(ep.Host) + if endpointsCreated[key].After(metrics.DetectedTime()) { + metrics.SetDetected(endpointsCreated[key]) + } p.detected[key] = detectedFadeIn{ when: detected, duration: ri.LBFadeInDuration, diff --git a/filters/fadein/fadein_test.go b/filters/fadein/fadein_test.go index a6a8d5cbe4..b05c6d999f 100644 --- a/filters/fadein/fadein_test.go +++ b/filters/fadein/fadein_test.go @@ -198,7 +198,7 @@ func TestPostProcessor(t *testing.T) { PostProcessors: []routing.PostProcessor{ loadbalancer.NewAlgorithmProvider(), endpointRegistry, - NewPostProcessor(), + NewPostProcessor(PostProcessorOptions{EndpointRegistry: endpointRegistry}), }, SignalFirstLoad: true, }) @@ -431,7 +431,7 @@ func TestPostProcessor(t *testing.T) { if !ep.Detected.After(firstDetected) { t.Fatal("Failed to reset detection time.") } - if endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) { + if !endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) { t.Fatal("Failed to reset detection time.") } diff --git a/loadbalancer/algorithm_test.go b/loadbalancer/algorithm_test.go index 3db8e1fc19..1eb9bb310a 100644 --- a/loadbalancer/algorithm_test.go +++ b/loadbalancer/algorithm_test.go @@ -419,7 +419,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) { t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2) } ep.Metrics.IncInflightRequest() - ctx.Registry.IncInflightRequest(ep.Host) + ctx.Registry.GetMetrics(ep.Host).IncInflightRequest() } } @@ -441,7 +441,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) { func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) { for i := 0; i < count; i++ { endpoint.Metrics.IncInflightRequest() - registry.IncInflightRequest(endpoint.Host) + registry.GetMetrics(endpoint.Host).IncInflightRequest() } } diff --git a/loadbalancer/fadein_test.go b/loadbalancer/fadein_test.go index c05311fe23..4768ce3dff 100644 --- a/loadbalancer/fadein_test.go +++ b/loadbalancer/fadein_test.go @@ -70,7 +70,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat Host: eps[i], Detected: detectionTimes[i], }) - ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i]) + ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) } ctx.LBEndpoints = ctx.Route.LBEndpoints @@ -332,7 +332,7 @@ func benchmarkFadeIn( Host: eps[i], Detected: detectionTimes[i], }) - registry.SetDetectedTime(eps[i], detectionTimes[i]) + registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) } var wg sync.WaitGroup diff --git a/proxy/fadeintesting_test.go b/proxy/fadeintesting_test.go index 4d6465176b..76f873fbdf 100644 --- a/proxy/fadeintesting_test.go +++ b/proxy/fadeintesting_test.go @@ -270,7 +270,7 @@ func (p *fadeInProxy) addInstances(n int) { DataClients: []routing.DataClient{client}, PostProcessors: []routing.PostProcessor{ loadbalancer.NewAlgorithmProvider(), - fadein.NewPostProcessor(), + fadein.NewPostProcessor(fadein.PostProcessorOptions{}), }, }) diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 2d1679ffa9..6c8ec39f53 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -2,6 +2,7 @@ package routing import ( "sync" + "sync/atomic" "time" "github.com/zalando/skipper/eskip" @@ -13,32 +14,65 @@ const defaultLastSeenTimeout = 1 * time.Minute // used to perform better load balancing, fadeIn, etc. type Metrics interface { DetectedTime() time.Time + SetDetected(detected time.Time) + + LastSeen() time.Time + SetLastSeen(lastSeen time.Time) + InflightRequests() int64 + IncInflightRequest() + DecInflightRequest() } type entry struct { - detected time.Time - inflightRequests int64 + detected atomic.Value // time.Time + lastSeen atomic.Value // time.Time + inflightRequests atomic.Int64 } var _ Metrics = &entry{} func (e *entry) DetectedTime() time.Time { - return e.detected + return e.detected.Load().(time.Time) +} + +func (e *entry) LastSeen() time.Time { + return e.lastSeen.Load().(time.Time) } func (e *entry) InflightRequests() int64 { - return e.inflightRequests + return e.inflightRequests.Load() +} + +func (e *entry) IncInflightRequest() { + e.inflightRequests.Add(1) +} + +func (e *entry) DecInflightRequest() { + e.inflightRequests.Add(-1) +} + +func (e *entry) SetDetected(detected time.Time) { + e.detected.Store(detected) +} + +func (e *entry) SetLastSeen(ts time.Time) { + e.lastSeen.Store(ts) +} + +func newEntry() (result *entry) { + result = &entry{} + result.SetDetected(time.Time{}) + result.SetLastSeen(time.Time{}) + return } type EndpointRegistry struct { - lastSeen map[string]time.Time lastSeenTimeout time.Duration now func() time.Time - mu sync.Mutex - - data map[string]*entry + // map[string]*entry + data sync.Map } var _ PostProcessor = &EndpointRegistry{} @@ -55,24 +89,23 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { for _, epi := range route.LBEndpoints { metrics := r.GetMetrics(epi.Host) if metrics.DetectedTime().IsZero() { - r.SetDetectedTime(epi.Host, now) + metrics.SetDetected(now) } - r.lastSeen[epi.Host] = now + metrics.SetLastSeen(now) } } } - for host, ts := range r.lastSeen { - if ts.Add(r.lastSeenTimeout).Before(now) { - r.mu.Lock() - if r.data[host].inflightRequests == 0 { - delete(r.lastSeen, host) - delete(r.data, host) - } - r.mu.Unlock() + removeOlder := now.Add(-r.lastSeenTimeout) + r.data.Range(func(key, value any) bool { + e := value.(*entry) + if e.LastSeen().Before(removeOlder) { + r.data.Delete(key) } - } + + return true + }) return routes } @@ -82,58 +115,16 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { o.LastSeenTimeout = defaultLastSeenTimeout } - return &EndpointRegistry{ - data: map[string]*entry{}, - lastSeen: map[string]time.Time{}, + result := &EndpointRegistry{ + data: sync.Map{}, lastSeenTimeout: o.LastSeenTimeout, now: time.Now, } -} - -func (r *EndpointRegistry) GetMetrics(key string) Metrics { - r.mu.Lock() - defer r.mu.Unlock() - - e := r.getOrInitEntryLocked(key) - copy := &entry{} - *copy = *e - return copy -} - -func (r *EndpointRegistry) SetDetectedTime(key string, detected time.Time) { - r.mu.Lock() - defer r.mu.Unlock() - e := r.getOrInitEntryLocked(key) - e.detected = detected + return result } -func (r *EndpointRegistry) IncInflightRequest(key string) { - r.mu.Lock() - defer r.mu.Unlock() - - e := r.getOrInitEntryLocked(key) - e.inflightRequests++ -} - -func (r *EndpointRegistry) DecInflightRequest(key string) { - r.mu.Lock() - defer r.mu.Unlock() - - e := r.getOrInitEntryLocked(key) - e.inflightRequests-- -} - -// getOrInitEntryLocked returns pointer to endpoint registry entry -// which contains the information about endpoint representing the -// following key. r.mu must be held while calling this function and -// using of the entry returned. In general, key represents the "host:port" -// string -func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry { - e, ok := r.data[key] - if !ok { - e = &entry{} - r.data[key] = e - } - return e +func (r *EndpointRegistry) GetMetrics(key string) Metrics { + e, _ := r.data.LoadOrStore(key, newEntry()) + return e.(*entry) } diff --git a/routing/endpointregistry_test.go b/routing/endpointregistry_test.go index 1256da4d17..2bb41af925 100644 --- a/routing/endpointregistry_test.go +++ b/routing/endpointregistry_test.go @@ -17,37 +17,50 @@ func TestEmptyRegistry(t *testing.T) { m := r.GetMetrics("some key") assert.Equal(t, time.Time{}, m.DetectedTime()) + assert.Equal(t, time.Time{}, m.LastSeen()) assert.Equal(t, int64(0), m.InflightRequests()) } func TestSetAndGet(t *testing.T) { + now := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) mBefore := r.GetMetrics("some key") - r.IncInflightRequest("some key") - mAfter := r.GetMetrics("some key") - + assert.Equal(t, time.Time{}, mBefore.DetectedTime()) + assert.Equal(t, time.Time{}, mBefore.LastSeen()) assert.Equal(t, int64(0), mBefore.InflightRequests()) - assert.Equal(t, int64(1), mAfter.InflightRequests()) - ts, _ := time.Parse(time.DateOnly, "2023-08-29") - mBefore = r.GetMetrics("some key") - r.SetDetectedTime("some key", ts) - mAfter = r.GetMetrics("some key") + r.GetMetrics("some key").SetDetected(now.Add(-time.Second)) + r.GetMetrics("some key").SetLastSeen(now) + r.GetMetrics("some key").IncInflightRequest() + mAfter := r.GetMetrics("some key") - assert.Equal(t, time.Time{}, mBefore.DetectedTime()) - assert.Equal(t, ts, mAfter.DetectedTime()) + assert.Equal(t, now.Add(-time.Second), mBefore.DetectedTime()) + assert.Equal(t, now, mBefore.LastSeen()) + assert.Equal(t, int64(1), mBefore.InflightRequests()) + + assert.Equal(t, now.Add(-time.Second), mAfter.DetectedTime()) + assert.Equal(t, now, mAfter.LastSeen()) + assert.Equal(t, int64(1), mAfter.InflightRequests()) } func TestSetAndGetAnotherKey(t *testing.T) { + now := time.Now() r := routing.NewEndpointRegistry(routing.RegistryOptions{}) - r.IncInflightRequest("some key") mToChange := r.GetMetrics("some key") + mToChange.IncInflightRequest() + mToChange.SetDetected(now.Add(-time.Second)) + mToChange.SetLastSeen(now) mConst := r.GetMetrics("another key") assert.Equal(t, int64(0), mConst.InflightRequests()) + assert.Equal(t, time.Time{}, mConst.DetectedTime()) + assert.Equal(t, time.Time{}, mConst.LastSeen()) + assert.Equal(t, int64(1), mToChange.InflightRequests()) + assert.Equal(t, now.Add(-time.Second), mToChange.DetectedTime()) + assert.Equal(t, now, mToChange.LastSeen()) } func TestDoRemovesOldEntries(t *testing.T) { @@ -73,9 +86,9 @@ func TestDoRemovesOldEntries(t *testing.T) { assert.Equal(t, beginTestTs, mExist.DetectedTime()) assert.Equal(t, beginTestTs, mExistYet.DetectedTime()) - r.IncInflightRequest("endpoint1.test:80") - r.IncInflightRequest("endpoint2.test:80") - r.DecInflightRequest("endpoint2.test:80") + mExist.IncInflightRequest() + mExistYet.IncInflightRequest() + mExistYet.DecInflightRequest() routing.SetNow(r, func() time.Time { return beginTestTs.Add(routing.ExportDefaultLastSeenTimeout + time.Second) @@ -101,6 +114,95 @@ func TestDoRemovesOldEntries(t *testing.T) { assert.Equal(t, int64(0), mRemoved.InflightRequests()) } +func TestMetricsMethodsDoNotAllocate(t *testing.T) { + r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + metrics := r.GetMetrics("some key") + now := time.Now() + metrics.SetDetected(now.Add(-time.Hour)) + metrics.SetLastSeen(now) + + allocs := testing.AllocsPerRun(100, func() { + assert.Equal(t, int64(0), metrics.InflightRequests()) + metrics.IncInflightRequest() + assert.Equal(t, int64(1), metrics.InflightRequests()) + metrics.DecInflightRequest() + assert.Equal(t, int64(0), metrics.InflightRequests()) + + metrics.DetectedTime() + metrics.LastSeen() + }) + assert.Equal(t, now.Add(-time.Hour), metrics.DetectedTime()) + assert.Equal(t, now, metrics.LastSeen()) + + assert.Equal(t, 0.0, allocs) +} + +func TestRaceReadWrite(t *testing.T) { + r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + duration := time.Second + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + stop := time.After(duration) + for { + r.GetMetrics("some key") + select { + case <-stop: + return + default: + } + } + }() + go func() { + defer wg.Done() + stop := time.After(duration) + for { + r.GetMetrics("some key").IncInflightRequest() + select { + case <-stop: + return + default: + } + } + }() + wg.Wait() +} + +func TestRaceTwoWriters(t *testing.T) { + r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + duration := time.Second + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + stop := time.After(duration) + for { + r.GetMetrics("some key").IncInflightRequest() + select { + case <-stop: + return + default: + } + } + }() + go func() { + defer wg.Done() + stop := time.After(duration) + for { + r.GetMetrics("some key").DecInflightRequest() + select { + case <-stop: + return + default: + } + } + }() + wg.Wait() +} + func printTotalMutexWaitTime(b *testing.B) { // Name of the metric we want to read. const myMetric = "/sync/mutex/wait/total:seconds" @@ -133,20 +235,24 @@ func benchmarkIncInflightRequests(b *testing.B, name string, goroutines int) { b.Run(name, func(b *testing.B) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + now := time.Now() + for i := 1; i < mapSize; i++ { - r.IncInflightRequest(fmt.Sprintf("foo-%d", i)) + r.GetMetrics(fmt.Sprintf("foo-%d", i)).IncInflightRequest() } - r.IncInflightRequest(key) - r.IncInflightRequest(key) + r.GetMetrics(key).IncInflightRequest() + r.GetMetrics(key).SetDetected(now) wg := sync.WaitGroup{} b.ResetTimer() + b.ReportAllocs() for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() + metrics := r.GetMetrics(key) for n := 0; n < b.N/goroutines; n++ { - r.IncInflightRequest(key) + metrics.IncInflightRequest() } }() } @@ -169,21 +275,24 @@ func benchmarkGetInflightRequests(b *testing.B, name string, goroutines int) { b.Run(name, func(b *testing.B) { r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + now := time.Now() for i := 1; i < mapSize; i++ { - r.IncInflightRequest(fmt.Sprintf("foo-%d", i)) + r.GetMetrics(fmt.Sprintf("foo-%d", i)).IncInflightRequest() } - r.IncInflightRequest(key) - r.IncInflightRequest(key) + r.GetMetrics(key).IncInflightRequest() + r.GetMetrics(key).SetDetected(now) var dummy int64 wg := sync.WaitGroup{} b.ResetTimer() + b.ReportAllocs() for i := 0; i < goroutines; i++ { wg.Add(1) go func() { defer wg.Done() + metrics := r.GetMetrics(key) for n := 0; n < b.N/goroutines; n++ { - dummy = r.GetMetrics(key).InflightRequests() + dummy = metrics.InflightRequests() } }() } @@ -200,3 +309,44 @@ func BenchmarkGetInflightRequests(b *testing.B) { benchmarkGetInflightRequests(b, fmt.Sprintf("%d goroutines", goroutines), goroutines) } } + +func benchmarkGetDetectedTime(b *testing.B, name string, goroutines int) { + const key string = "some key" + const mapSize int = 10000 + + b.Run(name, func(b *testing.B) { + r := routing.NewEndpointRegistry(routing.RegistryOptions{}) + now := time.Now() + for i := 1; i < mapSize; i++ { + r.GetMetrics(fmt.Sprintf("foo-%d", i)).IncInflightRequest() + } + r.GetMetrics(key).IncInflightRequest() + r.GetMetrics(key).SetDetected(now) + + var dummy time.Time + wg := sync.WaitGroup{} + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + metrics := r.GetMetrics(key) + for n := 0; n < b.N/goroutines; n++ { + dummy = metrics.DetectedTime() + } + }() + } + dummy = dummy.Add(time.Second) + wg.Wait() + + printTotalMutexWaitTime(b) + }) +} + +func BenchmarkGetDetectedTime(b *testing.B) { + goroutinesNums := []int{1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 24, 32, 48, 64, 128, 256, 512, 768, 1024, 1536, 2048, 4096} + for _, goroutines := range goroutinesNums { + benchmarkGetDetectedTime(b, fmt.Sprintf("%d goroutines", goroutines), goroutines) + } +} diff --git a/skipper.go b/skipper.go index eb75610a60..3a6dc0344c 100644 --- a/skipper.go +++ b/skipper.go @@ -1908,7 +1908,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { endpointRegistry, schedulerRegistry, builtin.NewRouteCreationMetrics(mtr), - fadein.NewPostProcessor(), + fadein.NewPostProcessor(fadein.PostProcessorOptions{EndpointRegistry: endpointRegistry}), admissionControlSpec.PostProcessor(), }, SignalFirstLoad: o.WaitFirstRouteLoad,