Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
docs: add docs in global.go (#213)
Browse files Browse the repository at this point in the history
* chore: rename for more clarity

* undo function renames

* another undo of a rename
  • Loading branch information
miparnisari authored Feb 19, 2024
1 parent 452c5b5 commit 6f1e32a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
16 changes: 8 additions & 8 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ apply the new config immediately.

## Global Behavior
Since Gubernator rate limits are hashed and handled by a single peer in the
cluster. Rate limits that apply to every request in a data center would result
cluster, rate limits that apply to every request in a data center could result
in the rate limit request being handled by a single peer for the entirety of
the data center. For example, consider a rate limit with
`name=requests_per_datacenter` and a `unique_id=us-east-1`. Now imagine that a
Expand All @@ -68,7 +68,7 @@ limit status from the owner.
#### Side effects of global behavior
Since Hits are batched and forwarded to the owning peer asynchronously, the
immediate response to the client will not include the most accurate remaining
counts. As that count will only get updated after the async call to the owner
counts, as that count will only get updated after the async call to the owner
peer is complete and the owning peer has had time to update all the peers in
the cluster. As a result the use of GLOBAL allows for greater scale but at the
cost of consistency.
Expand All @@ -83,18 +83,18 @@ updates before all nodes have the `Hit` updated in their cache.
To calculate the WORST case scenario, we total the number of network updates
that must occur for each global rate limit.

Count 1 incoming request to the node
Count 1 request when forwarding to the owning node
Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.
- Count 1 incoming request to the node
- Count 1 request when forwarding to the owning node
- Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.

Remember this is the WORST case, as the node that recieved the request might be
the owning node thus no need to forward to the owner. Additionally we improve
Remember this is the WORST case, as the node that received the request might be
the owning node thus no need to forward to the owner. Additionally, we improve
the worst case by having the owning node batch Hits when forwarding to all the
nodes in the cluster. Such that 1,000 individual requests of Hit = 1 each for a
unique key will result in batched request from the owner to each node with a
single Hit = 1,000 update.

Additionally thousands of hits to different unique keys will also be batched
Additionally, thousands of hits to different unique keys will also be batched
such that network usage doesn't increase until the number of requests in an
update batch exceeds the `BehaviorConfig.GlobalBatchLimit` or when the number of
nodes in the cluster increases. (thus more batch updates) When that occurs you
Expand Down
41 changes: 23 additions & 18 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
// globalManager manages async hit queue and updates peers in
// the cluster periodically when a global rate limit we own updates.
type globalManager struct {
asyncQueue chan *RateLimitReq
broadcastQueue chan *RateLimitReq
hitsQueue chan *RateLimitReq
updatesQueue chan *RateLimitReq
wg syncutil.WaitGroup
conf BehaviorConfig
log FieldLogger
instance *V1Instance
instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager
metricGlobalSendDuration prometheus.Summary
metricBroadcastDuration prometheus.Summary
metricBroadcastCounter *prometheus.CounterVec
Expand All @@ -41,11 +41,11 @@ type globalManager struct {

func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager {
gm := globalManager{
log: instance.log,
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
log: instance.log,
hitsQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
updatesQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "gubernator_global_send_duration",
Help: "The duration of GLOBAL async sends in seconds.",
Expand All @@ -71,23 +71,26 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager
}

func (gm *globalManager) QueueHit(r *RateLimitReq) {
gm.asyncQueue <- r
gm.hitsQueue <- r
}

func (gm *globalManager) QueueUpdate(r *RateLimitReq) {
gm.broadcastQueue <- r
gm.updatesQueue <- r
}

// runAsyncHits collects async hit requests and queues them to
// be sent to their owning peers.
// runAsyncHits collects async hit requests in a forever loop,
// aggregates them in one request, and sends them to
// the owning peers.
// The updates are sent both when the batch limit is hit
// and in a periodic frequency determined by GlobalSyncWait.
func (gm *globalManager) runAsyncHits() {
var interval = NewInterval(gm.conf.GlobalSyncWait)
hits := make(map[string]*RateLimitReq)

gm.wg.Until(func(done chan struct{}) bool {

select {
case r := <-gm.asyncQueue:
case r := <-gm.hitsQueue:
// Aggregate the hits into a single request
key := r.HashKey()
_, ok := hits[key]
Expand Down Expand Up @@ -162,22 +165,25 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
Errorf("while sending global hits to '%s'", p.client.Info().GRPCAddress)
}
return nil
}, p)
}
fan.Wait()
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
// runBroadcasts collects status changes for global rate limits in a forever loop,
// and broadcasts the changes to each peer in the cluster.
// The updates are sent both when the batch limit is hit
// and in a periodic frequency determined by GlobalSyncWait.
func (gm *globalManager) runBroadcasts() {
var interval = NewInterval(gm.conf.GlobalSyncWait)
updates := make(map[string]*RateLimitReq)

gm.wg.Until(func(done chan struct{}) bool {
select {
case r := <-gm.broadcastQueue:
case r := <-gm.updatesQueue:
updates[r.HashKey()] = r

// Send the hits if we reached our batch limit
Expand Down Expand Up @@ -226,10 +232,9 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]

status, err := gm.instance.getLocalRateLimit(ctx, rl)
if err != nil {
gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey())
gm.log.WithError(err).Errorf("while getting local rate limit for: '%s'", rl.HashKey())
continue
}
// Build an UpdatePeerGlobalsReq
req.Globals = append(req.Globals, &UpdatePeerGlobal{
Algorithm: rl.Algorithm,
Key: rl.HashKey(),
Expand Down

0 comments on commit 6f1e32a

Please sign in to comment.