Skip to content

Commit

Permalink
Merge branch 'develop' into urbs
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Apr 11, 2020
2 parents e69e336 + 5066a0c commit eaf2962
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/middlewares/canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(ctx context.Context, next http.Handler, cfg dynamic.Canary, name string
if c.loadLabels {
c.ls = NewLabelStore(logger, cfg, expiration, cacheCleanDuration)
}
logger.Infof("Add canary middleware: %v, %v, %v", cfg, expiration, cacheCleanDuration)
logger.Debugf("Add canary middleware: %v, %v, %v", cfg, expiration, cacheCleanDuration)
return c, nil
}

Expand Down
79 changes: 53 additions & 26 deletions pkg/middlewares/canary/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@ import (
"github.com/containous/traefik/v2/pkg/log"
)

var storesMu sync.Mutex
var stores = make(map[string]*Store)

// LabelStore ...
type LabelStore struct {
logger log.Logger
s *Store
logger log.Logger
expiration time.Duration
mustFetchLabels func(ctx context.Context, uid, requestID string) (labels []Label, timestamp int64)
}

// Store ...
type Store struct {
mu sync.RWMutex
expiration time.Duration
maxCacheSize int
cacheCleanDuration time.Duration
shouldRound time.Time
maxCacheSize int
liveMap map[string]*entry
staleMap map[string]*entry
mustFetchLabels func(ctx context.Context, uid, requestID string) (labels []Label, timestamp int64)
}

