Skip to content

Commit

Permalink
Reduce Heap Allocations (#8)
Browse files Browse the repository at this point in the history
* Reduce Memory Allocation for Item Get

* Reduce Allocs for Proxy
  • Loading branch information
Quang Tùng authored Jun 5, 2023
1 parent a3d9013 commit 08cda07
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 128 deletions.
192 changes: 98 additions & 94 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,151 +225,155 @@ type getResultType[T any] struct {
err error
}

func (i *Item[T, K]) handleLeaseGranted(
ctx context.Context, key K,
setError func(err error),
setResponse func(resp T),
keyStr string, cas uint64,
) {
fillFn := i.filler(ctx, key)
i.sess.AddNextCall(func() {
func (s *getState[T, K]) handleLeaseGranted(cas uint64) {
fillFn := s.it.filler(s.ctx, s.key)
s.it.sess.AddNextCall(func() {
fillResp, err := fillFn()

if err == ErrNotFound {
setResponse(fillResp)
i.pipeline.Delete(keyStr, memproxy.DeleteOptions{})
s.setResponse(fillResp)
s.it.pipeline.Delete(s.keyStr, memproxy.DeleteOptions{})
return
}

if err != nil {
setError(err)
s.setResponseError(err)
return
}

data, err := fillResp.Marshal()
if err != nil {
setError(err)
s.setResponseError(err)
return
}
setResponse(fillResp)
s.setResponse(fillResp)

if cas > 0 {
_ = i.pipeline.LeaseSet(keyStr, data, cas, memproxy.LeaseSetOptions{})
i.sess.AddNextCall(i.pipeline.Execute)
_ = s.it.pipeline.LeaseSet(s.keyStr, data, cas, memproxy.LeaseSetOptions{})
s.it.sess.AddNextCall(s.it.pipeline.Execute)
}
})
}

// Get ...
//
//revive:disable-next-line:cognitive-complexity
func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {
keyStr := key.String()
type getState[T Value, K Key] struct {
ctx context.Context
key K

returnFn := func() (T, error) {
i.sess.Execute()
it *Item[T, K]

result := i.getKeys[key]
return result.resp, result.err
}
retryCount int
keyStr string
leaseGetFunc func() (memproxy.LeaseGetResponse, error)
}

_, existed := i.getKeys[key]
if existed {
return returnFn
func (s *getState[T, K]) setResponseError(err error) {
s.it.options.errorLogger(err)
s.it.getKeys[s.key] = getResultType[T]{
err: err,
}
i.getKeys[key] = getResultType[T]{}
}

retryCount := 0
func (s *getState[T, K]) setResponse(resp T) {
s.it.getKeys[s.key] = getResultType[T]{
resp: resp,
}
}

leaseGetFn := i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})
func (s *getState[T, K]) doFillFunc(cas uint64) {
s.it.stats.FillCount++
s.handleLeaseGranted(cas)
}

var nextFn func()
func (s *getState[T, K]) handleCacheError(err error) {
s.it.stats.LeaseGetError++
if s.it.options.fillingOnCacheError {
s.it.options.errorLogger(err)
s.doFillFunc(0)
} else {
s.setResponseError(err)
}
}

setResponseError := func(err error) {
i.options.errorLogger(err)
i.getKeys[key] = getResultType[T]{
err: err,
}
func (s *getState[T, K]) nextFunc() {
leaseGetResp, err := s.leaseGetFunc()
if err != nil {
s.handleCacheError(err)
return
}

setResponse := func(resp T) {
i.getKeys[key] = getResultType[T]{
resp: resp,
if leaseGetResp.Status == memproxy.LeaseGetStatusFound {
s.it.stats.HitCount++
resp, err := s.it.unmarshaler(leaseGetResp.Data)
if err != nil {
s.setResponseError(err)
return
}
s.setResponse(resp)
return
}

nextFn = func() {
leaseGetResp, err := leaseGetFn()
if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseGranted {
s.doFillFunc(leaseGetResp.CAS)
return
}

doFillFunc := func() {
i.stats.FillCount++
i.handleLeaseGranted(
ctx, key,
setResponseError, setResponse,
keyStr, leaseGetResp.CAS,
)
}
if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseRejected {
s.it.increaseRejectedCount(s.retryCount)

handleCacheError := func(err error) {
i.stats.LeaseGetError++
if i.options.fillingOnCacheError {
leaseGetResp = memproxy.LeaseGetResponse{}
i.options.errorLogger(err)
doFillFunc()
} else {
setResponseError(err)
}
}
if s.retryCount < len(s.it.options.sleepDurations) {
s.it.sess.AddDelayedCall(s.it.options.sleepDurations[s.retryCount], func() {
s.retryCount++

if err != nil {
handleCacheError(err)
s.leaseGetFunc = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.it.sess.AddNextCall(s.nextFunc)
})
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusFound {
i.stats.HitCount++
resp, err := i.unmarshaler(leaseGetResp.Data)
if err != nil {
setResponseError(err)
return
}
setResponse(resp)
if !s.it.options.errorOnRetryLimit {
s.doFillFunc(leaseGetResp.CAS)
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseGranted {
doFillFunc()
return
}
s.setResponseError(ErrExceededRejectRetryLimit)
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseRejected {
i.increaseRejectedCount(retryCount)
s.handleCacheError(ErrInvalidLeaseGetStatus)
}

if retryCount < len(i.options.sleepDurations) {
i.sess.AddDelayedCall(i.options.sleepDurations[retryCount], func() {
retryCount++
func (s *getState[T, K]) returnFunc() (T, error) {
s.it.sess.Execute()

leaseGetFn = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})
i.sess.AddNextCall(nextFn)
})
return
}
result := s.it.getKeys[s.key]
return result.resp, result.err
}

if !i.options.errorOnRetryLimit {
doFillFunc()
return
}
// Get a single item with key
func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {
keyStr := key.String()

setResponseError(ErrExceededRejectRetryLimit)
return
}
state := &getState[T, K]{
ctx: ctx,
key: key,

it: i,

retryCount: 0,
keyStr: keyStr,
}

handleCacheError(ErrInvalidLeaseGetStatus)
_, existed := i.getKeys[key]
if existed {
return state.returnFunc
}
i.getKeys[key] = getResultType[T]{}

state.leaseGetFunc = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})

i.sess.AddNextCall(nextFn)
i.sess.AddNextCall(state.nextFunc)

return returnFn
return state.returnFunc
}

func (i *Item[T, K]) increaseRejectedCount(retryCount int) {
Expand Down
91 changes: 57 additions & 34 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,54 @@ func (p *Pipeline) setKeyForLeaseSet(
}
}

type leaseGetState struct {
pipe *Pipeline
serverID ServerID
key string
options memproxy.LeaseGetOptions

fn func() (memproxy.LeaseGetResponse, error)

resp memproxy.LeaseGetResponse
err error
}

func (s *leaseGetState) retryOnOtherNode() {
s.pipe.doExecuteForAllServers()
s.resp, s.err = s.fn()
if s.err == nil {
s.pipe.setKeyForLeaseSet(s.key, s.resp, s.serverID)
}
}

func (s *leaseGetState) nextFunc() {
s.pipe.doExecuteForAllServers()
s.resp, s.err = s.fn()

if s.err != nil {
s.pipe.selector.SetFailedServer(s.serverID)
if !s.pipe.selector.HasNextAvailableServer() {
return
}

s.serverID = s.pipe.selector.SelectServer(s.key)

pipe := s.pipe.getRoutePipeline(s.serverID)
s.fn = pipe.LeaseGet(s.key, s.options)

s.pipe.sess.AddNextCall(s.retryOnOtherNode)
return
}

s.pipe.setKeyForLeaseSet(s.key, s.resp, s.serverID)
}

func (s *leaseGetState) returnFunc() (memproxy.LeaseGetResponse, error) {
s.pipe.sess.Execute()
s.pipe.selector.Reset()
return s.resp, s.err
}

// LeaseGet ...
func (p *Pipeline) LeaseGet(
key string, options memproxy.LeaseGetOptions,
Expand All @@ -196,42 +244,17 @@ func (p *Pipeline) LeaseGet(
pipe := p.getRoutePipeline(serverID)
fn := pipe.LeaseGet(key, options)

var resp memproxy.LeaseGetResponse
var err error

p.sess.AddNextCall(func() {
p.doExecuteForAllServers()
resp, err = fn()

if err != nil {
p.selector.SetFailedServer(serverID)
if !p.selector.HasNextAvailableServer() {
return
}

serverID = p.selector.SelectServer(key)

pipe := p.getRoutePipeline(serverID)
fn = pipe.LeaseGet(key, options)

p.sess.AddNextCall(func() {
p.doExecuteForAllServers()
resp, err = fn()
if err == nil {
p.setKeyForLeaseSet(key, resp, serverID)
}
})
return
}
state := &leaseGetState{
pipe: p,
serverID: serverID,
key: key,
options: options,

p.setKeyForLeaseSet(key, resp, serverID)
})

return func() (memproxy.LeaseGetResponse, error) {
p.sess.Execute()
p.selector.Reset()
return resp, err
fn: fn,
}

p.sess.AddNextCall(state.nextFunc)
return state.returnFunc
}

// LeaseSet ...
Expand Down

0 comments on commit 08cda07

Please sign in to comment.