Skip to content

Commit

Permalink
feat: obey http status and backoff on 429, 50x (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Kolstad authored Dec 15, 2023
1 parent e7dffa7 commit 1b50fe8
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 9 deletions.
36 changes: 35 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -66,6 +67,9 @@ type metrics struct {
closed chan struct{}
ctx context.Context
cancel func()
maxSkips float64
errors float64
skips float64
}

func newMetrics(options metricsOptions, channels metricsChannels) *metrics {
Expand All @@ -75,6 +79,9 @@ func newMetrics(options metricsOptions, channels metricsChannels) *metrics {
started: time.Now(),
close: make(chan struct{}),
closed: make(chan struct{}),
maxSkips: 10,
errors: 0,
skips: 0,
}
ctx, cancel := context.WithCancel(context.Background())
m.ctx = ctx
Expand Down Expand Up @@ -111,7 +118,11 @@ func (m *metrics) sync() {
for {
select {
case <-m.ticker.C:
m.sendMetrics()
if m.skips == 0 {
m.sendMetrics()
} else {
m.decrementSkip()
}
case <-m.close:
close(m.closed)
return
Expand All @@ -136,7 +147,24 @@ func (m *metrics) registerInstance() {

m.registered <- payload
}
func (m *metrics) backoff() {
m.errors = math.Min(m.maxSkips, m.errors+1)
m.skips = m.errors
}

func (m *metrics) configurationError() {
m.errors = m.maxSkips
m.skips = m.errors
}

func (m *metrics) successfulPost() {
m.errors = math.Max(0, m.errors-1)
m.skips = m.errors
}

func (m *metrics) decrementSkip() {
m.skips = math.Max(0, m.skips-1)
}
func (m *metrics) sendMetrics() {
m.bucketMu.Lock()
bucket := m.resetBucket()
Expand All @@ -160,6 +188,11 @@ func (m *metrics) sendMetrics() {
defer resp.Body.Close()

if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusMultipleChoices {
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound {
m.configurationError()
} else if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= http.StatusInternalServerError {
m.backoff()
}
m.warn(fmt.Errorf("%s return %d", u.String(), resp.StatusCode))
// The post failed, re-add the metrics we attempted to send so
// they are included in the next post.
Expand All @@ -174,6 +207,7 @@ func (m *metrics) sendMetrics() {
m.bucket.Start = bucket.Start
m.bucketMu.Unlock()
} else {
m.successfulPost()
m.sent <- payload
}
}
Expand Down
91 changes: 89 additions & 2 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,101 @@ func TestMetrics_ShouldNotCountMetricsForParentToggles(t *testing.T) {
WithInstanceId(mockInstanceId),
WithListener(mockListener),
)

assert.Nil(err, "client should not return an error")
client.WaitForReady()
client.IsEnabled("child")

assert.EqualValues(client.metrics.bucket.Toggles["child"].Yes, 1)
assert.EqualValues(client.metrics.bucket.Toggles["parent"].Yes, 0)
client.Close()
err = client.Close()

assert.Nil(err, "client should not return an error")
assert.True(gock.IsDone(), "there should be no more mocks")
}

func TestMetrics_ShouldBackoffOn500(t *testing.T) {
assert := assert.New(t)
defer gock.OffAll()

gock.New(mockerServer).
Post("/client/register").
Reply(200)
gock.New(mockerServer).
Post("/client/metrics").
Persist().
Reply(500)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
JSON(api.FeatureResponse{})
mockListener := &MockedListener{}
mockListener.On("OnReady").Return()
mockListener.On("OnRegistered", mock.AnythingOfType("ClientData")).Return()
mockListener.On("OnCount", "foo", false).Return()
mockListener.On("OnCount", "bar", false).Return()
mockListener.On("OnCount", "baz", false).Return()
mockListener.On("OnWarning", mock.MatchedBy(func(e error) bool {
return strings.HasSuffix(e.Error(), "http://foo.com/client/metrics return 500")
})).Return()
mockListener.On("OnError", mock.Anything).Return()

client, err := NewClient(
WithUrl(mockerServer),
WithMetricsInterval(50*time.Millisecond),
WithAppName(mockAppName),
WithInstanceId(mockInstanceId),
WithListener(mockListener),
)
assert.Nil(err, "client should not return an error")

client.WaitForReady()
client.IsEnabled("foo")
client.IsEnabled("bar")
client.IsEnabled("baz")

time.Sleep(320 * time.Millisecond)
err = client.Close()
assert.Equal(float64(3), client.metrics.errors)
assert.Nil(err, "Client should close without a problem")

}

func TestMetrics_ErrorCountShouldDecreaseIfSuccessful(t *testing.T) {
assert := assert.New(t)
defer gock.OffAll()

gock.New(mockerServer).
Post("/client/register").
Reply(200)
gock.New(mockerServer).
Post("/client/metrics").
Times(2).
Reply(500)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
JSON(api.FeatureResponse{})
gock.New(mockerServer).
Post("/client/metrics").
Persist().
Reply(200)

client, err := NewClient(
WithUrl(mockerServer),
WithMetricsInterval(50*time.Millisecond),
WithAppName(mockAppName),
WithInstanceId(mockInstanceId),
)
assert.Nil(err, "client should not return an error")

client.WaitForReady()
client.IsEnabled("foo")
client.IsEnabled("bar")
client.IsEnabled("baz")
time.Sleep(360 * time.Millisecond)
client.IsEnabled("foo")
time.Sleep(100 * time.Millisecond)
err = client.Close()
assert.Equal(float64(0), client.metrics.errors)
assert.Nil(err, "Client should close without a problem")
}
44 changes: 40 additions & 4 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"sync"
Expand All @@ -26,6 +27,9 @@ type repository struct {
isReady bool
refreshTicker *time.Ticker
segments map[int][]api.Constraint
errors float64
maxSkips float64
skips float64
}

func newRepository(options repositoryOptions, channels repositoryChannels) *repository {
Expand All @@ -36,6 +40,9 @@ func newRepository(options repositoryOptions, channels repositoryChannels) *repo
closed: make(chan struct{}),
refreshTicker: time.NewTicker(options.refreshInterval),
segments: map[int][]api.Constraint{},
errors: 0,
maxSkips: 10,
skips: 0,
}
ctx, cancel := context.WithCancel(context.Background())
repo.ctx = ctx
Expand Down Expand Up @@ -80,11 +87,33 @@ func (r *repository) sync() {
close(r.closed)
return
case <-r.refreshTicker.C:
r.fetchAndReportError()
if r.skips == 0 {
r.fetchAndReportError()
} else {
r.decrementSkips()
}
}
}
}

func (r *repository) backoff() {
r.errors = math.Min(r.maxSkips, r.errors+1)
r.skips = r.errors
}

func (r *repository) successfulFetch() {
r.errors = math.Max(0, r.errors-1)
r.skips = r.errors
}

func (r *repository) decrementSkips() {
r.skips = math.Max(0, r.skips-1)
}
func (r *repository) configurationError() {
r.errors = r.maxSkips
r.skips = r.errors
}

func (r *repository) fetch() error {
u, _ := r.options.url.Parse(getFetchURLPath(r.options.projectName))

Expand Down Expand Up @@ -119,7 +148,7 @@ func (r *repository) fetch() error {
if resp.StatusCode == http.StatusNotModified {
return nil
}
if err := statusIsOK(resp); err != nil {
if err := r.statusIsOK(resp); err != nil {
return err
}

Expand All @@ -133,14 +162,21 @@ func (r *repository) fetch() error {
r.etag = resp.Header.Get("Etag")
r.segments = featureResp.SegmentsMap()
r.options.storage.Reset(featureResp.FeatureMap(), true)
r.successfulFetch()
r.Unlock()
return nil
}

func statusIsOK(resp *http.Response) error {
func (r *repository) statusIsOK(resp *http.Response) error {
s := resp.StatusCode
if 200 <= s && s < 300 {
if http.StatusOK <= s && s < http.StatusMultipleChoices {
return nil
} else if s == http.StatusUnauthorized || s == http.StatusForbidden || s == http.StatusNotFound {
r.configurationError()
return fmt.Errorf("%s %s returned status code %d your SDK is most likely misconfigured, backing off to maximum (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.maxSkips)
} else if s == http.StatusTooManyRequests || s >= http.StatusInternalServerError {
r.backoff()
return fmt.Errorf("%s %s returned status code %d, backing off (%f times our interval)", resp.Request.Method, resp.Request.URL, s, r.errors)
}

return fmt.Errorf("%s %s returned status code %d", resp.Request.Method, resp.Request.URL, s)
Expand Down
60 changes: 60 additions & 0 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package unleash
import (
"bytes"
"encoding/json"
"gopkg.in/h2non/gock.v1"
"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -127,3 +128,62 @@ func TestRepository_ParseAPIResponse(t *testing.T) {
assert.Equal(2, len(response.Features))
assert.Equal(0, len(response.Segments))
}


func TestRepository_backs_off_on_http_statuses(t *testing.T) {
a := assert.New(t)
testCases := []struct {
statusCode int
errorCount float64
}{
{ 401, 10},
{ 403, 10},
{ 404, 10},
{ 429, 1},
{ 500, 1},
{ 502, 1},
{ 503, 1},
}
defer gock.Off()
for _, tc := range testCases {
gock.New(mockerServer).
Get("/client/features").
Reply(tc.statusCode)
client, err := NewClient(
WithUrl(mockerServer),
WithAppName(mockAppName),
WithDisableMetrics(true),
WithInstanceId(mockInstanceId),
WithRefreshInterval(time.Millisecond * 15),
)
a.Nil(err)
time.Sleep(20 * time.Millisecond)
err = client.Close()
a.Equal(tc.errorCount, client.repository.errors)
a.Nil(err)
}
}
func TestRepository_back_offs_are_gradually_reduced_on_success(t *testing.T) {
a := assert.New(t)
defer gock.Off()
gock.New(mockerServer).
Get("/client/features").
Times(4).
Reply(429)
gock.New(mockerServer).
Get("/client/features").
Reply(200).
BodyString(`{ "version": 2, "features": []}`)
client, err := NewClient(
WithUrl(mockerServer),
WithAppName(mockAppName),
WithDisableMetrics(true),
WithInstanceId(mockInstanceId),
WithRefreshInterval(time.Millisecond * 10),
)
a.Nil(err)
client.WaitForReady()
err = client.Close()
a.Equal(float64(3), client.repository.errors) // 4 failures, and then one success, should reduce error count to 3
a.Nil(err)
}
1 change: 1 addition & 0 deletions spec_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build norace
// +build norace

package unleash
Expand Down
2 changes: 1 addition & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func every(slice interface{}, condition func(interface{}) bool) bool {
return false
}

if (sliceValue.Len() == 0) {
if sliceValue.Len() == 0 {
return false
}

Expand Down
1 change: 0 additions & 1 deletion utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,3 @@ func TestContains(t *testing.T) {
}
})
}

0 comments on commit 1b50fe8

Please sign in to comment.