Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

[feature]: new option WithPingCountWithinSessionTimeout #214

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,19 @@ type Conn struct {
sessionTimeoutMs int32 // session timeout in milliseconds
passwd []byte

dialer Dialer
hostProvider HostProvider
serverMu sync.Mutex // protects server
server string // remember the address/port of the current server
conn net.Conn
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
maxBufferSize int
dialer Dialer
hostProvider HostProvider
serverMu sync.Mutex // protects server
server string // remember the address/port of the current server
conn net.Conn
eventChan chan Event
eventCallback EventCallback // may be nil
shouldQuit chan struct{}
pingInterval time.Duration
pingCountSession uint32
recvTimeout time.Duration
connectTimeout time.Duration
maxBufferSize int

creds []authCreds
credsMu sync.Mutex // protects server
Expand Down Expand Up @@ -192,20 +193,21 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti

ec := make(chan Event, eventChanSize)
conn := &Conn{
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
dialer: net.DialTimeout,
hostProvider: &DNSHostProvider{},
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
pingCountSession: 5,
}

// Set provided options.
Expand Down Expand Up @@ -269,6 +271,15 @@ func WithEventCallback(cb EventCallback) connOption {
}
}

// WithPingCountWithinSessionTimeout returns a connection option that specifies the count of ping within session timeout
// ping's interval is sessionTimeout / pingCountSession
// If you have not specify it, it is 5, so ping interval is sessionTimeout / 5
func WithPingCountWithinSessionTimeout(pingCountSession uint32) connOption {
return func(c *Conn) {
c.pingCountSession = pingCountSession
}
}

// WithMaxBufferSize sets the maximum buffer size used to read and decode
// packets received from the Zookeeper server. The standard Zookeeper client for
// Java defaults to a limit of 1mb. For backwards compatibility, this Go client
Expand Down Expand Up @@ -337,8 +348,8 @@ func (c *Conn) SetLogger(l Logger) {
func (c *Conn) setTimeouts(sessionTimeoutMs int32) {
c.sessionTimeoutMs = sessionTimeoutMs
sessionTimeout := time.Duration(sessionTimeoutMs) * time.Millisecond
c.recvTimeout = sessionTimeout * 2 / 3
c.pingInterval = c.recvTimeout / 2
c.pingInterval = sessionTimeout / time.Duration(c.pingCountSession)
c.recvTimeout = c.pingInterval * 2
}

func (c *Conn) setState(state State) {
Expand Down