From 20f04b15860a955fda42c086fa491543fe8d3592 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Quang=20T=C3=B9ng?= Date: Wed, 11 Oct 2023 10:44:09 +0700 Subject: [PATCH] Add Get State Common (#33) --- item/item.go | 189 ++++++++++++++++++++++++++++++---------------- item/item_test.go | 5 ++ item/pool.go | 20 +++++ item/pool_test.go | 1 + 4 files changed, 152 insertions(+), 63 deletions(-) create mode 100644 item/pool.go create mode 100644 item/pool_test.go diff --git a/item/item.go b/item/item.go index d928b4b..40e30a3 100644 --- a/item/item.go +++ b/item/item.go @@ -199,9 +199,11 @@ func New[T Value, K Key]( options ...Option, ) *Item[T, K] { return &Item[T, K]{ - options: computeOptions(options), - sess: pipeline.LowerSession(), - pipeline: pipeline, + common: itemCommon{ + options: computeOptions(options), + sess: pipeline.LowerSession(), + pipeline: pipeline, + }, unmarshaler: unmarshaler, filler: filler, @@ -213,25 +215,31 @@ func New[T Value, K Key]( // Item is NOT thread safe and, it contains a cached keys // once a key is cached in memory, it will return the same value unless call **Reset** type Item[T Value, K Key] struct { - options *itemOptions - sess memproxy.Session - pipeline memproxy.Pipeline unmarshaler Unmarshaler[T] filler Filler[T, K] getKeys map[K]*getResultType[T] - stats Stats + common itemCommon } -func (i *Item[T, K]) addNextCall(fn func(obj unsafe.Pointer)) { +type itemCommon struct { + options *itemOptions + sess memproxy.Session + pipeline memproxy.Pipeline + stats Stats + + unmarshalAndSet func(data []byte) +} + +func (i *itemCommon) addNextCall(fn func(obj unsafe.Pointer)) { i.sess.AddNextCall(memproxy.CallbackFunc{ Object: nil, Func: fn, }) } -func (i *Item[T, K]) addDelayedCall(d time.Duration, fn func(obj unsafe.Pointer)) { +func (i *itemCommon) addDelayedCall(d time.Duration, fn func(obj unsafe.Pointer)) { i.sess.AddDelayedCall(d, memproxy.CallbackFunc{ Object: nil, Func: fn, @@ -244,13 +252,18 @@ type getResultType[T any] struct { } func (s *GetState[T, K]) handleLeaseGranted(cas uint64) { - fillFn := s.it.filler(s.ctx, s.key) - s.it.addNextCall(func(_ unsafe.Pointer) { + it := s.getItem() + + fillFn := it.filler(s.common.ctx, s.key) + + it.common.addNextCall(func(_ unsafe.Pointer) { + it := s.common.item + fillResp, err := fillFn() if err == ErrNotFound { s.setResponse(fillResp) - s.it.pipeline.Delete(s.keyStr, memproxy.DeleteOptions{}) + it.pipeline.Delete(s.common.keyStr, memproxy.DeleteOptions{}) return } @@ -267,54 +280,98 @@ func (s *GetState[T, K]) handleLeaseGranted(cas uint64) { s.setResponse(fillResp) if cas > 0 { - _ = s.it.pipeline.LeaseSet(s.keyStr, data, cas, memproxy.LeaseSetOptions{}) - s.it.addNextCall(func(obj unsafe.Pointer) { - s.it.pipeline.Execute() + _ = it.pipeline.LeaseSet(s.common.keyStr, data, cas, memproxy.LeaseSetOptions{}) + it.addNextCall(func(obj unsafe.Pointer) { + s.common.item.pipeline.Execute() }) } }) } -// GetState store intermediate state when getting item -type GetState[T Value, K Key] struct { +type getStateMethods interface { + setResponseError(err error) + doFillFunc(cas uint64) + unmarshalAndSet(data []byte) +} + +type getStateCommon struct { ctx context.Context - key K - it *Item[T, K] + itemRoot unsafe.Pointer + item *itemCommon retryCount int keyStr string leaseGetResult memproxy.LeaseGetResult + methods getStateMethods +} + +// GetState store intermediate state when getting item +type GetState[T Value, K Key] struct { + common *getStateCommon + key K result getResultType[T] } +func (s *GetState[T, K]) getItem() *Item[T, K] { + return (*Item[T, K])(s.common.itemRoot) +} + +func (s *GetState[T, K]) unmarshalAndSet(data []byte) { + it := s.getItem() + resp, err := it.unmarshaler(data) + + memcache.ReleaseGetResponseData(data) + + if err != nil { + s.setResponseError(err) + return + } + s.setResponse(resp) +} + func (s *GetState[T, K]) setResponseError(err error) { - s.it.options.errorLogger(err) - s.it.getKeys[s.key].err = err + it := s.getItem() + + s.common.item.options.errorLogger(err) + it.getKeys[s.key].err = err } func (s *GetState[T, K]) setResponse(resp T) { - s.it.getKeys[s.key].resp = resp + it := s.getItem() + it.getKeys[s.key].resp = resp } func (s *GetState[T, K]) doFillFunc(cas uint64) { - s.it.stats.FillCount++ + s.common.item.stats.FillCount++ s.handleLeaseGranted(cas) } -func (s *GetState[T, K]) handleCacheError(err error) { - s.it.stats.LeaseGetError++ - if s.it.options.fillingOnCacheError { - s.it.options.errorLogger(err) - s.doFillFunc(0) +func (s *getStateCommon) handleCacheError(err error) { + s.item.stats.LeaseGetError++ + if s.item.options.fillingOnCacheError { + s.item.options.errorLogger(err) + s.methods.doFillFunc(0) } else { - s.setResponseError(err) + s.methods.setResponseError(err) } } -func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) { +func (s *getStateCommon) newNextCallback() memproxy.CallbackFunc { + return memproxy.CallbackFunc{ + Object: unsafe.Pointer(s), + Func: stateCommonNextCallback, + } +} + +func stateCommonNextCallback(obj unsafe.Pointer) { + s := (*getStateCommon)(obj) + s.nextFunc() +} + +func (s *getStateCommon) nextFunc() { leaseGetResp, err := s.leaseGetResult.Result() s.leaseGetResult = nil @@ -324,46 +381,40 @@ func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) { return } - if leaseGetResp.Status == memproxy.LeaseGetStatusFound { - s.it.stats.HitCount++ - s.it.stats.TotalBytesRecv += uint64(len(leaseGetResp.Data)) - - resp, err := s.it.unmarshaler(leaseGetResp.Data) + it := s.item - memcache.ReleaseGetResponseData(leaseGetResp.Data) + if leaseGetResp.Status == memproxy.LeaseGetStatusFound { + it.stats.HitCount++ + it.stats.TotalBytesRecv += uint64(len(leaseGetResp.Data)) - if err != nil { - s.setResponseError(err) - return - } - s.setResponse(resp) + s.methods.unmarshalAndSet(leaseGetResp.Data) return } if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseGranted { - s.doFillFunc(leaseGetResp.CAS) + s.methods.doFillFunc(leaseGetResp.CAS) return } if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseRejected { - s.it.increaseRejectedCount(s.retryCount) + it.increaseRejectedCount(s.retryCount) - if s.retryCount < len(s.it.options.sleepDurations) { - s.it.addDelayedCall(s.it.options.sleepDurations[s.retryCount], func(_ unsafe.Pointer) { + if s.retryCount < len(it.options.sleepDurations) { + it.addDelayedCall(it.options.sleepDurations[s.retryCount], func(_ unsafe.Pointer) { s.retryCount++ - s.leaseGetResult = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{}) - s.it.addNextCall(s.nextFunc) + s.leaseGetResult = it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{}) + it.sess.AddNextCall(s.newNextCallback()) }) return } - if !s.it.options.errorOnRetryLimit { - s.doFillFunc(leaseGetResp.CAS) + if !it.options.errorOnRetryLimit { + s.methods.doFillFunc(leaseGetResp.CAS) return } - s.setResponseError(ErrExceededRejectRetryLimit) + s.methods.setResponseError(ErrExceededRejectRetryLimit) return } @@ -372,9 +423,13 @@ func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) { // Result returns result func (s *GetState[T, K]) Result() (T, error) { - s.it.sess.Execute() + it := s.getItem() + it.common.sess.Execute() + + putGetStateCommon(s.common) + s.common = nil - result := s.it.getKeys[s.key] + result := it.getKeys[s.key] return result.resp, result.err } @@ -387,30 +442,38 @@ func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) { func (i *Item[T, K]) GetFast(ctx context.Context, key K) *GetState[T, K] { keyStr := key.String() - state := &GetState[T, K]{ - ctx: ctx, - key: key, + // init get state common + sc := newGetStateCommon() + + sc.ctx = ctx + + sc.itemRoot = unsafe.Pointer(i) + sc.item = &i.common - it: i, + sc.keyStr = keyStr + // end init get state common - retryCount: 0, - keyStr: keyStr, + state := &GetState[T, K]{ + common: sc, + key: key, } + sc.methods = state + _, existed := i.getKeys[key] if existed { return state } i.getKeys[key] = &state.result - state.leaseGetResult = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{}) + sc.leaseGetResult = i.common.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{}) - i.addNextCall(state.nextFunc) + i.common.sess.AddNextCall(sc.newNextCallback()) return state } -func (i *Item[T, K]) increaseRejectedCount(retryCount int) { +func (i *itemCommon) increaseRejectedCount(retryCount int) { i.stats.TotalRejectedCount++ switch retryCount { @@ -425,7 +488,7 @@ func (i *Item[T, K]) increaseRejectedCount(retryCount int) { // LowerSession ... func (i *Item[T, K]) LowerSession() memproxy.Session { - return i.sess.GetLower() + return i.common.sess.GetLower() } // Reset clear in-memory cached values @@ -450,5 +513,5 @@ type Stats struct { // GetStats ... func (i *Item[T, K]) GetStats() Stats { - return i.stats + return i.common.stats } diff --git a/item/item_test.go b/item/item_test.go index 2e3dfd7..74589dc 100644 --- a/item/item_test.go +++ b/item/item_test.go @@ -6,6 +6,7 @@ import ( "errors" "testing" "time" + "unsafe" "github.com/stretchr/testify/assert" @@ -1203,3 +1204,7 @@ func TestItem_WithFakePipeline(t *testing.T) { assert.Equal(t, 1, fillCalls) } + +func TestSizeOfStateCommon(t *testing.T) { + assert.Equal(t, uintptr(88), unsafe.Sizeof(getStateCommon{})) +} diff --git a/item/pool.go b/item/pool.go new file mode 100644 index 0000000..8822110 --- /dev/null +++ b/item/pool.go @@ -0,0 +1,20 @@ +package item + +import ( + "sync" +) + +var getStateCommonPool = sync.Pool{ + New: func() any { + return &getStateCommon{} + }, +} + +func newGetStateCommon() *getStateCommon { + return getStateCommonPool.Get().(*getStateCommon) +} + +func putGetStateCommon(s *getStateCommon) { + *s = getStateCommon{} + getStateCommonPool.Put(s) +} diff --git a/item/pool_test.go b/item/pool_test.go new file mode 100644 index 0000000..eb11d32 --- /dev/null +++ b/item/pool_test.go @@ -0,0 +1 @@ +package item