From cf2f94e06a5adfbbcdfaade2a10f643afa653670 Mon Sep 17 00:00:00 2001 From: ken Date: Tue, 9 Jul 2019 14:16:43 +0800 Subject: [PATCH] [feature]: new option WithPingCountWithinSessionTimeout --- zk/conn.go | 67 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/zk/conn.go b/zk/conn.go index f79a51b3..0e908a28 100644 --- a/zk/conn.go +++ b/zk/conn.go @@ -74,18 +74,19 @@ type Conn struct { sessionTimeoutMs int32 // session timeout in milliseconds passwd []byte - dialer Dialer - hostProvider HostProvider - serverMu sync.Mutex // protects server - server string // remember the address/port of the current server - conn net.Conn - eventChan chan Event - eventCallback EventCallback // may be nil - shouldQuit chan struct{} - pingInterval time.Duration - recvTimeout time.Duration - connectTimeout time.Duration - maxBufferSize int + dialer Dialer + hostProvider HostProvider + serverMu sync.Mutex // protects server + server string // remember the address/port of the current server + conn net.Conn + eventChan chan Event + eventCallback EventCallback // may be nil + shouldQuit chan struct{} + pingInterval time.Duration + pingCountSession uint32 + recvTimeout time.Duration + connectTimeout time.Duration + maxBufferSize int creds []authCreds credsMu sync.Mutex // protects server @@ -192,20 +193,21 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti ec := make(chan Event, eventChanSize) conn := &Conn{ - dialer: net.DialTimeout, - hostProvider: &DNSHostProvider{}, - conn: nil, - state: StateDisconnected, - eventChan: ec, - shouldQuit: make(chan struct{}), - connectTimeout: 1 * time.Second, - sendChan: make(chan *request, sendChanSize), - requests: make(map[int32]*request), - watchers: make(map[watchPathType][]chan Event), - passwd: emptyPassword, - logger: DefaultLogger, - logInfo: true, // default is true for backwards compatability - buf: make([]byte, bufferSize), + dialer: net.DialTimeout, + hostProvider: &DNSHostProvider{}, + conn: nil, + state: StateDisconnected, + eventChan: ec, + shouldQuit: make(chan struct{}), + connectTimeout: 1 * time.Second, + sendChan: make(chan *request, sendChanSize), + requests: make(map[int32]*request), + watchers: make(map[watchPathType][]chan Event), + passwd: emptyPassword, + logger: DefaultLogger, + logInfo: true, // default is true for backwards compatability + buf: make([]byte, bufferSize), + pingCountSession: 5, } // Set provided options. @@ -269,6 +271,15 @@ func WithEventCallback(cb EventCallback) connOption { } } +// WithPingCountWithinSessionTimeout returns a connection option that specifies the count of ping within session timeout +// ping's interval is sessionTimeout / pingCountSession +// If you have not specify it, it is 5, so ping interval is sessionTimeout / 5 +func WithPingCountWithinSessionTimeout(pingCountSession uint32) connOption { + return func(c *Conn) { + c.pingCountSession = pingCountSession + } +} + // WithMaxBufferSize sets the maximum buffer size used to read and decode // packets received from the Zookeeper server. The standard Zookeeper client for // Java defaults to a limit of 1mb. For backwards compatibility, this Go client @@ -337,8 +348,8 @@ func (c *Conn) SetLogger(l Logger) { func (c *Conn) setTimeouts(sessionTimeoutMs int32) { c.sessionTimeoutMs = sessionTimeoutMs sessionTimeout := time.Duration(sessionTimeoutMs) * time.Millisecond - c.recvTimeout = sessionTimeout * 2 / 3 - c.pingInterval = c.recvTimeout / 2 + c.pingInterval = sessionTimeout / time.Duration(c.pingCountSession) + c.recvTimeout = c.pingInterval * 2 } func (c *Conn) setState(state State) {