Skip to content

Commit

Permalink
Merge pull request #10 from sendbird/fix/nameerror
Browse files Browse the repository at this point in the history
Fix error on connection error
  • Loading branch information
dlunch authored Jul 20, 2023
2 parents 2c5fb32 + 5e7c75c commit 41f0a65
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 19 deletions.
28 changes: 20 additions & 8 deletions kombu/transport/redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,20 @@ def _brpop_read(self, **options):
resp = self.parse_response(conn, 'BRPOP', **options)
except self.connection_errors:
raise Empty()
except MovedError:
raise Empty()
conn.client.connection.send_command('BRPOP', conn.key, conn.timeout) # schedule next BRPOP

if resp:
self.deliver_response(resp)
return True

def _poll_error(self, cmd, conn, **options):
resp = self.parse_response(conn, 'BRPOP', **options)
if resp:
self.deliver_response(resp)
try:
resp = self.parse_response(conn, 'BRPOP', **options)
if resp:
self.deliver_response(resp)
except self.connection_errors as e:
# We should not throw error on this method to make kombu to continue operation
logger.error('Error while reading from Redis', extra={"e": e, "key": conn.key})

self.connection.cycle._unregister(self, self.client, conn, 'BRPOP')

Expand All @@ -366,10 +368,14 @@ def parse_response(self, conn, cmd, **options):
try:
return conn.client.parse_response(conn.client.connection, cmd, **options)
except Exception as e:
logger.warning('Error while reading from Redis', extra={"e": e, "key": conn.key})
logger.error('Error while reading from Redis', extra={"e": e, "key": conn.key})
# Mostly copied from https://github.com/sendbird/redis-py/blob/master/redis/cluster.py#L1173
if isinstance(e, ConnectionError) or isinstance(e, TimeoutError):
self.client.nodes_manager.startup_nodes.pop(target_node.name, None)
try:
node = channel.client.get_node_from_key(conn.key)
self.client.nodes_manager.startup_nodes.pop(node.name, None)
except Exception as e:
logger.error('Error while removing node', extra={"e": e, "key": conn.key})
self.client.nodes_manager.initialize()
elif isinstance(e, MovedError):
self.client.reinitialize_counter += 1
Expand Down Expand Up @@ -403,7 +409,6 @@ class Transport(RedisTransport):

driver_type = 'redis-cluster'
driver_name = driver_type
connection_errors = RedisTransport.connection_errors + (RedisClusterException,)

implements = virtual.Transport.implements.extend(
asynchronous=True, exchange_type=frozenset(['direct'])
Expand All @@ -418,3 +423,10 @@ def __init__(self, *args, **kwargs):

def driver_version(self):
return redis.__version__

def _get_errors(self):
connection_errors, channel_errors = super()._get_errors()
connection_errors += (RedisClusterException, ConnectionError, TimeoutError, MovedError, TryAgainError, ClusterDownError, SlotNotCoveredError, AskError)

return connection_errors, channel_errors

56 changes: 45 additions & 11 deletions t/integration/test_redis_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import queue

import pytest
import redis
Expand Down Expand Up @@ -53,23 +54,50 @@ def patched_init(self, **kwargs):
conn = kombu.Connection('rediss-cluster://:test_password@localhost:7000')
conn.default_channel

def test_connectionerror(connection):
with connection as conn:
queue = conn.SimpleQueue('test_connectionerror')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})

original_parse_response = redis.Redis.parse_response
def parse_response(*args, **kwargs):
if args[2] == 'BRPOP':
raise redis.exceptions.ConnectionError()
else:
return original_parse_response(*args)

with patch('redis.Redis.parse_response', parse_response):
try:
_ = queue.get(timeout=1)
except queue.Empty:
pass
except:
raise

def test_movederror(connection):
with connection as conn:
queue = conn.SimpleQueue('test_movederror')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})

original_parse_response = redis.Redis.parse_response

def parse_response(*args, **kwargs):
slot = 123
r_host = 'nosuchhost'
r_port = 7001
if args[2] == 'BRPOP':
slot = 123
r_host = 'nosuchhost'
r_port = 7001

raise MovedError(f"{slot} {r_host}:{r_port}")
raise MovedError(f"{slot} {r_host}:{r_port}")
else:
return original_parse_response(*args)

with patch('redis.Redis.parse_response', parse_response):
try:
message = queue.get(timeout=1)
except Exception as e:
except queue.Empty:
pass
except:
raise
assert conn.default_channel.client.reinitialize_counter != 0


Expand All @@ -78,18 +106,25 @@ def test_askerror(connection):
queue = conn.SimpleQueue('test_askerror')
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})

original_parse_response = redis.Redis.parse_response

def parse_response(*args, **kwargs):
slot = 123
r_host = 'nosuchhost'
r_port = 7001
if args[2] == 'BRPOP':
slot = 123
r_host = 'nosuchhost'
r_port = 7001

raise AskError(f"{slot} {r_host}:{r_port}")
raise AskError(f"{slot} {r_host}:{r_port}")
else:
return original_parse_response(*args)

with patch('redis.Redis.parse_response', parse_response):
try:
message = queue.get(timeout=1)
except Exception as e:
except queue.Empty:
pass
except:
raise
assert conn.default_channel.ask_errors.get('test_askerror') is not None


Expand All @@ -100,7 +135,6 @@ def test_failed_connection__ConnectionError(self, invalid_connection):
# method raises transport exception
with pytest.raises(redis.exceptions.RedisClusterException) as ex:
invalid_connection.connection
assert ex.type in Transport.connection_errors


def test_many_queue(connection):
Expand Down

0 comments on commit 41f0a65

Please sign in to comment.