Skip to content

Commit

Permalink
fix panic: unlock of unlocked mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatsridhar95 committed Oct 22, 2024
1 parent 36771d0 commit b6b8cd3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
19 changes: 8 additions & 11 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"strings"
"sync/atomic"
"time"

"github.com/paypal/hera/cal"
Expand Down Expand Up @@ -138,11 +138,12 @@ func bindEvictNameOk(bindName string) (bool) {
bind name and values */
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
throttleCount := 0
GetBindEvict().lock.Lock()
for _,keyValues := range GetBindEvict().BindThrottle {
bindEvict := GetBindEvict()
bindEvict.lock.Lock()
defer bindEvict.lock.Unlock()
for _,keyValues := range bindEvict.BindThrottle {
throttleCount += len(keyValues)
}
GetBindEvict().lock.Unlock()
if throttleCount > GetConfig().BindEvictionMaxThrottle {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "already too many bind throttles, skipping bind eviction and throttle")
Expand All @@ -159,9 +160,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
usqlhash := uint32(worker.sqlHash)
sqlhash := atomic.LoadUint32(&(usqlhash))
GetBindEvict().lock.Lock()
_, ok := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
_, ok := bindEvict.BindThrottle[sqlhash]
if ok {
continue // don't repeatedly bind evict something already evicted
}
Expand Down Expand Up @@ -261,13 +260,11 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}

// setup allow-every-x
GetBindEvict().lock.Lock()
sqlBind, ok := GetBindEvict().BindThrottle[sqlhash]
sqlBind, ok := bindEvict.BindThrottle[sqlhash]
if !ok {
sqlBind = make(map[string]*BindThrottle)
GetBindEvict().BindThrottle[sqlhash] = sqlBind
bindEvict.BindThrottle[sqlhash] = sqlBind
}
GetBindEvict().lock.Unlock()
concatKey := fmt.Sprintf("%s|%s", bindName, bindValue)
throttle, ok := sqlBind[concatKey]
if ok {
Expand Down
52 changes: 28 additions & 24 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func GetBindEvict() *BindEvict {
}
return cfg.(*BindEvict)
}
func (this *BindEvict) Copy() *BindEvict {
func (bindEvict *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
for k,v := range this.BindThrottle {
for k,v := range bindEvict.BindThrottle {
out.BindThrottle[k] = v
}
return &out
Expand Down Expand Up @@ -85,41 +85,44 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
return
}
entry.AllowEveryX = 0
GetBindEvict().lock.Lock()
defer GetBindEvict().lock.Unlock()
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
if entry.AllowEveryX > 10000 {
entry.AllowEveryX = 10000
}
}

func (bindEvict *BindEvict) updateThrottle(entry *BindThrottle) {
// delete entry
if len(GetBindEvict().BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := GetBindEvict().Copy()
if len(bindEvict.BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := bindEvict.Copy()
delete(updateCopy.BindThrottle, entry.Sqlhash)
gBindEvict.Store(updateCopy)
} else {
// copy everything except bindKV (skipping it is deleting it)
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
updateCopy := make(map[string]*BindThrottle)
for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
updateCopy := bindEvict.Copy()
updateBindThrottleCopy := make(map[string]*BindThrottle)
for k,v := range bindEvict.BindThrottle[entry.Sqlhash] {
if k == bindKV {
continue
}
updateCopy[k] = v
updateBindThrottleCopy[k] = v
}
GetBindEvict().BindThrottle[entry.Sqlhash] = updateCopy
}
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
if entry.AllowEveryX > 10000 {
entry.AllowEveryX = 10000
updateCopy.BindThrottle[entry.Sqlhash] = updateBindThrottleCopy
gBindEvict.Store(updateCopy)
}
}

func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
GetBindEvict().lock.Lock()
sqlBinds := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
bindEvict.lock.Lock()
defer bindEvict.lock.Unlock()
sqlBinds := bindEvict.BindThrottle[sqlhash]
for k0, v := range bindKV /*parseBinds(request)*/ {
k := NormalizeBindName(k0)
concatKey := fmt.Sprintf("%s|%s", k, v)
Expand All @@ -143,6 +146,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec
entry.decrAllowEveryX(int(gap))
if entry.AllowEveryX == 0 {
bindEvict.updateThrottle(entry)
return false, nil
}

Expand Down
9 changes: 5 additions & 4 deletions lib/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,10 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
xShardRead := false

// check bind throttle
GetBindEvict().lock.Lock()
_, ok := GetBindEvict().BindThrottle[uint32(crd.sqlhash)]
GetBindEvict().lock.Unlock()
bindEvict := GetBindEvict()
bindEvict.lock.Lock()
_, ok := bindEvict.BindThrottle[uint32(crd.sqlhash)]
bindEvict.lock.Unlock()
if ok {
wType := wtypeRW
cfg := GetNumWorkers(crd.shard.shardID)
Expand Down Expand Up @@ -686,7 +687,7 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
logger.GetLogger().Log(logger.Debug, msg)
}
}
needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
needBlock, throttleEntry := bindEvict.ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
if needBlock {
msg := fmt.Sprintf("k=%s&v=%s&allowEveryX=%d&allowFrac=%.5f&raddr=%s",
throttleEntry.Name,
Expand Down

0 comments on commit b6b8cd3

Please sign in to comment.