Skip to content

Commit

Permalink
Using new session interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tung.tq committed Oct 9, 2023
1 parent 74273eb commit a5ac722
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 51 deletions.
4 changes: 2 additions & 2 deletions fake/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ func TestPipeline__Do_Finish(t *testing.T) {
sess := pipe.LowerSession()

calls := 0
sess.AddNextCall(func() {
sess.AddNextCall(memproxy.NewEmptyCallback(func() {
calls++
})
}))
sess.Execute()

assert.Equal(t, 1, calls)
Expand Down
29 changes: 23 additions & 6 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"log"
"time"
"unsafe"

"github.com/QuangTung97/go-memcache/memcache"

Expand Down Expand Up @@ -223,14 +224,28 @@ type Item[T Value, K Key] struct {
stats Stats
}

func (i *Item[T, K]) addNextCall(fn func(obj unsafe.Pointer)) {
i.sess.AddNextCall(memproxy.CallbackFunc{
Object: nil,
Func: fn,
})
}

func (i *Item[T, K]) addDelayedCall(d time.Duration, fn func(obj unsafe.Pointer)) {
i.sess.AddDelayedCall(d, memproxy.CallbackFunc{
Object: nil,
Func: fn,
})
}

type getResultType[T any] struct {
resp T
err error
}

func (s *getState[T, K]) handleLeaseGranted(cas uint64) {
fillFn := s.it.filler(s.ctx, s.key)
s.it.sess.AddNextCall(func() {
s.it.addNextCall(func(_ unsafe.Pointer) {
fillResp, err := fillFn()

if err == ErrNotFound {
Expand All @@ -253,7 +268,9 @@ func (s *getState[T, K]) handleLeaseGranted(cas uint64) {

if cas > 0 {
_ = s.it.pipeline.LeaseSet(s.keyStr, data, cas, memproxy.LeaseSetOptions{})
s.it.sess.AddNextCall(s.it.pipeline.Execute)
s.it.addNextCall(func(obj unsafe.Pointer) {
s.it.pipeline.Execute()
})
}
})
}
Expand Down Expand Up @@ -298,7 +315,7 @@ func (s *getState[T, K]) handleCacheError(err error) {
}
}

func (s *getState[T, K]) nextFunc() {
func (s *getState[T, K]) nextFunc(_ unsafe.Pointer) {
leaseGetResp, err := s.leaseGetResult.Result()

s.leaseGetResult = nil
Expand Down Expand Up @@ -333,11 +350,11 @@ func (s *getState[T, K]) nextFunc() {
s.it.increaseRejectedCount(s.retryCount)

if s.retryCount < len(s.it.options.sleepDurations) {
s.it.sess.AddDelayedCall(s.it.options.sleepDurations[s.retryCount], func() {
s.it.addDelayedCall(s.it.options.sleepDurations[s.retryCount], func(_ unsafe.Pointer) {
s.retryCount++

s.leaseGetResult = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.it.sess.AddNextCall(s.nextFunc)
s.it.addNextCall(s.nextFunc)
})
return
}
Expand Down Expand Up @@ -383,7 +400,7 @@ func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {

state.leaseGetResult = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})

i.sess.AddNextCall(state.nextFunc)
i.addNextCall(state.nextFunc)

return state.returnFunc
}
Expand Down
8 changes: 4 additions & 4 deletions item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func newItemTestWithSleepDurations(
},
}

var calls []func()
var calls []memproxy.CallbackFunc

sess.AddNextCallFunc = func(fn func()) {
sess.AddNextCallFunc = func(fn memproxy.CallbackFunc) {
calls = append(calls, fn)
}
sess.AddDelayedCallFunc = func(d time.Duration, fn func()) {
sess.AddDelayedCallFunc = func(d time.Duration, fn memproxy.CallbackFunc) {
i.delayCalls = append(i.delayCalls, d)
calls = append(calls, fn)
}
Expand All @@ -134,7 +134,7 @@ func newItemTestWithSleepDurations(
nextCalls := calls
calls = nil
for _, fn := range nextCalls {
fn()
fn.Call()
}
}
}
Expand Down
26 changes: 24 additions & 2 deletions memproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package memproxy
import (
"context"
"time"
"unsafe"
)

// Memcache represents a generic Memcache interface
Expand Down Expand Up @@ -62,10 +63,31 @@ type SessionProvider interface {
New() Session
}

// CallbackFunc for session
type CallbackFunc struct {
Object unsafe.Pointer
Func func(obj unsafe.Pointer)
}

// Call ...
func (f CallbackFunc) Call() {
f.Func(f.Object)
}

// NewEmptyCallback creates CallbackFunc from empty args function
func NewEmptyCallback(fn func()) CallbackFunc {
return CallbackFunc{
Object: nil,
Func: func(_ unsafe.Pointer) {
fn()
},
}
}

// Session controlling session values & delayed tasks, this object is NOT Thread Safe
type Session interface {
AddNextCall(fn func())
AddDelayedCall(d time.Duration, fn func())
AddNextCall(fn CallbackFunc)
AddDelayedCall(d time.Duration, fn CallbackFunc)
Execute()

GetLower() Session
Expand Down
5 changes: 3 additions & 2 deletions mhash/mhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/binary"
"encoding/hex"
"errors"
"math"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/item"
"math"
)

// ErrHashTooDeep when too many levels to go to
Expand Down Expand Up @@ -196,7 +197,7 @@ func (h *Hash[T, R, K]) Get(ctx context.Context, rootKey R, key K) func() (Null[
Level: callCtx.level,
Hash: computeHashAtLevel(keyHash, callCtx.level),
})
h.sess.AddNextCall(nextCallFn)
h.sess.AddNextCall(memproxy.NewEmptyCallback(nextCallFn))
}

callCtx.doComputeFn = doGetFn
Expand Down
17 changes: 9 additions & 8 deletions mhash/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mhash

import (
"context"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/item"
)
Expand Down Expand Up @@ -285,9 +286,9 @@ func (u *HashUpdater[T, R, K]) UpsertBucket(
if withUpdate {
nextCallFn(true)
} else {
u.sess.AddNextCall(func() {
u.sess.AddNextCall(memproxy.NewEmptyCallback(func() {
nextCallFn(false)
})
}))
}
}

Expand Down Expand Up @@ -367,13 +368,13 @@ func (u *HashUpdater[T, R, K]) UpsertBucket(

callCtx.doComputeFn()

u.lowerSession.AddNextCall(func() {
u.lowerSession.AddNextCall(memproxy.NewEmptyCallback(func() {
callCtx = callContext{}
callCtx.doComputeFn = func() {
doComputeWithUpdate(true)
}
callCtx.doComputeFn()
})
}))

return func() error {
u.execute()
Expand Down Expand Up @@ -404,9 +405,9 @@ func (u *HashUpdater[T, R, K]) DeleteBucket(
if withUpdate {
nextCallFn(true)
} else {
u.sess.AddNextCall(func() {
u.sess.AddNextCall(memproxy.NewEmptyCallback(func() {
nextCallFn(false)
})
}))
}
}

Expand Down Expand Up @@ -448,7 +449,7 @@ func (u *HashUpdater[T, R, K]) DeleteBucket(

callCtx.doComputeFn()

u.lowerSession.AddNextCall(func() {
u.lowerSession.AddNextCall(memproxy.NewEmptyCallback(func() {
// clear state
callCtx = callContext{}
scannedBuckets = scannedBuckets[:0]
Expand All @@ -457,7 +458,7 @@ func (u *HashUpdater[T, R, K]) DeleteBucket(
doComputeWithUpdate(true)
}
callCtx.doComputeFn()
})
}))

return func() error {
u.execute()
Expand Down
28 changes: 14 additions & 14 deletions mocks/memproxy_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a5ac722

Please sign in to comment.