Skip to content

Commit

Permalink
Change pipeline interface (#26)
Browse files Browse the repository at this point in the history
* Rename benchmark

* Switch to Plain Memcache

* Switch for proxy

* Fix All Implements
  • Loading branch information
QuangTung97 authored Oct 9, 2023
1 parent ed9330d commit 1080d96
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 166 deletions.
9 changes: 5 additions & 4 deletions fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package fake

import (
"context"
"sync"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/mocks"
"sync"
)

// Entry ...
Expand Down Expand Up @@ -54,7 +55,7 @@ func (m *Memcache) Pipeline(_ context.Context, _ ...memproxy.PipelineOption) mem

pipe := &mocks.PipelineMock{}

pipe.LeaseGetFunc = func(key string, options memproxy.LeaseGetOptions) func() (memproxy.LeaseGetResponse, error) {
pipe.LeaseGetFunc = func(key string, options memproxy.LeaseGetOptions) memproxy.LeaseGetResult {
var resp memproxy.LeaseGetResponse

callFn := func() {
Expand Down Expand Up @@ -92,10 +93,10 @@ func (m *Memcache) Pipeline(_ context.Context, _ ...memproxy.PipelineOption) mem

calls = append(calls, callFn)

return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
doCalls()
return resp, nil
}
})
}

pipe.LeaseSetFunc = func(
Expand Down
38 changes: 20 additions & 18 deletions fake/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package fake

import (
"context"
"github.com/QuangTung97/memproxy"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"

"github.com/QuangTung97/memproxy"
)

func newPipelineTest() memproxy.Pipeline {
Expand All @@ -20,14 +22,14 @@ func TestPipeline(t *testing.T) {
fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: 1,
}, resp1)

resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseRejected,
Expand All @@ -43,7 +45,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn3 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp3, err := fn3()
resp3, err := fn3.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand All @@ -59,14 +61,14 @@ func TestPipeline(t *testing.T) {
fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := pipe.LeaseGet("KEY02", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: 1,
}, resp1)

resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -80,7 +82,7 @@ func TestPipeline(t *testing.T) {

fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -95,7 +97,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseRejected,
Expand All @@ -115,7 +117,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -128,7 +130,7 @@ func TestPipeline(t *testing.T) {
defer pipe.Finish()

fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -140,7 +142,7 @@ func TestPipeline(t *testing.T) {
assert.Equal(t, memproxy.DeleteResponse{}, delResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -153,7 +155,7 @@ func TestPipeline(t *testing.T) {
pipe1 := mc.Pipeline(context.Background())

fn1 := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -165,7 +167,7 @@ func TestPipeline(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())
fn2 := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -179,7 +181,7 @@ func TestPipeline__Do_Finish(t *testing.T) {
mc := New()
pipe1 := mc.Pipeline(context.Background())

resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)

pipe1.LeaseSet("KEY01", []byte("data 01"), resp1.CAS, memproxy.LeaseSetOptions{})
Expand All @@ -188,7 +190,7 @@ func TestPipeline__Do_Finish(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())

resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand All @@ -201,7 +203,7 @@ func TestPipeline__Do_Finish(t *testing.T) {
mc := New()
pipe1 := mc.Pipeline(context.Background())

resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)

pipe1.LeaseSet("KEY01", []byte("data 01"), resp1.CAS, memproxy.LeaseSetOptions{})
Expand All @@ -210,7 +212,7 @@ func TestPipeline__Do_Finish(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())

resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand Down
16 changes: 10 additions & 6 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,10 @@ type getState[T Value, K Key] struct {

it *Item[T, K]

retryCount int
keyStr string
leaseGetFunc func() (memproxy.LeaseGetResponse, error)
retryCount int
keyStr string

leaseGetResult memproxy.LeaseGetResult
}

func (s *getState[T, K]) setResponseError(err error) {
Expand Down Expand Up @@ -298,7 +299,10 @@ func (s *getState[T, K]) handleCacheError(err error) {
}

func (s *getState[T, K]) nextFunc() {
leaseGetResp, err := s.leaseGetFunc()
leaseGetResp, err := s.leaseGetResult.Result()

s.leaseGetResult = nil

if err != nil {
s.handleCacheError(err)
return
Expand Down Expand Up @@ -332,7 +336,7 @@ func (s *getState[T, K]) nextFunc() {
s.it.sess.AddDelayedCall(s.it.options.sleepDurations[s.retryCount], func() {
s.retryCount++

s.leaseGetFunc = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.leaseGetResult = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.it.sess.AddNextCall(s.nextFunc)
})
return
Expand Down Expand Up @@ -377,7 +381,7 @@ func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {
}
i.getKeys[key] = getResultType[T]{}

state.leaseGetFunc = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})
state.leaseGetResult = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})

i.sess.AddNextCall(state.nextFunc)

Expand Down
20 changes: 11 additions & 9 deletions item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/fake"
"github.com/QuangTung97/memproxy/mocks"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

type userValue struct {
Expand Down Expand Up @@ -167,26 +169,26 @@ func (i *itemTest) appendAction(action string) {
func (i *itemTest) stubLeaseGet(resp memproxy.LeaseGetResponse, err error) {
i.pipe.LeaseGetFunc = func(
key string, options memproxy.LeaseGetOptions,
) func() (memproxy.LeaseGetResponse, error) {
) memproxy.LeaseGetResult {
i.appendAction(leaseGetAction(key))
return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
i.appendAction(leaseGetFuncAction(key))
return resp, err
}
})
}
}

func (i *itemTest) stubLeaseGetMulti(respList ...memproxy.LeaseGetResponse) {
i.pipe.LeaseGetFunc = func(
key string, options memproxy.LeaseGetOptions,
) func() (memproxy.LeaseGetResponse, error) {
) memproxy.LeaseGetResult {
i.appendAction(leaseGetAction(key))
index := len(i.pipe.LeaseGetCalls()) - 1

return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
i.appendAction(leaseGetFuncAction(key))
return respList[index], nil
}
})
}
}

Expand Down
28 changes: 27 additions & 1 deletion memproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,35 @@ type Memcache interface {
Close() error
}

// LeaseGetResult is the response of LeaseGet, method Result MUST only be called Once.
// Calling Result more than once is undefined behaviour
type LeaseGetResult interface {
Result() (LeaseGetResponse, error)
}

// LeaseGetErrorResult for error only result
type LeaseGetErrorResult struct {
Error error
}

// Result ...
func (r LeaseGetErrorResult) Result() (LeaseGetResponse, error) {
return LeaseGetResponse{}, r.Error
}

// LeaseGetResultFunc for function implementation of LeaseGetResult
type LeaseGetResultFunc func() (LeaseGetResponse, error)

// Result ...
func (f LeaseGetResultFunc) Result() (LeaseGetResponse, error) {
return f()
}

// Pipeline represents a generic Pipeline
type Pipeline interface {
LeaseGet(key string, options LeaseGetOptions) func() (LeaseGetResponse, error)
// LeaseGet should not be used directly, use the item or mmap package instead
LeaseGet(key string, options LeaseGetOptions) LeaseGetResult

LeaseSet(key string, data []byte, cas uint64, options LeaseSetOptions) func() (LeaseSetResponse, error)
Delete(key string, options DeleteOptions) func() (DeleteResponse, error)

Expand Down
Loading

0 comments on commit 1080d96

Please sign in to comment.