Skip to content

Commit

Permalink
Chore: close some debts after kafka-python merge (#962)
Browse files Browse the repository at this point in the history
* Drop isinstance hack (not needed anymore)
* Add 2.6 case to check_version
  • Loading branch information
ods authored Jan 18, 2024
1 parent 72c1969 commit 2cbeee6
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
9 changes: 5 additions & 4 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from aiokafka import __version__
from aiokafka.conn import collect_hosts, create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.admin import DescribeAclsRequest_v2
from aiokafka.protocol.admin import (
DescribeAclsRequest_v2, DescribeClientQuotasRequest_v0
)
from aiokafka.protocol.commit import OffsetFetchRequest
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.fetch import FetchRequest
Expand Down Expand Up @@ -421,7 +423,7 @@ async def _get_conn(
# XXX: earlier we only did an assert here, but it seems it's
# possible to get a leader that is for some reason not in
# metadata.
# I think requerying metadata should solve this problem
# I think requiring metadata should solve this problem
if broker is None:
raise StaleMetadata(
'Broker id %s not in current metadata' % node_id)
Expand Down Expand Up @@ -581,8 +583,7 @@ def _check_api_version_response(self, response):
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
# TODO Requires unreleased version of python-kafka
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 6, 0), DescribeClientQuotasRequest_v0),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
Expand Down
5 changes: 1 addition & 4 deletions aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,6 @@ def _can_retry(self, error, batch):
# as long as we set proper sequence, pid and epoch.
if self._sender._txn_manager is None and batch.expired():
return False
# XXX: remove unknown topic check as we fix
# https://github.com/dpkp/kafka-python/issues/1155
if error.retriable or isinstance(error, UnknownTopicOrPartitionError)\
or error is UnknownTopicOrPartitionError:
if error.retriable:
return True
return False
2 changes: 1 addition & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ async def test_send_request(self):
self.assertTrue(isinstance(resp, MetadataResponse))
await client.close()

@kafka_versions('<2.6') # FIXME Not implemented yet
@kafka_versions('<2.7') # FIXME Not implemented yet
@run_until_complete
async def test_check_version(self):
kafka_version = tuple(int(x) for x in self.kafka_version.split("."))
Expand Down

0 comments on commit 2cbeee6

Please sign in to comment.