Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

fix: owners set UNDER_LIMIT status when Remaining=0 #212

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"
"testing"
"time"

guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
Expand All @@ -34,6 +35,10 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
json "google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -859,6 +864,62 @@ func TestGlobalRateLimits(t *testing.T) {
})
}

func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
owner := cluster.PeerAt(2).GRPCAddress
peer := cluster.PeerAt(0).GRPCAddress
assert.NotEqual(t, owner, peer)

dialOpts := []grpc.DialOption{
grpc.WithResolvers(newStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}

address := fmt.Sprintf("static:///%s,%s", owner, peer)
conn, err := grpc.DialContext(context.Background(), address, dialOpts...)
require.NoError(t, err)

client := guber.NewV1Client(conn)

sendHit := func(status guber.Status, assertion func(resp *guber.RateLimitResp), i int) string {
ctx, cancel := context.WithTimeout(context.Background(), clock.Hour*5)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_global",
UniqueKey: "account:12345",
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miparnisari - I think we will want to extend the test case to cover both algorithms perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should

Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Hits: 1,
Limit: 2,
},
},
})
require.NoError(t, err, i)
gotResp := resp.Responses[0]
assert.Equal(t, "", gotResp.GetError(), i)
assert.Equal(t, status, gotResp.GetStatus(), i)

if assertion != nil {
assertion(gotResp)
}

return gotResp.GetMetadata()["owner"]
}

// Send two hits that should be processed by the owner and the peer and deplete the limit
sendHit(guber.Status_UNDER_LIMIT, nil, 1)
sendHit(guber.Status_UNDER_LIMIT, nil, 2)
// sleep to ensure the async forward has occurred and state should be shared
time.Sleep(time.Second * 5)

for i := 0; i < 10; i++ {
sendHit(guber.Status_OVER_LIMIT, nil, i+2)
}
}

func getMetricRequest(t testutil.TestingT, url string, name string) *model.Sample {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -1273,3 +1334,46 @@ func getMetric(t testutil.TestingT, in io.Reader, name string) *model.Sample {
}
return nil
}

// staticBuilder implements the `resolver.Builder` interface.
type staticBuilder struct{}

func newStaticBuilder() resolver.Builder {
return &staticBuilder{}
}

func (sb *staticBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
var resolverAddrs []resolver.Address
for _, address := range strings.Split(target.Endpoint(), ",") {
resolverAddrs = append(resolverAddrs, resolver.Address{
Addr: address,
ServerName: address,
})

}
r, err := newStaticResolver(cc, resolverAddrs)
if err != nil {
return nil, err
}
return r, nil
}

func (sb *staticBuilder) Scheme() string {
return "static"
}

type staticResolver struct {
cc resolver.ClientConn
}

func newStaticResolver(cc resolver.ClientConn, addresses []resolver.Address) (resolver.Resolver, error) {
err := cc.UpdateState(resolver.State{Addresses: addresses})
if err != nil {
return nil, err
}
return &staticResolver{cc: cc}, nil
}

func (sr *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (sr *staticResolver) Close() {}
6 changes: 5 additions & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,15 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
SetBehavior(&rl.Behavior, Behavior_GLOBAL, false)
rl.Hits = 0

status, err := gm.instance.getLocalRateLimit(ctx, rl)
misleadingStatus, err := gm.instance.getLocalRateLimit(ctx, rl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you confirm with the test that this returns with the correct result? I saw various places where the "fix" could be made at the time but it's hard to know if there is any knock on effects elsewhere without knowing the codebase more thoroughly.

Hopefully it is this simple :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only stared at the codebase for a couple of hours haha. As I mentioned here #208 (comment) the other place where I thought we could fix this is in algorithms.go.. but I am not comfortable changing that code because it doesn't have any unit tests 😬

if err != nil {
gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey())
continue
}
status := misleadingStatus
if misleadingStatus.Remaining == 0 {
status.Status = Status_OVER_LIMIT
}
// Build an UpdatePeerGlobalsReq
req.Globals = append(req.Globals, &UpdatePeerGlobal{
Algorithm: rl.Algorithm,
Expand Down
Loading