Skip to content

Commit

Permalink
Use sync.Map in the endpointregistry
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Dec 14, 2023
1 parent 5f706e5 commit d229720
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 96 deletions.
4 changes: 2 additions & 2 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ep.Metrics.IncInflightRequest()
ctx.Registry.IncInflightRequest(ep.Host)
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
}
}

Expand All @@ -441,7 +441,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
for i := 0; i < count; i++ {
endpoint.Metrics.IncInflightRequest()
registry.IncInflightRequest(endpoint.Host)
registry.GetMetrics(endpoint.Host).IncInflightRequest()
}
}

Expand Down
4 changes: 2 additions & 2 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
Host: eps[i],
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}
ctx.LBEndpoints = ctx.Route.LBEndpoints

Expand Down Expand Up @@ -332,7 +332,7 @@ func benchmarkFadeIn(
Host: eps[i],
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

var wg sync.WaitGroup
Expand Down
133 changes: 63 additions & 70 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routing

import (
"sync"
"sync/atomic"
"time"

"github.com/zalando/skipper/eskip"
Expand All @@ -13,32 +14,69 @@ const defaultLastSeenTimeout = 1 * time.Minute
// used to perform better load balancing, fadeIn, etc.
type Metrics interface {
DetectedTime() time.Time
SetDetected(detected time.Time)

LastSeen() time.Time
SetLastSeen(lastSeen time.Time)

InflightRequests() int64
IncInflightRequest()
DecInflightRequest()
}

type entry struct {
detected time.Time
inflightRequests int64
detected atomic.Value // time.Time
lastSeen atomic.Value // time.Time
inflightRequests atomic.Int64
}

var _ Metrics = &entry{}

func (e *entry) DetectedTime() time.Time {
return e.detected
return e.detected.Load().(time.Time)
}

func (e *entry) LastSeen() time.Time {
return e.lastSeen.Load().(time.Time)
}

func (e *entry) InflightRequests() int64 {
return e.inflightRequests
return e.inflightRequests.Load()
}

func (e *entry) IncInflightRequest() {
e.inflightRequests.Add(1)
}

func (e *entry) DecInflightRequest() {
e.inflightRequests.Add(-1)
}

func (e *entry) setDetectedForce(detected time.Time) {
e.detected.Store(detected)
}

func (e *entry) SetDetected(detected time.Time) {
e.detected.CompareAndSwap(time.Time{}, detected)
}

func (e *entry) SetLastSeen(ts time.Time) {
e.lastSeen.Store(ts)
}

func newEntry() (result *entry) {
result = &entry{}
result.setDetectedForce(time.Time{})
result.SetLastSeen(time.Time{})
return
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
lastSeenTimeout time.Duration
now func() time.Time

mu sync.Mutex

data map[string]*entry
// map[string]*entry
data sync.Map
}

var _ PostProcessor = &EndpointRegistry{}
Expand All @@ -53,26 +91,23 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
for _, route := range routes {
if route.BackendType == eskip.LBBackend {
for _, epi := range route.LBEndpoints {
metrics := r.GetMetrics(epi.Host)
if metrics.DetectedTime().IsZero() {
r.SetDetectedTime(epi.Host, now)
}
e, _ := r.data.LoadOrStore(epi.Host, newEntry())

r.lastSeen[epi.Host] = now
e.(*entry).SetDetected(now)
e.(*entry).SetLastSeen(now)
}
}
}

for host, ts := range r.lastSeen {
if ts.Add(r.lastSeenTimeout).Before(now) {
r.mu.Lock()
if r.data[host].inflightRequests == 0 {
delete(r.lastSeen, host)
delete(r.data, host)
}
r.mu.Unlock()
removeOlder := now.Add(-r.lastSeenTimeout)
r.data.Range(func(key, value any) bool {
e := value.(*entry)
if e.lastSeen.Load().(time.Time).Before(removeOlder) {
r.data.Delete(key)
}
}

return true
})

return routes
}
Expand All @@ -82,58 +117,16 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
o.LastSeenTimeout = defaultLastSeenTimeout
}

return &EndpointRegistry{
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
result := &EndpointRegistry{
data: sync.Map{},
lastSeenTimeout: o.LastSeenTimeout,
now: time.Now,
}
}

func (r *EndpointRegistry) GetMetrics(key string) Metrics {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e
return copy
}

func (r *EndpointRegistry) SetDetectedTime(key string, detected time.Time) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.detected = detected
return result
}

func (r *EndpointRegistry) IncInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests++
}

func (r *EndpointRegistry) DecInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests--
}

// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
// using of the entry returned. In general, key represents the "host:port"
// string
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
r.data[key] = e
}
return e
func (r *EndpointRegistry) GetMetrics(key string) Metrics {
e, _ := r.data.LoadOrStore(key, newEntry())
return e.(*entry)
}
Loading

0 comments on commit d229720

Please sign in to comment.