diff --git a/aiokafka/client.py b/aiokafka/client.py index 26436fec..f26655e6 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -7,7 +7,9 @@ from aiokafka import __version__ from aiokafka.conn import collect_hosts, create_conn, CloseReason from aiokafka.cluster import ClusterMetadata -from aiokafka.protocol.admin import DescribeAclsRequest_v2 +from aiokafka.protocol.admin import ( + DescribeAclsRequest_v2, DescribeClientQuotasRequest_v0 +) from aiokafka.protocol.commit import OffsetFetchRequest from aiokafka.protocol.coordination import FindCoordinatorRequest from aiokafka.protocol.fetch import FetchRequest @@ -421,7 +423,7 @@ async def _get_conn( # XXX: earlier we only did an assert here, but it seems it's # possible to get a leader that is for some reason not in # metadata. - # I think requerying metadata should solve this problem + # I think requiring metadata should solve this problem if broker is None: raise StaleMetadata( 'Broker id %s not in current metadata' % node_id) @@ -581,8 +583,7 @@ def _check_api_version_response(self, response): # in descending order. As soon as we find one that works, return it test_cases = [ # format (, ) - # TODO Requires unreleased version of python-kafka - # ((2, 6, 0), DescribeClientQuotasRequest[0]), + ((2, 6, 0), DescribeClientQuotasRequest_v0), ((2, 5, 0), DescribeAclsRequest_v2), ((2, 4, 0), ProduceRequest[8]), ((2, 3, 0), FetchRequest[11]), diff --git a/aiokafka/producer/sender.py b/aiokafka/producer/sender.py index bc6c8f4e..b258340c 100644 --- a/aiokafka/producer/sender.py +++ b/aiokafka/producer/sender.py @@ -807,9 +807,6 @@ def _can_retry(self, error, batch): # as long as we set proper sequence, pid and epoch. if self._sender._txn_manager is None and batch.expired(): return False - # XXX: remove unknown topic check as we fix - # https://github.com/dpkp/kafka-python/issues/1155 - if error.retriable or isinstance(error, UnknownTopicOrPartitionError)\ - or error is UnknownTopicOrPartitionError: + if error.retriable: return True return False diff --git a/tests/test_client.py b/tests/test_client.py index f17ca673..083b28b3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -289,7 +289,7 @@ async def test_send_request(self): self.assertTrue(isinstance(resp, MetadataResponse)) await client.close() - @kafka_versions('<2.6') # FIXME Not implemented yet + @kafka_versions('<2.7') # FIXME Not implemented yet @run_until_complete async def test_check_version(self): kafka_version = tuple(int(x) for x in self.kafka_version.split("."))