Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

docs: add docs in global.go #213

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ apply the new config immediately.

## Global Behavior
Since Gubernator rate limits are hashed and handled by a single peer in the
cluster. Rate limits that apply to every request in a data center would result
cluster, rate limits that apply to every request in a data center could result
in the rate limit request being handled by a single peer for the entirety of
the data center. For example, consider a rate limit with
`name=requests_per_datacenter` and a `unique_id=us-east-1`. Now imagine that a
Expand All @@ -68,7 +68,7 @@ limit status from the owner.
#### Side effects of global behavior
Since Hits are batched and forwarded to the owning peer asynchronously, the
immediate response to the client will not include the most accurate remaining
counts. As that count will only get updated after the async call to the owner
counts, as that count will only get updated after the async call to the owner
peer is complete and the owning peer has had time to update all the peers in
the cluster. As a result the use of GLOBAL allows for greater scale but at the
cost of consistency.
Expand All @@ -83,18 +83,18 @@ updates before all nodes have the `Hit` updated in their cache.
To calculate the WORST case scenario, we total the number of network updates
that must occur for each global rate limit.

Count 1 incoming request to the node
Count 1 request when forwarding to the owning node
Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.
- Count 1 incoming request to the node
- Count 1 request when forwarding to the owning node
- Count 1 + (number of nodes in cluster) to update all the nodes with the current Hit count.

Remember this is the WORST case, as the node that recieved the request might be
the owning node thus no need to forward to the owner. Additionally we improve
Remember this is the WORST case, as the node that received the request might be
the owning node thus no need to forward to the owner. Additionally, we improve
the worst case by having the owning node batch Hits when forwarding to all the
nodes in the cluster. Such that 1,000 individual requests of Hit = 1 each for a
unique key will result in batched request from the owner to each node with a
single Hit = 1,000 update.

Additionally thousands of hits to different unique keys will also be batched
Additionally, thousands of hits to different unique keys will also be batched
such that network usage doesn't increase until the number of requests in an
update batch exceeds the `BehaviorConfig.GlobalBatchLimit` or when the number of
nodes in the cluster increases. (thus more batch updates) When that occurs you
Expand Down
65 changes: 35 additions & 30 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
// globalManager manages async hit queue and updates peers in
// the cluster periodically when a global rate limit we own updates.
type globalManager struct {
asyncQueue chan *RateLimitReq
broadcastQueue chan *RateLimitReq
hitsQueue chan *RateLimitReq
updatesQueue chan *RateLimitReq
wg syncutil.WaitGroup
conf BehaviorConfig
log FieldLogger
instance *V1Instance
instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager
metricGlobalSendDuration prometheus.Summary
metricBroadcastDuration prometheus.Summary
metricBroadcastCounter *prometheus.CounterVec
Expand All @@ -41,11 +41,11 @@ type globalManager struct {

func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager {
gm := globalManager{
log: instance.log,
asyncQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
log: instance.log,
hitsQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
updatesQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit),
instance: instance,
conf: conf,
metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "gubernator_global_send_duration",
Help: "The duration of GLOBAL async sends in seconds.",
Expand All @@ -65,29 +65,32 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager
Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.",
}),
}
gm.runAsyncHits()
gm.runBroadcasts()
gm.toOwners()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the confusion around the original names. Would prefer a more <Verb><Noun> nomenclature.

How about:

  • gm.runAsyncHits() -> gm.runBatches()

I think runBroadcasts() is appropriately named.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for me it's still confusing, because i'm missing out on who is doing the action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's implied the current peer is doing the action. Both of these start a goroutine that process these actions.

gm.fromOwners()
return &gm
}

func (gm *globalManager) QueueHit(r *RateLimitReq) {
gm.asyncQueue <- r
gm.hitsQueue <- r
}

func (gm *globalManager) QueueUpdate(r *RateLimitReq) {
gm.broadcastQueue <- r
gm.updatesQueue <- r
}

