Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic during bind throttle #404

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions lib/adaptivequemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"strings"
"sync/atomic"
"time"

"github.com/paypal/hera/cal"
Expand Down Expand Up @@ -138,11 +138,12 @@ func bindEvictNameOk(bindName string) (bool) {
bind name and values */
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer lock on bindevict is being held for too long. It gets unlocked only after inner lock on workerpool is done processing dispatched workers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it is on copy. It is safe reduce scope of the lock

throttleCount := 0
GetBindEvict().lock.Lock()
for _,keyValues := range GetBindEvict().BindThrottle {
bindEvict := GetBindEvict()
bindEvict.lock.Lock()
defer bindEvict.lock.Unlock()
for _,keyValues := range bindEvict.BindThrottle {
throttleCount += len(keyValues)
}
GetBindEvict().lock.Unlock()
if throttleCount > GetConfig().BindEvictionMaxThrottle {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "already too many bind throttles, skipping bind eviction and throttle")
Expand All @@ -159,9 +160,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}
usqlhash := uint32(worker.sqlHash)
sqlhash := atomic.LoadUint32(&(usqlhash))
GetBindEvict().lock.Lock()
_, ok := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
_, ok := bindEvict.BindThrottle[sqlhash]
if ok {
continue // don't repeatedly bind evict something already evicted
}
Expand Down Expand Up @@ -261,13 +260,11 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
}

// setup allow-every-x
GetBindEvict().lock.Lock()
sqlBind, ok := GetBindEvict().BindThrottle[sqlhash]
sqlBind, ok := bindEvict.BindThrottle[sqlhash]
if !ok {
sqlBind = make(map[string]*BindThrottle)
GetBindEvict().BindThrottle[sqlhash] = sqlBind
bindEvict.BindThrottle[sqlhash] = sqlBind
}
GetBindEvict().lock.Unlock()
concatKey := fmt.Sprintf("%s|%s", bindName, bindValue)
throttle, ok := sqlBind[concatKey]
if ok {
Expand Down
54 changes: 29 additions & 25 deletions lib/bindevict.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good.

Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func GetBindEvict() *BindEvict {
}
return cfg.(*BindEvict)
}
func (this *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
for k, v := range this.BindThrottle {
func (bindEvict *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
for k,v := range bindEvict.BindThrottle {
out.BindThrottle[k] = v
}
return &out
Expand Down Expand Up @@ -85,41 +85,44 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
return
}
entry.AllowEveryX = 0
GetBindEvict().lock.Lock()
defer GetBindEvict().lock.Unlock()
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
if entry.AllowEveryX > 10000 {
entry.AllowEveryX = 10000
}
}

func (bindEvict *BindEvict) updateThrottle(entry *BindThrottle) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs to be used with caution (hold the lock when updating the copy), the new struct has a different lock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change decrease logic also to avoid slow recovery?

// delete entry
if len(GetBindEvict().BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := GetBindEvict().Copy()
if len(bindEvict.BindThrottle[entry.Sqlhash]) == 1 {
updateCopy := bindEvict.Copy()
delete(updateCopy.BindThrottle, entry.Sqlhash)
gBindEvict.Store(updateCopy)
} else {
// copy everything except bindKV (skipping it is deleting it)
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
updateCopy := make(map[string]*BindThrottle)
for k, v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
updateCopy := bindEvict.Copy()
updateBindThrottleCopy := make(map[string]*BindThrottle)
for k,v := range bindEvict.BindThrottle[entry.Sqlhash] {
if k == bindKV {
continue
}
updateCopy[k] = v
updateBindThrottleCopy[k] = v
}
GetBindEvict().BindThrottle[entry.Sqlhash] = updateCopy
}
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
if entry.AllowEveryX > 10000 {
entry.AllowEveryX = 10000
updateCopy.BindThrottle[entry.Sqlhash] = updateBindThrottleCopy
gBindEvict.Store(updateCopy)
}
}

func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
GetBindEvict().lock.Lock()
sqlBinds := GetBindEvict().BindThrottle[sqlhash]
GetBindEvict().lock.Unlock()
func (bindEvict *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavyUsage bool) (bool, *BindThrottle) {
bindEvict.lock.Lock()
defer bindEvict.lock.Unlock()
sqlBinds := bindEvict.BindThrottle[sqlhash]
for k0, v := range bindKV /*parseBinds(request)*/ {
k := NormalizeBindName(k0)
concatKey := fmt.Sprintf("%s|%s", k, v)
Expand All @@ -143,6 +146,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
gap := now.Sub(*recent).Seconds() * GetConfig().BindEvictionDecrPerSec
entry.decrAllowEveryX(int(gap))
if entry.AllowEveryX == 0 {
bindEvict.updateThrottle(entry)
return false, nil
}

Expand Down
9 changes: 5 additions & 4 deletions lib/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,10 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
xShardRead := false

// check bind throttle
GetBindEvict().lock.Lock()
_, ok := GetBindEvict().BindThrottle[uint32(crd.sqlhash)]
GetBindEvict().lock.Unlock()
bindEvict := GetBindEvict()
bindEvict.lock.Lock()
_, ok := bindEvict.BindThrottle[uint32(crd.sqlhash)]
bindEvict.lock.Unlock()
if ok {
wType := wtypeRW
cfg := GetNumWorkers(crd.shard.shardID)
Expand Down Expand Up @@ -686,7 +687,7 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
logger.GetLogger().Log(logger.Debug, msg)
}
}
needBlock, throttleEntry := GetBindEvict().ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
needBlock, throttleEntry := bindEvict.ShouldBlock(uint32(crd.sqlhash), bindkv, heavyUsage)
if needBlock {
msg := fmt.Sprintf("k=%s&v=%s&allowEveryX=%d&allowFrac=%.5f&raddr=%s",
throttleEntry.Name,
Expand Down
Loading