Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from mesosphere/jdef/zkclose-1
Browse files Browse the repository at this point in the history
idempotent Close; queueRequest guards against deadlocks on closed connections
  • Loading branch information
jdef authored Feb 13, 2018
2 parents 695728a + 0858988 commit 449c45a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
37 changes: 30 additions & 7 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Conn struct {
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
shouldQuitOnce sync.Once
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
Expand Down Expand Up @@ -315,12 +316,14 @@ func WithMaxConnBufferSize(maxBufferSize int) connOption {
}

func (c *Conn) Close() {
close(c.shouldQuit)
c.shouldQuitOnce.Do(func() {
close(c.shouldQuit)

select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-time.After(time.Second):
}
select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-time.After(time.Second):
}
})
}

// State returns the current state of the connection.
Expand Down Expand Up @@ -977,10 +980,30 @@ func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recv
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvChan: make(chan response, 2),
recvFunc: recvFunc,
}
c.sendChan <- rq

switch opcode {
case opClose:
// always attempt to send close ops.
c.sendChan <- rq
default:
// otherwise avoid deadlocks for dumb clients who aren't aware that
// the ZK connection is closed yet.
select {
case <-c.shouldQuit:
rq.recvChan <- response{-1, ErrConnectionClosed}
case c.sendChan <- rq:
// check for a tie
select {
case <-c.shouldQuit:
// maybe the caller gets this, maybe not- we tried.
rq.recvChan <- response{-1, ErrConnectionClosed}
default:
}
}
}
return rq.recvChan
}

Expand Down
42 changes: 42 additions & 0 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,37 @@ func TestCreate(t *testing.T) {
}
}

func TestOpsAfterCloseDontDeadlock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
zk.Close()

path := "/gozk-test"

ch := make(chan struct{})
go func() {
defer close(ch)
for range make([]struct{}, 30) {
if _, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err == nil {
t.Fatal("Create did not return error")
}
}
}()
select {
case <-ch:
// expected
case <-time.After(10 * time.Second):
t.Fatal("ZK connection deadlocked when executing ops after a Close operation")
}
}

func TestMulti(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down Expand Up @@ -139,6 +170,7 @@ func TestIfAuthdataSurvivesReconnect(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
Expand Down Expand Up @@ -666,6 +698,16 @@ func TestRequestFail(t *testing.T) {
}
}

func TestIdempotentClose(t *testing.T) {
zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15)
if err != nil {
t.Fatal(err)
}
// multiple calls to Close() should not panic
zk.Close()
zk.Close()
}

func TestSlowServer(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down

0 comments on commit 449c45a

Please sign in to comment.