diff --git a/kombu/transport/redis_cluster.py b/kombu/transport/redis_cluster.py index d30f3eab4..4c44381d1 100644 --- a/kombu/transport/redis_cluster.py +++ b/kombu/transport/redis_cluster.py @@ -341,8 +341,6 @@ def _brpop_read(self, **options): resp = self.parse_response(conn, 'BRPOP', **options) except self.connection_errors: raise Empty() - except MovedError: - raise Empty() conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP if resp: @@ -350,9 +348,13 @@ def _brpop_read(self, **options): return True def _poll_error(self, cmd, conn, **options): - resp = self.parse_response(conn, 'BRPOP', **options) - if resp: - self.deliver_response(resp) + try: + resp = self.parse_response(conn, 'BRPOP', **options) + if resp: + self.deliver_response(resp) + except self.connection_errors as e: + # We should not throw error on this method to make kombu to continue operation + logger.error('Error while reading from Redis', extra={"e": e, "key": conn.key}) self.connection.cycle._unregister(self, self.client, conn, 'BRPOP') @@ -366,10 +368,14 @@ def parse_response(self, conn, cmd, **options): try: return conn.client.parse_response(conn.client.connection, cmd, **options) except Exception as e: - logger.warning('Error while reading from Redis', extra={"e": e, "key": conn.key}) + logger.error('Error while reading from Redis', extra={"e": e, "key": conn.key}) # Mostly copied from https://github.com/sendbird/redis-py/blob/master/redis/cluster.py#L1173 if isinstance(e, ConnectionError) or isinstance(e, TimeoutError): - self.client.nodes_manager.startup_nodes.pop(target_node.name, None) + try: + node = channel.client.get_node_from_key(conn.key) + self.client.nodes_manager.startup_nodes.pop(node.name, None) + except Exception as e: + logger.error('Error while removing node', extra={"e": e, "key": conn.key}) self.client.nodes_manager.initialize() elif isinstance(e, MovedError): self.client.reinitialize_counter += 1 @@ -403,7 +409,6 @@ class Transport(RedisTransport): driver_type = 'redis-cluster' driver_name = driver_type - connection_errors = RedisTransport.connection_errors + (RedisClusterException,) implements = virtual.Transport.implements.extend( asynchronous=True, exchange_type=frozenset(['direct']) @@ -418,3 +423,10 @@ def __init__(self, *args, **kwargs): def driver_version(self): return redis.__version__ + + def _get_errors(self): + connection_errors, channel_errors = super()._get_errors() + connection_errors += (RedisClusterException, ConnectionError, TimeoutError, MovedError, TryAgainError, ClusterDownError, SlotNotCoveredError, AskError) + + return connection_errors, channel_errors + diff --git a/t/integration/test_redis_cluster.py b/t/integration/test_redis_cluster.py index bbfcb7075..e6ebd802d 100644 --- a/t/integration/test_redis_cluster.py +++ b/t/integration/test_redis_cluster.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import queue import pytest import redis @@ -53,23 +54,50 @@ def patched_init(self, **kwargs): conn = kombu.Connection('rediss-cluster://:test_password@localhost:7000') conn.default_channel +def test_connectionerror(connection): + with connection as conn: + queue = conn.SimpleQueue('test_connectionerror') + queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + + original_parse_response = redis.Redis.parse_response + def parse_response(*args, **kwargs): + if args[2] == 'BRPOP': + raise redis.exceptions.ConnectionError() + else: + return original_parse_response(*args) + + with patch('redis.Redis.parse_response', parse_response): + try: + _ = queue.get(timeout=1) + except queue.Empty: + pass + except: + raise + def test_movederror(connection): with connection as conn: queue = conn.SimpleQueue('test_movederror') queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + original_parse_response = redis.Redis.parse_response + def parse_response(*args, **kwargs): - slot = 123 - r_host = 'nosuchhost' - r_port = 7001 + if args[2] == 'BRPOP': + slot = 123 + r_host = 'nosuchhost' + r_port = 7001 - raise MovedError(f"{slot} {r_host}:{r_port}") + raise MovedError(f"{slot} {r_host}:{r_port}") + else: + return original_parse_response(*args) with patch('redis.Redis.parse_response', parse_response): try: message = queue.get(timeout=1) - except Exception as e: + except queue.Empty: pass + except: + raise assert conn.default_channel.client.reinitialize_counter != 0 @@ -78,18 +106,25 @@ def test_askerror(connection): queue = conn.SimpleQueue('test_askerror') queue.put({'Hello': 'World'}, headers={'k1': 'v1'}) + original_parse_response = redis.Redis.parse_response + def parse_response(*args, **kwargs): - slot = 123 - r_host = 'nosuchhost' - r_port = 7001 + if args[2] == 'BRPOP': + slot = 123 + r_host = 'nosuchhost' + r_port = 7001 - raise AskError(f"{slot} {r_host}:{r_port}") + raise AskError(f"{slot} {r_host}:{r_port}") + else: + return original_parse_response(*args) with patch('redis.Redis.parse_response', parse_response): try: message = queue.get(timeout=1) - except Exception as e: + except queue.Empty: pass + except: + raise assert conn.default_channel.ask_errors.get('test_askerror') is not None @@ -100,7 +135,6 @@ def test_failed_connection__ConnectionError(self, invalid_connection): # method raises transport exception with pytest.raises(redis.exceptions.RedisClusterException) as ex: invalid_connection.connection - assert ex.type in Transport.connection_errors def test_many_queue(connection):