Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync.Map in the endpointregistry #2795

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
}

postProcessor struct {
endpointRegisty *routing.EndpointRegistry
// "http://10.2.1.53:1234": {t0 60s t0-10s}
detected map[string]detectedFadeIn
}
Expand Down Expand Up @@ -192,11 +193,20 @@ func (endpointCreated) CreateFilter(args []interface{}) (filters.Filter, error)
func (endpointCreated) Request(filters.FilterContext) {}
func (endpointCreated) Response(filters.FilterContext) {}

type PostProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

// NewPostProcessor creates post-processor for maintaining the detection time of LB endpoints with fade-in
// behavior.
func NewPostProcessor() routing.PostProcessor {
func NewPostProcessor(options PostProcessorOptions) routing.PostProcessor {
if options.EndpointRegistry == nil {
options.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{})
}
Comment on lines +203 to +205
Copy link
Member

@AlexanderYastrebov AlexanderYastrebov Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes no sense as this registry instance will not be visible outside of this postprocessor.
The code should call GetMetrics only when registry is not nil instead.


return &postProcessor{
detected: make(map[string]detectedFadeIn),
endpointRegisty: options.EndpointRegistry,
detected: make(map[string]detectedFadeIn),
}
}

Expand Down Expand Up @@ -235,6 +245,10 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
}

ep.Detected = detected
metrics := p.endpointRegisty.GetMetrics(ep.Host)
if endpointsCreated[key].After(metrics.DetectedTime()) {
metrics.SetDetected(endpointsCreated[key])
}
p.detected[key] = detectedFadeIn{
when: detected,
duration: ri.LBFadeInDuration,
Expand Down
4 changes: 2 additions & 2 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestPostProcessor(t *testing.T) {
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
endpointRegistry,
NewPostProcessor(),
NewPostProcessor(PostProcessorOptions{EndpointRegistry: endpointRegistry}),
},
SignalFirstLoad: true,
})
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
if endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
if !endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to reset detection time.")
szuecs marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
4 changes: 2 additions & 2 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ep.Metrics.IncInflightRequest()
ctx.Registry.IncInflightRequest(ep.Host)
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
}
}

Expand All @@ -441,7 +441,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
for i := 0; i < count; i++ {
endpoint.Metrics.IncInflightRequest()
registry.IncInflightRequest(endpoint.Host)
registry.GetMetrics(endpoint.Host).IncInflightRequest()
}
}

Expand Down
4 changes: 2 additions & 2 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
Host: eps[i],
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}
ctx.LBEndpoints = ctx.Route.LBEndpoints

Expand Down Expand Up @@ -332,7 +332,7 @@ func benchmarkFadeIn(
Host: eps[i],
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

var wg sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion proxy/fadeintesting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (p *fadeInProxy) addInstances(n int) {
DataClients: []routing.DataClient{client},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
fadein.NewPostProcessor(),
fadein.NewPostProcessor(fadein.PostProcessorOptions{}),
},
})

Expand Down
125 changes: 58 additions & 67 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routing

import (
"sync"
"sync/atomic"
"time"

"github.com/zalando/skipper/eskip"
Expand All @@ -13,32 +14,65 @@ const defaultLastSeenTimeout = 1 * time.Minute
// used to perform better load balancing, fadeIn, etc.
type Metrics interface {
DetectedTime() time.Time
SetDetected(detected time.Time)

LastSeen() time.Time
SetLastSeen(lastSeen time.Time)

InflightRequests() int64
IncInflightRequest()
DecInflightRequest()
}

type entry struct {
detected time.Time
inflightRequests int64
detected atomic.Value // time.Time
lastSeen atomic.Value // time.Time
inflightRequests atomic.Int64
}

var _ Metrics = &entry{}

func (e *entry) DetectedTime() time.Time {
return e.detected
return e.detected.Load().(time.Time)
AlexanderYastrebov marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *entry) LastSeen() time.Time {
return e.lastSeen.Load().(time.Time)
}

func (e *entry) InflightRequests() int64 {
return e.inflightRequests
return e.inflightRequests.Load()
}

func (e *entry) IncInflightRequest() {
e.inflightRequests.Add(1)
}

func (e *entry) DecInflightRequest() {
e.inflightRequests.Add(-1)
}

func (e *entry) SetDetected(detected time.Time) {
e.detected.Store(detected)
}

func (e *entry) SetLastSeen(ts time.Time) {
e.lastSeen.Store(ts)
}

func newEntry() (result *entry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why named return value?
This looks much better:

func newEntry() *entry {
	result := &entry{}
	result.SetDetected(time.Time{})
	result.SetLastSeen(time.Time{})
	return result
}

result = &entry{}
result.SetDetected(time.Time{})
result.SetLastSeen(time.Time{})
return
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
lastSeenTimeout time.Duration
now func() time.Time

mu sync.Mutex

data map[string]*entry
// map[string]*entry
data sync.Map
}

var _ PostProcessor = &EndpointRegistry{}
Expand All @@ -55,24 +89,23 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
for _, epi := range route.LBEndpoints {
metrics := r.GetMetrics(epi.Host)
if metrics.DetectedTime().IsZero() {
r.SetDetectedTime(epi.Host, now)
metrics.SetDetected(now)
RomanZavodskikh marked this conversation as resolved.
Show resolved Hide resolved
}

r.lastSeen[epi.Host] = now
metrics.SetLastSeen(now)
}
}
}

for host, ts := range r.lastSeen {
if ts.Add(r.lastSeenTimeout).Before(now) {
r.mu.Lock()
if r.data[host].inflightRequests == 0 {
delete(r.lastSeen, host)
delete(r.data, host)
}
r.mu.Unlock()
removeOlder := now.Add(-r.lastSeenTimeout)
r.data.Range(func(key, value any) bool {
e := value.(*entry)
if e.LastSeen().Before(removeOlder) {
r.data.Delete(key)
szuecs marked this conversation as resolved.
Show resolved Hide resolved
}
}

return true
})

return routes
}
Expand All @@ -82,58 +115,16 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
o.LastSeenTimeout = defaultLastSeenTimeout
}

return &EndpointRegistry{
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
result := &EndpointRegistry{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why store result if you can just return it?

data: sync.Map{},
lastSeenTimeout: o.LastSeenTimeout,
now: time.Now,
}
}

func (r *EndpointRegistry) GetMetrics(key string) Metrics {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e
return copy
}

func (r *EndpointRegistry) SetDetectedTime(key string, detected time.Time) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.detected = detected
return result
}

func (r *EndpointRegistry) IncInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests++
}

func (r *EndpointRegistry) DecInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests--
}

// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
// using of the entry returned. In general, key represents the "host:port"
// string
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
r.data[key] = e
}
return e
func (r *EndpointRegistry) GetMetrics(key string) Metrics {
e, _ := r.data.LoadOrStore(key, newEntry())
szuecs marked this conversation as resolved.
Show resolved Hide resolved
return e.(*entry)
}
Loading