Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

Commit

Permalink
fix: operations hang if attempted after connection closed
Browse files Browse the repository at this point in the history
Operations hang forever if attempted after client connection is closed

Fixes: #125
  • Loading branch information
eahydra committed Feb 23, 2018
1 parent c4fab1a commit 98572ac
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
22 changes: 17 additions & 5 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,14 @@ func WithMaxConnBufferSize(maxBufferSize int) connOption {
}

func (c *Conn) Close() {
rc, err := c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil)
if err != nil {
return
}
close(c.shouldQuit)

select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-rc:
case <-time.After(time.Second):
}
}
Expand Down Expand Up @@ -933,7 +937,7 @@ func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
return ch
}

func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (<-chan response, error) {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
Expand All @@ -942,12 +946,20 @@ func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recv
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
c.sendChan <- rq
return rq.recvChan
select {
case c.sendChan <- rq:
return rq.recvChan, nil
case <-c.shouldQuit:
return nil, ErrClosing
}
}

func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
r := <-c.queueRequest(opcode, req, res, recvFunc)
rc, err := c.queueRequest(opcode, req, res, recvFunc)
if err != nil {
return 0, err
}
r := <-rc
return r.zxid, r.err
}

Expand Down
16 changes: 16 additions & 0 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,22 @@ func TestRequestFail(t *testing.T) {
}
}

func TestRequestFailAfterClosed(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
zk.Close()
_, _, err = zk.Get("/blah")
if err != ErrClosing {
t.Fatalf("unexpected err: %+v", err)
}
}
func TestSlowServer(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down

0 comments on commit 98572ac

Please sign in to comment.