Skip to content

Commit

Permalink
Merge branch 'develop' into urbs
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Apr 9, 2020
2 parents 63698db + d2c5d52 commit c9054dd
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/middlewares/canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(ctx context.Context, next http.Handler, cfg dynamic.Canary, name string
}

expiration := time.Duration(cfg.CacheExpiration)
if expiration < time.Minute {
if expiration < time.Second {
expiration = defaultExpiration
}
cacheCleanDuration := time.Duration(cfg.CacheCleanDuration)
Expand Down
6 changes: 3 additions & 3 deletions pkg/middlewares/canary/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestLabelStore(t *testing.T) {
cfg := dynamic.Canary{MaxCacheSize: 3, Server: "localhost", Product: "T"}
ls := NewLabelStore(logrus.StandardLogger(), cfg, time.Second, time.Second*2)
ls.mustFetchLabels = func(ctx context.Context, uid, requestID string) ([]Label, int64) {
return []Label{Label{Label: requestID}}, time.Now().Unix()
return []Label{{Label: requestID}}, time.Now().Unix()
}

u1 := ls.mustLoadEntry("u1", time.Now())
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestLabelStore(t *testing.T) {
cfg := dynamic.Canary{MaxCacheSize: 3, Server: "localhost", Product: "T"}
ls := NewLabelStore(logrus.StandardLogger(), cfg, time.Second, time.Second*2)
ls.mustFetchLabels = func(ctx context.Context, uid, requestID string) ([]Label, int64) {
return []Label{Label{Label: requestID}}, time.Now().Unix()
return []Label{{Label: requestID}}, time.Now().Unix()
}

labels := ls.MustLoadLabels(context.Background(), "u1", "v1")
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestLabelStore(t *testing.T) {
var call int32
ls.mustFetchLabels = func(ctx context.Context, uid, requestID string) ([]Label, int64) {
atomic.AddInt32(&call, 1)
return []Label{Label{Label: requestID}}, time.Now().Unix()
return []Label{{Label: requestID}}, time.Now().Unix()
}

var wg sync.WaitGroup
Expand Down
88 changes: 70 additions & 18 deletions pkg/middlewares/canary/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"runtime"
"sync/atomic"
"time"

"github.com/containous/traefik/v2/pkg/log"
Expand Down Expand Up @@ -45,7 +46,41 @@ var tr = &http.Transport{

var client = &http.Client{
Transport: tr,
Timeout: time.Second * 3,
Timeout: time.Second,
}

var hc = &healthcheck{
failuresThreshold: 5,
retry: time.Second * 10,
}

type healthcheck struct {
failures uint64
failuresThreshold uint64
retry time.Duration
timer *time.Timer
}

func (h *healthcheck) CountFailure() uint64 {
i := atomic.AddUint64(&h.failures, 1)
if i == h.failuresThreshold {
h.timer = time.AfterFunc(h.retry, func() {
// make MaybeHealthy() returns true
atomic.StoreUint64(&h.failures, h.failuresThreshold-1)
})
}
return i
}

func (h *healthcheck) Reset() {
if atomic.SwapUint64(&h.failures, 0) != 0 && h.timer != nil {
h.timer.Stop()
h.timer = nil
}
}

func (h *healthcheck) MaybeHealthy() bool {
return atomic.LoadUint64(&h.failures) < h.failuresThreshold
}

type labelsRes struct {
Expand All @@ -54,11 +89,14 @@ type labelsRes struct {
}

func getUserLabels(ctx context.Context, url, xRequestID string) (*labelsRes, error) {
req, err := http.NewRequest("GET", url, nil)
if ctx.Err() != nil {
return nil, nil
}

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)

var sp opentracing.Span
if tr, _ := tracing.FromContext(req.Context()); tr != nil {
Expand All @@ -77,35 +115,49 @@ func getUserLabels(ctx context.Context, url, xRequestID string) (*labelsRes, err
req.Header.Set(headerXRequestID, xRequestID)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("xRequestId: %s, request error: %s", xRequestID, err.Error())
if err == context.Canceled {
return nil, nil
}

c := hc.CountFailure()
return nil, fmt.Errorf("xRequestId: %s, failures: %d, request error: %s", xRequestID, c, err.Error())
}

hc.Reset()
if sp != nil {
tracing.LogResponseCode(sp, resp.StatusCode)
}

defer resp.Body.Close()
respBody, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != 200 || len(respBody) == 0 {
return nil, fmt.Errorf("xRequestId: %s, getUserLabels error: %d, %s", xRequestID, resp.StatusCode, string(respBody))
respBody, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode != 200 || err != nil || len(respBody) == 0 {
return nil, fmt.Errorf("xRequestId: %s, getUserLabels error: %d, %d, %v, %s",
xRequestID, resp.StatusCode, resp.ContentLength, err, string(respBody))
}

res := &labelsRes{}
if err = json.Unmarshal(respBody, res); err != nil {
return nil, fmt.Errorf("xRequestId: %s, getUserLabels Unmarshal error: %s, %s", xRequestID, err.Error(), string(respBody))
return nil, fmt.Errorf("xRequestId: %s, getUserLabels Unmarshal error: %s, %s",
xRequestID, err.Error(), string(respBody))
}

return res, nil
}

// MustGetUserLabels returns labels and timestamp
func MustGetUserLabels(ctx context.Context, url, xRequestID string, logger log.Logger) ([]Label, int64) {
res, err := getUserLabels(ctx, url, xRequestID)
now := time.Now().UTC().Unix()
if res == nil || res.Result == nil {
res = &labelsRes{Result: []Label{}, Timestamp: now}
logger.Error(err)
} else if res.Timestamp > now || res.Timestamp <= 0 {
res.Timestamp = now
ts := time.Now().UTC().Unix()
rs := []Label{}

if hc.MaybeHealthy() {
if res, err := getUserLabels(ctx, url, xRequestID); err != nil {
logger.Error(err)
} else if res != nil {
rs = res.Result
if res.Timestamp > 0 && res.Timestamp < ts {
ts = res.Timestamp
}
}
}
return res.Result, res.Timestamp

return rs, ts
}
38 changes: 38 additions & 0 deletions pkg/middlewares/canary/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package canary

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestHealthcheck(t *testing.T) {
t.Run("should work", func(t *testing.T) {
a := assert.New(t)

hc := &healthcheck{
failuresThreshold: 3,
retry: time.Second,
}

a.True(hc.MaybeHealthy())
hc.CountFailure()
a.True(hc.MaybeHealthy())
for i := 0; i < 10; i++ {
go func() {
hc.CountFailure()
}()
}
a.False(hc.MaybeHealthy())

time.Sleep(time.Millisecond * 1001)
a.True(hc.MaybeHealthy())
hc.CountFailure()
a.False(hc.MaybeHealthy())

hc.Reset()
a.True(hc.MaybeHealthy())
a.Nil(hc.timer)
})
}

0 comments on commit c9054dd

Please sign in to comment.