Skip to content

Commit

Permalink
fix(core): allow requests to be queued in CONNECTING state (#374) (#588)
Browse files Browse the repository at this point in the history
With this patch, requests issued while the client is in the
'CONNECTING' state get queued instead of raising a misleading
'SessionExpiredError'.

This fixes #374, and brings
Kazoo more in line with the Java and C clients.

See the 'kazoo.client.KazooClient.state' documentation as well as
these discussions for more details:

#570 (comment)
#583 (comment)
  • Loading branch information
ztzg authored Mar 9, 2020
1 parent 933b38b commit a636d7a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 9 deletions.
21 changes: 21 additions & 0 deletions docs/api/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ Public API
A :class:`~kazoo.protocol.states.KazooState` attribute indicating
the current higher-level connection state.

.. note::

Up to version 2.6.1, requests could only be submitted
in the CONNECTED state. Requests submitted while
SUSPENDED would immediately raise a
:exc:`~kazoo.exceptions.SessionExpiredError`. This
was problematic, as sessions are usually recovered on
reconnect.

Kazoo now simply queues requests submitted in the
SUSPENDED state, expecting a recovery. This matches
the behavior of the Java and C clients.

Requests submitted in a LOST state still fail
immediately with the corresponding exception.

See:

* https://github.com/python-zk/kazoo/issues/374 and
* https://github.com/python-zk/kazoo/pull/570

.. autoclass:: TransactionRequest
:members:
:member-order: bysource
9 changes: 4 additions & 5 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,11 @@ def _safe_close(self):
"and wouldn't close after %s seconds" % timeout)

def _call(self, request, async_object):
"""Ensure there's an active connection and put the request in
the queue if there is.
"""Ensure the client is in CONNECTED or SUSPENDED state and put the
request in the queue if it is.
Returns False if the call short circuits due to AUTH_FAILED,
CLOSED, EXPIRED_SESSION or CONNECTING state.
CLOSED, or EXPIRED_SESSION state.
"""

Expand All @@ -599,8 +599,7 @@ def _call(self, request, async_object):
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
return False
elif self._state in (KeeperState.EXPIRED_SESSION,
KeeperState.CONNECTING):
elif self._state == KeeperState.EXPIRED_SESSION:
async_object.set_exception(SessionExpiredError())
return False

Expand Down
106 changes: 102 additions & 4 deletions kazoo/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import mock
from mock import patch
from nose import SkipTest
from nose.tools import eq_
from nose.tools import eq_, ok_, assert_not_equal
from nose.tools import raises

from kazoo.testing import KazooTestCase
Expand Down Expand Up @@ -492,9 +492,6 @@ def test_create_on_broken_connection(self):
self.assertRaises(AuthFailedError, client.create,
'/closedpath', b'bar')

client._state = KeeperState.CONNECTING
self.assertRaises(SessionExpiredError, client.create,
'/closedpath', b'bar')
client.stop()
client.close()

Expand Down Expand Up @@ -982,6 +979,107 @@ def test_update_host_list(self):
finally:
self.cluster[0].run()

# utility for test_request_queuing*
def _make_request_queuing_client(self):
from kazoo.client import KazooClient
server = self.cluster[0]
handler = self._makeOne()
# create a client with only one server in its list, and
# infinite retries
client = KazooClient(
hosts=server.address + self.client.chroot,
handler=handler,
connection_retry=dict(
max_tries=-1,
delay=0.1,
backoff=1,
max_jitter=0.0,
sleep_func=handler.sleep_func
)
)

return client, server

# utility for test_request_queuing*
def _request_queuing_common(self, client, server, path, expire_session):
ev_suspended = client.handler.event_object()
ev_connected = client.handler.event_object()

def listener(state):
if state == KazooState.SUSPENDED:
ev_suspended.set()
elif state == KazooState.CONNECTED:
ev_connected.set()
client.add_listener(listener)

# wait for the client to connect
client.start()

try:
# force the client to suspend
server.stop()

ev_suspended.wait(5)
ok_(ev_suspended.is_set())
ev_connected.clear()

# submit a request, expecting it to be queued
result = client.create_async(path)
assert_not_equal(len(client._queue), 0)
eq_(result.ready(), False)
eq_(client.state, KazooState.SUSPENDED)

# optionally cause a SessionExpiredError to occur by
# mangling the first byte of the session password.
if expire_session:
b0 = b'\x00'
if client._session_passwd[0] == 0:
b0 = b'\xff'
client._session_passwd = b0 + client._session_passwd[1:]
finally:
server.run()

# wait for the client to reconnect (either with a recovered
# session, or with a new one if expire_session was set)
ev_connected.wait(5)
ok_(ev_connected.is_set())

return result

def test_request_queuing_session_recovered(self):
path = "/" + uuid.uuid4().hex
client, server = self._make_request_queuing_client()

try:
result = self._request_queuing_common(
client=client,
server=server,
path=path,
expire_session=False
)

eq_(result.get(), path)
assert_not_equal(client.exists(path), None)
finally:
client.stop()

def test_request_queuing_session_expired(self):
path = "/" + uuid.uuid4().hex
client, server = self._make_request_queuing_client()

try:
result = self._request_queuing_common(
client=client,
server=server,
path=path,
expire_session=True
)

eq_(len(client._queue), 0)
self.assertRaises(SessionExpiredError, result.get)
finally:
client.stop()


dummy_dict = {
'aversion': 1, 'ctime': 0, 'cversion': 1,
Expand Down

0 comments on commit a636d7a

Please sign in to comment.