Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from mesosphere/mh/reauth
Browse files Browse the repository at this point in the history
Fix re-auth hang.
  • Loading branch information
vespian authored Jan 30, 2018
2 parents 1323b51 + b86dade commit 695728a
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 20 deletions.
113 changes: 93 additions & 20 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ type Conn struct {
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)

// Debug (for recurring re-auth hang) test
// These variables shouldn't be used or modified as part of normal
// operation.
// See `TestRecurringReAuthHang`
debugCloseRecvLoop int32
debugReauthDone chan struct{}

logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged

Expand Down Expand Up @@ -189,20 +196,21 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti

ec := make(chan Event, eventChanSize)
conn := &Conn{
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
debugReauthDone: make(chan struct{}),
}

// Set provided options.
Expand Down Expand Up @@ -297,13 +305,13 @@ func WithMaxBufferSize(maxBufferSize int) connOption {
}

// WithMaxConnBufferSize sets maximum buffer size used to send and encode
// packets to Zookeeper server. The standard Zookeepeer client for java defaults
// packets to Zookeeper server. The standard Zookeeper client for java defaults
// to a limit of 1mb. This option should be used for non-standard server setup
// where znode is bigger than default 1mb.
func WithMaxConnBufferSize(maxBufferSize int) connOption {
return func(c *Conn) {
c.buf = make([]byte, maxBufferSize)
}
return func(c *Conn) {
c.buf = make([]byte, maxBufferSize)
}
}

func (c *Conn) Close() {
Expand All @@ -325,6 +333,21 @@ func (c *Conn) SessionID() int64 {
return atomic.LoadInt64(&c.sessionID)
}

func (c *Conn) shouldDebugCloseRecvLoop() bool {
return (atomic.LoadInt32(&c.debugCloseRecvLoop) == 1)
}

func (c *Conn) setDebugCloseRecvLoop(v bool) {
var store int32
if v {
store = 1
} else {
store = 0
}

atomic.StoreInt32(&c.debugCloseRecvLoop, store)
}

// SetLogger sets the logger to be used for printing errors.
// Logger is an interface provided by this package.
func (c *Conn) SetLogger(l Logger) {
Expand Down Expand Up @@ -389,6 +412,17 @@ func (c *Conn) connect() error {
}

func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
shouldCancel := func() bool {
select {
case <-c.shouldQuit:
return true
case <-c.closeChan:
return true
default:
return false
}
}

c.credsMu.Lock()
defer c.credsMu.Unlock()

Expand All @@ -400,6 +434,10 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
}

for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel re-submitting credentials")
return
}
resChan, err := c.sendRequest(
opSetAuth,
&setAuthRequest{Type: 0,
Expand All @@ -415,7 +453,16 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
continue
}

res := <-resChan
var res response
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials, cannot read")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
Expand Down Expand Up @@ -476,6 +523,23 @@ func (c *Conn) loop() {
wg.Add(1)
go func() {
<-reauthChan
// This condition exists for signaling purposes, that the test
// `TestRecurringReAuthHang` was successful. The previous call
// `<-reauthChan` did not block. That means the
// `resendZkAuth` didn't block even on read loop error.
// See `TestRecurringReAuthHang`
if c.shouldDebugCloseRecvLoop() {
// It is possible that during the test the ZK conn will try
// to reconnect multiple times before cleanly closing the
// test. This select here is to prevent closing
// `c.debugReauthDone` channel twice during the test and
// panic.
select {
case <-c.debugReauthDone:
default:
close(c.debugReauthDone)
}
}
err := c.sendLoop()
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
Expand All @@ -486,7 +550,16 @@ func (c *Conn) loop() {

wg.Add(1)
go func() {
err := c.recvLoop(c.conn)
var err error
// For purposes of testing recurring resendZkAuth we'll
// simulate error on read loop, which should force whole
// IO loop to close.
// See `TestRecurringReAuthHang`
if c.shouldDebugCloseRecvLoop() {
err = errors.New("DEBUG: close recv loop")
} else {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
}
Expand Down
65 changes: 65 additions & 0 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zk

import (
"context"
"io/ioutil"
"testing"
"time"
)

func TestRecurringReAuthHang(t *testing.T) {
zkC, err := StartTestCluster(3, ioutil.Discard, ioutil.Discard)
if err != nil {
t.Fatal(err)
}
defer zkC.Stop()

conn, evtC, err := zkC.ConnectAll()
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.State() != StateHasSession {
time.Sleep(50 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to connect to ZK")
default:
}
}

go func() {
for range evtC {
}
}()

// Add auth.
conn.credsMu.Lock()
conn.creds = append(conn.creds, authCreds{"digest", []byte("test:test")})
conn.credsMu.Unlock()

currentServer := conn.Server()
conn.setDebugCloseRecvLoop(true)
zkC.StopServer(currentServer)

// wait connect to new zookeeper.
ctx, cancel = context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.Server() == currentServer && conn.State() != StateHasSession {
time.Sleep(100 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to reconnect ZK next server")
default:
}
}

<-conn.debugReauthDone
conn.Close()
}

0 comments on commit 695728a

Please sign in to comment.