From 5066a0cc0c5fc6da851f17b2abd182817e9ed554 Mon Sep 17 00:00:00 2001 From: zensh Date: Sat, 11 Apr 2020 11:36:36 +0800 Subject: [PATCH] share label cache for each canary middleware --- pkg/middlewares/canary/canary.go | 2 +- pkg/middlewares/canary/label.go | 79 +++++++++++++++++++--------- pkg/middlewares/canary/label_test.go | 29 ++++++---- 3 files changed, 73 insertions(+), 37 deletions(-) diff --git a/pkg/middlewares/canary/canary.go b/pkg/middlewares/canary/canary.go index d67e377f5b..d6a1c17b1e 100644 --- a/pkg/middlewares/canary/canary.go +++ b/pkg/middlewares/canary/canary.go @@ -78,7 +78,7 @@ func New(ctx context.Context, next http.Handler, cfg dynamic.Canary, name string if c.loadLabels { c.ls = NewLabelStore(logger, cfg, expiration, cacheCleanDuration) } - logger.Infof("Add canary middleware: %v, %v, %v", cfg, expiration, cacheCleanDuration) + logger.Debugf("Add canary middleware: %v, %v, %v", cfg, expiration, cacheCleanDuration) return c, nil } diff --git a/pkg/middlewares/canary/label.go b/pkg/middlewares/canary/label.go index cea2e980b9..03ee8b7a77 100644 --- a/pkg/middlewares/canary/label.go +++ b/pkg/middlewares/canary/label.go @@ -11,17 +11,25 @@ import ( "github.com/containous/traefik/v2/pkg/log" ) +var storesMu sync.Mutex +var stores = make(map[string]*Store) + // LabelStore ... type LabelStore struct { - logger log.Logger + s *Store + logger log.Logger + expiration time.Duration + mustFetchLabels func(ctx context.Context, uid, requestID string) (labels []Label, timestamp int64) +} + +// Store ... +type Store struct { mu sync.RWMutex - expiration time.Duration + maxCacheSize int cacheCleanDuration time.Duration shouldRound time.Time - maxCacheSize int liveMap map[string]*entry staleMap map[string]*entry - mustFetchLabels func(ctx context.Context, uid, requestID string) (labels []Label, timestamp int64) } type entry struct { @@ -65,16 +73,6 @@ func (l *Label) MatchChannel(channel string) bool { // NewLabelStore ... func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheCleanDuration time.Duration) *LabelStore { - ls := &LabelStore{ - logger: logger, - maxCacheSize: cfg.MaxCacheSize, - expiration: expiration, - cacheCleanDuration: cacheCleanDuration, - shouldRound: time.Now().UTC().Add(cacheCleanDuration), - liveMap: make(map[string]*entry), - staleMap: make(map[string]*entry), - } - product := cfg.Product apiURL := cfg.Server // apiURL ex. https://labelServerHost/api/labels?uid=%s&product=%s @@ -85,6 +83,24 @@ func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheClean apiURL += "/users/%s/labels:cache?product=%s" } + storesMu.Lock() + // LabelStores share Store with same apiURL, but always update Store'config to latest + s, ok := stores[apiURL] + if !ok { + s = &Store{ + maxCacheSize: cfg.MaxCacheSize, + cacheCleanDuration: cacheCleanDuration, + shouldRound: time.Now().UTC().Add(cacheCleanDuration), + liveMap: make(map[string]*entry), + staleMap: make(map[string]*entry), + } + stores[apiURL] = s + } else { + s.updateConfig(cfg.MaxCacheSize, cacheCleanDuration) + } + storesMu.Unlock() + + ls := &LabelStore{logger: logger, s: s, expiration: expiration} ls.mustFetchLabels = func(ctx context.Context, uid, requestID string) ([]Label, int64) { url := fmt.Sprintf(apiURL, uid, product) return MustGetUserLabels(ctx, url, requestID, logger) @@ -93,29 +109,41 @@ func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheClean } // MustLoadLabels ... -func (s *LabelStore) MustLoadLabels(ctx context.Context, uid, requestID string) []Label { +func (ls *LabelStore) MustLoadLabels(ctx context.Context, uid, requestID string) []Label { now := time.Now().UTC() - e := s.mustLoadEntry(uid, now) + e, round := ls.s.mustLoadEntry(uid, now) + if round { + ls.logger.Infof("Round cache: current stale cache %d, live cache %d, trigger %s", + len(ls.s.staleMap), len(ls.s.liveMap), uid) + } e.mu.Lock() defer e.mu.Unlock() if e.value == nil || e.expireAt.Before(now) { - labels, ts := s.mustFetchLabels(ctx, uid, requestID) + labels, ts := ls.mustFetchLabels(ctx, uid, requestID) e.value = labels - e.expireAt = time.Unix(ts, 0).Add(s.expiration) + e.expireAt = time.Unix(ts, 0).Add(ls.expiration) } return e.value } -func (s *LabelStore) mustLoadEntry(key string, now time.Time) *entry { +// updateConfig ... +func (s *Store) updateConfig(maxCacheSize int, cacheCleanDuration time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + s.maxCacheSize = maxCacheSize + s.cacheCleanDuration = cacheCleanDuration +} + +func (s *Store) mustLoadEntry(key string, now time.Time) (*entry, bool) { s.mu.RLock() e, ok := s.liveMap[key] - shouldRound := len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) + round := len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) s.mu.RUnlock() - if ok && !shouldRound { - return e + if ok && !round { + return e, round } s.mu.Lock() @@ -134,13 +162,12 @@ func (s *LabelStore) mustLoadEntry(key string, now time.Time) *entry { s.liveMap[key] = e } - if len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) { - s.logger.Infof("Round cache, stale cache size: %d, live cache size: %d, trigger: %s, shouldRound: %s", - len(s.staleMap), len(s.liveMap), key, s.shouldRound.Format(time.RFC3339)) + round = len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) // check again + if round { s.shouldRound = now.Add(s.cacheCleanDuration) // make a round: drop staleMap and create new liveMap s.staleMap = s.liveMap s.liveMap = make(map[string]*entry, len(s.staleMap)/2) } - return e + return e, round } diff --git a/pkg/middlewares/canary/label_test.go b/pkg/middlewares/canary/label_test.go index 9684acc763..e44f4a43eb 100644 --- a/pkg/middlewares/canary/label_test.go +++ b/pkg/middlewares/canary/label_test.go @@ -44,28 +44,37 @@ func TestLabelStore(t *testing.T) { return []Label{{Label: requestID}}, time.Now().Unix() } - u1 := ls.mustLoadEntry("u1", time.Now()) + u1, ok := ls.s.mustLoadEntry("u1", time.Now()) + a.False(ok) var wg sync.WaitGroup wg.Add(3) go func(e *entry) { defer wg.Done() - a.Equal(e, ls.mustLoadEntry("u1", time.Now())) + u1, ok := ls.s.mustLoadEntry("u1", time.Now()) + a.False(ok) + a.Equal(e, u1) }(u1) go func(e *entry) { defer wg.Done() - a.Equal(e, ls.mustLoadEntry("u1", time.Now())) + u1, ok := ls.s.mustLoadEntry("u1", time.Now()) + a.False(ok) + a.Equal(e, u1) }(u1) go func(e *entry) { defer wg.Done() - ls.mustLoadEntry("u2", time.Now()) - ls.mustLoadEntry("u3", time.Now()) - ls.mustLoadEntry("u4", time.Now()) - // Round cache - a.Equal(0, len(ls.liveMap)) - a.Equal(e, ls.mustLoadEntry("u1", time.Now())) + _, ok := ls.s.mustLoadEntry("u2", time.Now()) + a.False(ok) + _, ok = ls.s.mustLoadEntry("u3", time.Now()) + a.False(ok) + _, ok = ls.s.mustLoadEntry("u4", time.Now()) + a.True(ok) + a.Equal(0, len(ls.s.liveMap)) + u1, ok := ls.s.mustLoadEntry("u1", time.Now()) + a.False(ok) + a.Equal(e, u1) }(u1) wg.Wait() @@ -105,7 +114,7 @@ func TestLabelStore(t *testing.T) { _ = ls.MustLoadLabels(context.Background(), "u4", "v2") // Round cache - a.Equal(0, len(ls.liveMap)) + a.Equal(0, len(ls.s.liveMap)) // load cache from staleMap labels = ls.MustLoadLabels(context.Background(), "u1", "v4")