Skip to content

Commit

Permalink
refactor(grpc): extract fullChan as a full burst limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
searKing committed Aug 29, 2024
1 parent 45feee9 commit eb45d26
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2024 The searKing Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package burstlimit

// fullChan returns a channel of the given size, filled with empty structs.
// simple, but effective.
//
// Also see: https://github.com/searKing/golang/blob/go/v1.2.118/go/time/rate/rate.go#L87
func fullChan(b int) (limiter chan struct{}) {
if b > 0 {
limiter = make(chan struct{}, b)
for i := 0; i < b; i++ {
limiter <- struct{}{}
}
}
return limiter
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ import (
"google.golang.org/grpc/status"
)

// UnaryClientInterceptor returns a new unary client interceptor that performs request burst limiting.
// UnaryClientInterceptor returns a new unary client interceptor that performs request burst limiting on the request on the client side.
// This can be helpful for clients that want to limit the number of requests they send concurrently, potentially saving cost.
// b bucket size, take effect if b > 0
// timeout ResourceExhausted if cost more than timeout to get a token, take effect if timeout > 0
func UnaryClientInterceptor(b int, timeout time.Duration) grpc.UnaryClientInterceptor {
limiter := make(chan struct{}, b)
for i := 0; i < b; i++ {
limiter <- struct{}{}
}
limiter := fullChan(b)
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if b > 0 {
if limiter != nil {
var limiterCtx = ctx
var cancel context.CancelFunc
if timeout > 0 {
Expand All @@ -31,9 +29,7 @@ func UnaryClientInterceptor(b int, timeout time.Duration) grpc.UnaryClientInterc
}
select {
case <-limiter:
defer func() {
limiter <- struct{}{}
}()
defer func() { limiter <- struct{}{} }()
case <-limiterCtx.Done():
return status.Errorf(codes.ResourceExhausted,
"%s is rejected by burstlimit unary client middleware, please retry later: %s", method, limiterCtx.Err())
Expand All @@ -43,16 +39,14 @@ func UnaryClientInterceptor(b int, timeout time.Duration) grpc.UnaryClientInterc
}
}

// StreamClientInterceptor returns a new streaming client interceptor that performs burst limiting on the request.
// StreamClientInterceptor returns a new streaming client interceptor that performs burst limiting on the request on the client side.
// This can be helpful for clients that want to limit the number of requests they send concurrently, potentially saving cost.
// b bucket size, take effect if b > 0
// timeout ResourceExhausted if cost more than timeout to get a token, take effect if timeout > 0
func StreamClientInterceptor(b int, timeout time.Duration) grpc.StreamClientInterceptor {
limiter := make(chan struct{}, b)
for i := 0; i < b; i++ {
limiter <- struct{}{}
}
limiter := fullChan(b)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if b > 0 {
if limiter != nil {
var limiterCtx = ctx
var cancel context.CancelFunc
if timeout > 0 {
Expand All @@ -61,9 +55,7 @@ func StreamClientInterceptor(b int, timeout time.Duration) grpc.StreamClientInte
}
select {
case <-limiter:
defer func() {
limiter <- struct{}{}
}()
defer func() { limiter <- struct{}{} }()
case <-limiterCtx.Done():
return nil, status.Errorf(codes.ResourceExhausted,
"%s is rejected by burstlimit stream client middleware, please retry later: %s", method, limiterCtx.Err())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import (
)

// UnaryServerInterceptor returns a new unary server interceptors that performs request burst limiting.
// This can be helpful for clients that want to limit the number of requests they receive concurrently, potentially saving cost.
// b bucket size, take effect if b > 0
// timeout ResourceExhausted if cost more than timeout to get a token, take effect if timeout > 0
func UnaryServerInterceptor(b int, timeout time.Duration) grpc.UnaryServerInterceptor {
limiter := make(chan struct{}, b)
for i := 0; i < b; i++ {
limiter <- struct{}{}
}
limiter := fullChan(b)
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
if b > 0 {
if limiter != nil {
var limiterCtx = ctx
var cancel context.CancelFunc
if timeout > 0 {
Expand All @@ -31,9 +29,7 @@ func UnaryServerInterceptor(b int, timeout time.Duration) grpc.UnaryServerInterc
}
select {
case <-limiter:
defer func() {
limiter <- struct{}{}
}()
defer func() { limiter <- struct{}{} }()
case <-limiterCtx.Done():
return nil, status.Errorf(codes.ResourceExhausted,
"%s is rejected by burstlimit unary server middleware, please retry later: %s", info.FullMethod, limiterCtx.Err())
Expand All @@ -44,15 +40,13 @@ func UnaryServerInterceptor(b int, timeout time.Duration) grpc.UnaryServerInterc
}

// StreamServerInterceptor returns a new streaming server interceptor that performs burst limiting on the request.
// This can be helpful for clients that want to limit the number of requests they receive concurrently, potentially saving cost.
// b bucket size, take effect if b > 0
// timeout ResourceExhausted if cost more than timeout to get a token, take effect if timeout > 0
func StreamServerInterceptor(b int, timeout time.Duration) grpc.StreamServerInterceptor {
limiter := make(chan struct{}, b)
for i := 0; i < b; i++ {
limiter <- struct{}{}
}
limiter := fullChan(b)
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if b > 0 {
if limiter != nil {
var limiterCtx = stream.Context()
var cancel context.CancelFunc
if timeout > 0 {
Expand All @@ -61,9 +55,7 @@ func StreamServerInterceptor(b int, timeout time.Duration) grpc.StreamServerInte
}
select {
case <-limiter:
defer func() {
limiter <- struct{}{}
}()
defer func() { limiter <- struct{}{} }()
case <-limiterCtx.Done():
return status.Errorf(codes.ResourceExhausted,
"%s is rejected by burstlimit stream server middleware, please retry later: %s", info.FullMethod, limiterCtx.Err())
Expand Down

0 comments on commit eb45d26

Please sign in to comment.