Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change pipeline interface #26

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package fake

import (
"context"
"sync"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/mocks"
"sync"
)

// Entry ...
Expand Down Expand Up @@ -54,7 +55,7 @@ func (m *Memcache) Pipeline(_ context.Context, _ ...memproxy.PipelineOption) mem

pipe := &mocks.PipelineMock{}

pipe.LeaseGetFunc = func(key string, options memproxy.LeaseGetOptions) func() (memproxy.LeaseGetResponse, error) {
pipe.LeaseGetFunc = func(key string, options memproxy.LeaseGetOptions) memproxy.LeaseGetResult {
var resp memproxy.LeaseGetResponse

callFn := func() {
Expand Down Expand Up @@ -92,10 +93,10 @@ func (m *Memcache) Pipeline(_ context.Context, _ ...memproxy.PipelineOption) mem

calls = append(calls, callFn)

return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
doCalls()
return resp, nil
}
})
}

pipe.LeaseSetFunc = func(
Expand Down
38 changes: 20 additions & 18 deletions fake/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package fake

import (
"context"
"github.com/QuangTung97/memproxy"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"

"github.com/QuangTung97/memproxy"
)

func newPipelineTest() memproxy.Pipeline {
Expand All @@ -20,14 +22,14 @@ func TestPipeline(t *testing.T) {
fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: 1,
}, resp1)

resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseRejected,
Expand All @@ -43,7 +45,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn3 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp3, err := fn3()
resp3, err := fn3.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand All @@ -59,14 +61,14 @@ func TestPipeline(t *testing.T) {
fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
fn2 := pipe.LeaseGet("KEY02", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
CAS: 1,
}, resp1)

resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -80,7 +82,7 @@ func TestPipeline(t *testing.T) {

fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})

resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -95,7 +97,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseRejected,
Expand All @@ -115,7 +117,7 @@ func TestPipeline(t *testing.T) {
}, setResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -128,7 +130,7 @@ func TestPipeline(t *testing.T) {
defer pipe.Finish()

fn1 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -140,7 +142,7 @@ func TestPipeline(t *testing.T) {
assert.Equal(t, memproxy.DeleteResponse{}, delResp)

fn2 := pipe.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -153,7 +155,7 @@ func TestPipeline(t *testing.T) {
pipe1 := mc.Pipeline(context.Background())

fn1 := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp1, err := fn1()
resp1, err := fn1.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -165,7 +167,7 @@ func TestPipeline(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())
fn2 := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})
resp2, err := fn2()
resp2, err := fn2.Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusLeaseGranted,
Expand All @@ -179,7 +181,7 @@ func TestPipeline__Do_Finish(t *testing.T) {
mc := New()
pipe1 := mc.Pipeline(context.Background())

resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)

pipe1.LeaseSet("KEY01", []byte("data 01"), resp1.CAS, memproxy.LeaseSetOptions{})
Expand All @@ -188,7 +190,7 @@ func TestPipeline__Do_Finish(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())

resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand All @@ -201,7 +203,7 @@ func TestPipeline__Do_Finish(t *testing.T) {
mc := New()
pipe1 := mc.Pipeline(context.Background())

resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp1, err := pipe1.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)

pipe1.LeaseSet("KEY01", []byte("data 01"), resp1.CAS, memproxy.LeaseSetOptions{})
Expand All @@ -210,7 +212,7 @@ func TestPipeline__Do_Finish(t *testing.T) {

pipe2 := mc.Pipeline(context.Background())

resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{})()
resp2, err := pipe2.LeaseGet("KEY01", memproxy.LeaseGetOptions{}).Result()
assert.Equal(t, nil, err)
assert.Equal(t, memproxy.LeaseGetResponse{
Status: memproxy.LeaseGetStatusFound,
Expand Down
16 changes: 10 additions & 6 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,10 @@ type getState[T Value, K Key] struct {

it *Item[T, K]

retryCount int
keyStr string
leaseGetFunc func() (memproxy.LeaseGetResponse, error)
retryCount int
keyStr string

leaseGetResult memproxy.LeaseGetResult
}

func (s *getState[T, K]) setResponseError(err error) {
Expand Down Expand Up @@ -298,7 +299,10 @@ func (s *getState[T, K]) handleCacheError(err error) {
}

func (s *getState[T, K]) nextFunc() {
leaseGetResp, err := s.leaseGetFunc()
leaseGetResp, err := s.leaseGetResult.Result()

s.leaseGetResult = nil

if err != nil {
s.handleCacheError(err)
return
Expand Down Expand Up @@ -332,7 +336,7 @@ func (s *getState[T, K]) nextFunc() {
s.it.sess.AddDelayedCall(s.it.options.sleepDurations[s.retryCount], func() {
s.retryCount++

s.leaseGetFunc = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.leaseGetResult = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.it.sess.AddNextCall(s.nextFunc)
})
return
Expand Down Expand Up @@ -377,7 +381,7 @@ func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {
}
i.getKeys[key] = getResultType[T]{}

state.leaseGetFunc = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})
state.leaseGetResult = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})

i.sess.AddNextCall(state.nextFunc)

Expand Down
20 changes: 11 additions & 9 deletions item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/QuangTung97/memproxy"
"github.com/QuangTung97/memproxy/fake"
"github.com/QuangTung97/memproxy/mocks"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

type userValue struct {
Expand Down Expand Up @@ -167,26 +169,26 @@ func (i *itemTest) appendAction(action string) {
func (i *itemTest) stubLeaseGet(resp memproxy.LeaseGetResponse, err error) {
i.pipe.LeaseGetFunc = func(
key string, options memproxy.LeaseGetOptions,
) func() (memproxy.LeaseGetResponse, error) {
) memproxy.LeaseGetResult {
i.appendAction(leaseGetAction(key))
return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
i.appendAction(leaseGetFuncAction(key))
return resp, err
}
})
}
}

func (i *itemTest) stubLeaseGetMulti(respList ...memproxy.LeaseGetResponse) {
i.pipe.LeaseGetFunc = func(
key string, options memproxy.LeaseGetOptions,
) func() (memproxy.LeaseGetResponse, error) {
) memproxy.LeaseGetResult {
i.appendAction(leaseGetAction(key))
index := len(i.pipe.LeaseGetCalls()) - 1

return func() (memproxy.LeaseGetResponse, error) {
return memproxy.LeaseGetResultFunc(func() (memproxy.LeaseGetResponse, error) {
i.appendAction(leaseGetFuncAction(key))
return respList[index], nil
}
})
}
}

Expand Down
28 changes: 27 additions & 1 deletion memproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,35 @@ type Memcache interface {
Close() error
}

// LeaseGetResult is the response of LeaseGet, method Result MUST only be called Once.
// Calling Result more than once is undefined behaviour
type LeaseGetResult interface {
Result() (LeaseGetResponse, error)
}

// LeaseGetErrorResult for error only result
type LeaseGetErrorResult struct {
Error error
}

// Result ...
func (r LeaseGetErrorResult) Result() (LeaseGetResponse, error) {
return LeaseGetResponse{}, r.Error
}

// LeaseGetResultFunc for function implementation of LeaseGetResult
type LeaseGetResultFunc func() (LeaseGetResponse, error)

// Result ...
func (f LeaseGetResultFunc) Result() (LeaseGetResponse, error) {
return f()
}

// Pipeline represents a generic Pipeline
type Pipeline interface {
LeaseGet(key string, options LeaseGetOptions) func() (LeaseGetResponse, error)
// LeaseGet should not be used directly, use the item or mmap package instead
LeaseGet(key string, options LeaseGetOptions) LeaseGetResult

LeaseSet(key string, data []byte, cas uint64, options LeaseSetOptions) func() (LeaseSetResponse, error)
Delete(key string, options DeleteOptions) func() (DeleteResponse, error)

Expand Down
Loading
Loading