Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix global behavior ResetTime bug.
Browse files Browse the repository at this point in the history
Every call to `GetRateLimits` would reset the `ResetTime` and not the `Remaining` counter.  This would cause counters to eventually deplete and never fully reset.
  • Loading branch information
Baliedge committed Feb 26, 2024
1 parent f740f2b commit f02cb5b
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 87 deletions.
30 changes: 17 additions & 13 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gubernator

import (
"context"
"time"

"github.com/mailgun/holster/v4/clock"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -34,8 +35,7 @@ import (
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

Expand Down Expand Up @@ -100,7 +100,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, requestTime)
}

// Update the limit if it changed.
Expand Down Expand Up @@ -133,7 +133,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// If our new duration means we are currently expired.
now := MillisecondNow()
now := EpochMillis(requestTime)
if expire <= now {
// Renew item.
span.AddEvent("Limit has expired")
Expand Down Expand Up @@ -196,12 +196,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// Item is not found in cache or store, create new.
return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, requestTime)
}

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
now := EpochMillis(requestTime)
expire := now + r.Duration

t := &TokenBucketItem{
Expand Down Expand Up @@ -252,15 +252,15 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
}

now := MillisecondNow()
now := EpochMillis(requestTime)

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -309,7 +309,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, requestTime)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -421,12 +421,12 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
return rl, nil
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, requestTime)
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) {
now := EpochMillis(requestTime)
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand Down Expand Up @@ -480,3 +480,7 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)

return &rl, nil
}

func EpochMillis(t time.Time) int64 {
return t.UnixNano() / 1_000_000
}
35 changes: 21 additions & 14 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ func TestGlobalRateLimits(t *testing.T) {
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, status guber.Status, hits int64, remain int64) {
sendHit := func(client guber.V1Client, status guber.Status, hits, expectRemaining, expectResetTime int64) int64 {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Expand All @@ -988,16 +988,21 @@ func TestGlobalRateLimits(t *testing.T) {
},
})
require.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].Error)
assert.Equal(t, remain, resp.Responses[0].Remaining)
assert.Equal(t, status, resp.Responses[0].Status)
assert.Equal(t, int64(5), resp.Responses[0].Limit)
item := resp.Responses[0]
assert.Equal(t, "", item.Error)
assert.Equal(t, expectRemaining, item.Remaining)
assert.Equal(t, status, item.Status)
assert.Equal(t, int64(5), item.Limit)
if expectResetTime != 0 {
assert.Equal(t, expectResetTime, item.ResetTime)
}
return item.ResetTime
}
// Our first hit should create the request on the peer and queue for async forward
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1, 4)
_ = sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1, 4, 0)

// Our second should be processed as if we own it since the async forward hasn't occurred yet
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 2, 2)
_ = sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 2, 2, 0)

testutil.UntilPass(t, 20, clock.Millisecond*200, func(t testutil.TestingT) {
// Inspect peers metrics, ensure the peer sent the global rate limit to the owner
Expand All @@ -1009,19 +1014,21 @@ func TestGlobalRateLimits(t *testing.T) {
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)

// Get the ResetTime from owner.
expectResetTime := sendHit(owner.MustClient(), guber.Status_UNDER_LIMIT, 0, 2, 0)
require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1))

// Check different peers, they should have gotten the broadcast from the owner
sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, 0, 2)
sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 0, 2)
sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, 0, 2, expectResetTime)
sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 0, 2, expectResetTime)

// Non owning peer should calculate the rate limit remaining before forwarding
// to the owner.
sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 2, 0)
sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 2, 0, expectResetTime)

require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2))

sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0)
sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0, expectResetTime)
}

// Ensure global broadcast updates all peers when GetRateLimits is called on
Expand All @@ -1034,6 +1041,8 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
// Determine owner and non-owner peers.
ownerPeerInfo, err := cluster.FindOwningPeer(name, key)
require.NoError(t, err)
ownerDaemon, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
owner := ownerPeerInfo.GRPCAddress
nonOwner := cluster.PeerAt(0).GRPCAddress
if nonOwner == owner {
Expand Down Expand Up @@ -1078,9 +1087,7 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
// deplete the limit consistently.
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 2)

// Sleep to ensure the global broadcast occurs (every 100ms).
time.Sleep(150 * time.Millisecond)
require.NoError(t, waitForBroadcast(clock.Second*3, ownerDaemon, 1))

// All successive hits should return OVER_LIMIT.
for i := 2; i <= 10; i++ {
Expand Down
11 changes: 7 additions & 4 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gubernator

import (
"context"
"time"

"github.com/mailgun/holster/v4/syncutil"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -73,11 +74,13 @@ func (gm *globalManager) QueueHit(r *RateLimitReq) {
gm.hitsQueue <- r
}

func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp) {
func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp, requestTime time.Time) {
gm.broadcastQueue <- &UpdatePeerGlobal{
Key: req.HashKey(),
Algorithm: req.Algorithm,
Status: resp,
Key: req.HashKey(),
Algorithm: req.Algorithm,
Duration: req.Duration,
Status: resp,
RequestTime: EpochMillis(requestTime),
}
}

Expand Down
8 changes: 6 additions & 2 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

"github.com/mailgun/errors"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -423,13 +424,15 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
item.Value = &LeakyBucketItem{
Remaining: float64(g.Status.Remaining),
Limit: g.Status.Limit,
Duration: g.Duration,
Burst: g.Status.Limit,
UpdatedAt: now,
}
case Algorithm_TOKEN_BUCKET:
item.Value = &TokenBucketItem{
Status: g.Status.Status,
Limit: g.Status.Limit,
Duration: g.Duration,
Remaining: g.Status.Remaining,
CreatedAt: now,
}
Expand Down Expand Up @@ -572,15 +575,16 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
defer func() { tracing.EndScope(ctx, err) }()
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration()

resp, err := s.workerPool.GetRateLimit(ctx, r)
requestTime := clock.Now()
resp, err := s.workerPool.GetRateLimit(ctx, r, requestTime)
if err != nil {
return nil, errors.Wrap(err, "during workerPool.GetRateLimit")
}

metricGetRateLimitCounter.WithLabelValues("local").Inc()
// If global behavior, then broadcast update to all peers.
if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r, resp)
s.global.QueueUpdate(r, resp, requestTime)
}

return resp, nil
Expand Down
8 changes: 5 additions & 3 deletions peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"fmt"
"sync"
"time"

"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/collections"
Expand Down Expand Up @@ -71,9 +72,10 @@ type response struct {
}

type request struct {
request *RateLimitReq
resp chan *response
ctx context.Context
request *RateLimitReq
resp chan *response
ctx context.Context
requestTime time.Time
}

type PeerConfig struct {
Expand Down
72 changes: 49 additions & 23 deletions peers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f02cb5b

Please sign in to comment.