From cad3a5c475c53c612cc68c24d374b672c94e1526 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 3 Nov 2020 17:31:54 -0500 Subject: [PATCH 01/14] Implement KIP-345 in aiokafka --- aiokafka/client.py | 2 + aiokafka/consumer/assignors.py | 23 ++++ aiokafka/consumer/consumer.py | 5 + aiokafka/consumer/group_coordinator.py | 158 ++++++++++++++++--------- aiokafka/errors.py | 8 +- aiokafka/protocol/group.py | 75 ++++++++++++ 6 files changed, 214 insertions(+), 57 deletions(-) create mode 100644 aiokafka/consumer/assignors.py create mode 100644 aiokafka/protocol/group.py diff --git a/aiokafka/client.py b/aiokafka/client.py index c1bdf6d1..d7999eb2 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -27,6 +27,7 @@ from aiokafka.util import ( create_future, create_task, parse_kafka_version, get_running_loop ) +from aiokafka.protocol.group import SyncGroupRequest __all__ = ['AIOKafkaClient'] @@ -587,6 +588,7 @@ def _check_api_version_response(self, response): # ((2, 6, 0), DescribeClientQuotasRequest[0]), ((2, 5, 0), DescribeAclsRequest_v2), ((2, 4, 0), ProduceRequest[8]), + ((2, 3, 0), SyncGroupRequest[3]), ((2, 3, 0), FetchRequest[11]), ((2, 2, 0), OffsetRequest[5]), ((2, 1, 0), FetchRequest[10]), diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py new file mode 100644 index 00000000..dfde2874 --- /dev/null +++ b/aiokafka/consumer/assignors.py @@ -0,0 +1,23 @@ +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +import abc + +class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): + """ + Abstract assignor implementation that also supports static assignments (KIP-345) + """ + + + @abc.abstractmethod + def assign(self, cluster, members, member_group_instance_ids): + """Perform group assignment given cluster metadata, member subscriptions + and group_instance_ids + Arguments: + cluster (ClusterMetadata): metadata for use in assignment + members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + mmember_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + Returns: + dict: {member_id: MemberAssignment} + """ + pass \ No newline at end of file diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 2fa2376d..b3a7cb58 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -65,6 +65,8 @@ class AIOKafkaConsumer: committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str or None): name of the group instance ID used for + static membership (KIP-345) key_deserializer (Callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (Callable, Optional): Any callable that takes a @@ -228,6 +230,7 @@ def __init__(self, *topics, loop=None, bootstrap_servers='localhost', client_id='aiokafka-' + __version__, group_id=None, + group_instance_id=None, key_deserializer=None, value_deserializer=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, @@ -290,6 +293,7 @@ def __init__(self, *topics, loop=None, sasl_oauth_token_provider=sasl_oauth_token_provider) self._group_id = group_id + self._group_instance_id = group_instance_id self._heartbeat_interval_ms = heartbeat_interval_ms self._session_timeout_ms = session_timeout_ms self._retry_backoff_ms = retry_backoff_ms @@ -381,6 +385,7 @@ async def start(self): self._coordinator = GroupCoordinator( self._client, self._subscription, group_id=self._group_id, + group_instance_id=self._group_instance_id, heartbeat_interval_ms=self._heartbeat_interval_ms, session_timeout_ms=self._session_timeout_ms, retry_backoff_ms=self._retry_backoff_ms, diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 8d244288..c5a80dd2 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -9,13 +9,15 @@ from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest) -from kafka.protocol.group import ( +from aiokafka.protocol.group import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition from aiokafka.client import ConnectionGroup, CoordinationType from aiokafka.util import create_future, create_task +from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor + log = logging.getLogger(__name__) @@ -210,6 +212,7 @@ class GroupCoordinator(BaseCoordinator): def __init__(self, client, subscription, *, group_id='aiokafka-default-group', + group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, enable_auto_commit=True, auto_commit_interval_ms=5000, @@ -241,6 +244,7 @@ def __init__(self, client, subscription, *, self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = group_id + self._group_instance_id = group_instance_id self.coordinator_id = None # Coordination flags and futures @@ -346,9 +350,11 @@ def maybe_leave_group(self): return task async def _maybe_leave_group(self): - if self.generation > 0: + if self.generation > 0 and self._group_instance_id is None: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. + # Note: do not send this leave request if we are running in static + # partition assignment mode (when group_instance_id has been set). version = 0 if self._client.api_version < (0, 11, 0) else 1 request = LeaveGroupRequest[version](self.group_id, self.member_id) try: @@ -402,8 +408,10 @@ async def _perform_assignment( 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + group_instance_id_mapping = {} + for member_id, group_instance_id, metadata_bytes in members: metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) + group_instance_id_mapping = group_instance_id member_metadata[member_id] = metadata all_subscribed_topics.update(metadata.subscription) @@ -421,7 +429,12 @@ async def _perform_assignment( " with subscriptions %s", self.group_id, assignor.name, member_metadata) - assignments = assignor.assign(self._cluster, member_metadata) + assignments = None + if isinstance(assignor, AbstractStaticPartitionAssignor) : + assignments = assignor.assign(self._cluster, member_metadata, + group_instance_id_mapping) + else : + assignments = assignor.assign(self._cluster, member_metadata) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) @@ -1203,46 +1216,63 @@ async def perform_group_join(self): metadata = metadata.encode() group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) + # for KIP-394 we may have to send a second join request + try_join = True + while (try_join) : + try_join = False + + if self._api_version < (0, 10, 1) : + request = JoinGroupRequest[0]( + self.group_id, + self._session_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + elif self._api_version < (0, 11, 0) : + request = JoinGroupRequest[1]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + elif self._api_version < (2, 3, 0) : + request = JoinGroupRequest[2]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + else : + request = JoinGroupRequest[3]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + self._coordinator._group_instance_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list) + + # create the request for the coordinator + log.debug("Sending JoinGroup (%s) to coordinator %s", + request, self.coordinator_id) + try : + response = await T(self._coordinator._send_req)(request) + except Errors.KafkaError : + # Return right away. It's a connection error, so backoff will be + # handled by coordinator lookup + return None + if not self._subscription.active : + # Subscription changed. Ignore response and restart group join + return None + + error_type = Errors.for_code(response.error_code) + + if error_type is Errors.MemberIdRequired : + self._coordinator.member_id = response.member_id + try_join = True - if self._api_version < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self._session_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - else: - request = JoinGroupRequest[2]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - - # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", - request, self.coordinator_id) - try: - response = await self._coordinator._send_req(request) - except Errors.KafkaError: - # Return right away. It's a connection error, so backoff will be - # handled by coordinator lookup - return None - - if not self._subscription.active: - # Subscription changed. Ignore response and restart group join - return None - - error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Join group response %s", response) self._coordinator.member_id = response.member_id @@ -1295,18 +1325,26 @@ async def perform_group_join(self): err = error_type() log.error( "Unexpected error in join group '%s' response: %s", - self.group_id, err) + self.group_id, error_type) raise Errors.KafkaError(repr(err)) return None async def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - []) + if self._api_version < (2, 3, 0) : + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + []) + else : + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + []) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -1338,12 +1376,20 @@ async def _on_join_leader(self, response): assignment = assignment.encode() assignment_req.append((member_id, assignment)) - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - assignment_req) + if self._api_version < (2, 3, 0) : + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + assignment_req) + else : + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + assignment_req) log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", diff --git a/aiokafka/errors.py b/aiokafka/errors.py index c000369b..886b151d 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -413,7 +413,13 @@ class ListenerNotFound(BrokerResponseError): 'There is no listener on the leader broker that matches the' ' listener on which metadata request was processed' ) - +class MemberIdRequired(BrokerResponseError): + errno = 79 + message = 'MEMBER_ID_REQUIRED' + description = ( + 'Consumer needs to have a valid member ' + 'id before actually entering group' + ) def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py new file mode 100644 index 00000000..e2a03d5b --- /dev/null +++ b/aiokafka/protocol/group.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.struct import Struct +from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.protocol.group import * + +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + + +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('rebalance_timeout', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))) + ) + UNKNOWN_MEMBER_ID = '' + + +JoinGroupRequest = [ + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v5 +] +JoinGroupResponse = [ + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupRequest_v2, JoinGroupResponse_v5 +] + + +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = SyncGroupResponse_v1.SCHEMA + + +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3] \ No newline at end of file From ef9d8842d4d082f895fefed0e4366d167d123a75 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Wed, 25 Nov 2020 11:16:30 -0500 Subject: [PATCH 02/14] fixing linting errors --- aiokafka/errors.py | 178 ++++++++++++++++++------------------- aiokafka/protocol/group.py | 75 +++++++++------- 2 files changed, 130 insertions(+), 123 deletions(-) diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 886b151d..5605f25a 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -17,7 +17,6 @@ AuthenticationMethodNotSupported, AuthenticationFailedError, BrokerResponseError, - # Numbered errors NoError, # 0 UnknownError, # -1 @@ -65,7 +64,6 @@ InvalidRequestError, # 42 UnsupportedForMessageFormatError, # 43 PolicyViolationError, # 44 - KafkaUnavailableError, KafkaTimeoutError, KafkaConnectionError, @@ -74,9 +72,10 @@ __all__ = [ # aiokafka custom errors - "ConsumerStoppedError", "NoOffsetForPartitionError", "RecordTooLargeError", + "ConsumerStoppedError", + "NoOffsetForPartitionError", + "RecordTooLargeError", "ProducerClosed", - # Kafka Python errors "KafkaError", "IllegalStateError", @@ -93,7 +92,6 @@ "AuthenticationMethodNotSupported", "AuthenticationFailedError", "BrokerResponseError", - # Numbered errors "NoError", # 0 "UnknownError", # -1 @@ -141,7 +139,6 @@ "InvalidRequestError", # 42 "UnsupportedForMessageFormatError", # 43 "PolicyViolationError", # 44 - "KafkaUnavailableError", "KafkaTimeoutError", "KafkaConnectionError", @@ -201,230 +198,229 @@ class ProducerFenced(KafkaError): def __init__( self, msg="There is a newer producer using the same transactional_id or" - "transaction timeout occurred (check that processing time is " - "below transaction_timeout_ms)" + "transaction timeout occurred (check that processing time is " + "below transaction_timeout_ms)", ): super().__init__(msg) class OutOfOrderSequenceNumber(BrokerResponseError): errno = 45 - message = 'OUT_OF_ORDER_SEQUENCE_NUMBER' - description = 'The broker received an out of order sequence number' + message = "OUT_OF_ORDER_SEQUENCE_NUMBER" + description = "The broker received an out of order sequence number" class DuplicateSequenceNumber(BrokerResponseError): errno = 46 - message = 'DUPLICATE_SEQUENCE_NUMBER' - description = 'The broker received a duplicate sequence number' + message = "DUPLICATE_SEQUENCE_NUMBER" + description = "The broker received a duplicate sequence number" class InvalidProducerEpoch(BrokerResponseError): errno = 47 - message = 'INVALID_PRODUCER_EPOCH' + message = "INVALID_PRODUCER_EPOCH" description = ( - 'Producer attempted an operation with an old epoch. Either ' - 'there is a newer producer with the same transactionalId, or the ' - 'producer\'s transaction has been expired by the broker.' + "Producer attempted an operation with an old epoch. Either " + "there is a newer producer with the same transactionalId, or the " + "producer's transaction has been expired by the broker." ) class InvalidTxnState(BrokerResponseError): errno = 48 - message = 'INVALID_TXN_STATE' - description = ( - 'The producer attempted a transactional operation in an invalid state' - ) + message = "INVALID_TXN_STATE" + description = "The producer attempted a transactional operation in an invalid state" class InvalidProducerIdMapping(BrokerResponseError): errno = 49 - message = 'INVALID_PRODUCER_ID_MAPPING' + message = "INVALID_PRODUCER_ID_MAPPING" description = ( - 'The producer attempted to use a producer id which is not currently ' - 'assigned to its transactional id' + "The producer attempted to use a producer id which is not currently " + "assigned to its transactional id" ) class InvalidTransactionTimeout(BrokerResponseError): errno = 50 - message = 'INVALID_TRANSACTION_TIMEOUT' + message = "INVALID_TRANSACTION_TIMEOUT" description = ( - 'The transaction timeout is larger than the maximum value allowed by' - ' the broker (as configured by transaction.max.timeout.ms).' + "The transaction timeout is larger than the maximum value allowed by" + " the broker (as configured by transaction.max.timeout.ms)." ) class ConcurrentTransactions(BrokerResponseError): errno = 51 - message = 'CONCURRENT_TRANSACTIONS' + message = "CONCURRENT_TRANSACTIONS" description = ( - 'The producer attempted to update a transaction while another ' - 'concurrent operation on the same transaction was ongoing' + "The producer attempted to update a transaction while another " + "concurrent operation on the same transaction was ongoing" ) class TransactionCoordinatorFenced(BrokerResponseError): errno = 52 - message = 'TRANSACTION_COORDINATOR_FENCED' + message = "TRANSACTION_COORDINATOR_FENCED" description = ( - 'Indicates that the transaction coordinator sending a WriteTxnMarker' - ' is no longer the current coordinator for a given producer' + "Indicates that the transaction coordinator sending a WriteTxnMarker" + " is no longer the current coordinator for a given producer" ) class TransactionalIdAuthorizationFailed(BrokerResponseError): errno = 53 - message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED' - description = 'Transactional Id authorization failed' + message = "TRANSACTIONAL_ID_AUTHORIZATION_FAILED" + description = "Transactional Id authorization failed" class SecurityDisabled(BrokerResponseError): errno = 54 - message = 'SECURITY_DISABLED' - description = 'Security features are disabled' + message = "SECURITY_DISABLED" + description = "Security features are disabled" class OperationNotAttempted(BrokerResponseError): errno = 55 - message = 'OPERATION_NOT_ATTEMPTED' + message = "OPERATION_NOT_ATTEMPTED" description = ( - 'The broker did not attempt to execute this operation. This may happen' - ' for batched RPCs where some operations in the batch failed, causing ' - 'the broker to respond without trying the rest.' + "The broker did not attempt to execute this operation. This may happen" + " for batched RPCs where some operations in the batch failed, causing " + "the broker to respond without trying the rest." ) class KafkaStorageError(BrokerResponseError): errno = 56 - message = 'KAFKA_STORAGE_ERROR' - description = ( - 'The user-specified log directory is not found in the broker config.' - ) + message = "KAFKA_STORAGE_ERROR" + description = "The user-specified log directory is not found in the broker config." class LogDirNotFound(BrokerResponseError): errno = 57 - message = 'LOG_DIR_NOT_FOUND' - description = ( - 'The user-specified log directory is not found in the broker config.' - ) + message = "LOG_DIR_NOT_FOUND" + description = "The user-specified log directory is not found in the broker config." class SaslAuthenticationFailed(BrokerResponseError): errno = 58 - message = 'SASL_AUTHENTICATION_FAILED' - description = 'SASL Authentication failed.' + message = "SASL_AUTHENTICATION_FAILED" + description = "SASL Authentication failed." class UnknownProducerId(BrokerResponseError): errno = 59 - message = 'UNKNOWN_PRODUCER_ID' + message = "UNKNOWN_PRODUCER_ID" description = ( - 'This exception is raised by the broker if it could not locate the ' - 'producer metadata associated with the producerId in question. This ' - 'could happen if, for instance, the producer\'s records were deleted ' - 'because their retention time had elapsed. Once the last records of ' - 'the producerId are removed, the producer\'s metadata is removed from' - ' the broker, and future appends by the producer will return this ' - 'exception.' + "This exception is raised by the broker if it could not locate the " + "producer metadata associated with the producerId in question. This " + "could happen if, for instance, the producer's records were deleted " + "because their retention time had elapsed. Once the last records of " + "the producerId are removed, the producer's metadata is removed from" + " the broker, and future appends by the producer will return this " + "exception." ) class ReassignmentInProgress(BrokerResponseError): errno = 60 - message = 'REASSIGNMENT_IN_PROGRESS' - description = 'A partition reassignment is in progress' + message = "REASSIGNMENT_IN_PROGRESS" + description = "A partition reassignment is in progress" class DelegationTokenAuthDisabled(BrokerResponseError): errno = 61 - message = 'DELEGATION_TOKEN_AUTH_DISABLED' - description = 'Delegation Token feature is not enabled' + message = "DELEGATION_TOKEN_AUTH_DISABLED" + description = "Delegation Token feature is not enabled" class DelegationTokenNotFound(BrokerResponseError): errno = 62 - message = 'DELEGATION_TOKEN_NOT_FOUND' - description = 'Delegation Token is not found on server.' + message = "DELEGATION_TOKEN_NOT_FOUND" + description = "Delegation Token is not found on server." class DelegationTokenOwnerMismatch(BrokerResponseError): errno = 63 - message = 'DELEGATION_TOKEN_OWNER_MISMATCH' - description = 'Specified Principal is not valid Owner/Renewer.' + message = "DELEGATION_TOKEN_OWNER_MISMATCH" + description = "Specified Principal is not valid Owner/Renewer." class DelegationTokenRequestNotAllowed(BrokerResponseError): errno = 64 - message = 'DELEGATION_TOKEN_REQUEST_NOT_ALLOWED' + message = "DELEGATION_TOKEN_REQUEST_NOT_ALLOWED" description = ( - 'Delegation Token requests are not allowed on PLAINTEXT/1-way SSL ' - 'channels and on delegation token authenticated channels.' + "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL " + "channels and on delegation token authenticated channels." ) class DelegationTokenAuthorizationFailed(BrokerResponseError): errno = 65 - message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED' - description = 'Delegation Token authorization failed.' + message = "DELEGATION_TOKEN_AUTHORIZATION_FAILED" + description = "Delegation Token authorization failed." class DelegationTokenExpired(BrokerResponseError): errno = 66 - message = 'DELEGATION_TOKEN_EXPIRED' - description = 'Delegation Token is expired.' + message = "DELEGATION_TOKEN_EXPIRED" + description = "Delegation Token is expired." class InvalidPrincipalType(BrokerResponseError): errno = 67 - message = 'INVALID_PRINCIPAL_TYPE' - description = 'Supplied principalType is not supported' + message = "INVALID_PRINCIPAL_TYPE" + description = "Supplied principalType is not supported" class NonEmptyGroup(BrokerResponseError): errno = 68 - message = 'NON_EMPTY_GROUP' - description = 'The group is not empty' + message = "NON_EMPTY_GROUP" + description = "The group is not empty" class GroupIdNotFound(BrokerResponseError): errno = 69 - message = 'GROUP_ID_NOT_FOUND' - description = 'The group id does not exist' + message = "GROUP_ID_NOT_FOUND" + description = "The group id does not exist" class FetchSessionIdNotFound(BrokerResponseError): errno = 70 - message = 'FETCH_SESSION_ID_NOT_FOUND' - description = 'The fetch session ID was not found' + message = "FETCH_SESSION_ID_NOT_FOUND" + description = "The fetch session ID was not found" class InvalidFetchSessionEpoch(BrokerResponseError): errno = 71 - message = 'INVALID_FETCH_SESSION_EPOCH' - description = 'The fetch session epoch is invalid' + message = "INVALID_FETCH_SESSION_EPOCH" + description = "The fetch session epoch is invalid" class ListenerNotFound(BrokerResponseError): errno = 72 - message = 'LISTENER_NOT_FOUND' + message = "LISTENER_NOT_FOUND" description = ( - 'There is no listener on the leader broker that matches the' - ' listener on which metadata request was processed' + "There is no listener on the leader broker that matches the" + " listener on which metadata request was processed" ) + + class MemberIdRequired(BrokerResponseError): errno = 79 - message = 'MEMBER_ID_REQUIRED' + message = "MEMBER_ID_REQUIRED" description = ( - 'Consumer needs to have a valid member ' - 'id before actually entering group' + "Consumer needs to have a valid member " "id before actually entering group" ) + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): - if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \ - obj != BrokerResponseError: + if ( + inspect.isclass(obj) + and issubclass(obj, BrokerResponseError) + and obj != BrokerResponseError + ): yield obj diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py index e2a03d5b..0e5dd749 100644 --- a/aiokafka/protocol/group.py +++ b/aiokafka/protocol/group.py @@ -5,47 +5,58 @@ from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String from kafka.protocol.group import * + class JoinGroupResponse_v5(Response): API_KEY = 11 API_VERSION = 5 SCHEMA = Schema( - ('throttle_time_ms', Int32), - ('error_code', Int16), - ('generation_id', Int32), - ('group_protocol', String('utf-8')), - ('leader_id', String('utf-8')), - ('member_id', String('utf-8')), - ('members', Array( - ('member_id', String('utf-8')), - ('group_instance_id', String('utf-8')), - ('member_metadata', Bytes))) + ("throttle_time_ms", Int32), + ("error_code", Int16), + ("generation_id", Int32), + ("group_protocol", String("utf-8")), + ("leader_id", String("utf-8")), + ("member_id", String("utf-8")), + ( + "members", + Array( + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("member_metadata", Bytes), + ), + ), ) - class JoinGroupRequest_v5(Request): API_KEY = 11 API_VERSION = 5 RESPONSE_TYPE = JoinGroupResponse_v5 SCHEMA = Schema( - ('group', String('utf-8')), - ('session_timeout', Int32), - ('rebalance_timeout', Int32), - ('member_id', String('utf-8')), - ('group_instance_id', String('utf-8')), - ('protocol_type', String('utf-8')), - ('group_protocols', Array( - ('protocol_name', String('utf-8')), - ('protocol_metadata', Bytes))) + ("group", String("utf-8")), + ("session_timeout", Int32), + ("rebalance_timeout", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("protocol_type", String("utf-8")), + ( + "group_protocols", + Array(("protocol_name", String("utf-8")), ("protocol_metadata", Bytes)), + ), ) - UNKNOWN_MEMBER_ID = '' + UNKNOWN_MEMBER_ID = "" JoinGroupRequest = [ - JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, JoinGroupRequest_v5 + JoinGroupRequest_v0, + JoinGroupRequest_v1, + JoinGroupRequest_v2, + JoinGroupRequest_v5, ] JoinGroupResponse = [ - JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupRequest_v2, JoinGroupResponse_v5 + JoinGroupResponse_v0, + JoinGroupResponse_v1, + JoinGroupRequest_v2, + JoinGroupResponse_v5, ] @@ -60,16 +71,16 @@ class SyncGroupRequest_v3(Request): API_VERSION = 3 RESPONSE_TYPE = SyncGroupResponse_v3 SCHEMA = Schema( - ('group', String('utf-8')), - ('generation_id', Int32), - ('member_id', String('utf-8')), - ('group_instance_id', String('utf-8')), - ('group_assignment', Array( - ('member_id', String('utf-8')), - ('member_metadata', Bytes))) + ("group", String("utf-8")), + ("generation_id", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ( + "group_assignment", + Array(("member_id", String("utf-8")), ("member_metadata", Bytes)), + ), ) - SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v3] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3] \ No newline at end of file +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3] From 0074dd1bfce3b7955d72835818571ecdd301030f Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Wed, 25 Nov 2020 11:40:15 -0500 Subject: [PATCH 03/14] fixing linting errors --- aiokafka/consumer/group_coordinator.py | 610 +++++++++++++++---------- 1 file changed, 358 insertions(+), 252 deletions(-) diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index c5a80dd2..b65b9c53 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -8,9 +8,14 @@ from kafka.coordinator.protocol import ConsumerProtocol from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, - OffsetFetchRequest_v1 as OffsetFetchRequest) + OffsetFetchRequest_v1 as OffsetFetchRequest, +) from aiokafka.protocol.group import ( - HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) + HeartbeatRequest, + JoinGroupRequest, + LeaveGroupRequest, + SyncGroupRequest, +) import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition @@ -24,10 +29,8 @@ UNKNOWN_OFFSET = -1 -class BaseCoordinator: - - def __init__(self, client, subscription, *, - exclude_internal_topics=True): +class BaseCoordinator(object): + def __init__(self, client, subscription, *, exclude_internal_topics=True): self._client = client self._exclude_internal_topics = exclude_internal_topics self._subscription = subscription @@ -47,16 +50,23 @@ def _handle_metadata_update(self, cluster): if subscription.subscribed_pattern.match(topic): topics.append(topic) - if subscription.subscription is None or \ - set(topics) != subscription.subscription.topics: + if ( + subscription.subscription is None + or set(topics) != subscription.subscription.topics + ): subscription.subscribe_from_pattern(topics) - if subscription.partitions_auto_assigned() and \ - self._group_subscription is not None: + if ( + subscription.partitions_auto_assigned() + and self._group_subscription is not None + ): metadata_snapshot = self._get_metadata_snapshot() if self._metadata_snapshot != metadata_snapshot: - log.info("Metadata for topic has changed from %s to %s. ", - self._metadata_snapshot, metadata_snapshot) + log.info( + "Metadata for topic has changed from %s to %s. ", + self._metadata_snapshot, + metadata_snapshot, + ) self._metadata_snapshot = metadata_snapshot self._on_metadata_change() @@ -87,8 +97,7 @@ class NoGroupCoordinator(BaseCoordinator): def __init__(self, *args, **kw): super().__init__(*args, **kw) # Reset all committed points, as the GroupCoordinator would - self._reset_committed_task = create_task( - self._reset_committed_routine()) + self._reset_committed_task = create_task(self._reset_committed_routine()) def _on_metadata_change(self): self.assign_all_partitions() @@ -140,14 +149,14 @@ async def _reset_committed_routine(self): for tp in assignment.requesting_committed(): tp_state = assignment.state_value(tp) - tp_state.update_committed( - OffsetAndMetadata(UNKNOWN_OFFSET, "")) + tp_state.update_committed(OffsetAndMetadata(UNKNOWN_OFFSET, "")) event_waiter = create_task(commit_refresh_needed.wait()) await asyncio.wait( [assignment.unassign_future, event_waiter], - return_when=asyncio.FIRST_COMPLETED) + return_when=asyncio.FIRST_COMPLETED, + ) if not event_waiter.done(): event_waiter.cancel() @@ -210,17 +219,23 @@ class GroupCoordinator(BaseCoordinator): org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java """ - def __init__(self, client, subscription, *, - group_id='aiokafka-default-group', - group_instance_id=None, - session_timeout_ms=10000, heartbeat_interval_ms=3000, - retry_backoff_ms=100, - enable_auto_commit=True, auto_commit_interval_ms=5000, - assignors=(RoundRobinPartitionAssignor,), - exclude_internal_topics=True, - max_poll_interval_ms=300000, - rebalance_timeout_ms=30000 - ): + def __init__( + self, + client, + subscription, + *, + group_id="aiokafka-default-group", + group_instance_id=None, + session_timeout_ms=10000, + heartbeat_interval_ms=3000, + retry_backoff_ms=100, + enable_auto_commit=True, + auto_commit_interval_ms=5000, + assignors=(RoundRobinPartitionAssignor,), + exclude_internal_topics=True, + max_poll_interval_ms=300000, + rebalance_timeout_ms=30000 + ): """Initialize the coordination manager. Parameters (see AIOKafkaConsumer) @@ -229,8 +244,8 @@ def __init__(self, client, subscription, *, self._group_subscription = None super().__init__( - client, subscription, - exclude_internal_topics=exclude_internal_topics) + client, subscription, exclude_internal_topics=exclude_internal_topics + ) self._session_timeout_ms = session_timeout_ms self._heartbeat_interval_ms = heartbeat_interval_ms @@ -269,8 +284,9 @@ def __init__(self, client, subscription, *, # changed from user code by calling ``commit()``. self._commit_lock = asyncio.Lock() - self._next_autocommit_deadline = \ + self._next_autocommit_deadline = ( time.monotonic() + auto_commit_interval_ms / 1000 + ) # Will be set on close self._closing = create_future() @@ -287,11 +303,15 @@ async def _send_req(self, request): raise Errors.GroupCoordinatorNotAvailableError() try: resp = await self._client.send( - node_id, request, group=ConnectionGroup.COORDINATION) + node_id, request, group=ConnectionGroup.COORDINATION + ) except Errors.KafkaError as err: log.error( - 'Error sending %s to node %s [%s] -- marking coordinator dead', - request.__class__.__name__, node_id, err) + "Error sending %s to node %s [%s] -- marking coordinator dead", + request.__class__.__name__, + node_id, + err, + ) self.coordinator_dead() raise err return resp @@ -387,25 +407,27 @@ async def _on_join_prepare(self, previous_assignment): revoked = set() # execute the user's callback before rebalance - log.info("Revoking previously assigned partitions %s for group %s", - revoked, self.group_id) + log.info( + "Revoking previously assigned partitions %s for group %s", + revoked, + self.group_id, + ) if self._subscription.listener: try: - res = self._subscription.listener.on_partitions_revoked( - revoked) + res = self._subscription.listener.on_partitions_revoked(revoked) if asyncio.iscoroutine(res): await res except Exception: - log.exception("User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, self.group_id) - - async def _perform_assignment( - self, leader_id, assignment_strategy, members - ): + log.exception( + "User provided subscription listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.listener, + self.group_id, + ) + + async def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, \ - 'Invalid assignment protocol: %s' % assignment_strategy + assert assignor, "Invalid assignment protocol: %s" % assignment_strategy member_metadata = {} all_subscribed_topics = set() group_instance_id_mapping = {} @@ -425,18 +447,22 @@ async def _perform_assignment( # call) we should wait for it before performing assignment await self._client._maybe_wait_metadata() - log.debug("Performing assignment for group %s using strategy %s" - " with subscriptions %s", self.group_id, assignor.name, - member_metadata) + log.debug( + "Performing assignment for group %s using strategy %s" + " with subscriptions %s", + self.group_id, + assignor.name, + member_metadata, + ) assignments = None - if isinstance(assignor, AbstractStaticPartitionAssignor) : - assignments = assignor.assign(self._cluster, member_metadata, - group_instance_id_mapping) - else : + if isinstance(assignor, AbstractStaticPartitionAssignor): + assignments = assignor.assign( + self._cluster, member_metadata, group_instance_id_mapping + ) + else: assignments = assignor.assign(self._cluster, member_metadata) - log.debug("Finished assignment for group %s: %s", - self.group_id, assignments) + log.debug("Finished assignment for group %s: %s", self.group_id, assignments) # `set_topics()` will not trigger a metadata update if we only # removed some topics (client has all needed metadata already), @@ -449,14 +475,12 @@ async def _perform_assignment( return group_assignment async def _on_join_complete( - self, generation, member_id, protocol, - member_assignment_bytes + self, generation, member_id, protocol, member_assignment_bytes ): assignor = self._lookup_assignor(protocol) - assert assignor, 'invalid assignment protocol: %s' % protocol + assert assignor, "invalid assignment protocol: %s" % protocol - assignment = ConsumerProtocol.ASSIGNMENT.decode( - member_assignment_bytes) + assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) # update partition assignment self._subscription.assign_from_subscribed(assignment.partitions()) @@ -474,21 +498,24 @@ async def _on_join_complete( self.start_commit_offsets_refresh_task(subscription.assignment) assigned = set(self._subscription.assigned_partitions()) - log.info("Setting newly assigned partitions %s for group %s", - assigned, self.group_id) + log.info( + "Setting newly assigned partitions %s for group %s", assigned, self.group_id + ) # execute the user's callback after rebalance if self._subscription.listener: try: - res = self._subscription.listener.on_partitions_assigned( - assigned) + res = self._subscription.listener.on_partitions_assigned(assigned) if asyncio.iscoroutine(res): await res except Exception: - log.exception("User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, self.group_id, - assigned) + log.exception( + "User provided listener %s for group %s" + " failed on partition assignment: %s", + self._subscription.listener, + self.group_id, + assigned, + ) def coordinator_dead(self): """ Mark the current coordinator as dead. @@ -498,7 +525,9 @@ def coordinator_dead(self): if self.coordinator_id is not None: log.warning( "Marking the coordinator dead (node %s)for group %s.", - self.coordinator_id, self.group_id) + self.coordinator_id, + self.group_id, + ) self.coordinator_id = None self._coordinator_dead_fut.set_result(None) @@ -520,9 +549,7 @@ def need_rejoin(self, subscription): Returns: bool: True if consumer should rejoin group, False otherwise """ - return ( - subscription.assignment is None or self._rejoin_needed_fut.done() - ) + return subscription.assignment is None or self._rejoin_needed_fut.done() async def ensure_coordinator_known(self): """ Block until the coordinator for this group is known. @@ -534,9 +561,8 @@ async def ensure_coordinator_known(self): retry_backoff = self._retry_backoff_ms / 1000 while self.coordinator_id is None and not self._closing.done(): try: - coordinator_id = ( - await self._client.coordinator_lookup( - CoordinationType.GROUP, self.group_id) + coordinator_id = await self._client.coordinator_lookup( + CoordinationType.GROUP, self.group_id ) except Errors.GroupAuthorizationFailedError: err = Errors.GroupAuthorizationFailedError(self.group_id) @@ -553,15 +579,19 @@ async def ensure_coordinator_known(self): # Try to connect to confirm that the connection can be # established. ready = await self._client.ready( - coordinator_id, group=ConnectionGroup.COORDINATION) + coordinator_id, group=ConnectionGroup.COORDINATION + ) if not ready: await asyncio.sleep(retry_backoff) continue self.coordinator_id = coordinator_id self._coordinator_dead_fut = create_future() - log.info("Discovered coordinator %s for group %s", - self.coordinator_id, self.group_id) + log.info( + "Discovered coordinator %s for group %s", + self.coordinator_id, + self.group_id, + ) async def _coordination_routine(self): try: @@ -569,8 +599,7 @@ async def _coordination_routine(self): except asyncio.CancelledError: # pragma: no cover raise except Exception as exc: - log.error( - "Unexpected error in coordinator routine", exc_info=True) + log.error("Unexpected error in coordinator routine", exc_info=True) kafka_exc = Errors.KafkaError( f"Unexpected error during coordination {exc!r}") self._subscription.abort_waiters(kafka_exc) @@ -595,9 +624,9 @@ async def __coordination_routine(self): subscription = self._subscription.subscription if subscription is None: await asyncio.wait( - [self._subscription.wait_for_subscription(), - self._closing], - return_when=asyncio.FIRST_COMPLETED) + [self._subscription.wait_for_subscription(), self._closing], + return_when=asyncio.FIRST_COMPLETED, + ) if self._closing.done(): break subscription = self._subscription.subscription @@ -609,7 +638,8 @@ async def __coordination_routine(self): await self.ensure_coordinator_known() if auto_assigned and self.need_rejoin(subscription): new_assignment = await self.ensure_active_group( - subscription, assignment) + subscription, assignment + ) if new_assignment is None or not new_assignment.active: continue else: @@ -632,7 +662,8 @@ async def __coordination_routine(self): futures = [ self._closing, # Will exit fast if close() called self._coordinator_dead_fut, - subscription.unsubscribe_future] + subscription.unsubscribe_future, + ] # In case of manual assignment this future will be always set and # we don't want a heavy loop here. # NOTE: metadata changes are for partition count and pattern @@ -649,8 +680,8 @@ async def __coordination_routine(self): futures.append(self._commit_refresh_task) done, _ = await asyncio.wait( - futures, timeout=wait_timeout, - return_when=asyncio.FIRST_COMPLETED) + futures, timeout=wait_timeout, return_when=asyncio.FIRST_COMPLETED + ) # Handle exceptions in other background tasks for task in [self._heartbeat_task, self._commit_refresh_task]: @@ -756,17 +787,14 @@ async def _heartbeat_routine(self): # the session timeout has expired without seeing a successful # heartbeat, so we should probably make sure the coordinator # is still healthy. - log.error( - "Heartbeat session expired - marking coordinator dead") + log.error("Heartbeat session expired - marking coordinator dead") self.coordinator_dead() # If consumer is idle (no records consumed) for too long we need # to leave the group idle_time = self._subscription.fetcher_idle_time if idle_time < self._max_poll_interval: - sleep_time = min( - sleep_time, - self._max_poll_interval - idle_time) + sleep_time = min(sleep_time, self._max_poll_interval - idle_time) else: await self._maybe_leave_group() @@ -775,9 +803,11 @@ async def _heartbeat_routine(self): async def _do_heartbeat(self): version = 0 if self._client.api_version < (0, 11, 0) else 1 request = HeartbeatRequest[version]( - self.group_id, self.generation, self.member_id) - log.debug("Heartbeat: %s[%s] %s", - self.group_id, self.generation, self.member_id) + self.group_id, self.generation, self.member_id + ) + log.debug( + "Heartbeat: %s[%s] %s", self.group_id, self.generation, self.member_id + ) # _send_req may fail with error like `RequestTimedOutError` # we need to catch it so coordinator_routine won't fail @@ -789,20 +819,24 @@ async def _do_heartbeat(self): error_type = Errors.for_code(resp.error_code) if error_type is Errors.NoError: log.debug( - "Received successful heartbeat response for group %s", - self.group_id) + "Received successful heartbeat response for group %s", self.group_id + ) return True - if error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + if error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): log.warning( "Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", - self.group_id, self.coordinator_id) + self.group_id, + self.coordinator_id, + ) self.coordinator_dead() elif error_type is Errors.RebalanceInProgressError: log.warning( - "Heartbeat failed for group %s because it is rebalancing", - self.group_id) + "Heartbeat failed for group %s because it is rebalancing", self.group_id + ) self.request_rejoin() # it is valid to continue heartbeating while the group is # rebalancing. This ensures that the coordinator keeps the @@ -813,13 +847,15 @@ async def _do_heartbeat(self): return True elif error_type is Errors.IllegalGenerationError: log.warning( - "Heartbeat failed for group %s: generation id is not " - " current.", self.group_id) + "Heartbeat failed for group %s: generation id is not " " current.", + self.group_id, + ) self.reset_generation() elif error_type is Errors.UnknownMemberIdError: log.warning( "Heartbeat failed: local member_id was not recognized;" - " resetting and re-joining group") + " resetting and re-joining group" + ) self.reset_generation() elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) @@ -835,7 +871,8 @@ def start_commit_offsets_refresh_task(self, assignment): if self._commit_refresh_task is not None: self._commit_refresh_task.cancel() self._commit_refresh_task = create_task( - self._commit_refresh_routine(assignment)) + self._commit_refresh_routine(assignment) + ) async def _stop_commit_offsets_refresh_task(self): # The previous task should end after assignment changed @@ -855,22 +892,19 @@ async def _commit_refresh_routine(self, assignment): try: while assignment.active: commit_refresh_needed.clear() - success = await self._maybe_refresh_commit_offsets( - assignment) + success = await self._maybe_refresh_commit_offsets(assignment) wait_futures = [assignment.unassign_future] if not success: timeout = retry_backoff_ms else: timeout = None - event_waiter = create_task( - commit_refresh_needed.wait()) + event_waiter = create_task(commit_refresh_needed.wait()) wait_futures.append(event_waiter) await asyncio.wait( - wait_futures, - timeout=timeout, - return_when=asyncio.FIRST_COMPLETED) + wait_futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED + ) except asyncio.CancelledError: pass except Exception: @@ -886,27 +920,33 @@ async def _commit_refresh_routine(self, assignment): async def _do_rejoin_group(self, subscription): rebalance = CoordinatorGroupRebalance( - self, self.group_id, self.coordinator_id, - subscription, self._assignors, self._session_timeout_ms, - self._retry_backoff_ms) + self, + self.group_id, + self.coordinator_id, + subscription, + self._assignors, + self._session_timeout_ms, + self._retry_backoff_ms, + ) assignment = await rebalance.perform_group_join() if not subscription.active: - log.debug("Subscription changed during rebalance from %s to %s. " - "Rejoining group.", - subscription.topics, - self._subscription.topics) + log.debug( + "Subscription changed during rebalance from %s to %s. " + "Rejoining group.", + subscription.topics, + self._subscription.topics, + ) return False if assignment is None: # wait backoff and try again - await asyncio.sleep( - self._retry_backoff_ms / 1000) + await asyncio.sleep(self._retry_backoff_ms / 1000) return False protocol, member_assignment_bytes = assignment await self._on_join_complete( - self.generation, self.member_id, - protocol, member_assignment_bytes) + self.generation, self.member_id, protocol, member_assignment_bytes + ) return True async def _maybe_do_autocommit(self, assignment): @@ -919,13 +959,13 @@ async def _maybe_do_autocommit(self, assignment): try: async with self._commit_lock: await self._do_commit_offsets( - assignment, assignment.all_consumed_offsets()) + assignment, assignment.all_consumed_offsets() + ) except Errors.KafkaError as error: log.warning("Auto offset commit failed: %s", error) if self._is_commit_retriable(error): # Retry after backoff. - self._next_autocommit_deadline = \ - time.monotonic() + backoff + self._next_autocommit_deadline = time.monotonic() + backoff return backoff else: raise @@ -939,17 +979,19 @@ def _is_commit_retriable(self, error): # Java client raises CommitFailedError which is retriable and thus # masks those 3. We raise error that we got explicitly, so treat them # as retriable. - return error.retriable or isinstance(error, ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError - )) + return error.retriable or isinstance( + error, + ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ), + ) async def _maybe_do_last_autocommit(self, assignment): if not self._enable_auto_commit: return - await self.commit_offsets( - assignment, assignment.all_consumed_offsets()) + await self.commit_offsets(assignment, assignment.all_consumed_offsets()) async def commit_offsets(self, assignment, offsets): """Commit specific offsets @@ -963,22 +1005,23 @@ async def commit_offsets(self, assignment, offsets): await self.ensure_coordinator_known() try: async with self._commit_lock: - await asyncio.shield( - self._do_commit_offsets(assignment, offsets)) - except (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): + await asyncio.shield(self._do_commit_offsets(assignment, offsets)) + except ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): raise Errors.CommitFailedError( "Commit cannot be completed since the group has already " "rebalanced and may have assigned the partitions " - "to another member") + "to another member" + ) except Errors.KafkaError as err: if not err.retriable: raise err else: # wait backoff and try again - await asyncio.sleep( - self._retry_backoff_ms / 1000) + await asyncio.sleep(self._retry_backoff_ms / 1000) else: break @@ -990,21 +1033,22 @@ async def _do_commit_offsets(self, assignment, offsets): # create the offset commit request offset_data = collections.defaultdict(list) for tp, offset in offsets.items(): - offset_data[tp.topic].append( - (tp.partition, - offset.offset, - offset.metadata)) + offset_data[tp.topic].append((tp.partition, offset.offset, offset.metadata)) request = OffsetCommitRequest( self.group_id, self.generation, self.member_id, OffsetCommitRequest.DEFAULT_RETENTION_TIME, - [(topic, tp_offsets) for topic, tp_offsets in offset_data.items()] + [(topic, tp_offsets) for topic, tp_offsets in offset_data.items()], ) - log.debug("Sending offset-commit request with %s for group %s to %s", - offsets, self.group_id, self.coordinator_id) + log.debug( + "Sending offset-commit request with %s for group %s to %s", + offsets, + self.group_id, + self.coordinator_id, + ) response = await self._send_req(request) @@ -1016,46 +1060,65 @@ async def _do_commit_offsets(self, assignment, offsets): error_type = Errors.for_code(error_code) offset = offsets[tp] if error_type is Errors.NoError: - log.debug( - "Committed offset %s for partition %s", offset, tp) + log.debug("Committed offset %s for partition %s", offset, tp) elif error_type is Errors.GroupAuthorizationFailedError: - log.error("OffsetCommit failed for group %s - %s", - self.group_id, error_type.__name__) + log.error( + "OffsetCommit failed for group %s - %s", + self.group_id, + error_type.__name__, + ) errored[tp] = error_type(self.group_id) elif error_type is Errors.TopicAuthorizationFailedError: unauthorized_topics.add(topic) - elif error_type in (Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError): + elif error_type in ( + Errors.OffsetMetadataTooLargeError, + Errors.InvalidCommitOffsetSizeError, + ): # raise the error to the user log.info( "OffsetCommit failed for group %s on partition %s" - " due to %s, will retry", self.group_id, tp, - error_type.__name__) + " due to %s, will retry", + self.group_id, + tp, + error_type.__name__, + ) errored[tp] = error_type() elif error_type is Errors.GroupLoadInProgressError: # just retry log.info( "OffsetCommit failed for group %s because group is" - " initializing (%s), will retry", self.group_id, - error_type.__name__) + " initializing (%s), will retry", + self.group_id, + error_type.__name__, + ) errored[tp] = error_type() - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + Errors.RequestTimedOutError, + ): log.info( "OffsetCommit failed for group %s due to a" " coordinator error (%s), will find new coordinator" - " and retry", self.group_id, error_type.__name__) + " and retry", + self.group_id, + error_type.__name__, + ) self.coordinator_dead() errored[tp] = error_type() - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): + elif error_type in ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): # need to re-join group error = error_type(self.group_id) log.error( "OffsetCommit failed for group %s due to group" - " error (%s), will rejoin", self.group_id, error) + " error (%s), will rejoin", + self.group_id, + error, + ) if error_type is Errors.RebalanceInProgressError: self.request_rejoin() else: @@ -1064,30 +1127,37 @@ async def _do_commit_offsets(self, assignment, offsets): error = error_type(self.group_id) log.error( "OffsetCommit failed for group %s due to group" - " error (%s), will rejoin", self.group_id, error) + " error (%s), will rejoin", + self.group_id, + error, + ) errored[tp] = error else: log.error( "OffsetCommit failed for group %s on partition %s" - " with offset %s: %s", self.group_id, tp, offset, - error_type.__name__) + " with offset %s: %s", + self.group_id, + tp, + offset, + error_type.__name__, + ) errored[tp] = error_type() if errored: first_error = list(errored.values())[0] raise first_error if unauthorized_topics: - log.error("OffsetCommit failed for unauthorized topics %s", - unauthorized_topics) + log.error( + "OffsetCommit failed for unauthorized topics %s", unauthorized_topics + ) raise Errors.TopicAuthorizationFailedError(unauthorized_topics) async def _maybe_refresh_commit_offsets(self, assignment): need_update = assignment.requesting_committed() if need_update: try: - offsets = await self._do_fetch_commit_offsets( - need_update) + offsets = await self._do_fetch_commit_offsets(need_update) except Errors.KafkaError as err: if not err.retriable: raise @@ -1099,8 +1169,7 @@ async def _maybe_refresh_commit_offsets(self, assignment): if tp in offsets: tp_state.update_committed(offsets[tp]) else: - tp_state.update_committed( - OffsetAndMetadata(UNKNOWN_OFFSET, "")) + tp_state.update_committed(OffsetAndMetadata(UNKNOWN_OFFSET, "")) return True async def fetch_committed_offsets(self, partitions): @@ -1124,8 +1193,7 @@ async def fetch_committed_offsets(self, partitions): raise err else: # wait backoff and try again - await asyncio.sleep( - self._retry_backoff_ms / 1000) + await asyncio.sleep(self._retry_backoff_ms / 1000) else: return offsets @@ -1136,10 +1204,7 @@ async def _do_fetch_commit_offsets(self, partitions): for tp in partitions: topic_partitions[tp.topic].append(tp.partition) - request = OffsetFetchRequest( - self.group_id, - list(topic_partitions.items()) - ) + request = OffsetFetchRequest(self.group_id, list(topic_partitions.items())) response = await self._send_req(request) offsets = {} for topic, partitions in response.topics: @@ -1157,14 +1222,14 @@ async def _do_fetch_commit_offsets(self, partitions): self.coordinator_dead() raise error elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning( - "OffsetFetchRequest -- unknown topic %s", topic) + log.warning("OffsetFetchRequest -- unknown topic %s", topic) continue elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) else: - log.error("Unknown error fetching offsets for %s: %s", - tp, error) + log.error( + "Unknown error fetching offsets for %s: %s", tp, error + ) raise Errors.KafkaError(repr(error)) # record the position with the offset # (-1 indicates no committed offset to fetch) @@ -1184,8 +1249,16 @@ class CoordinatorGroupRebalance: display/KAFKA/Kafka+Client-side+Assignment+Proposal """ - def __init__(self, coordinator, group_id, coordinator_id, subscription, - assignors, session_timeout_ms, retry_backoff_ms): + def __init__( + self, + coordinator, + group_id, + coordinator_id, + subscription, + assignors, + session_timeout_ms, + retry_backoff_ms, + ): self._coordinator = coordinator self.group_id = group_id self.coordinator_id = coordinator_id @@ -1218,33 +1291,36 @@ async def perform_group_join(self): metadata_list.append(group_protocol) # for KIP-394 we may have to send a second join request try_join = True - while (try_join) : + while try_join: try_join = False - if self._api_version < (0, 10, 1) : + if self._api_version < (0, 10, 1): request = JoinGroupRequest[0]( self.group_id, self._session_timeout_ms, self._coordinator.member_id, ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (0, 11, 0) : + metadata_list, + ) + elif self._api_version < (0, 11, 0): request = JoinGroupRequest[1]( self.group_id, self._session_timeout_ms, self._rebalance_timeout_ms, self._coordinator.member_id, ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (2, 3, 0) : + metadata_list, + ) + elif self._api_version < (2, 3, 0): request = JoinGroupRequest[2]( self.group_id, self._session_timeout_ms, self._rebalance_timeout_ms, self._coordinator.member_id, ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - else : + metadata_list, + ) + else: request = JoinGroupRequest[3]( self.group_id, self._session_timeout_ms, @@ -1252,24 +1328,28 @@ async def perform_group_join(self): self._coordinator.member_id, self._coordinator._group_instance_id, ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) + metadata_list, + ) # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", - request, self.coordinator_id) - try : + log.debug( + "Sending JoinGroup (%s) to coordinator %s", + request, + self.coordinator_id, + ) + try: response = await T(self._coordinator._send_req)(request) - except Errors.KafkaError : + except Errors.KafkaError: # Return right away. It's a connection error, so backoff will be # handled by coordinator lookup return None - if not self._subscription.active : + if not self._subscription.active: # Subscription changed. Ignore response and restart group join return None error_type = Errors.for_code(response.error_code) - if error_type is Errors.MemberIdRequired : + if error_type is Errors.MemberIdRequired: self._coordinator.member_id = response.member_id try_join = True @@ -1278,12 +1358,19 @@ async def perform_group_join(self): self._coordinator.member_id = response.member_id self._coordinator.generation = response.generation_id protocol = response.group_protocol - log.info("Joined group '%s' (generation %s) with member_id %s", - self.group_id, response.generation_id, response.member_id) + log.info( + "Joined group '%s' (generation %s) with member_id %s", + self.group_id, + response.generation_id, + response.member_id, + ) if response.leader_id == response.member_id: - log.info("Elected group leader -- performing partition" - " assignments using %s", protocol) + log.info( + "Elected group leader -- performing partition" + " assignments using %s", + protocol, + ) assignment_bytes = await self._on_join_leader(response) else: assignment_bytes = await self._on_join_follower() @@ -1293,31 +1380,40 @@ async def perform_group_join(self): return (protocol, assignment_bytes) elif error_type is Errors.GroupLoadInProgressError: # Backoff and retry - log.debug("Attempt to join group %s rejected since coordinator %s" - " is loading the group.", self.group_id, - self.coordinator_id) - await asyncio.sleep( - self._retry_backoff_ms / 1000) + log.debug( + "Attempt to join group %s rejected since coordinator %s" + " is loading the group.", + self.group_id, + self.coordinator_id, + ) + await asyncio.sleep(self._retry_backoff_ms / 1000) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately self._coordinator.reset_generation() log.debug( "Attempt to join group %s failed due to unknown member id", - self.group_id) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + self.group_id, + ) + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): # Coordinator changed we should be able to find it immediately err = error_type() self._coordinator.coordinator_dead() - log.debug("Attempt to join group %s failed due to obsolete " - "coordinator information: %s", self.group_id, - err) - elif error_type in (Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError): + log.debug( + "Attempt to join group %s failed due to obsolete " + "coordinator information: %s", + self.group_id, + err, + ) + elif error_type in ( + Errors.InconsistentGroupProtocolError, + Errors.InvalidSessionTimeoutError, + Errors.InvalidGroupIdError, + ): err = error_type() - log.error( - "Attempt to join group failed due to fatal error: %s", err) + log.error("Attempt to join group failed due to fatal error: %s", err) raise err elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) @@ -1325,30 +1421,37 @@ async def perform_group_join(self): err = error_type() log.error( "Unexpected error in join group '%s' response: %s", - self.group_id, error_type) + self.group_id, + error_type, + ) raise Errors.KafkaError(repr(err)) return None async def _on_join_follower(self): # send follower's sync group with an empty assignment - if self._api_version < (2, 3, 0) : + if self._api_version < (2, 3, 0): version = 0 if self._api_version < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._coordinator.generation, self._coordinator.member_id, - []) - else : + [], + ) + else: request = SyncGroupRequest[2]( self.group_id, self._coordinator.generation, self._coordinator.member_id, self._coordinator._group_instance_id, - []) + [], + ) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) - return (await self._send_sync_group_request(request)) + self.group_id, + self.coordinator_id, + request, + ) + return await self._send_sync_group_request(request) async def _on_join_leader(self, response): """ @@ -1362,11 +1465,9 @@ async def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - group_assignment = \ - await self._coordinator._perform_assignment( - response.leader_id, - response.group_protocol, - response.members) + group_assignment = await self._coordinator._perform_assignment( + response.leader_id, response.group_protocol, response.members + ) except Exception as e: raise Errors.KafkaError(repr(e)) @@ -1376,25 +1477,30 @@ async def _on_join_leader(self, response): assignment = assignment.encode() assignment_req.append((member_id, assignment)) - if self._api_version < (2, 3, 0) : + if self._api_version < (2, 3, 0): version = 0 if self._api_version < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._coordinator.generation, self._coordinator.member_id, - assignment_req) - else : + assignment_req, + ) + else: request = SyncGroupRequest[2]( self.group_id, self._coordinator.generation, self._coordinator.member_id, self._coordinator._group_instance_id, - assignment_req) + assignment_req, + ) log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) - return (await self._send_sync_group_request(request)) + self.group_id, + self.coordinator_id, + request, + ) + return await self._send_sync_group_request(request) async def _send_sync_group_request(self, request): # We need to reset the rejoin future right after the assignment to @@ -1426,19 +1532,19 @@ async def _send_sync_group_request(self, request): # Error case self._coordinator.request_rejoin() if error_type is Errors.RebalanceInProgressError: - log.debug("SyncGroup for group %s failed due to group" - " rebalance", self.group_id) - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError): + log.debug( + "SyncGroup for group %s failed due to group" " rebalance", self.group_id + ) + elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): err = error_type() - log.debug("SyncGroup for group %s failed due to %s,", - self.group_id, err) + log.debug("SyncGroup for group %s failed due to %s,", self.group_id, err) self._coordinator.reset_generation() - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): err = error_type() - log.debug("SyncGroup for group %s failed due to %s", - self.group_id, err) + log.debug("SyncGroup for group %s failed due to %s", self.group_id, err) self._coordinator.coordinator_dead() elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) From 88052300991e5cd482f40169d0189ef5f8782f20 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Wed, 25 Nov 2020 14:33:55 -0500 Subject: [PATCH 04/14] fixing linting errors --- aiokafka/consumer/assignors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index dfde2874..416aeadd 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -1,12 +1,12 @@ from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor import abc + class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): """ Abstract assignor implementation that also supports static assignments (KIP-345) """ - @abc.abstractmethod def assign(self, cluster, members, member_group_instance_ids): """Perform group assignment given cluster metadata, member subscriptions @@ -20,4 +20,4 @@ def assign(self, cluster, members, member_group_instance_ids): Returns: dict: {member_id: MemberAssignment} """ - pass \ No newline at end of file + pass From 48f6a9396c0d42dd3b2ea1b93c225f74041331a3 Mon Sep 17 00:00:00 2001 From: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Date: Fri, 4 Dec 2020 16:56:57 -0500 Subject: [PATCH 05/14] Update tests.yml --- .github/workflows/tests.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fd4f864a..f588f815 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,7 +27,11 @@ jobs: - name: Install system dependencies run: | sudo apt-get update +<<<<<<< HEAD sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev +======= + sudo apt-get install -y libsnappy-dev libkrb5-dev +>>>>>>> 757a793 (Update tests.yml) - name: Get pip cache dir id: pip-cache From ec36ae00de8bf0164667914c4beeb5a816c67fc4 Mon Sep 17 00:00:00 2001 From: g-clef Date: Fri, 4 Dec 2020 17:19:12 -0500 Subject: [PATCH 06/14] Fix linting errors --- aiokafka/consumer/assignors.py | 4 ++-- aiokafka/consumer/group_coordinator.py | 11 ++++------- aiokafka/protocol/group.py | 14 ++++++++++++-- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index 416aeadd..98026f22 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -15,8 +15,8 @@ def assign(self, cluster, members, member_group_instance_ids): cluster (ClusterMetadata): metadata for use in assignment members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. - mmember_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded metadata for - each member in the group. + member_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded + metadata for each member in the group. Returns: dict: {member_id: MemberAssignment} """ diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index b65b9c53..3bb15f96 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -10,12 +10,9 @@ OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest, ) -from aiokafka.protocol.group import ( - HeartbeatRequest, - JoinGroupRequest, - LeaveGroupRequest, - SyncGroupRequest, -) +from aiokafka.protocol.group import JoinGroupRequest, SyncGroupRequest + +from kafka.protocol.group import HeartbeatRequest, LeaveGroupRequest import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition @@ -1338,7 +1335,7 @@ async def perform_group_join(self): self.coordinator_id, ) try: - response = await T(self._coordinator._send_req)(request) + response = await self._coordinator._send_req(request) except Errors.KafkaError: # Return right away. It's a connection error, so backoff will be # handled by coordinator lookup diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py index 0e5dd749..dfdb16e7 100644 --- a/aiokafka/protocol/group.py +++ b/aiokafka/protocol/group.py @@ -1,9 +1,19 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String -from kafka.protocol.group import * +from kafka.protocol.group import ( + JoinGroupRequest_v0, + JoinGroupRequest_v1, + JoinGroupRequest_v2, + JoinGroupResponse_v0, + JoinGroupResponse_v1, + SyncGroupRequest_v0, + SyncGroupRequest_v1, + SyncGroupResponse_v0, + SyncGroupResponse_v1 + ) + class JoinGroupResponse_v5(Response): From 152a5d4d10bf4f94207b6bf6dafa2d886e86759b Mon Sep 17 00:00:00 2001 From: g-clef Date: Sat, 5 Dec 2020 13:54:01 -0500 Subject: [PATCH 07/14] Linting fixed, tests still failing --- aiokafka/consumer/assignors.py | 4 ++-- aiokafka/consumer/group_coordinator.py | 1 - aiokafka/consumer/subscription_state.py | 2 -- aiokafka/protocol/group.py | 1 - 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index 98026f22..ce2cbd1c 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -15,8 +15,8 @@ def assign(self, cluster, members, member_group_instance_ids): cluster (ClusterMetadata): metadata for use in assignment members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. - member_group_instance_ids members (dict of {member_id: MemberMetadata}): decoded - metadata for each member in the group. + member_group_instance_ids (dict of {member_id: MemberMetadata}): + decoded metadata for each member in the group. Returns: dict: {member_id: MemberAssignment} """ diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 3bb15f96..5c29f6bb 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -452,7 +452,6 @@ async def _perform_assignment(self, leader_id, assignment_strategy, members): member_metadata, ) - assignments = None if isinstance(assignor, AbstractStaticPartitionAssignor): assignments = assignor.assign( self._cluster, member_metadata, group_instance_id_mapping diff --git a/aiokafka/consumer/subscription_state.py b/aiokafka/consumer/subscription_state.py index b2dbd388..9631b008 100644 --- a/aiokafka/consumer/subscription_state.py +++ b/aiokafka/consumer/subscription_state.py @@ -14,8 +14,6 @@ log = logging.getLogger(__name__) -(List, Future) - class SubscriptionType(Enum): diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py index dfdb16e7..4bda2b17 100644 --- a/aiokafka/protocol/group.py +++ b/aiokafka/protocol/group.py @@ -15,7 +15,6 @@ ) - class JoinGroupResponse_v5(Response): API_KEY = 11 API_VERSION = 5 From f10b3eeb3d7b5458e27a8883f668daada4507dc1 Mon Sep 17 00:00:00 2001 From: g-clef Date: Sat, 5 Dec 2020 16:30:53 -0500 Subject: [PATCH 08/14] fixed tests. --- aiokafka/consumer/group_coordinator.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 5c29f6bb..07bbf9d7 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -428,7 +428,15 @@ async def _perform_assignment(self, leader_id, assignment_strategy, members): member_metadata = {} all_subscribed_topics = set() group_instance_id_mapping = {} - for member_id, group_instance_id, metadata_bytes in members: + # for member_id, group_instance_id, metadata_bytes in members: + for member in members: + if len(member) == 2: + member_id, metadata_bytes = member + group_instance_id = None + elif len(member) == 3: + member_id, group_instance_id, metadata_bytes = member + else: + raise Exception("unknown protocol returned from assignment") metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) group_instance_id_mapping = group_instance_id member_metadata[member_id] = metadata From 304b80daa8cdffa00c47961c9f7cf35936dfe71e Mon Sep 17 00:00:00 2001 From: g-clef Date: Mon, 7 Dec 2020 16:57:40 -0500 Subject: [PATCH 09/14] Undoing a lot of linting --- .github/workflows/tests.yml | 4 - aiokafka/consumer/group_coordinator.py | 526 ++++++++++--------------- aiokafka/errors.py | 171 ++++---- 3 files changed, 306 insertions(+), 395 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f588f815..fd4f864a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,11 +27,7 @@ jobs: - name: Install system dependencies run: | sudo apt-get update -<<<<<<< HEAD sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev -======= - sudo apt-get install -y libsnappy-dev libkrb5-dev ->>>>>>> 757a793 (Update tests.yml) - name: Get pip cache dir id: pip-cache diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 07bbf9d7..707e6e7b 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -8,11 +8,11 @@ from kafka.coordinator.protocol import ConsumerProtocol from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, - OffsetFetchRequest_v1 as OffsetFetchRequest, -) + OffsetFetchRequest_v1 as OffsetFetchRequest) +from kafka.protocol.group import HeartbeatRequest, LeaveGroupRequest + from aiokafka.protocol.group import JoinGroupRequest, SyncGroupRequest -from kafka.protocol.group import HeartbeatRequest, LeaveGroupRequest import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition @@ -27,7 +27,8 @@ class BaseCoordinator(object): - def __init__(self, client, subscription, *, exclude_internal_topics=True): + def __init__(self, client, subscription, *, + exclude_internal_topics=True): self._client = client self._exclude_internal_topics = exclude_internal_topics self._subscription = subscription @@ -47,23 +48,16 @@ def _handle_metadata_update(self, cluster): if subscription.subscribed_pattern.match(topic): topics.append(topic) - if ( - subscription.subscription is None - or set(topics) != subscription.subscription.topics - ): + if subscription.subscription is None or \ + set(topics) != subscription.subscription.topics: subscription.subscribe_from_pattern(topics) - if ( - subscription.partitions_auto_assigned() - and self._group_subscription is not None - ): + if subscription.partitions_auto_assigned() and \ + self._group_subscription is not None: metadata_snapshot = self._get_metadata_snapshot() if self._metadata_snapshot != metadata_snapshot: - log.info( - "Metadata for topic has changed from %s to %s. ", - self._metadata_snapshot, - metadata_snapshot, - ) + log.info("Metadata for topic has changed from %s to %s. ", + self._metadata_snapshot, metadata_snapshot) self._metadata_snapshot = metadata_snapshot self._on_metadata_change() @@ -94,7 +88,8 @@ class NoGroupCoordinator(BaseCoordinator): def __init__(self, *args, **kw): super().__init__(*args, **kw) # Reset all committed points, as the GroupCoordinator would - self._reset_committed_task = create_task(self._reset_committed_routine()) + self._reset_committed_task = create_task( + self._reset_committed_routine()) def _on_metadata_change(self): self.assign_all_partitions() @@ -146,14 +141,14 @@ async def _reset_committed_routine(self): for tp in assignment.requesting_committed(): tp_state = assignment.state_value(tp) - tp_state.update_committed(OffsetAndMetadata(UNKNOWN_OFFSET, "")) + tp_state.update_committed( + OffsetAndMetadata(UNKNOWN_OFFSET, "")) event_waiter = create_task(commit_refresh_needed.wait()) await asyncio.wait( [assignment.unassign_future, event_waiter], - return_when=asyncio.FIRST_COMPLETED, - ) + return_when=asyncio.FIRST_COMPLETED) if not event_waiter.done(): event_waiter.cancel() @@ -216,23 +211,17 @@ class GroupCoordinator(BaseCoordinator): org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java """ - def __init__( - self, - client, - subscription, - *, - group_id="aiokafka-default-group", - group_instance_id=None, - session_timeout_ms=10000, - heartbeat_interval_ms=3000, - retry_backoff_ms=100, - enable_auto_commit=True, - auto_commit_interval_ms=5000, - assignors=(RoundRobinPartitionAssignor,), - exclude_internal_topics=True, - max_poll_interval_ms=300000, - rebalance_timeout_ms=30000 - ): + def __init__(self, client, subscription, *, + group_id="aiokafka-default-group", + group_instance_id=None, + session_timeout_ms=10000, heartbeat_interval_ms=3000, + retry_backoff_ms=100, + enable_auto_commit=True, auto_commit_interval_ms=5000, + assignors=(RoundRobinPartitionAssignor,), + exclude_internal_topics=True, + max_poll_interval_ms=300000, + rebalance_timeout_ms=30000 + ): """Initialize the coordination manager. Parameters (see AIOKafkaConsumer) @@ -241,8 +230,8 @@ def __init__( self._group_subscription = None super().__init__( - client, subscription, exclude_internal_topics=exclude_internal_topics - ) + client, subscription, + exclude_internal_topics=exclude_internal_topics) self._session_timeout_ms = session_timeout_ms self._heartbeat_interval_ms = heartbeat_interval_ms @@ -281,9 +270,8 @@ def __init__( # changed from user code by calling ``commit()``. self._commit_lock = asyncio.Lock() - self._next_autocommit_deadline = ( + self._next_autocommit_deadline = \ time.monotonic() + auto_commit_interval_ms / 1000 - ) # Will be set on close self._closing = create_future() @@ -300,15 +288,11 @@ async def _send_req(self, request): raise Errors.GroupCoordinatorNotAvailableError() try: resp = await self._client.send( - node_id, request, group=ConnectionGroup.COORDINATION - ) + node_id, request, group=ConnectionGroup.COORDINATION) except Errors.KafkaError as err: log.error( "Error sending %s to node %s [%s] -- marking coordinator dead", - request.__class__.__name__, - node_id, - err, - ) + request.__class__.__name__, node_id, err) self.coordinator_dead() raise err return resp @@ -404,27 +388,25 @@ async def _on_join_prepare(self, previous_assignment): revoked = set() # execute the user's callback before rebalance - log.info( - "Revoking previously assigned partitions %s for group %s", - revoked, - self.group_id, - ) + log.info("Revoking previously assigned partitions %s for group %s", + revoked, self.group_id) if self._subscription.listener: try: - res = self._subscription.listener.on_partitions_revoked(revoked) + res = self._subscription.listener.on_partitions_revoked( + revoked) if asyncio.iscoroutine(res): await res except Exception: - log.exception( - "User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, - self.group_id, - ) - - async def _perform_assignment(self, leader_id, assignment_strategy, members): + log.exception("User provided subscription listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.listener, self.group_id) + + async def _perform_assignment( + self, leader_id, assignment_strategy, members + ): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, "Invalid assignment protocol: %s" % assignment_strategy + assert assignor, \ + 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() group_instance_id_mapping = {} @@ -452,13 +434,9 @@ async def _perform_assignment(self, leader_id, assignment_strategy, members): # call) we should wait for it before performing assignment await self._client._maybe_wait_metadata() - log.debug( - "Performing assignment for group %s using strategy %s" - " with subscriptions %s", - self.group_id, - assignor.name, - member_metadata, - ) + log.debug("Performing assignment for group %s using strategy %s" + " with subscriptions %s", self.group_id, assignor.name, + member_metadata) if isinstance(assignor, AbstractStaticPartitionAssignor): assignments = assignor.assign( @@ -466,7 +444,8 @@ async def _perform_assignment(self, leader_id, assignment_strategy, members): ) else: assignments = assignor.assign(self._cluster, member_metadata) - log.debug("Finished assignment for group %s: %s", self.group_id, assignments) + log.debug("Finished assignment for group %s: %s", + self.group_id, assignments) # `set_topics()` will not trigger a metadata update if we only # removed some topics (client has all needed metadata already), @@ -479,12 +458,14 @@ async def _perform_assignment(self, leader_id, assignment_strategy, members): return group_assignment async def _on_join_complete( - self, generation, member_id, protocol, member_assignment_bytes + self, generation, member_id, protocol, + member_assignment_bytes ): assignor = self._lookup_assignor(protocol) - assert assignor, "invalid assignment protocol: %s" % protocol + assert assignor, 'invalid assignment protocol: %s' % protocol - assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) + assignment = ConsumerProtocol.ASSIGNMENT.decode( + member_assignment_bytes) # update partition assignment self._subscription.assign_from_subscribed(assignment.partitions()) @@ -502,24 +483,21 @@ async def _on_join_complete( self.start_commit_offsets_refresh_task(subscription.assignment) assigned = set(self._subscription.assigned_partitions()) - log.info( - "Setting newly assigned partitions %s for group %s", assigned, self.group_id - ) + log.info("Setting newly assigned partitions %s for group %s", + assigned, self.group_id) # execute the user's callback after rebalance if self._subscription.listener: try: - res = self._subscription.listener.on_partitions_assigned(assigned) + res = self._subscription.listener.on_partitions_assigned( + assigned) if asyncio.iscoroutine(res): await res except Exception: - log.exception( - "User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, - self.group_id, - assigned, - ) + log.exception("User provided listener %s for group %s" + " failed on partition assignment: %s", + self._subscription.listener, self.group_id, + assigned) def coordinator_dead(self): """ Mark the current coordinator as dead. @@ -529,9 +507,7 @@ def coordinator_dead(self): if self.coordinator_id is not None: log.warning( "Marking the coordinator dead (node %s)for group %s.", - self.coordinator_id, - self.group_id, - ) + self.coordinator_id, self.group_id) self.coordinator_id = None self._coordinator_dead_fut.set_result(None) @@ -553,7 +529,9 @@ def need_rejoin(self, subscription): Returns: bool: True if consumer should rejoin group, False otherwise """ - return subscription.assignment is None or self._rejoin_needed_fut.done() + return ( + subscription.assignment is None or self._rejoin_needed_fut.done() + ) async def ensure_coordinator_known(self): """ Block until the coordinator for this group is known. @@ -565,9 +543,10 @@ async def ensure_coordinator_known(self): retry_backoff = self._retry_backoff_ms / 1000 while self.coordinator_id is None and not self._closing.done(): try: - coordinator_id = await self._client.coordinator_lookup( - CoordinationType.GROUP, self.group_id - ) + coordinator_id = ( + await self._client.coordinator_lookup( + CoordinationType.GROUP, self.group_id) + ) except Errors.GroupAuthorizationFailedError: err = Errors.GroupAuthorizationFailedError(self.group_id) raise err @@ -583,19 +562,15 @@ async def ensure_coordinator_known(self): # Try to connect to confirm that the connection can be # established. ready = await self._client.ready( - coordinator_id, group=ConnectionGroup.COORDINATION - ) + coordinator_id, group=ConnectionGroup.COORDINATION) if not ready: await asyncio.sleep(retry_backoff) continue self.coordinator_id = coordinator_id self._coordinator_dead_fut = create_future() - log.info( - "Discovered coordinator %s for group %s", - self.coordinator_id, - self.group_id, - ) + log.info("Discovered coordinator %s for group %s", + self.coordinator_id, self.group_id) async def _coordination_routine(self): try: @@ -603,7 +578,8 @@ async def _coordination_routine(self): except asyncio.CancelledError: # pragma: no cover raise except Exception as exc: - log.error("Unexpected error in coordinator routine", exc_info=True) + log.error( + "Unexpected error in coordinator routine", exc_info=True) kafka_exc = Errors.KafkaError( f"Unexpected error during coordination {exc!r}") self._subscription.abort_waiters(kafka_exc) @@ -628,9 +604,9 @@ async def __coordination_routine(self): subscription = self._subscription.subscription if subscription is None: await asyncio.wait( - [self._subscription.wait_for_subscription(), self._closing], - return_when=asyncio.FIRST_COMPLETED, - ) + [self._subscription.wait_for_subscription(), + self._closing], + return_when=asyncio.FIRST_COMPLETED) if self._closing.done(): break subscription = self._subscription.subscription @@ -642,8 +618,7 @@ async def __coordination_routine(self): await self.ensure_coordinator_known() if auto_assigned and self.need_rejoin(subscription): new_assignment = await self.ensure_active_group( - subscription, assignment - ) + subscription, assignment) if new_assignment is None or not new_assignment.active: continue else: @@ -666,8 +641,7 @@ async def __coordination_routine(self): futures = [ self._closing, # Will exit fast if close() called self._coordinator_dead_fut, - subscription.unsubscribe_future, - ] + subscription.unsubscribe_future] # In case of manual assignment this future will be always set and # we don't want a heavy loop here. # NOTE: metadata changes are for partition count and pattern @@ -684,8 +658,8 @@ async def __coordination_routine(self): futures.append(self._commit_refresh_task) done, _ = await asyncio.wait( - futures, timeout=wait_timeout, return_when=asyncio.FIRST_COMPLETED - ) + futures, timeout=wait_timeout, + return_when=asyncio.FIRST_COMPLETED) # Handle exceptions in other background tasks for task in [self._heartbeat_task, self._commit_refresh_task]: @@ -791,14 +765,17 @@ async def _heartbeat_routine(self): # the session timeout has expired without seeing a successful # heartbeat, so we should probably make sure the coordinator # is still healthy. - log.error("Heartbeat session expired - marking coordinator dead") + log.error( + "Heartbeat session expired - marking coordinator dead") self.coordinator_dead() # If consumer is idle (no records consumed) for too long we need # to leave the group idle_time = self._subscription.fetcher_idle_time if idle_time < self._max_poll_interval: - sleep_time = min(sleep_time, self._max_poll_interval - idle_time) + sleep_time = min( + sleep_time, + self._max_poll_interval - idle_time) else: await self._maybe_leave_group() @@ -807,11 +784,9 @@ async def _heartbeat_routine(self): async def _do_heartbeat(self): version = 0 if self._client.api_version < (0, 11, 0) else 1 request = HeartbeatRequest[version]( - self.group_id, self.generation, self.member_id - ) - log.debug( - "Heartbeat: %s[%s] %s", self.group_id, self.generation, self.member_id - ) + self.group_id, self.generation, self.member_id) + log.debug("Heartbeat: %s[%s] %s", + self.group_id, self.generation, self.member_id) # _send_req may fail with error like `RequestTimedOutError` # we need to catch it so coordinator_routine won't fail @@ -823,19 +798,15 @@ async def _do_heartbeat(self): error_type = Errors.for_code(resp.error_code) if error_type is Errors.NoError: log.debug( - "Received successful heartbeat response for group %s", self.group_id - ) + "Received successful heartbeat response for group %s", + self.group_id) return True - if error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): + if error_type in (Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError): log.warning( "Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", - self.group_id, - self.coordinator_id, - ) + self.group_id, self.coordinator_id) self.coordinator_dead() elif error_type is Errors.RebalanceInProgressError: log.warning( @@ -851,15 +822,13 @@ async def _do_heartbeat(self): return True elif error_type is Errors.IllegalGenerationError: log.warning( - "Heartbeat failed for group %s: generation id is not " " current.", - self.group_id, - ) + "Heartbeat failed for group %s: generation id is not " + " current.", self.group_id) self.reset_generation() elif error_type is Errors.UnknownMemberIdError: log.warning( "Heartbeat failed: local member_id was not recognized;" - " resetting and re-joining group" - ) + " resetting and re-joining group") self.reset_generation() elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) @@ -875,8 +844,7 @@ def start_commit_offsets_refresh_task(self, assignment): if self._commit_refresh_task is not None: self._commit_refresh_task.cancel() self._commit_refresh_task = create_task( - self._commit_refresh_routine(assignment) - ) + self._commit_refresh_routine(assignment)) async def _stop_commit_offsets_refresh_task(self): # The previous task should end after assignment changed @@ -896,19 +864,22 @@ async def _commit_refresh_routine(self, assignment): try: while assignment.active: commit_refresh_needed.clear() - success = await self._maybe_refresh_commit_offsets(assignment) + success = await self._maybe_refresh_commit_offsets( + assignment) wait_futures = [assignment.unassign_future] if not success: timeout = retry_backoff_ms else: timeout = None - event_waiter = create_task(commit_refresh_needed.wait()) + event_waiter = create_task( + commit_refresh_needed.wait()) wait_futures.append(event_waiter) await asyncio.wait( - wait_futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED - ) + wait_futures, + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED) except asyncio.CancelledError: pass except Exception: @@ -924,33 +895,27 @@ async def _commit_refresh_routine(self, assignment): async def _do_rejoin_group(self, subscription): rebalance = CoordinatorGroupRebalance( - self, - self.group_id, - self.coordinator_id, - subscription, - self._assignors, - self._session_timeout_ms, - self._retry_backoff_ms, - ) + self, self.group_id, self.coordinator_id, + subscription, self._assignors, self._session_timeout_ms, + self._retry_backoff_ms) assignment = await rebalance.perform_group_join() if not subscription.active: - log.debug( - "Subscription changed during rebalance from %s to %s. " - "Rejoining group.", - subscription.topics, - self._subscription.topics, - ) + log.debug("Subscription changed during rebalance from %s to %s. " + "Rejoining group.", + subscription.topics, + self._subscription.topics) return False if assignment is None: # wait backoff and try again - await asyncio.sleep(self._retry_backoff_ms / 1000) + await asyncio.sleep( + self._retry_backoff_ms / 1000) return False protocol, member_assignment_bytes = assignment await self._on_join_complete( - self.generation, self.member_id, protocol, member_assignment_bytes - ) + self.generation, self.member_id, + protocol, member_assignment_bytes) return True async def _maybe_do_autocommit(self, assignment): @@ -963,13 +928,13 @@ async def _maybe_do_autocommit(self, assignment): try: async with self._commit_lock: await self._do_commit_offsets( - assignment, assignment.all_consumed_offsets() - ) + assignment, assignment.all_consumed_offsets()) except Errors.KafkaError as error: log.warning("Auto offset commit failed: %s", error) if self._is_commit_retriable(error): # Retry after backoff. - self._next_autocommit_deadline = time.monotonic() + backoff + self._next_autocommit_deadline = \ + time.monotonic() + backoff return backoff else: raise @@ -983,19 +948,17 @@ def _is_commit_retriable(self, error): # Java client raises CommitFailedError which is retriable and thus # masks those 3. We raise error that we got explicitly, so treat them # as retriable. - return error.retriable or isinstance( - error, - ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ), - ) + return error.retriable or isinstance(error, ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + )) async def _maybe_do_last_autocommit(self, assignment): if not self._enable_auto_commit: return - await self.commit_offsets(assignment, assignment.all_consumed_offsets()) + await self.commit_offsets( + assignment, assignment.all_consumed_offsets()) async def commit_offsets(self, assignment, offsets): """Commit specific offsets @@ -1009,23 +972,22 @@ async def commit_offsets(self, assignment, offsets): await self.ensure_coordinator_known() try: async with self._commit_lock: - await asyncio.shield(self._do_commit_offsets(assignment, offsets)) - except ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): + await asyncio.shield( + self._do_commit_offsets(assignment, offsets)) + except (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError,): raise Errors.CommitFailedError( "Commit cannot be completed since the group has already " "rebalanced and may have assigned the partitions " - "to another member" - ) + "to another member") except Errors.KafkaError as err: if not err.retriable: raise err else: # wait backoff and try again - await asyncio.sleep(self._retry_backoff_ms / 1000) + await asyncio.sleep( + self._retry_backoff_ms / 1000) else: break @@ -1037,22 +999,22 @@ async def _do_commit_offsets(self, assignment, offsets): # create the offset commit request offset_data = collections.defaultdict(list) for tp, offset in offsets.items(): - offset_data[tp.topic].append((tp.partition, offset.offset, offset.metadata)) + offset_data[tp.topic].append( + (tp.partition, + offset.offset, + offset.metadata)) request = OffsetCommitRequest( self.group_id, self.generation, self.member_id, OffsetCommitRequest.DEFAULT_RETENTION_TIME, - [(topic, tp_offsets) for topic, tp_offsets in offset_data.items()], + [(topic, tp_offsets) for topic, tp_offsets in offset_data.items()] ) log.debug( "Sending offset-commit request with %s for group %s to %s", - offsets, - self.group_id, - self.coordinator_id, - ) + offsets, self.group_id, self.coordinator_id) response = await self._send_req(request) @@ -1064,50 +1026,36 @@ async def _do_commit_offsets(self, assignment, offsets): error_type = Errors.for_code(error_code) offset = offsets[tp] if error_type is Errors.NoError: - log.debug("Committed offset %s for partition %s", offset, tp) + log.debug( + "Committed offset %s for partition %s", offset, tp) elif error_type is Errors.GroupAuthorizationFailedError: - log.error( - "OffsetCommit failed for group %s - %s", - self.group_id, - error_type.__name__, - ) + log.error("OffsetCommit failed for group %s - %s", + self.group_id, error_type.__name__) errored[tp] = error_type(self.group_id) elif error_type is Errors.TopicAuthorizationFailedError: unauthorized_topics.add(topic) - elif error_type in ( - Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError, - ): + elif error_type in (Errors.OffsetMetadataTooLargeError, + Errors.InvalidCommitOffsetSizeError): # raise the error to the user log.info( "OffsetCommit failed for group %s on partition %s" - " due to %s, will retry", - self.group_id, - tp, - error_type.__name__, - ) + " due to %s, will retry", self.group_id, tp, + error_type.__name__) errored[tp] = error_type() elif error_type is Errors.GroupLoadInProgressError: # just retry log.info( "OffsetCommit failed for group %s because group is" - " initializing (%s), will retry", - self.group_id, - error_type.__name__, - ) + " initializing (%s), will retry", self.group_id, + error_type.__name__) errored[tp] = error_type() - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError, - ): + elif error_type in (Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + Errors.RequestTimedOutError): log.info( "OffsetCommit failed for group %s due to a" " coordinator error (%s), will find new coordinator" - " and retry", - self.group_id, - error_type.__name__, - ) + " and retry", self.group_id, error_type.__name__) self.coordinator_dead() errored[tp] = error_type() elif error_type in ( @@ -1119,10 +1067,7 @@ async def _do_commit_offsets(self, assignment, offsets): error = error_type(self.group_id) log.error( "OffsetCommit failed for group %s due to group" - " error (%s), will rejoin", - self.group_id, - error, - ) + " error (%s), will rejoin", self.group_id, error) if error_type is Errors.RebalanceInProgressError: self.request_rejoin() else: @@ -1131,37 +1076,30 @@ async def _do_commit_offsets(self, assignment, offsets): error = error_type(self.group_id) log.error( "OffsetCommit failed for group %s due to group" - " error (%s), will rejoin", - self.group_id, - error, - ) + " error (%s), will rejoin", self.group_id, error) errored[tp] = error else: log.error( "OffsetCommit failed for group %s on partition %s" - " with offset %s: %s", - self.group_id, - tp, - offset, - error_type.__name__, - ) + " with offset %s: %s", self.group_id, tp, offset, + error_type.__name__) errored[tp] = error_type() if errored: first_error = list(errored.values())[0] raise first_error if unauthorized_topics: - log.error( - "OffsetCommit failed for unauthorized topics %s", unauthorized_topics - ) + log.error("OffsetCommit failed for unauthorized topics %s", + unauthorized_topics) raise Errors.TopicAuthorizationFailedError(unauthorized_topics) async def _maybe_refresh_commit_offsets(self, assignment): need_update = assignment.requesting_committed() if need_update: try: - offsets = await self._do_fetch_commit_offsets(need_update) + offsets = await self._do_fetch_commit_offsets( + need_update) except Errors.KafkaError as err: if not err.retriable: raise @@ -1173,7 +1111,8 @@ async def _maybe_refresh_commit_offsets(self, assignment): if tp in offsets: tp_state.update_committed(offsets[tp]) else: - tp_state.update_committed(OffsetAndMetadata(UNKNOWN_OFFSET, "")) + tp_state.update_committed( + OffsetAndMetadata(UNKNOWN_OFFSET, "")) return True async def fetch_committed_offsets(self, partitions): @@ -1197,7 +1136,8 @@ async def fetch_committed_offsets(self, partitions): raise err else: # wait backoff and try again - await asyncio.sleep(self._retry_backoff_ms / 1000) + await asyncio.sleep( + self._retry_backoff_ms / 1000) else: return offsets @@ -1208,7 +1148,10 @@ async def _do_fetch_commit_offsets(self, partitions): for tp in partitions: topic_partitions[tp.topic].append(tp.partition) - request = OffsetFetchRequest(self.group_id, list(topic_partitions.items())) + request = OffsetFetchRequest( + self.group_id, + list(topic_partitions.items()) + ) response = await self._send_req(request) offsets = {} for topic, partitions in response.topics: @@ -1226,14 +1169,14 @@ async def _do_fetch_commit_offsets(self, partitions): self.coordinator_dead() raise error elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning("OffsetFetchRequest -- unknown topic %s", topic) + log.warning( + "OffsetFetchRequest -- unknown topic %s", topic) continue elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) else: - log.error( - "Unknown error fetching offsets for %s: %s", tp, error - ) + log.error("Unknown error fetching offsets for %s: %s", + tp, error) raise Errors.KafkaError(repr(error)) # record the position with the offset # (-1 indicates no committed offset to fetch) @@ -1253,16 +1196,8 @@ class CoordinatorGroupRebalance: display/KAFKA/Kafka+Client-side+Assignment+Proposal """ - def __init__( - self, - coordinator, - group_id, - coordinator_id, - subscription, - assignors, - session_timeout_ms, - retry_backoff_ms, - ): + def __init__(self, coordinator, group_id, coordinator_id, subscription, + assignors, session_timeout_ms, retry_backoff_ms): self._coordinator = coordinator self.group_id = group_id self.coordinator_id = coordinator_id @@ -1336,11 +1271,8 @@ async def perform_group_join(self): ) # create the request for the coordinator - log.debug( - "Sending JoinGroup (%s) to coordinator %s", - request, - self.coordinator_id, - ) + log.debug("Sending JoinGroup (%s) to coordinator %s", + request, self.coordinator_id) try: response = await self._coordinator._send_req(request) except Errors.KafkaError: @@ -1362,19 +1294,12 @@ async def perform_group_join(self): self._coordinator.member_id = response.member_id self._coordinator.generation = response.generation_id protocol = response.group_protocol - log.info( - "Joined group '%s' (generation %s) with member_id %s", - self.group_id, - response.generation_id, - response.member_id, - ) + log.info("Joined group '%s' (generation %s) with member_id %s", + self.group_id, response.generation_id, response.member_id) if response.leader_id == response.member_id: - log.info( - "Elected group leader -- performing partition" - " assignments using %s", - protocol, - ) + log.info("Elected group leader -- performing partition" + " assignments using %s", protocol) assignment_bytes = await self._on_join_leader(response) else: assignment_bytes = await self._on_join_follower() @@ -1384,40 +1309,31 @@ async def perform_group_join(self): return (protocol, assignment_bytes) elif error_type is Errors.GroupLoadInProgressError: # Backoff and retry - log.debug( - "Attempt to join group %s rejected since coordinator %s" - " is loading the group.", - self.group_id, - self.coordinator_id, - ) - await asyncio.sleep(self._retry_backoff_ms / 1000) + log.debug("Attempt to join group %s rejected since coordinator %s" + " is loading the group.", self.group_id, + self.coordinator_id) + await asyncio.sleep( + self._retry_backoff_ms / 1000) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately self._coordinator.reset_generation() log.debug( "Attempt to join group %s failed due to unknown member id", - self.group_id, - ) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): + self.group_id) + elif error_type in (Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError): # Coordinator changed we should be able to find it immediately err = error_type() self._coordinator.coordinator_dead() - log.debug( - "Attempt to join group %s failed due to obsolete " - "coordinator information: %s", - self.group_id, - err, - ) - elif error_type in ( - Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError, - ): + log.debug("Attempt to join group %s failed due to obsolete " + "coordinator information: %s", self.group_id, + err) + elif error_type in (Errors.InconsistentGroupProtocolError, + Errors.InvalidSessionTimeoutError, + Errors.InvalidGroupIdError): err = error_type() - log.error("Attempt to join group failed due to fatal error: %s", err) + log.error( + "Attempt to join group failed due to fatal error: %s", err) raise err elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) @@ -1425,9 +1341,7 @@ async def perform_group_join(self): err = error_type() log.error( "Unexpected error in join group '%s' response: %s", - self.group_id, - error_type, - ) + self.group_id, err) raise Errors.KafkaError(repr(err)) return None @@ -1451,10 +1365,7 @@ async def _on_join_follower(self): ) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) + self.group_id, self.coordinator_id, request) return await self._send_sync_group_request(request) async def _on_join_leader(self, response): @@ -1469,9 +1380,11 @@ async def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - group_assignment = await self._coordinator._perform_assignment( - response.leader_id, response.group_protocol, response.members - ) + group_assignment = \ + await self._coordinator._perform_assignment( + response.leader_id, + response.group_protocol, + response.members) except Exception as e: raise Errors.KafkaError(repr(e)) @@ -1500,10 +1413,7 @@ async def _on_join_leader(self, response): log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) + self.group_id, self.coordinator_id, request) return await self._send_sync_group_request(request) async def _send_sync_group_request(self, request): @@ -1536,19 +1446,19 @@ async def _send_sync_group_request(self, request): # Error case self._coordinator.request_rejoin() if error_type is Errors.RebalanceInProgressError: - log.debug( - "SyncGroup for group %s failed due to group" " rebalance", self.group_id - ) - elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): + log.debug("SyncGroup for group %s failed due to group" + " rebalance", self.group_id) + elif error_type in (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError): err = error_type() - log.debug("SyncGroup for group %s failed due to %s,", self.group_id, err) + log.debug("SyncGroup for group %s failed due to %s,", + self.group_id, err) self._coordinator.reset_generation() - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): + elif error_type in (Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError): err = error_type() - log.debug("SyncGroup for group %s failed due to %s", self.group_id, err) + log.debug("SyncGroup for group %s failed due to %s", + self.group_id, err) self._coordinator.coordinator_dead() elif error_type is Errors.GroupAuthorizationFailedError: raise error_type(self.group_id) diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 5605f25a..4fc2f129 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -72,10 +72,9 @@ __all__ = [ # aiokafka custom errors - "ConsumerStoppedError", - "NoOffsetForPartitionError", - "RecordTooLargeError", + "ConsumerStoppedError", "NoOffsetForPartitionError", "RecordTooLargeError", "ProducerClosed", + # Kafka Python errors "KafkaError", "IllegalStateError", @@ -92,6 +91,7 @@ "AuthenticationMethodNotSupported", "AuthenticationFailedError", "BrokerResponseError", + # Numbered errors "NoError", # 0 "UnknownError", # -1 @@ -139,6 +139,7 @@ "InvalidRequestError", # 42 "UnsupportedForMessageFormatError", # 43 "PolicyViolationError", # 44 + "KafkaUnavailableError", "KafkaTimeoutError", "KafkaConnectionError", @@ -198,229 +199,233 @@ class ProducerFenced(KafkaError): def __init__( self, msg="There is a newer producer using the same transactional_id or" - "transaction timeout occurred (check that processing time is " - "below transaction_timeout_ms)", + "transaction timeout occurred (check that processing time is " + "below transaction_timeout_ms)" ): super().__init__(msg) class OutOfOrderSequenceNumber(BrokerResponseError): errno = 45 - message = "OUT_OF_ORDER_SEQUENCE_NUMBER" - description = "The broker received an out of order sequence number" + message = 'OUT_OF_ORDER_SEQUENCE_NUMBER' + description = 'The broker received an out of order sequence number' class DuplicateSequenceNumber(BrokerResponseError): errno = 46 - message = "DUPLICATE_SEQUENCE_NUMBER" - description = "The broker received a duplicate sequence number" + message = 'DUPLICATE_SEQUENCE_NUMBER' + description = 'The broker received a duplicate sequence number' class InvalidProducerEpoch(BrokerResponseError): errno = 47 - message = "INVALID_PRODUCER_EPOCH" + message = 'INVALID_PRODUCER_EPOCH' description = ( - "Producer attempted an operation with an old epoch. Either " - "there is a newer producer with the same transactionalId, or the " - "producer's transaction has been expired by the broker." + 'Producer attempted an operation with an old epoch. Either ' + 'there is a newer producer with the same transactionalId, or the ' + 'producer\'s transaction has been expired by the broker.' ) class InvalidTxnState(BrokerResponseError): errno = 48 - message = "INVALID_TXN_STATE" - description = "The producer attempted a transactional operation in an invalid state" + message = 'INVALID_TXN_STATE' + description = ( + 'The producer attempted a transactional operation in an invalid state' + ) class InvalidProducerIdMapping(BrokerResponseError): errno = 49 - message = "INVALID_PRODUCER_ID_MAPPING" + message = 'INVALID_PRODUCER_ID_MAPPING' description = ( - "The producer attempted to use a producer id which is not currently " - "assigned to its transactional id" + 'The producer attempted to use a producer id which is not currently ' + 'assigned to its transactional id' ) class InvalidTransactionTimeout(BrokerResponseError): errno = 50 - message = "INVALID_TRANSACTION_TIMEOUT" + message = 'INVALID_TRANSACTION_TIMEOUT' description = ( - "The transaction timeout is larger than the maximum value allowed by" - " the broker (as configured by transaction.max.timeout.ms)." + 'The transaction timeout is larger than the maximum value allowed by' + ' the broker (as configured by transaction.max.timeout.ms).' ) class ConcurrentTransactions(BrokerResponseError): errno = 51 - message = "CONCURRENT_TRANSACTIONS" + message = 'CONCURRENT_TRANSACTIONS' description = ( - "The producer attempted to update a transaction while another " - "concurrent operation on the same transaction was ongoing" + 'The producer attempted to update a transaction while another ' + 'concurrent operation on the same transaction was ongoing' ) class TransactionCoordinatorFenced(BrokerResponseError): errno = 52 - message = "TRANSACTION_COORDINATOR_FENCED" + message = 'TRANSACTION_COORDINATOR_FENCED' description = ( - "Indicates that the transaction coordinator sending a WriteTxnMarker" - " is no longer the current coordinator for a given producer" + 'Indicates that the transaction coordinator sending a WriteTxnMarker' + ' is no longer the current coordinator for a given producer' ) class TransactionalIdAuthorizationFailed(BrokerResponseError): errno = 53 - message = "TRANSACTIONAL_ID_AUTHORIZATION_FAILED" - description = "Transactional Id authorization failed" + message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED' + description = 'Transactional Id authorization failed' class SecurityDisabled(BrokerResponseError): errno = 54 - message = "SECURITY_DISABLED" - description = "Security features are disabled" + message = 'SECURITY_DISABLED' + description = 'Security features are disabled' class OperationNotAttempted(BrokerResponseError): errno = 55 - message = "OPERATION_NOT_ATTEMPTED" + message = 'OPERATION_NOT_ATTEMPTED' description = ( - "The broker did not attempt to execute this operation. This may happen" - " for batched RPCs where some operations in the batch failed, causing " - "the broker to respond without trying the rest." + 'The broker did not attempt to execute this operation. This may happen' + ' for batched RPCs where some operations in the batch failed, causing ' + 'the broker to respond without trying the rest.' ) class KafkaStorageError(BrokerResponseError): errno = 56 - message = "KAFKA_STORAGE_ERROR" - description = "The user-specified log directory is not found in the broker config." + message = 'KAFKA_STORAGE_ERROR' + description = ( + 'The user-specified log directory is not found in the broker config.' + ) class LogDirNotFound(BrokerResponseError): errno = 57 message = "LOG_DIR_NOT_FOUND" - description = "The user-specified log directory is not found in the broker config." + description = ( + "The user-specified log directory is not found in the broker config." + ) class SaslAuthenticationFailed(BrokerResponseError): errno = 58 - message = "SASL_AUTHENTICATION_FAILED" - description = "SASL Authentication failed." + message = 'SASL_AUTHENTICATION_FAILED' + description = 'SASL Authentication failed.' class UnknownProducerId(BrokerResponseError): errno = 59 - message = "UNKNOWN_PRODUCER_ID" + message = 'UNKNOWN_PRODUCER_ID' description = ( - "This exception is raised by the broker if it could not locate the " - "producer metadata associated with the producerId in question. This " - "could happen if, for instance, the producer's records were deleted " - "because their retention time had elapsed. Once the last records of " - "the producerId are removed, the producer's metadata is removed from" - " the broker, and future appends by the producer will return this " - "exception." + 'This exception is raised by the broker if it could not locate the ' + 'producer metadata associated with the producerId in question. This ' + 'could happen if, for instance, the producer\'s records were deleted ' + 'because their retention time had elapsed. Once the last records of ' + 'the producerId are removed, the producer\'s metadata is removed from' + ' the broker, and future appends by the producer will return this ' + 'exception.' ) class ReassignmentInProgress(BrokerResponseError): errno = 60 - message = "REASSIGNMENT_IN_PROGRESS" - description = "A partition reassignment is in progress" + message = 'REASSIGNMENT_IN_PROGRESS' + description = 'A partition reassignment is in progress' class DelegationTokenAuthDisabled(BrokerResponseError): errno = 61 - message = "DELEGATION_TOKEN_AUTH_DISABLED" - description = "Delegation Token feature is not enabled" + message = 'DELEGATION_TOKEN_AUTH_DISABLED' + description = 'Delegation Token feature is not enabled' class DelegationTokenNotFound(BrokerResponseError): errno = 62 - message = "DELEGATION_TOKEN_NOT_FOUND" - description = "Delegation Token is not found on server." + message = 'DELEGATION_TOKEN_NOT_FOUND' + description = 'Delegation Token is not found on server.' class DelegationTokenOwnerMismatch(BrokerResponseError): errno = 63 - message = "DELEGATION_TOKEN_OWNER_MISMATCH" - description = "Specified Principal is not valid Owner/Renewer." + message = 'DELEGATION_TOKEN_OWNER_MISMATCH' + description = 'Specified Principal is not valid Owner/Renewer.' class DelegationTokenRequestNotAllowed(BrokerResponseError): errno = 64 - message = "DELEGATION_TOKEN_REQUEST_NOT_ALLOWED" + message = 'DELEGATION_TOKEN_REQUEST_NOT_ALLOWED' description = ( - "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL " - "channels and on delegation token authenticated channels." + 'Delegation Token requests are not allowed on PLAINTEXT/1-way SSL ' + 'channels and on delegation token authenticated channels.' ) class DelegationTokenAuthorizationFailed(BrokerResponseError): errno = 65 - message = "DELEGATION_TOKEN_AUTHORIZATION_FAILED" - description = "Delegation Token authorization failed." + message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED' + description = 'Delegation Token authorization failed.' class DelegationTokenExpired(BrokerResponseError): errno = 66 - message = "DELEGATION_TOKEN_EXPIRED" - description = "Delegation Token is expired." + message = 'DELEGATION_TOKEN_EXPIRED' + description = 'Delegation Token is expired.' class InvalidPrincipalType(BrokerResponseError): errno = 67 - message = "INVALID_PRINCIPAL_TYPE" - description = "Supplied principalType is not supported" + message = 'INVALID_PRINCIPAL_TYPE' + description = 'Supplied principalType is not supported' class NonEmptyGroup(BrokerResponseError): errno = 68 - message = "NON_EMPTY_GROUP" - description = "The group is not empty" + message = 'NON_EMPTY_GROUP' + description = 'The group is not empty' class GroupIdNotFound(BrokerResponseError): errno = 69 - message = "GROUP_ID_NOT_FOUND" - description = "The group id does not exist" + message = 'GROUP_ID_NOT_FOUND' + description = 'The group id does not exist' class FetchSessionIdNotFound(BrokerResponseError): errno = 70 - message = "FETCH_SESSION_ID_NOT_FOUND" - description = "The fetch session ID was not found" + message = 'FETCH_SESSION_ID_NOT_FOUND' + description = 'The fetch session ID was not found' class InvalidFetchSessionEpoch(BrokerResponseError): errno = 71 - message = "INVALID_FETCH_SESSION_EPOCH" - description = "The fetch session epoch is invalid" + message = 'INVALID_FETCH_SESSION_EPOCH' + description = 'The fetch session epoch is invalid' class ListenerNotFound(BrokerResponseError): errno = 72 - message = "LISTENER_NOT_FOUND" + message = 'LISTENER_NOT_FOUND' description = ( - "There is no listener on the leader broker that matches the" - " listener on which metadata request was processed" + 'There is no listener on the leader broker that matches the' + ' listener on which metadata request was processed' ) class MemberIdRequired(BrokerResponseError): errno = 79 - message = "MEMBER_ID_REQUIRED" + message = 'MEMBER_ID_REQUIRED' description = ( - "Consumer needs to have a valid member " "id before actually entering group" + 'Consumer needs to have a valid member ' + 'id before actually entering group' ) def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): - if ( - inspect.isclass(obj) - and issubclass(obj, BrokerResponseError) - and obj != BrokerResponseError - ): + if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \ + obj != BrokerResponseError: yield obj From 26c240caebdb98fa59871e0f5825f38d78c5c503 Mon Sep 17 00:00:00 2001 From: g-clef Date: Mon, 7 Dec 2020 17:05:54 -0500 Subject: [PATCH 10/14] last few lints --- aiokafka/consumer/group_coordinator.py | 38 ++++++++++++-------------- aiokafka/errors.py | 6 ++-- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 707e6e7b..e60fe62d 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -27,6 +27,7 @@ class BaseCoordinator(object): + def __init__(self, client, subscription, *, exclude_internal_topics=True): self._client = client @@ -212,7 +213,7 @@ class GroupCoordinator(BaseCoordinator): """ def __init__(self, client, subscription, *, - group_id="aiokafka-default-group", + group_id='aiokafka-default-group', group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, @@ -221,7 +222,7 @@ def __init__(self, client, subscription, *, exclude_internal_topics=True, max_poll_interval_ms=300000, rebalance_timeout_ms=30000 - ): + ): """Initialize the coordination manager. Parameters (see AIOKafkaConsumer) @@ -291,7 +292,7 @@ async def _send_req(self, request): node_id, request, group=ConnectionGroup.COORDINATION) except Errors.KafkaError as err: log.error( - "Error sending %s to node %s [%s] -- marking coordinator dead", + 'Error sending %s to node %s [%s] -- marking coordinator dead', request.__class__.__name__, node_id, err) self.coordinator_dead() raise err @@ -402,7 +403,7 @@ async def _on_join_prepare(self, previous_assignment): self._subscription.listener, self.group_id) async def _perform_assignment( - self, leader_id, assignment_strategy, members + self, leader_id, assignment_strategy, members ): assignor = self._lookup_assignor(assignment_strategy) assert assignor, \ @@ -546,7 +547,7 @@ async def ensure_coordinator_known(self): coordinator_id = ( await self._client.coordinator_lookup( CoordinationType.GROUP, self.group_id) - ) + ) except Errors.GroupAuthorizationFailedError: err = Errors.GroupAuthorizationFailedError(self.group_id) raise err @@ -810,8 +811,8 @@ async def _do_heartbeat(self): self.coordinator_dead() elif error_type is Errors.RebalanceInProgressError: log.warning( - "Heartbeat failed for group %s because it is rebalancing", self.group_id - ) + "Heartbeat failed for group %s because it is rebalancing", + self.group_id) self.request_rejoin() # it is valid to continue heartbeating while the group is # rebalancing. This ensures that the coordinator keeps the @@ -951,8 +952,8 @@ def _is_commit_retriable(self, error): return error.retriable or isinstance(error, ( Errors.UnknownMemberIdError, Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - )) + Errors.RebalanceInProgressError + )) async def _maybe_do_last_autocommit(self, assignment): if not self._enable_auto_commit: @@ -976,7 +977,7 @@ async def commit_offsets(self, assignment, offsets): self._do_commit_offsets(assignment, offsets)) except (Errors.UnknownMemberIdError, Errors.IllegalGenerationError, - Errors.RebalanceInProgressError,): + Errors.RebalanceInProgressError): raise Errors.CommitFailedError( "Commit cannot be completed since the group has already " "rebalanced and may have assigned the partitions " @@ -1012,9 +1013,8 @@ async def _do_commit_offsets(self, assignment, offsets): [(topic, tp_offsets) for topic, tp_offsets in offset_data.items()] ) - log.debug( - "Sending offset-commit request with %s for group %s to %s", - offsets, self.group_id, self.coordinator_id) + log.debug("Sending offset-commit request with %s for group %s to %s", + offsets, self.group_id, self.coordinator_id) response = await self._send_req(request) @@ -1058,11 +1058,9 @@ async def _do_commit_offsets(self, assignment, offsets): " and retry", self.group_id, error_type.__name__) self.coordinator_dead() errored[tp] = error_type() - elif error_type in ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): + elif error_type in (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): # need to re-join group error = error_type(self.group_id) log.error( @@ -1366,7 +1364,7 @@ async def _on_join_follower(self): log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) - return await self._send_sync_group_request(request) + return (await self._send_sync_group_request(request)) async def _on_join_leader(self, response): """ @@ -1414,7 +1412,7 @@ async def _on_join_leader(self, response): log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) - return await self._send_sync_group_request(request) + return (await self._send_sync_group_request(request)) async def _send_sync_group_request(self, request): # We need to reset the rejoin future right after the assignment to diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 4fc2f129..4df88e60 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -17,6 +17,7 @@ AuthenticationMethodNotSupported, AuthenticationFailedError, BrokerResponseError, + # Numbered errors NoError, # 0 UnknownError, # -1 @@ -64,6 +65,7 @@ InvalidRequestError, # 42 UnsupportedForMessageFormatError, # 43 PolicyViolationError, # 44 + KafkaUnavailableError, KafkaTimeoutError, KafkaConnectionError, @@ -303,9 +305,9 @@ class KafkaStorageError(BrokerResponseError): class LogDirNotFound(BrokerResponseError): errno = 57 - message = "LOG_DIR_NOT_FOUND" + message = 'LOG_DIR_NOT_FOUND' description = ( - "The user-specified log directory is not found in the broker config." + 'The user-specified log directory is not found in the broker config.' ) From a3ea5cbaa4bdacc86a19122a021f78c9f4c8bacb Mon Sep 17 00:00:00 2001 From: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Date: Tue, 8 Dec 2020 09:42:02 -0500 Subject: [PATCH 11/14] Update assignors.py suppressing lgtm warning --- aiokafka/consumer/assignors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index ce2cbd1c..bebb379b 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -8,7 +8,7 @@ class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): """ @abc.abstractmethod - def assign(self, cluster, members, member_group_instance_ids): + def assign(self, cluster, members, member_group_instance_ids): # lgtm[py/inheritance/signature-mismatch] """Perform group assignment given cluster metadata, member subscriptions and group_instance_ids Arguments: From 29541429a34f59201f6e139165289369893365a9 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 8 Dec 2020 09:51:41 -0500 Subject: [PATCH 12/14] fix linting --- aiokafka/consumer/assignors.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index bebb379b..a59e7bf4 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -8,7 +8,8 @@ class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): """ @abc.abstractmethod - def assign(self, cluster, members, member_group_instance_ids): # lgtm[py/inheritance/signature-mismatch] + def assign(self, cluster, members, + member_group_instance_ids): # lgtm[py/inheritance/signature-mismatch] """Perform group assignment given cluster metadata, member subscriptions and group_instance_ids Arguments: From a2c45aa2cef568fb121c6cc51c905ceb59cbdd12 Mon Sep 17 00:00:00 2001 From: g-clef Date: Wed, 9 Dec 2020 12:23:57 -0500 Subject: [PATCH 13/14] fix lgtm exception --- aiokafka/consumer/assignors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py index a59e7bf4..7788575e 100644 --- a/aiokafka/consumer/assignors.py +++ b/aiokafka/consumer/assignors.py @@ -8,8 +8,8 @@ class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): """ @abc.abstractmethod - def assign(self, cluster, members, - member_group_instance_ids): # lgtm[py/inheritance/signature-mismatch] + def assign(self, cluster, members, # lgtm[py/inheritance/signature-mismatch] + member_group_instance_ids): """Perform group assignment given cluster metadata, member subscriptions and group_instance_ids Arguments: From 83e882a847e7d7089cb757065a89107d5d843e5d Mon Sep 17 00:00:00 2001 From: g-clef Date: Fri, 11 Dec 2020 09:09:49 -0500 Subject: [PATCH 14/14] fix trailing space --- aiokafka/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiokafka/errors.py b/aiokafka/errors.py index 4df88e60..d067958e 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -419,7 +419,7 @@ class MemberIdRequired(BrokerResponseError): errno = 79 message = 'MEMBER_ID_REQUIRED' description = ( - 'Consumer needs to have a valid member ' + 'Consumer needs to have a valid member ' 'id before actually entering group' )