diff --git a/algorithms.go b/algorithms.go index f2ed4a82..a9937c59 100644 --- a/algorithms.go +++ b/algorithms.go @@ -18,6 +18,7 @@ package gubernator import ( "context" + "time" "github.com/mailgun/holster/v4/clock" "github.com/prometheus/client_golang/prometheus" @@ -34,8 +35,7 @@ import ( // with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT` // Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket -func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - +func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) { tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket")) defer tokenBucketTimer.ObserveDuration() @@ -100,7 +100,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * s.Remove(ctx, hashKey) } - return tokenBucketNewItem(ctx, s, c, r) + return tokenBucketNewItem(ctx, s, c, r, requestTime) } // Update the limit if it changed. @@ -133,7 +133,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * } // If our new duration means we are currently expired. - now := MillisecondNow() + now := EpochMillis(requestTime) if expire <= now { // Renew item. span.AddEvent("Limit has expired") @@ -196,12 +196,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * } // Item is not found in cache or store, create new. - return tokenBucketNewItem(ctx, s, c, r) + return tokenBucketNewItem(ctx, s, c, r, requestTime) } // Called by tokenBucket() when adding a new item in the store. -func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - now := MillisecondNow() +func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) { + now := EpochMillis(requestTime) expire := now + r.Duration t := &TokenBucketItem{ @@ -252,7 +252,7 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) } // Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket -func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { +func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) { leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket")) defer leakyBucketTimer.ObserveDuration() @@ -260,7 +260,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * r.Burst = r.Limit } - now := MillisecondNow() + now := EpochMillis(requestTime) // Get rate limit from cache. hashKey := r.HashKey() @@ -309,7 +309,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * s.Remove(ctx, hashKey) } - return leakyBucketNewItem(ctx, s, c, r) + return leakyBucketNewItem(ctx, s, c, r, requestTime) } if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) { @@ -421,12 +421,12 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp * return rl, nil } - return leakyBucketNewItem(ctx, s, c, r) + return leakyBucketNewItem(ctx, s, c, r, requestTime) } // Called by leakyBucket() when adding a new item in the store. -func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) { - now := MillisecondNow() +func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, requestTime time.Time) (resp *RateLimitResp, err error) { + now := EpochMillis(requestTime) duration := r.Duration rate := float64(duration) / float64(r.Limit) if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) { @@ -480,3 +480,7 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) return &rl, nil } + +func EpochMillis(t time.Time) int64 { + return t.UnixNano() / 1_000_000 +} diff --git a/functional_test.go b/functional_test.go index 2d365b13..e3b906d6 100644 --- a/functional_test.go +++ b/functional_test.go @@ -971,7 +971,7 @@ func TestGlobalRateLimits(t *testing.T) { peers, err := cluster.ListNonOwningDaemons(name, key) require.NoError(t, err) - sendHit := func(client guber.V1Client, status guber.Status, hits int64, remain int64) { + sendHit := func(client guber.V1Client, status guber.Status, hits, expectRemaining, expectResetTime int64) int64 { ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) defer cancel() resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ @@ -988,16 +988,21 @@ func TestGlobalRateLimits(t *testing.T) { }, }) require.NoError(t, err) - assert.Equal(t, "", resp.Responses[0].Error) - assert.Equal(t, remain, resp.Responses[0].Remaining) - assert.Equal(t, status, resp.Responses[0].Status) - assert.Equal(t, int64(5), resp.Responses[0].Limit) + item := resp.Responses[0] + assert.Equal(t, "", item.Error) + assert.Equal(t, expectRemaining, item.Remaining) + assert.Equal(t, status, item.Status) + assert.Equal(t, int64(5), item.Limit) + if expectResetTime != 0 { + assert.Equal(t, expectResetTime, item.ResetTime) + } + return item.ResetTime } // Our first hit should create the request on the peer and queue for async forward - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1, 4) + _ = sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1, 4, 0) // Our second should be processed as if we own it since the async forward hasn't occurred yet - sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 2, 2) + _ = sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 2, 2, 0) testutil.UntilPass(t, 20, clock.Millisecond*200, func(t testutil.TestingT) { // Inspect peers metrics, ensure the peer sent the global rate limit to the owner @@ -1009,19 +1014,21 @@ func TestGlobalRateLimits(t *testing.T) { owner, err := cluster.FindOwningDaemon(name, key) require.NoError(t, err) + // Get the ResetTime from owner. + expectResetTime := sendHit(owner.MustClient(), guber.Status_UNDER_LIMIT, 0, 2, 0) require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1)) // Check different peers, they should have gotten the broadcast from the owner - sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, 0, 2) - sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 0, 2) + sendHit(peers[1].MustClient(), guber.Status_UNDER_LIMIT, 0, 2, expectResetTime) + sendHit(peers[2].MustClient(), guber.Status_UNDER_LIMIT, 0, 2, expectResetTime) // Non owning peer should calculate the rate limit remaining before forwarding // to the owner. - sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 2, 0) + sendHit(peers[3].MustClient(), guber.Status_UNDER_LIMIT, 2, 0, expectResetTime) require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2)) - sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0) + sendHit(peers[4].MustClient(), guber.Status_OVER_LIMIT, 1, 0, expectResetTime) } // Ensure global broadcast updates all peers when GetRateLimits is called on @@ -1034,6 +1041,8 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { // Determine owner and non-owner peers. ownerPeerInfo, err := cluster.FindOwningPeer(name, key) require.NoError(t, err) + ownerDaemon, err := cluster.FindOwningDaemon(name, key) + require.NoError(t, err) owner := ownerPeerInfo.GRPCAddress nonOwner := cluster.PeerAt(0).GRPCAddress if nonOwner == owner { @@ -1078,9 +1087,7 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) { // deplete the limit consistently. sendHit(guber.Status_UNDER_LIMIT, 1) sendHit(guber.Status_UNDER_LIMIT, 2) - - // Sleep to ensure the global broadcast occurs (every 100ms). - time.Sleep(150 * time.Millisecond) + require.NoError(t, waitForBroadcast(clock.Second*3, ownerDaemon, 1)) // All successive hits should return OVER_LIMIT. for i := 2; i <= 10; i++ { diff --git a/global.go b/global.go index b1f652ae..6f6e924b 100644 --- a/global.go +++ b/global.go @@ -18,6 +18,7 @@ package gubernator import ( "context" + "time" "github.com/mailgun/holster/v4/syncutil" "github.com/prometheus/client_golang/prometheus" @@ -73,11 +74,13 @@ func (gm *globalManager) QueueHit(r *RateLimitReq) { gm.hitsQueue <- r } -func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp) { +func (gm *globalManager) QueueUpdate(req *RateLimitReq, resp *RateLimitResp, requestTime time.Time) { gm.broadcastQueue <- &UpdatePeerGlobal{ - Key: req.HashKey(), - Algorithm: req.Algorithm, - Status: resp, + Key: req.HashKey(), + Algorithm: req.Algorithm, + Duration: req.Duration, + Status: resp, + RequestTime: EpochMillis(requestTime), } } diff --git a/gubernator.go b/gubernator.go index f33fa48c..bde63652 100644 --- a/gubernator.go +++ b/gubernator.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/mailgun/errors" + "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/syncutil" "github.com/mailgun/holster/v4/tracing" "github.com/prometheus/client_golang/prometheus" @@ -423,6 +424,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals item.Value = &LeakyBucketItem{ Remaining: float64(g.Status.Remaining), Limit: g.Status.Limit, + Duration: g.Duration, Burst: g.Status.Limit, UpdatedAt: now, } @@ -430,6 +432,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals item.Value = &TokenBucketItem{ Status: g.Status.Status, Limit: g.Status.Limit, + Duration: g.Duration, Remaining: g.Status.Remaining, CreatedAt: now, } @@ -572,7 +575,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ defer func() { tracing.EndScope(ctx, err) }() defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration() - resp, err := s.workerPool.GetRateLimit(ctx, r) + requestTime := clock.Now() + resp, err := s.workerPool.GetRateLimit(ctx, r, requestTime) if err != nil { return nil, errors.Wrap(err, "during workerPool.GetRateLimit") } @@ -580,7 +584,7 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ metricGetRateLimitCounter.WithLabelValues("local").Inc() // If global behavior, then broadcast update to all peers. if HasBehavior(r.Behavior, Behavior_GLOBAL) { - s.global.QueueUpdate(r, resp) + s.global.QueueUpdate(r, resp, requestTime) } return resp, nil diff --git a/peer_client.go b/peer_client.go index a39d9f02..98d08f41 100644 --- a/peer_client.go +++ b/peer_client.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "fmt" "sync" + "time" "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" @@ -71,9 +72,10 @@ type response struct { } type request struct { - request *RateLimitReq - resp chan *response - ctx context.Context + request *RateLimitReq + resp chan *response + ctx context.Context + requestTime time.Time } type PeerConfig struct { diff --git a/peers.pb.go b/peers.pb.go index a805b29a..d4100832 100644 --- a/peers.pb.go +++ b/peers.pb.go @@ -185,9 +185,17 @@ type UpdatePeerGlobal struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Status *RateLimitResp `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - Algorithm Algorithm `protobuf:"varint,3,opt,name=algorithm,proto3,enum=pb.gubernator.Algorithm" json:"algorithm,omitempty"` + // Uniquely identifies this rate limit IE: 'ip:10.2.10.7' or 'account:123445' + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Status *RateLimitResp `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + // The algorithm used to calculate the rate limit. The algorithm may change on + // subsequent requests, when this occurs any previous rate limit hit counts are reset. + Algorithm Algorithm `protobuf:"varint,3,opt,name=algorithm,proto3,enum=pb.gubernator.Algorithm" json:"algorithm,omitempty"` + // The duration of the rate limit in milliseconds + Duration int64 `protobuf:"varint,4,opt,name=duration,proto3" json:"duration,omitempty"` + // Time of original GetRateLimits request so that ExpiresAt timestamps can be + // synchronized. + RequestTime int64 `protobuf:"varint,5,opt,name=request_time,json=requestTime,proto3" json:"request_time,omitempty"` } func (x *UpdatePeerGlobal) Reset() { @@ -243,6 +251,20 @@ func (x *UpdatePeerGlobal) GetAlgorithm() Algorithm { return Algorithm_TOKEN_BUCKET } +func (x *UpdatePeerGlobal) GetDuration() int64 { + if x != nil { + return x.Duration + } + return 0 +} + +func (x *UpdatePeerGlobal) GetRequestTime() int64 { + if x != nil { + return x.RequestTime + } + return 0 +} + type UpdatePeerGlobalsResp struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -302,7 +324,7 @@ var file_peers_proto_rawDesc = []byte{ 0x39, 0x0a, 0x07, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, - 0x6c, 0x52, 0x07, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x10, 0x55, + 0x6c, 0x52, 0x07, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x22, 0xd1, 0x01, 0x0a, 0x10, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x34, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, @@ -311,25 +333,29 @@ var file_peers_proto_rawDesc = []byte{ 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x36, 0x0a, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, - 0x69, 0x74, 0x68, 0x6d, 0x52, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x22, - 0x17, 0x0a, 0x15, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, - 0x62, 0x61, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x32, 0xcd, 0x01, 0x0a, 0x07, 0x50, 0x65, 0x65, - 0x72, 0x73, 0x56, 0x31, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x50, 0x65, 0x65, 0x72, 0x52, - 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x12, 0x23, 0x2e, 0x70, 0x62, 0x2e, 0x67, - 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x65, 0x65, - 0x72, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x24, - 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x47, - 0x65, 0x74, 0x50, 0x65, 0x65, 0x72, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x12, 0x23, 0x2e, 0x70, 0x62, - 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x52, 0x65, 0x71, - 0x1a, 0x24, 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, - 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, - 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, 0x22, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x61, 0x69, 0x6c, 0x67, 0x75, 0x6e, 0x2f, 0x67, - 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x80, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x74, 0x68, 0x6d, 0x52, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, + 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x17, + 0x0a, 0x15, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, + 0x61, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x32, 0xcd, 0x01, 0x0a, 0x07, 0x50, 0x65, 0x65, 0x72, + 0x73, 0x56, 0x31, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x50, 0x65, 0x65, 0x72, 0x52, 0x61, + 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x12, 0x23, 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, + 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x65, 0x65, 0x72, + 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x52, 0x65, 0x71, 0x1a, 0x24, 0x2e, + 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x47, 0x65, + 0x74, 0x50, 0x65, 0x65, 0x72, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, + 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x12, 0x23, 0x2e, 0x70, 0x62, 0x2e, + 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x1a, + 0x24, 0x2e, 0x70, 0x62, 0x2e, 0x67, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2e, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, 0x22, 0x5a, 0x1d, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x61, 0x69, 0x6c, 0x67, 0x75, 0x6e, 0x2f, 0x67, 0x75, + 0x62, 0x65, 0x72, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x80, 0x01, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/peers.proto b/peers.proto index 1ce2a431..f97f4ead 100644 --- a/peers.proto +++ b/peers.proto @@ -26,32 +26,40 @@ import "gubernator.proto"; // NOTE: For use by gubernator peers only service PeersV1 { - // Used by peers to relay batches of requests to an owner peer - rpc GetPeerRateLimits (GetPeerRateLimitsReq) returns (GetPeerRateLimitsResp) {} + // Used by peers to relay batches of requests to an owner peer + rpc GetPeerRateLimits (GetPeerRateLimitsReq) returns (GetPeerRateLimitsResp) {} - // Used by owner peers to send global rate limit updates to non-owner peers - rpc UpdatePeerGlobals (UpdatePeerGlobalsReq) returns (UpdatePeerGlobalsResp) {} + // Used by owner peers to send global rate limit updates to non-owner peers + rpc UpdatePeerGlobals (UpdatePeerGlobalsReq) returns (UpdatePeerGlobalsResp) {} } message GetPeerRateLimitsReq { - // Must specify at least one RateLimit. The peer that recives this request MUST be authoritative for - // each rate_limit[x].unique_key provided, as the peer will not forward the request to any other peers - repeated RateLimitReq requests = 1; + // Must specify at least one RateLimit. The peer that recives this request MUST be authoritative for + // each rate_limit[x].unique_key provided, as the peer will not forward the request to any other peers + repeated RateLimitReq requests = 1; } message GetPeerRateLimitsResp { - // Responses are in the same order as they appeared in the PeerRateLimitRequests - repeated RateLimitResp rate_limits = 1; + // Responses are in the same order as they appeared in the PeerRateLimitRequests + repeated RateLimitResp rate_limits = 1; } message UpdatePeerGlobalsReq { - // Must specify at least one RateLimit - repeated UpdatePeerGlobal globals = 1; + // Must specify at least one RateLimit + repeated UpdatePeerGlobal globals = 1; } message UpdatePeerGlobal { - string key = 1; - RateLimitResp status = 2; - Algorithm algorithm = 3; + // Uniquely identifies this rate limit IE: 'ip:10.2.10.7' or 'account:123445' + string key = 1; + RateLimitResp status = 2; + // The algorithm used to calculate the rate limit. The algorithm may change on + // subsequent requests, when this occurs any previous rate limit hit counts are reset. + Algorithm algorithm = 3; + // The duration of the rate limit in milliseconds + int64 duration = 4; + // Time of original GetRateLimits request so that ExpiresAt timestamps can be + // synchronized. + int64 request_time = 5; } message UpdatePeerGlobalsResp {} diff --git a/python/gubernator/peers_pb2.py b/python/gubernator/peers_pb2.py index b1451c7a..9619dda8 100644 --- a/python/gubernator/peers_pb2.py +++ b/python/gubernator/peers_pb2.py @@ -15,7 +15,7 @@ import gubernator_pb2 as gubernator__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bpeers.proto\x12\rpb.gubernator\x1a\x10gubernator.proto\"O\n\x14GetPeerRateLimitsReq\x12\x37\n\x08requests\x18\x01 \x03(\x0b\x32\x1b.pb.gubernator.RateLimitReqR\x08requests\"V\n\x15GetPeerRateLimitsResp\x12=\n\x0brate_limits\x18\x01 \x03(\x0b\x32\x1c.pb.gubernator.RateLimitRespR\nrateLimits\"Q\n\x14UpdatePeerGlobalsReq\x12\x39\n\x07globals\x18\x01 \x03(\x0b\x32\x1f.pb.gubernator.UpdatePeerGlobalR\x07globals\"\x92\x01\n\x10UpdatePeerGlobal\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x34\n\x06status\x18\x02 \x01(\x0b\x32\x1c.pb.gubernator.RateLimitRespR\x06status\x12\x36\n\talgorithm\x18\x03 \x01(\x0e\x32\x18.pb.gubernator.AlgorithmR\talgorithm\"\x17\n\x15UpdatePeerGlobalsResp2\xcd\x01\n\x07PeersV1\x12`\n\x11GetPeerRateLimits\x12#.pb.gubernator.GetPeerRateLimitsReq\x1a$.pb.gubernator.GetPeerRateLimitsResp\"\x00\x12`\n\x11UpdatePeerGlobals\x12#.pb.gubernator.UpdatePeerGlobalsReq\x1a$.pb.gubernator.UpdatePeerGlobalsResp\"\x00\x42\"Z\x1dgithub.com/mailgun/gubernator\x80\x01\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bpeers.proto\x12\rpb.gubernator\x1a\x10gubernator.proto\"O\n\x14GetPeerRateLimitsReq\x12\x37\n\x08requests\x18\x01 \x03(\x0b\x32\x1b.pb.gubernator.RateLimitReqR\x08requests\"V\n\x15GetPeerRateLimitsResp\x12=\n\x0brate_limits\x18\x01 \x03(\x0b\x32\x1c.pb.gubernator.RateLimitRespR\nrateLimits\"Q\n\x14UpdatePeerGlobalsReq\x12\x39\n\x07globals\x18\x01 \x03(\x0b\x32\x1f.pb.gubernator.UpdatePeerGlobalR\x07globals\"\xd1\x01\n\x10UpdatePeerGlobal\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x34\n\x06status\x18\x02 \x01(\x0b\x32\x1c.pb.gubernator.RateLimitRespR\x06status\x12\x36\n\talgorithm\x18\x03 \x01(\x0e\x32\x18.pb.gubernator.AlgorithmR\talgorithm\x12\x1a\n\x08\x64uration\x18\x04 \x01(\x03R\x08\x64uration\x12!\n\x0crequest_time\x18\x05 \x01(\x03R\x0brequestTime\"\x17\n\x15UpdatePeerGlobalsResp2\xcd\x01\n\x07PeersV1\x12`\n\x11GetPeerRateLimits\x12#.pb.gubernator.GetPeerRateLimitsReq\x1a$.pb.gubernator.GetPeerRateLimitsResp\"\x00\x12`\n\x11UpdatePeerGlobals\x12#.pb.gubernator.UpdatePeerGlobalsReq\x1a$.pb.gubernator.UpdatePeerGlobalsResp\"\x00\x42\"Z\x1dgithub.com/mailgun/gubernator\x80\x01\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -30,9 +30,9 @@ _globals['_UPDATEPEERGLOBALSREQ']._serialized_start=217 _globals['_UPDATEPEERGLOBALSREQ']._serialized_end=298 _globals['_UPDATEPEERGLOBAL']._serialized_start=301 - _globals['_UPDATEPEERGLOBAL']._serialized_end=447 - _globals['_UPDATEPEERGLOBALSRESP']._serialized_start=449 - _globals['_UPDATEPEERGLOBALSRESP']._serialized_end=472 - _globals['_PEERSV1']._serialized_start=475 - _globals['_PEERSV1']._serialized_end=680 + _globals['_UPDATEPEERGLOBAL']._serialized_end=510 + _globals['_UPDATEPEERGLOBALSRESP']._serialized_start=512 + _globals['_UPDATEPEERGLOBALSRESP']._serialized_end=535 + _globals['_PEERSV1']._serialized_start=538 + _globals['_PEERSV1']._serialized_end=743 # @@protoc_insertion_point(module_scope) diff --git a/workers.go b/workers.go index 07ba177f..04557f76 100644 --- a/workers.go +++ b/workers.go @@ -42,6 +42,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/OneOfOne/xxhash" "github.com/mailgun/holster/v4/errors" @@ -199,7 +200,7 @@ func (p *WorkerPool) dispatch(worker *Worker) { } resp := new(response) - resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, worker.cache) + resp.rl, resp.err = worker.handleGetRateLimit(req.ctx, req.request, req.requestTime, worker.cache) select { case req.resp <- resp: // Success. @@ -258,16 +259,17 @@ func (p *WorkerPool) dispatch(worker *Worker) { } // GetRateLimit sends a GetRateLimit request to worker pool. -func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) (retval *RateLimitResp, reterr error) { +func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq, requestTime time.Time) (*RateLimitResp, error) { // Delegate request to assigned channel based on request key. worker := p.getWorker(rlRequest.HashKey()) queueGauge := metricWorkerQueue.WithLabelValues("GetRateLimit", worker.name) queueGauge.Inc() defer queueGauge.Dec() handlerRequest := request{ - ctx: ctx, - resp: make(chan *response, 1), - request: rlRequest, + ctx: ctx, + resp: make(chan *response, 1), + request: rlRequest, + requestTime: requestTime, } // Send request. @@ -289,14 +291,14 @@ func (p *WorkerPool) GetRateLimit(ctx context.Context, rlRequest *RateLimitReq) } // Handle request received by worker. -func (worker *Worker) handleGetRateLimit(ctx context.Context, req *RateLimitReq, cache Cache) (*RateLimitResp, error) { +func (worker *Worker) handleGetRateLimit(ctx context.Context, req *RateLimitReq, requestTime time.Time, cache Cache) (*RateLimitResp, error) { defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("Worker.handleGetRateLimit")).ObserveDuration() var rlResponse *RateLimitResp var err error switch req.Algorithm { case Algorithm_TOKEN_BUCKET: - rlResponse, err = tokenBucket(ctx, worker.conf.Store, cache, req) + rlResponse, err = tokenBucket(ctx, worker.conf.Store, cache, req, requestTime) if err != nil { msg := "Error in tokenBucket" countError(err, msg) @@ -305,7 +307,7 @@ func (worker *Worker) handleGetRateLimit(ctx context.Context, req *RateLimitReq, } case Algorithm_LEAKY_BUCKET: - rlResponse, err = leakyBucket(ctx, worker.conf.Store, cache, req) + rlResponse, err = leakyBucket(ctx, worker.conf.Store, cache, req, requestTime) if err != nil { msg := "Error in leakyBucket" countError(err, msg)