diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 8e49e2e0..d59992b5 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "sync/atomic" "strings" + "sync/atomic" "time" "github.com/paypal/hera/cal" @@ -138,11 +138,12 @@ func bindEvictNameOk(bindName string) (bool) { bind name and values */ func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttleCount := 0 - GetBindEvict().lock.Lock() - for _,keyValues := range GetBindEvict().BindThrottle { + bindEvict := GetBindEvict() + bindEvict.lock.Lock() + defer bindEvict.lock.Unlock() + for _,keyValues := range bindEvict.BindThrottle { throttleCount += len(keyValues) } - GetBindEvict().lock.Unlock() if throttleCount > GetConfig().BindEvictionMaxThrottle { if logger.GetLogger().V(logger.Info) { logger.GetLogger().Log(logger.Info, "already too many bind throttles, skipping bind eviction and throttle") @@ -159,9 +160,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } usqlhash := uint32(worker.sqlHash) sqlhash := atomic.LoadUint32(&(usqlhash)) - GetBindEvict().lock.Lock() - _, ok := GetBindEvict().BindThrottle[sqlhash] - GetBindEvict().lock.Unlock() + _, ok := bindEvict.BindThrottle[sqlhash] if ok { continue // don't repeatedly bind evict something already evicted } @@ -261,13 +260,11 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } // setup allow-every-x - GetBindEvict().lock.Lock() - sqlBind, ok := GetBindEvict().BindThrottle[sqlhash] + sqlBind, ok := bindEvict.BindThrottle[sqlhash] if !ok { sqlBind = make(map[string]*BindThrottle) - GetBindEvict().BindThrottle[sqlhash] = sqlBind + bindEvict.BindThrottle[sqlhash] = sqlBind } - GetBindEvict().lock.Unlock() concatKey := fmt.Sprintf("%s|%s", bindName, bindValue) throttle, ok := sqlBind[concatKey] if ok { diff --git a/lib/bindevict.go b/lib/bindevict.go index a31585b6..c1a1034b 100644 --- a/lib/bindevict.go +++ b/lib/bindevict.go @@ -54,9 +54,9 @@ func GetBindEvict() *BindEvict { } return cfg.(*BindEvict) } -func (this *BindEvict) Copy() *BindEvict { +func (bindEvict *BindEvict) Copy() *BindEvict { out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)} - for k,v := range this.BindThrottle { + for k,v := range bindEvict.BindThrottle { out.BindThrottle[k] = v } return &out @@ -85,41 +85,44 @@ func (entry *BindThrottle) decrAllowEveryX(y int) { return } entry.AllowEveryX = 0 - GetBindEvict().lock.Lock() - defer GetBindEvict().lock.Unlock() +} +func (entry *BindThrottle) incrAllowEveryX() { + if logger.GetLogger().V(logger.Warning) { + info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX) + logger.GetLogger().Log(logger.Warning, "bind throttle incr", info) + } + entry.AllowEveryX = 3*entry.AllowEveryX + 1 + if entry.AllowEveryX > 10000 { + entry.AllowEveryX = 10000 + } +} + +func (bindEvict *BindEvict) updateThrottle(entry *BindThrottle) { // delete entry - if len(GetBindEvict().BindThrottle[entry.Sqlhash]) == 1 { - updateCopy := GetBindEvict().Copy() + if len(bindEvict.BindThrottle[entry.Sqlhash]) == 1 { + updateCopy := bindEvict.Copy() delete(updateCopy.BindThrottle, entry.Sqlhash) gBindEvict.Store(updateCopy) } else { // copy everything except bindKV (skipping it is deleting it) bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value) - updateCopy := make(map[string]*BindThrottle) - for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] { + updateCopy := bindEvict.Copy() + updateBindThrottleCopy := make(map[string]*BindThrottle) + for k,v := range bindEvict.BindThrottle[entry.Sqlhash] { if k == bindKV { continue } - updateCopy[k] = v + updateBindThrottleCopy[k] = v } - GetBindEvict().BindThrottle[entry.Sqlhash] = updateCopy - } -} -func (entry *BindThrottle) incrAllowEveryX() { - if logger.GetLogger().V(logger.Warning) { - info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX) - logger.GetLogger().Log(logger.Warning, "bind throttle incr", info) - } - entry.AllowEveryX = 3*entry.AllowEveryX + 1 - if entry.AllowEveryX > 10000 { - entry.AllowEveryX = 10000 + updateCopy.BindThrottle[entry.Sqlhash] = updateBindThrottleCopy + gBindEvict.Store(updateCopy) } } -func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) { - GetBindEvict().lock.Lock() - sqlBinds := GetBindEvict().BindThrottle[sqlhash] - GetBindEvict().lock.Unlock() +func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) { + bindEvict.lock.Lock() + defer bindEvict.lock.Unlock() + sqlBinds := bindEvict.BindThrottle[sqlhash] for k0, v := range bindKV /*parseBinds(request)*/ { k := NormalizeBindName(k0) concatKey := fmt.Sprintf("%s|%s", k, v) @@ -143,6 +146,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec entry.decrAllowEveryX(int(gap)) if entry.AllowEveryX == 0 { + bindEvict.updateThrottle(entry) return false, nil } diff --git a/lib/coordinator.go b/lib/coordinator.go index 3b36745a..1dbe7ac1 100644 --- a/lib/coordinator.go +++ b/lib/coordinator.go @@ -652,9 +652,10 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error { xShardRead := false // check bind throttle - GetBindEvict().lock.Lock() - _, ok := GetBindEvict().BindThrottle[uint32(crd.sqlhash)] - GetBindEvict().lock.Unlock() + bindEvict := GetBindEvict() + bindEvict.lock.Lock() + _, ok := bindEvict.BindThrottle[uint32(crd.sqlhash)] + bindEvict.lock.Unlock() if ok { wType := wtypeRW cfg := GetNumWorkers(crd.shard.shardID) @@ -686,7 +687,7 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error { logger.GetLogger().Log(logger.Debug, msg) } } - needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage) + needBlock, throttleEntry := bindEvict.ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage) if needBlock { msg := fmt.Sprintf("k=%s&v=%s&allowEveryX=%d&allowFrac=%.5f&raddr=%s", throttleEntry.Name,