type entry struct {
Expand Down Expand Up @@ -65,16 +73,6 @@ func (l *Label) MatchChannel(channel string) bool {

// NewLabelStore ...
func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheCleanDuration time.Duration) *LabelStore {
ls := &LabelStore{
logger: logger,
maxCacheSize: cfg.MaxCacheSize,
expiration: expiration,
cacheCleanDuration: cacheCleanDuration,
shouldRound: time.Now().UTC().Add(cacheCleanDuration),
liveMap: make(map[string]*entry),
staleMap: make(map[string]*entry),
}

product := cfg.Product
apiURL := cfg.Server
// apiURL ex. https://labelServerHost/api/labels?uid=%s&product=%s
Expand All @@ -85,6 +83,24 @@ func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheClean
apiURL += "/users/%s/labels:cache?product=%s"
}

storesMu.Lock()
// LabelStores share Store with same apiURL, but always update Store'config to latest
s, ok := stores[apiURL]
if !ok {
s = &Store{
maxCacheSize: cfg.MaxCacheSize,
cacheCleanDuration: cacheCleanDuration,
shouldRound: time.Now().UTC().Add(cacheCleanDuration),
liveMap: make(map[string]*entry),
staleMap: make(map[string]*entry),
}
stores[apiURL] = s
} else {
s.updateConfig(cfg.MaxCacheSize, cacheCleanDuration)
}
storesMu.Unlock()

ls := &LabelStore{logger: logger, s: s, expiration: expiration}
ls.mustFetchLabels = func(ctx context.Context, uid, requestID string) ([]Label, int64) {
url := fmt.Sprintf(apiURL, uid, product)
return MustGetUserLabels(ctx, url, requestID, logger)
Expand All @@ -93,29 +109,41 @@ func NewLabelStore(logger log.Logger, cfg dynamic.Canary, expiration, cacheClean
}

// MustLoadLabels ...
func (s *LabelStore) MustLoadLabels(ctx context.Context, uid, requestID string) []Label {
func (ls *LabelStore) MustLoadLabels(ctx context.Context, uid, requestID string) []Label {
now := time.Now().UTC()
e := s.mustLoadEntry(uid, now)
e, round := ls.s.mustLoadEntry(uid, now)
if round {
ls.logger.Infof("Round cache: current stale cache %d, live cache %d, trigger %s",
len(ls.s.staleMap), len(ls.s.liveMap), uid)
}

e.mu.Lock()
defer e.mu.Unlock()
if e.value == nil || e.expireAt.Before(now) {
labels, ts := s.mustFetchLabels(ctx, uid, requestID)
labels, ts := ls.mustFetchLabels(ctx, uid, requestID)
e.value = labels
e.expireAt = time.Unix(ts, 0).Add(s.expiration)
e.expireAt = time.Unix(ts, 0).Add(ls.expiration)
}

return e.value
}

func (s *LabelStore) mustLoadEntry(key string, now time.Time) *entry {
// updateConfig ...
func (s *Store) updateConfig(maxCacheSize int, cacheCleanDuration time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
s.maxCacheSize = maxCacheSize
s.cacheCleanDuration = cacheCleanDuration
}

func (s *Store) mustLoadEntry(key string, now time.Time) (*entry, bool) {
s.mu.RLock()
e, ok := s.liveMap[key]
shouldRound := len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now)
round := len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now)
s.mu.RUnlock()

if ok && !shouldRound {
return e
if ok && !round {
return e, round
}

s.mu.Lock()
Expand All @@ -134,13 +162,12 @@ func (s *LabelStore) mustLoadEntry(key string, now time.Time) *entry {
s.liveMap[key] = e
}

if len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) {
s.logger.Infof("Round cache, stale cache size: %d, live cache size: %d, trigger: %s, shouldRound: %s",
len(s.staleMap), len(s.liveMap), key, s.shouldRound.Format(time.RFC3339))
round = len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) // check again
if round {
s.shouldRound = now.Add(s.cacheCleanDuration)
// make a round: drop staleMap and create new liveMap
s.staleMap = s.liveMap
s.liveMap = make(map[string]*entry, len(s.staleMap)/2)
}
return e
return e, round
}
29 changes: 19 additions & 10 deletions pkg/middlewares/canary/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,37 @@ func TestLabelStore(t *testing.T) {
return []Label{{Label: requestID}}, time.Now().Unix()
}

u1 := ls.mustLoadEntry("u1", time.Now())
u1, ok := ls.s.mustLoadEntry("u1", time.Now())
a.False(ok)
var wg sync.WaitGroup

wg.Add(3)
go func(e *entry) {
defer wg.Done()
a.Equal(e, ls.mustLoadEntry("u1", time.Now()))
u1, ok := ls.s.mustLoadEntry("u1", time.Now())
a.False(ok)
a.Equal(e, u1)
}(u1)

go func(e *entry) {
defer wg.Done()
a.Equal(e, ls.mustLoadEntry("u1", time.Now()))
u1, ok := ls.s.mustLoadEntry("u1", time.Now())
a.False(ok)
a.Equal(e, u1)
}(u1)

go func(e *entry) {
defer wg.Done()
ls.mustLoadEntry("u2", time.Now())
ls.mustLoadEntry("u3", time.Now())
ls.mustLoadEntry("u4", time.Now())
// Round cache
a.Equal(0, len(ls.liveMap))
a.Equal(e, ls.mustLoadEntry("u1", time.Now()))
_, ok := ls.s.mustLoadEntry("u2", time.Now())
a.False(ok)
_, ok = ls.s.mustLoadEntry("u3", time.Now())
a.False(ok)
_, ok = ls.s.mustLoadEntry("u4", time.Now())
a.True(ok)
a.Equal(0, len(ls.s.liveMap))
u1, ok := ls.s.mustLoadEntry("u1", time.Now())
a.False(ok)
a.Equal(e, u1)
}(u1)

wg.Wait()
Expand Down Expand Up @@ -105,7 +114,7 @@ func TestLabelStore(t *testing.T) {
_ = ls.MustLoadLabels(context.Background(), "u4", "v2")

// Round cache
a.Equal(0, len(ls.liveMap))
a.Equal(0, len(ls.s.liveMap))

// load cache from staleMap
labels = ls.MustLoadLabels(context.Background(), "u1", "v4")
Expand Down

0 comments on commit eaf2962

Please sign in to comment.