Skip to content

Commit

Permalink
improve canary middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Mar 11, 2020
1 parent de73b0a commit 58558a1
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 68 deletions.
11 changes: 7 additions & 4 deletions pkg/config/dynamic/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,11 @@ func (c *ClientTLS) CreateTLSConfig() (*tls.Config, error) {

// Canary middleware settings.
type Canary struct {
Product string `json:"product,omitempty" toml:"product,omitempty" yaml:"product,omitempty" export:"true"`
Server string `json:"server,omitempty" toml:"server,omitempty" yaml:"server,omitempty" export:"true"`
Cookie string `json:"cookie,omitempty" toml:"cookie,omitempty" yaml:"cookie,omitempty" export:"true"`
Expire string `json:"expire,omitempty" toml:"expire,omitempty" yaml:"expire,omitempty" export:"true"`
Product string `json:"product,omitempty" toml:"product,omitempty" yaml:"product,omitempty" export:"true"`
Server string `json:"server,omitempty" toml:"server,omitempty" yaml:"server,omitempty" export:"true"`
Cookie string `json:"cookie,omitempty" toml:"cookie,omitempty" yaml:"cookie,omitempty" export:"true"`
AddRequestID bool `json:"addRequestID,omitempty" toml:"addRequestID,omitempty" yaml:"addRequestID,omitempty" export:"true"`
MaxCacheSize int `json:"maxCacheSize,omitempty" toml:"maxCacheSize,omitempty" yaml:"maxCacheSize,omitempty" export:"true"`
CacheExpiration types.Duration `json:"cacheExpiration,omitempty" toml:"cacheExpiration,omitempty" yaml:"cacheExpiration,omitempty" export:"true"`
CacheCleanDuration types.Duration `json:"cacheCleanDuration,omitempty" toml:"cacheCleanDuration,omitempty" yaml:"cacheCleanDuration,omitempty" export:"true"`
}
82 changes: 61 additions & 21 deletions pkg/middlewares/canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"time"

Expand All @@ -17,16 +18,28 @@ import (
)

const (
labelKey = "X-Canary-Label"
typeName = "canary"
typeName = "canary"
headerAuth = "Authorization"
headerUA = "User-Agent"
headerXCanary = "X-Canary"
headerXRequestID = "X-Request-ID"
queryAccessToken = "access_token"
defaultCacheSize = 100000
defaultExpiration = time.Minute * 10
defaultCacheCleanDuration = time.Minute * 20
)

// Should be subset of DNS-1035 label
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
var validLabelReg = regexp.MustCompile(`^[a-z][0-9a-z-]{1,62}$`)

// Canary ...
type Canary struct {
name string
cookie string
next http.Handler
ls *labelStore
name string
cookie string
addRequestID bool
next http.Handler
ls *labelStore
}

// New returns a Canary instance.
Expand All @@ -41,16 +54,21 @@ func New(ctx context.Context, next http.Handler, cfg dynamic.Canary, name string
return nil, fmt.Errorf("canary label server required for Canary middleware")
}

expire, err := time.ParseDuration(cfg.Expire)
if err != nil {
return nil, fmt.Errorf("invalid expire for Canary middleware")
expiration := time.Duration(cfg.CacheExpiration)
if expiration < time.Minute {
expiration = defaultExpiration
}
if expire < time.Minute {
expire = time.Minute
cacheCleanDuration := time.Duration(cfg.CacheCleanDuration)
if cacheCleanDuration < time.Minute {
cacheCleanDuration = defaultCacheCleanDuration
}

if cfg.MaxCacheSize < 10 {
cfg.MaxCacheSize = defaultCacheSize
}

ls := newLabelStore(cfg, logger, expire)
return &Canary{name: name, cookie: cfg.Cookie, next: next, ls: ls}, nil
ls := newLabelStore(cfg, logger, expiration, cacheCleanDuration)
return &Canary{name: name, cookie: cfg.Cookie, addRequestID: cfg.AddRequestID, next: next, ls: ls}, nil
}

// GetTracingInformation implements Tracable interface
Expand All @@ -64,26 +82,48 @@ func (c *Canary) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// userAgent:"Android/9 (OPPO PBET00;zh_CN) App/5.0.5 AliApp(DingTalk/5.0.5) com.alibaba.android.rimet/12726948 Channel/263200 language/zh-CN"
// ua := uasurfer.Parse(req.Header.Get("User-Agent"))

label := ""
if uid := c.extractUserID(req); uid != "" {
labels := c.ls.mustLoad(req.Context(), uid, req.Header)
if len(labels) > 0 {
label = labels[0].Label
if c.addRequestID {
addRequestID(req)
}

if label := req.Header.Get(headerXCanary); label == "" {
if cookie, _ := req.Cookie(headerXCanary); cookie != nil {
label = cookie.Value
}

if label != "" && !validLabelReg.MatchString(label) {
label = ""
}

uid := c.extractUserID(req)
if label == "" && uid != "" {
labels := c.ls.mustLoad(req.Context(), uid, req.Header)
if len(labels) > 0 {
label = labels[0].Label
}
}

if label != "" {
req.Header.Set(headerXCanary, fmt.Sprintf("label=%s", label))
req.Header.Add(headerXCanary, fmt.Sprintf("product=%s", c.ls.product))
if uid != "" {
req.Header.Add(headerXCanary, fmt.Sprintf("uid=%s", uid))
}
}
}
req.Header.Set("labelKey", label)

c.next.ServeHTTP(rw, req)
}

func (c *Canary) extractUserID(req *http.Request) string {
jwToken := req.Header.Get("Authorization")
jwToken := req.Header.Get(headerAuth)
if jwToken != "" {
if strs := strings.Split(jwToken, " "); len(strs) == 2 {
jwToken = strs[1]
}
}
if jwToken == "" {
jwToken = req.URL.Query().Get("access_token")
jwToken = req.URL.Query().Get(queryAccessToken)
}
if jwToken != "" {
if strs := strings.Split(jwToken, "."); len(strs) == 3 {
Expand Down
64 changes: 34 additions & 30 deletions pkg/middlewares/canary/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import (
"github.com/containous/traefik/v2/pkg/log"
)

const roundDuration = time.Minute * 10

type labelStore struct {
product string
server string
logger log.Logger
mu sync.Mutex
expire time.Duration
shouldRound time.Time
liveMap map[string]*entity
staleMap map[string]*entity
product string
server string
logger log.Logger
mu sync.Mutex
expiration time.Duration
cacheCleanDuration time.Duration
maxCacheSize int
shouldRound time.Time
liveMap map[string]*entry
staleMap map[string]*entry
}

type entity struct {
type entry struct {
mu sync.Mutex
value []label
expireAt time.Time
Expand All @@ -36,15 +36,17 @@ type label struct {
Channels string `json:"chs,omitempty"`
}

func newLabelStore(cfg dynamic.Canary, logger log.Logger, expire time.Duration) *labelStore {
func newLabelStore(cfg dynamic.Canary, logger log.Logger, expiration time.Duration, cacheCleanDuration time.Duration) *labelStore {
return &labelStore{
product: cfg.Product,
server: cfg.Server,
expire: expire,
logger: logger,
shouldRound: time.Now().UTC().Add(roundDuration),
liveMap: make(map[string]*entity),
staleMap: make(map[string]*entity),
expiration: expiration,
logger: logger,
product: cfg.Product,
server: cfg.Server,
maxCacheSize: cfg.MaxCacheSize,
cacheCleanDuration: cacheCleanDuration,
shouldRound: time.Now().UTC().Add(cacheCleanDuration),
liveMap: make(map[string]*entry),
staleMap: make(map[string]*entry),
}
}

Expand All @@ -57,33 +59,35 @@ func (s *labelStore) mustLoad(ctx context.Context, uid string, header http.Heade
if e.value == nil || e.expireAt.Before(now) {
res := s.fetch(ctx, uid, header)
e.value = res.Result
e.expireAt = time.Unix(res.Timestamp, 0).UTC().Add(s.expire)
e.expireAt = time.Unix(res.Timestamp, 0).UTC().Add(s.expiration)
}

return e.value
}

func (s *labelStore) mustLoadEntity(key string, now time.Time) *entity {
func (s *labelStore) mustLoadEntity(key string, now time.Time) *entry {
s.mu.Lock()
defer s.mu.Unlock()

e, ok := s.liveMap[key]
if !ok {
if e, ok = s.staleMap[key]; ok {
if e, ok = s.staleMap[key]; ok && e != nil {
s.liveMap[key] = e // move entity from staleMap to liveMap
delete(s.staleMap, key)
s.staleMap[key] = nil
}
}
if !ok {
e = &entity{}
s.liveMap[key] = e
}

if s.shouldRound.Before(now) {
s.shouldRound = now.Add(roundDuration)
if len(s.liveMap) > s.maxCacheSize || s.shouldRound.Before(now) {
s.logger.Infof("Round cache, stale cache size: %d, live cache size: %d", len(s.staleMap), len(s.liveMap))
s.shouldRound = now.Add(s.cacheCleanDuration)
// make a round: drop staleMap and create new liveMap
s.staleMap = s.liveMap
s.liveMap = make(map[string]*entity)
s.liveMap = make(map[string]*entry, len(s.staleMap)/2)
}

if !ok {
e = &entry{}
s.liveMap[key] = e
}
return e
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/middlewares/canary/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ func getUserLabels(ctx context.Context, url, xRequestID string) (*labelsRes, err
sp = span
defer finish()

span.SetTag("http.host", re.Host)
span.SetTag("x.request.id", xRequestID)
span.SetTag(headerXRequestID, xRequestID)
ext.HTTPUrl.Set(span, re.URL.String())
tracing.InjectRequestHeaders(re)
req = re
}

req.Header.Set("User-Agent", userAgent)
req.Header.Set("X-Request-ID", xRequestID)
req.Header.Set(headerUA, userAgent)
req.Header.Set(headerXRequestID, xRequestID)
resp, err := client.Do(req)
if err != nil {
return nil, err
Expand Down
48 changes: 48 additions & 0 deletions pkg/middlewares/canary/requestid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package canary

import (
"encoding/hex"
"math/rand"
"net/http"
)

func addRequestID(req *http.Request) {
requestID := req.Header.Get(headerXRequestID)
if requestID == "" {
requestID = generator()
req.Header.Set(headerXRequestID, requestID)
}
}

// uuid version 4
type uuidv4 [16]byte

// String https://github.com/satori/go.uuid/blob/master/uuid.go
func (u uuidv4) String() string {
buf := make([]byte, 36)

hex.Encode(buf[0:8], u[0:4])
buf[8] = '-'
hex.Encode(buf[9:13], u[4:6])
buf[13] = '-'
hex.Encode(buf[14:18], u[6:8])
buf[18] = '-'
hex.Encode(buf[19:23], u[8:10])
buf[23] = '-'
hex.Encode(buf[24:], u[10:])

return string(buf)
}

func generator() string {
id := uuidv4{}
if _, err := rand.Read(id[:]); err != nil {
return ""
}

// https://tools.ietf.org/html/rfc4122#section-4.1.3
id[6] = (id[6] & 0x0f) | 0x40
id[8] = (id[8] & 0x3f) | 0x80

return id.String()
}
17 changes: 8 additions & 9 deletions pkg/server/service/loadbalancer/lrr/lrr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"strings"
)

const labelKey = "X-Canary-Label"
const labelKey = "X-Canary"

var isDigitsReg = regexp.MustCompile(`^\d+$`)
var isPortReg = regexp.MustCompile(`^\d+$`)

type namedHandler struct {
http.Handler
Expand All @@ -29,15 +29,14 @@ type Balancer struct {
}

func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// X-Canary: beta
// X-Canary: label=beta; uid=5c4057f0be825b390667abee; ...
label := req.Header.Get(labelKey)
name := b.serviceName

if label == "" {
if cookie, _ := req.Cookie(labelKey); cookie != nil {
label = cookie.Value
}
if label != "" && strings.HasPrefix(label, "label=") {
label = label[6:]
}

name := b.serviceName
if label != "" {
name = fmt.Sprintf("%s-%s", name, label)
for _, handler := range b.handlers {
Expand Down Expand Up @@ -69,7 +68,7 @@ func removeNsPort(fullServiceName string) string {
if len(strs) > 1 {
strs = strs[1:] // remove namespace
}
if len(strs) > 1 && isDigitsReg.MatchString(strs[len(strs)-1]) {
if len(strs) > 1 && isPortReg.MatchString(strs[len(strs)-1]) {
strs = strs[:len(strs)-1] // remove port
}
return strings.Join(strs, "-")
Expand Down

0 comments on commit 58558a1

Please sign in to comment.