// runAsyncHits collects async hit requests and queues them to
// be sent to their owning peers.
func (gm *globalManager) runAsyncHits() {
// toOwners collects async hit requests in a forever loop,
// aggregates them in one request, and sends them to
// the owning peers.
// The updates are sent both when the batch limit is hit
// and in a periodic frequency determined by GlobalSyncWait
func (gm *globalManager) toOwners() {
var interval = NewInterval(gm.conf.GlobalSyncWait)
hits := make(map[string]*RateLimitReq)

gm.wg.Until(func(done chan struct{}) bool {

select {
case r := <-gm.asyncQueue:
case r := <-gm.hitsQueue:
// Aggregate the hits into a single request
key := r.HashKey()
_, ok := hits[key]
Expand All @@ -99,7 +102,7 @@ func (gm *globalManager) runAsyncHits() {

// Send the hits if we reached our batch limit
if len(hits) == gm.conf.GlobalBatchLimit {
gm.sendHits(hits)
gm.sendHitsToOwners(hits)
hits = make(map[string]*RateLimitReq)
return true
}
Expand All @@ -112,7 +115,7 @@ func (gm *globalManager) runAsyncHits() {

case <-interval.C:
if len(hits) != 0 {
gm.sendHits(hits)
gm.sendHitsToOwners(hits)
hits = make(map[string]*RateLimitReq)
}
case <-done:
Expand All @@ -122,9 +125,9 @@ func (gm *globalManager) runAsyncHits() {
})
}

// sendHits takes the hits collected by runAsyncHits and sends them to their
// sendHitsToOwners takes the hits collected by toOwners and sends them to their
// owning peers
func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
func (gm *globalManager) sendHitsToOwners(hits map[string]*RateLimitReq) {
type pair struct {
client *PeerClient
req GetPeerRateLimitsReq
Expand Down Expand Up @@ -162,28 +165,31 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
Errorf("while sending global hits to '%s'", p.client.Info().GRPCAddress)
}
return nil
}, p)
}
fan.Wait()
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
func (gm *globalManager) runBroadcasts() {
// fromOwners collects status changes for global rate limits in a forever loop,
// and broadcasts the changes to each peer in the cluster.
// The updates are sent both when the batch limit is hit
// and in a periodic frequency determined by GlobalSyncWait
func (gm *globalManager) fromOwners() {
var interval = NewInterval(gm.conf.GlobalSyncWait)
updates := make(map[string]*RateLimitReq)

gm.wg.Until(func(done chan struct{}) bool {
select {
case r := <-gm.broadcastQueue:
case r := <-gm.updatesQueue:
updates[r.HashKey()] = r

// Send the hits if we reached our batch limit
if len(updates) >= gm.conf.GlobalBatchLimit {
gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc()
gm.broadcastPeers(context.Background(), updates)
gm.sendUpdatesToPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
return true
}
Expand All @@ -197,7 +203,7 @@ func (gm *globalManager) runBroadcasts() {
case <-interval.C:
if len(updates) != 0 {
gm.metricBroadcastCounter.WithLabelValues("timer").Inc()
gm.broadcastPeers(context.Background(), updates)
gm.sendUpdatesToPeers(context.Background(), updates)
updates = make(map[string]*RateLimitReq)
} else {
gm.metricGlobalQueueLength.Set(0)
Expand All @@ -209,8 +215,8 @@ func (gm *globalManager) runBroadcasts() {
})
}

// broadcastPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) {
// sendUpdatesToPeers broadcasts global rate limit statuses to all other peers
func (gm *globalManager) sendUpdatesToPeers(ctx context.Context, updates map[string]*RateLimitReq) {
defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration()
var req UpdatePeerGlobalsReq

Expand All @@ -226,10 +232,9 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]

status, err := gm.instance.getLocalRateLimit(ctx, rl)
if err != nil {
gm.log.WithError(err).Errorf("while broadcasting update to peers for: '%s'", rl.HashKey())
gm.log.WithError(err).Errorf("while getting local rate limit for: '%s'", rl.HashKey())
continue
}
// Build an UpdatePeerGlobalsReq
req.Globals = append(req.Globals, &UpdatePeerGlobal{
Algorithm: rl.Algorithm,
Key: rl.HashKey(),
Expand Down
Loading