diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 8b58fb4a..aba21b9b 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -213,7 +213,7 @@ class AIOKafkaConsumer: sasl_plain_password (str): password for SASL ``PLAIN`` authentication. Default: None sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider): - OAuthBearer token provider instance. (See :mod:`aiokafka.oauth`). + OAuthBearer token provider instance. Default: None Note: diff --git a/aiokafka/coordinator/base.py b/aiokafka/coordinator/base.py deleted file mode 100644 index f1de92de..00000000 --- a/aiokafka/coordinator/base.py +++ /dev/null @@ -1,1189 +0,0 @@ -import abc -import copy -import logging -import threading -import time -import weakref -from concurrent.futures import Future - -from aiokafka import errors as Errors -from aiokafka.metrics import AnonMeasurable -from aiokafka.metrics.stats import Avg, Count, Max, Rate -from aiokafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from aiokafka.protocol.group import ( - HeartbeatRequest, - JoinGroupRequest, - LeaveGroupRequest, - SyncGroupRequest, -) - -from .heartbeat import Heartbeat - -log = logging.getLogger("aiokafka.coordinator") - - -class MemberState(object): - UNJOINED = "" # the client is not part of a group - REBALANCING = "" # the client has begun rebalancing - STABLE = "" # the client has joined and is sending heartbeats - - -class Generation(object): - def __init__(self, generation_id, member_id, protocol): - self.generation_id = generation_id - self.member_id = member_id - self.protocol = protocol - - -Generation.NO_GENERATION = Generation( - OffsetCommitRequest[2].DEFAULT_GENERATION_ID, - JoinGroupRequest[0].UNKNOWN_MEMBER_ID, - None, -) - - -class UnjoinedGroupException(Errors.KafkaError): - retriable = True - - -class BaseCoordinator(object): - """ - BaseCoordinator implements group management for a single group member - by interacting with a designated Kafka broker (the coordinator). Group - semantics are provided by extending this class. See ConsumerCoordinator - for example usage. - - From a high level, Kafka's group management protocol consists of the - following sequence of actions: - - 1. Group Registration: Group members register with the coordinator providing - their own metadata (such as the set of topics they are interested in). - - 2. Group/Leader Selection: The coordinator select the members of the group - and chooses one member as the leader. - - 3. State Assignment: The leader collects the metadata from all the members - of the group and assigns state. - - 4. Group Stabilization: Each member receives the state assigned by the - leader and begins processing. - - To leverage this protocol, an implementation must define the format of - metadata provided by each member for group registration in - :meth:`.group_protocols` and the format of the state assignment provided by - the leader in :meth:`._perform_assignment` and which becomes available to - members in :meth:`._on_join_complete`. - - Note on locking: this class shares state between the caller and a background - thread which is used for sending heartbeats after the client has joined the - group. All mutable state as well as state transitions are protected with the - class's monitor. Generally this means acquiring the lock before reading or - writing the state of the group (e.g. generation, member_id) and holding the - lock when sending a request that affects the state of the group - (e.g. JoinGroup, LeaveGroup). - """ - - DEFAULT_CONFIG = { - "group_id": "kafka-python-default-group", - "session_timeout_ms": 10000, - "heartbeat_interval_ms": 3000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - "api_version": (0, 10, 1), - "metric_group_prefix": "", - } - - def __init__(self, client, metrics, **configs): - """ - Keyword Arguments: - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - """ - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - if self.config["api_version"] < (0, 10, 1): - if self.config["max_poll_interval_ms"] != self.config["session_timeout_ms"]: - raise Errors.KafkaConfigurationError( - "Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms" - ) - - self._client = client - self.group_id = self.config["group_id"] - self.heartbeat = Heartbeat(**self.config) - self._heartbeat_thread = None - self._lock = threading.Condition() - self.rejoin_needed = True - self.rejoining = False # renamed / complement of java needsJoinPrepare - self.state = MemberState.UNJOINED - self.join_future = None - self.coordinator_id = None - self._find_coordinator_future = None - self._generation = Generation.NO_GENERATION - self.sensors = GroupCoordinatorMetrics( - self.heartbeat, metrics, self.config["metric_group_prefix"] - ) - - @abc.abstractmethod - def protocol_type(self): - """ - Unique identifier for the class of supported protocols - (e.g. "consumer" or "connect"). - - Returns: - str: protocol type name - """ - pass - - @abc.abstractmethod - def group_protocols(self): - """Return the list of supported group protocols and metadata. - - This list is submitted by each group member via a JoinGroupRequest. - The order of the protocols in the list indicates the preference of the - protocol (the first entry is the most preferred). The coordinator takes - this preference into account when selecting the generation protocol - (generally more preferred protocols will be selected as long as all - members support them and there is no disagreement on the preference). - - Note: metadata must be type bytes or support an encode() method - - Returns: - list: [(protocol, metadata), ...] - """ - pass - - @abc.abstractmethod - def _on_join_prepare(self, generation, member_id): - """Invoked prior to each group join or rejoin. - - This is typically used to perform any cleanup from the previous - generation (such as committing offsets for the consumer) - - Arguments: - generation (int): The previous generation or -1 if there was none - member_id (str): The identifier of this member in the previous group - or '' if there was none - """ - pass - - @abc.abstractmethod - def _perform_assignment(self, leader_id, protocol, members): - """Perform assignment for the group. - - This is used by the leader to push state to all the members of the group - (e.g. to push partition assignments in the case of the new consumer) - - Arguments: - leader_id (str): The id of the leader (which is this member) - protocol (str): the chosen group protocol (assignment strategy) - members (list): [(member_id, metadata_bytes)] from - JoinGroupResponse. metadata_bytes are associated with the chosen - group protocol, and the Coordinator subclass is responsible for - decoding metadata_bytes based on that protocol. - - Returns: - dict: {member_id: assignment}; assignment must either be bytes - or have an encode() method to convert to bytes - """ - pass - - @abc.abstractmethod - def _on_join_complete( - self, generation, member_id, protocol, member_assignment_bytes - ): - """Invoked when a group member has successfully joined a group. - - Arguments: - generation (int): the generation that was joined - member_id (str): the identifier for the local member in the group - protocol (str): the protocol selected by the coordinator - member_assignment_bytes (bytes): the protocol-encoded assignment - propagated from the group leader. The Coordinator instance is - responsible for decoding based on the chosen protocol. - """ - pass - - def coordinator_unknown(self): - """Check if we know who the coordinator is and have an active connection - - Side-effect: reset coordinator_id to None if connection failed - - Returns: - bool: True if the coordinator is unknown - """ - return self.coordinator() is None - - def coordinator(self): - """Get the current coordinator - - Returns: the current coordinator id or None if it is unknown - """ - if self.coordinator_id is None: - return None - elif self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead("Node Disconnected") - return None - else: - return self.coordinator_id - - def ensure_coordinator_ready(self): - """Block until the coordinator for this group is known - (and we have an active connection -- java client uses unsent queue). - """ - with self._client._lock, self._lock: - while self.coordinator_unknown(): - - # Prior to 0.8.2 there was no group coordinator - # so we will just pick a node at random and treat - # it as the "coordinator" - if self.config["api_version"] < (0, 8, 2): - self.coordinator_id = self._client.least_loaded_node() - if self.coordinator_id is not None: - self._client.maybe_connect(self.coordinator_id) - continue - - future = self.lookup_coordinator() - self._client.poll(future=future) - - if future.failed(): - if future.retriable(): - if getattr(future.exception, "invalid_metadata", False): - log.debug( - "Requesting metadata for group coordinator request: %s", - future.exception, - ) - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - else: - time.sleep(self.config["retry_backoff_ms"] / 1000) - else: - raise future.exception # pylint: disable-msg=raising-bad-type - - def _reset_find_coordinator_future(self, result): - self._find_coordinator_future = None - - def lookup_coordinator(self): - with self._lock: - if self._find_coordinator_future is not None: - return self._find_coordinator_future - - # If there is an error sending the group coordinator request - # then _reset_find_coordinator_future will immediately fire and - # set _find_coordinator_future = None - # To avoid returning None, we capture the future in a local variable - future = self._send_group_coordinator_request() - self._find_coordinator_future = future - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return future - - def need_rejoin(self): - """Check whether the group should be rejoined (e.g. if metadata changes) - - Returns: - bool: True if it should, False otherwise - """ - return self.rejoin_needed - - def poll_heartbeat(self): - """ - Check the status of the heartbeat thread (if it is active) and indicate - the liveness of the client. This must be called periodically after - joining with :meth:`.ensure_active_group` to ensure that the member stays - in the group. If an interval of time longer than the provided rebalance - timeout (max_poll_interval_ms) expires without calling this method, then - the client will proactively leave the group. - - Raises: RuntimeError for unexpected errors raised from the heartbeat thread - """ - with self._lock: - if self._heartbeat_thread is not None: - if self._heartbeat_thread.failed: - # set the heartbeat thread to None and raise an exception. - # If the user catches it, the next call to ensure_active_group() - # will spawn a new heartbeat thread. - cause = self._heartbeat_thread.failed - self._heartbeat_thread = None - raise cause # pylint: disable-msg=raising-bad-type - - # Awake the heartbeat thread if needed - if self.heartbeat.should_heartbeat(): - self._lock.notify() - self.heartbeat.poll() - - def time_to_next_heartbeat(self): - """Returns seconds (float) remaining before next heartbeat should be sent - - Note: Returns infinite if group is not joined - """ - with self._lock: - # if we have not joined the group, we don't need to send heartbeats - if self.state is MemberState.UNJOINED: - return float("inf") - return self.heartbeat.time_to_next_heartbeat() - - def _handle_join_success(self, member_assignment_bytes): - with self._lock: - log.info( - "Successfully joined group %s with generation %s", - self.group_id, - self._generation.generation_id, - ) - self.state = MemberState.STABLE - self.rejoin_needed = False - if self._heartbeat_thread: - self._heartbeat_thread.enable() - - def _handle_join_failure(self, _): - with self._lock: - self.state = MemberState.UNJOINED - - def ensure_active_group(self): - """Ensure that the group is active (i.e. joined and synced)""" - with self._client._lock, self._lock: - if self._heartbeat_thread is None: - self._start_heartbeat_thread() - - while self.need_rejoin() or self._rejoin_incomplete(): - self.ensure_coordinator_ready() - - # call on_join_prepare if needed. We set a flag - # to make sure that we do not call it a second - # time if the client is woken up before a pending - # rebalance completes. This must be called on each - # iteration of the loop because an event requiring - # a rebalance (such as a metadata refresh which - # changes the matched subscription set) can occur - # while another rebalance is still in progress. - if not self.rejoining: - self._on_join_prepare( - self._generation.generation_id, self._generation.member_id - ) - self.rejoining = True - - # ensure that there are no pending requests to the coordinator. - # This is important in particular to avoid resending a pending - # JoinGroup request. - while not self.coordinator_unknown(): - if not self._client.in_flight_request_count(self.coordinator_id): - break - self._client.poll() - else: - continue - - # we store the join future in case we are woken up by the user - # after beginning the rebalance in the call to poll below. - # This ensures that we do not mistakenly attempt to rejoin - # before the pending rebalance has completed. - if self.join_future is None: - # Fence off the heartbeat thread explicitly so that it cannot - # interfere with the join group. Note that this must come after - # the call to _on_join_prepare since we must be able to continue - # sending heartbeats if that callback takes some time. - self._heartbeat_thread.disable() - - self.state = MemberState.REBALANCING - future = self._send_join_group_request() - - self.join_future = ( - future # this should happen before adding callbacks - ) - - # handle join completion in the callback so that the - # callback will be invoked even if the consumer is woken up - # before finishing the rebalance - future.add_callback(self._handle_join_success) - - # we handle failures below after the request finishes. - # If the join completes after having been woken up, the - # exception is ignored and we will rejoin - future.add_errback(self._handle_join_failure) - - else: - future = self.join_future - - self._client.poll(future=future) - - if future.succeeded(): - self._on_join_complete( - self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - future.value, - ) - self.join_future = None - self.rejoining = False - - else: - self.join_future = None - exception = future.exception - if isinstance( - exception, - ( - Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError, - ), - ): - continue - elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def _rejoin_incomplete(self): - return self.join_future is not None - - def _send_join_group_request(self): - """Join the group and return the assignment for the next generation. - - This function handles both JoinGroup and SyncGroup, delegating to - :meth:`._perform_assignment` if elected leader by the coordinator. - - Returns: - Future: resolves to the encoded-bytes assignment returned from the - group leader - """ - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - elif not self._client.ready(self.coordinator_id, metadata_priority=False): - e = Errors.NodeNotReadyError(self.coordinator_id) - return Future().failure(e) - - # send a join group request to the coordinator - log.info("(Re-)joining group %s", self.group_id) - member_metadata = [ - (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) - for protocol, metadata in self.group_protocols() - ] - if self.config["api_version"] < (0, 9): - raise Errors.KafkaError("JoinGroupRequest api requires 0.9+ brokers") - elif (0, 9) <= self.config["api_version"] < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self.config["session_timeout_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - elif (0, 10, 1) <= self.config["api_version"] < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self.config["session_timeout_ms"], - self.config["max_poll_interval_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - else: - request = JoinGroupRequest[2]( - self.group_id, - self.config["session_timeout_ms"], - self.config["max_poll_interval_ms"], - self._generation.member_id, - self.protocol_type(), - member_metadata, - ) - - # create the request for the coordinator - log.debug( - "Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id - ) - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _failed_request(self, node_id, request, future, error): - # Marking coordinator dead - # unless the error is caused by internal client pipelining - if not isinstance( - error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests) - ): - log.error( - "Error sending %s to node %s [%s]", - request.__class__.__name__, - node_id, - error, - ) - self.coordinator_dead(error) - else: - log.debug( - "Error sending %s to node %s [%s]", - request.__class__.__name__, - node_id, - error, - ) - future.failure(error) - - def _handle_join_group_response(self, future, send_time, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "Received successful JoinGroup response for group %s: %s", - self.group_id, - response, - ) - self.sensors.join_latency.record((time.time() - send_time) * 1000) - with self._lock: - if self.state is not MemberState.REBALANCING: - # if the consumer was woken up before a rebalance completes, - # we may have already left the group. In this case, we do - # not want to continue with the sync group. - future.failure(UnjoinedGroupException()) - else: - self._generation = Generation( - response.generation_id, - response.member_id, - response.group_protocol, - ) - - if response.leader_id == response.member_id: - log.info( - "Elected group leader -- performing partition" - " assignments using %s", - self._generation.protocol, - ) - self._on_join_leader(response).chain(future) - else: - self._on_join_follower().chain(future) - - elif error_type is Errors.GroupLoadInProgressError: - log.debug( - "Attempt to join group %s rejected since coordinator %s" - " is loading the group.", - self.group_id, - self.coordinator_id, - ) - # backoff and retry - future.failure(error_type(response)) - elif error_type is Errors.UnknownMemberIdError: - # reset the member id and retry immediately - error = error_type(self._generation.member_id) - self.reset_generation() - log.debug( - "Attempt to join group %s failed due to unknown member id", - self.group_id, - ) - future.failure(error) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - # re-discover the coordinator and retry with backoff - self.coordinator_dead(error_type()) - log.debug( - "Attempt to join group %s failed due to obsolete " - "coordinator information: %s", - self.group_id, - error_type.__name__, - ) - future.failure(error_type()) - elif error_type in ( - Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError, - ): - # log the error and re-throw the exception - error = error_type(response) - log.error( - "Attempt to join group %s failed due to fatal error: %s", - self.group_id, - error, - ) - future.failure(error) - elif error_type is Errors.GroupAuthorizationFailedError: - future.failure(error_type(self.group_id)) - else: - # unexpected error, throw the exception - error = error_type() - log.error("Unexpected error in join group response: %s", error) - future.failure(error) - - def _on_join_follower(self): - # send follower's sync group with an empty assignment - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - {}, - ) - log.debug( - "Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) - return self._send_sync_group_request(request) - - def _on_join_leader(self, response): - """ - Perform leader synchronization and send back the assignment - for the group via SyncGroupRequest - - Arguments: - response (JoinResponse): broker response to parse - - Returns: - Future: resolves to member assignment encoded-bytes - """ - try: - group_assignment = self._perform_assignment( - response.leader_id, response.group_protocol, response.members - ) - except Exception as e: - return Future().failure(e) - - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - [ - ( - member_id, - assignment - if isinstance(assignment, bytes) - else assignment.encode(), - ) - for member_id, assignment in group_assignment.items() - ], - ) - - log.debug( - "Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, - self.coordinator_id, - request, - ) - return self._send_sync_group_request(request) - - def _send_sync_group_request(self, request): - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - # We assume that coordinator is ready if we're sending SyncGroup - # as it typically follows a successful JoinGroup - # Also note that if client.ready() enforces a metadata priority policy, - # we can get into an infinite loop if the leader assignment process - # itself requests a metadata update - - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _handle_sync_group_response(self, future, send_time, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - self.sensors.sync_latency.record((time.time() - send_time) * 1000) - future.success(response.member_assignment) - return - - # Always rejoin on error - self.request_rejoin() - if error_type is Errors.GroupAuthorizationFailedError: - future.failure(error_type(self.group_id)) - elif error_type is Errors.RebalanceInProgressError: - log.debug( - "SyncGroup for group %s failed due to coordinator" " rebalance", - self.group_id, - ) - future.failure(error_type(self.group_id)) - elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): - error = error_type() - log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.reset_generation() - future.failure(error) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - error = error_type() - log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.coordinator_dead(error) - future.failure(error) - else: - error = error_type() - log.error("Unexpected error from SyncGroup: %s", error) - future.failure(error) - - def _send_group_coordinator_request(self): - """Discover the current coordinator for the group. - - Returns: - Future: resolves to the node id of the coordinator - """ - node_id = self._client.least_loaded_node() - if node_id is None: - return Future().failure(Errors.NoBrokersAvailable()) - - elif not self._client.ready(node_id, metadata_priority=False): - e = Errors.NodeNotReadyError(node_id) - return Future().failure(e) - - log.debug( - "Sending group coordinator request for group %s to broker %s", - self.group_id, - node_id, - ) - request = GroupCoordinatorRequest[0](self.group_id) - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback(self._handle_group_coordinator_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_group_coordinator_response(self, future, response): - log.debug("Received group coordinator response %s", response) - - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - with self._lock: - coordinator_id = self._client.cluster.add_group_coordinator( - self.group_id, response - ) - if not coordinator_id: - # This could happen if coordinator metadata is different - # than broker metadata - future.failure(Errors.IllegalStateError()) - return - - self.coordinator_id = coordinator_id - log.info( - "Discovered coordinator %s for group %s", - self.coordinator_id, - self.group_id, - ) - self._client.maybe_connect(self.coordinator_id) - self.heartbeat.reset_timeouts() - future.success(self.coordinator_id) - - elif error_type is Errors.GroupCoordinatorNotAvailableError: - log.debug("Group Coordinator Not Available; retry") - future.failure(error_type()) - elif error_type is Errors.GroupAuthorizationFailedError: - error = error_type(self.group_id) - log.error("Group Coordinator Request failed: %s", error) - future.failure(error) - else: - error = error_type() - log.error( - "Group coordinator lookup for group %s failed: %s", self.group_id, error - ) - future.failure(error) - - def coordinator_dead(self, error): - """Mark the current coordinator as dead.""" - if self.coordinator_id is not None: - log.warning( - "Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, - self.group_id, - error, - ) - self.coordinator_id = None - - def generation(self): - """Get the current generation state if the group is stable. - - Returns: the current generation or None if the group is unjoined/rebalancing - """ - with self._lock: - if self.state is not MemberState.STABLE: - return None - return self._generation - - def reset_generation(self): - """Reset the generation and memberId because we have fallen out of the group.""" - with self._lock: - self._generation = Generation.NO_GENERATION - self.rejoin_needed = True - self.state = MemberState.UNJOINED - - def request_rejoin(self): - self.rejoin_needed = True - - def _start_heartbeat_thread(self): - if self._heartbeat_thread is None: - log.info("Starting new heartbeat thread") - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() - - def _close_heartbeat_thread(self): - if self._heartbeat_thread is not None: - log.info("Stopping heartbeat thread") - try: - self._heartbeat_thread.close() - except ReferenceError: - pass - self._heartbeat_thread = None - - def __del__(self): - self._close_heartbeat_thread() - - def close(self): - """Close the coordinator, leave the current group, - and reset local generation / member_id""" - self._close_heartbeat_thread() - self.maybe_leave_group() - - def maybe_leave_group(self): - """Leave the current group and reset local generation/memberId.""" - with self._client._lock, self._lock: - if ( - not self.coordinator_unknown() - and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION - ): - - # this is a minimal effort attempt to leave the group. we do not - # attempt any resending if the request fails or times out. - log.info("Leaving consumer group (%s).", self.group_id) - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version]( - self.group_id, self._generation.member_id - ) - future = self._client.send(self.coordinator_id, request) - future.add_callback(self._handle_leave_group_response) - future.add_errback(log.error, "LeaveGroup request failed: %s") - self._client.poll(future=future) - - self.reset_generation() - - def _handle_leave_group_response(self, response): - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "LeaveGroup request for group %s returned successfully", self.group_id - ) - else: - log.error( - "LeaveGroup request for group %s failed with error: %s", - self.group_id, - error_type(), - ) - - def _send_heartbeat_request(self): - """Send a heartbeat request""" - if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) - return Future().failure(e) - - elif not self._client.ready(self.coordinator_id, metadata_priority=False): - e = Errors.NodeNotReadyError(self.coordinator_id) - return Future().failure(e) - - version = 0 if self.config["api_version"] < (0, 11, 0) else 1 - request = HeartbeatRequest[version]( - self.group_id, self._generation.generation_id, self._generation.member_id - ) - log.debug( - "Heartbeat: %s[%s] %s", - request.group, - request.generation_id, - request.member_id, - ) # pylint: disable-msg=no-member - future = Future() - _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, request, future) - return future - - def _handle_heartbeat_response(self, future, send_time, response): - self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) - error_type = Errors.for_code(response.error_code) - if error_type is Errors.NoError: - log.debug( - "Received successful heartbeat response for group %s", self.group_id - ) - future.success(None) - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - ): - log.warning( - "Heartbeat failed for group %s: coordinator (node %s)" - " is either not started or not valid", - self.group_id, - self.coordinator(), - ) - self.coordinator_dead(error_type()) - future.failure(error_type()) - elif error_type is Errors.RebalanceInProgressError: - log.warning( - "Heartbeat failed for group %s because it is" " rebalancing", - self.group_id, - ) - self.request_rejoin() - future.failure(error_type()) - elif error_type is Errors.IllegalGenerationError: - log.warning( - "Heartbeat failed for group %s: generation id is not " " current.", - self.group_id, - ) - self.reset_generation() - future.failure(error_type()) - elif error_type is Errors.UnknownMemberIdError: - log.warning( - "Heartbeat: local member_id was not recognized;" - " this consumer needs to re-join" - ) - self.reset_generation() - future.failure(error_type) - elif error_type is Errors.GroupAuthorizationFailedError: - error = error_type(self.group_id) - log.error("Heartbeat failed: authorization error: %s", error) - future.failure(error) - else: - error = error_type() - log.error("Heartbeat failed: Unhandled error: %s", error) - future.failure(error) - - -class GroupCoordinatorMetrics(object): - def __init__(self, heartbeat, metrics, prefix, tags=None): - self.heartbeat = heartbeat - self.metrics = metrics - self.metric_group_name = prefix + "-coordinator-metrics" - - self.heartbeat_latency = metrics.sensor("heartbeat-latency") - self.heartbeat_latency.add( - metrics.metric_name( - "heartbeat-response-time-max", - self.metric_group_name, - "The max time taken to receive a response to a heartbeat request", - tags, - ), - Max(), - ) - self.heartbeat_latency.add( - metrics.metric_name( - "heartbeat-rate", - self.metric_group_name, - "The average number of heartbeats per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - self.join_latency = metrics.sensor("join-latency") - self.join_latency.add( - metrics.metric_name( - "join-time-avg", - self.metric_group_name, - "The average time taken for a group rejoin", - tags, - ), - Avg(), - ) - self.join_latency.add( - metrics.metric_name( - "join-time-max", - self.metric_group_name, - "The max time taken for a group rejoin", - tags, - ), - Max(), - ) - self.join_latency.add( - metrics.metric_name( - "join-rate", - self.metric_group_name, - "The number of group joins per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - self.sync_latency = metrics.sensor("sync-latency") - self.sync_latency.add( - metrics.metric_name( - "sync-time-avg", - self.metric_group_name, - "The average time taken for a group sync", - tags, - ), - Avg(), - ) - self.sync_latency.add( - metrics.metric_name( - "sync-time-max", - self.metric_group_name, - "The max time taken for a group sync", - tags, - ), - Max(), - ) - self.sync_latency.add( - metrics.metric_name( - "sync-rate", - self.metric_group_name, - "The number of group syncs per second", - tags, - ), - Rate(sampled_stat=Count()), - ) - - metrics.add_metric( - metrics.metric_name( - "last-heartbeat-seconds-ago", - self.metric_group_name, - "The number of seconds since the last controller heartbeat was sent", - tags, - ), - AnonMeasurable(lambda _, now: (now / 1000) - self.heartbeat.last_send), - ) - - -class HeartbeatThread(threading.Thread): - def __init__(self, coordinator): - super(HeartbeatThread, self).__init__() - self.name = coordinator.group_id + "-heartbeat" - self.coordinator = coordinator - self.enabled = False - self.closed = False - self.failed = None - - def enable(self): - with self.coordinator._lock: - self.enabled = True - self.coordinator.heartbeat.reset_timeouts() - self.coordinator._lock.notify() - - def disable(self): - self.enabled = False - - def close(self): - self.closed = True - with self.coordinator._lock: - self.coordinator._lock.notify() - if self.is_alive(): - self.join(self.coordinator.config["heartbeat_interval_ms"] / 1000) - if self.is_alive(): - log.warning("Heartbeat thread did not fully terminate during close") - - def run(self): - try: - log.debug("Heartbeat thread started") - while not self.closed: - self._run_once() - - except ReferenceError: - log.debug("Heartbeat thread closed due to coordinator gc") - - except RuntimeError as e: - log.error( - "Heartbeat thread for group %s failed due to unexpected error: %s", - self.coordinator.group_id, - e, - ) - self.failed = e - - finally: - log.debug("Heartbeat thread closed") - - def _run_once(self): - with self.coordinator._client._lock, self.coordinator._lock: - if self.enabled and self.coordinator.state is MemberState.STABLE: - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - # must get client._lock, or maybe deadlock at heartbeat - # failure callback in consumer poll - self.coordinator._client.poll(timeout_ms=0) - - with self.coordinator._lock: - if not self.enabled: - log.debug("Heartbeat disabled. Waiting") - self.coordinator._lock.wait() - log.debug("Heartbeat re-enabled.") - return - - if self.coordinator.state is not MemberState.STABLE: - # the group is not stable (perhaps because we left the - # group or because the coordinator kicked us out), so - # disable heartbeats and wait for the main thread to rejoin. - log.debug("Group state is not stable, disabling heartbeats") - self.disable() - return - - if self.coordinator.coordinator_unknown(): - future = self.coordinator.lookup_coordinator() - if not future.is_done or future.failed(): - # the immediate future check ensures that we backoff - # properly in the case that no brokers are available - # to connect to (and the future is automatically failed). - self.coordinator._lock.wait( - self.coordinator.config["retry_backoff_ms"] / 1000 - ) - - elif self.coordinator.heartbeat.session_timeout_expired(): - # the session timeout has expired without seeing a - # successful heartbeat, so we should probably make sure - # the coordinator is still healthy. - log.warning("Heartbeat session expired, marking coordinator dead") - self.coordinator.coordinator_dead("Heartbeat session expired") - - elif self.coordinator.heartbeat.poll_timeout_expired(): - # the poll timeout has expired, which means that the - # foreground thread has stalled in between calls to - # poll(), so we explicitly leave the group. - log.warning("Heartbeat poll expired, leaving group") - self.coordinator.maybe_leave_group() - - elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - log.log(0, "Not ready to heartbeat, waiting") - self.coordinator._lock.wait( - self.coordinator.config["retry_backoff_ms"] / 1000 - ) - - else: - self.coordinator.heartbeat.sent_heartbeat() - future = self.coordinator._send_heartbeat_request() - future.add_callback(self._handle_heartbeat_success) - future.add_errback(self._handle_heartbeat_failure) - - def _handle_heartbeat_success(self, result): - with self.coordinator._lock: - self.coordinator.heartbeat.received_heartbeat() - - def _handle_heartbeat_failure(self, exception): - with self.coordinator._lock: - if isinstance(exception, Errors.RebalanceInProgressError): - # it is valid to continue heartbeating while the group is - # rebalancing. This ensures that the coordinator keeps the - # member in the group for as long as the duration of the - # rebalance timeout. If we stop sending heartbeats, however, - # then the session timeout may expire before we can rejoin. - self.coordinator.heartbeat.received_heartbeat() - else: - self.coordinator.heartbeat.fail_heartbeat() - # wake up the thread if it's sleeping to reschedule the heartbeat - self.coordinator._lock.notify() diff --git a/aiokafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py deleted file mode 100644 index dade9fcd..00000000 --- a/aiokafka/coordinator/consumer.py +++ /dev/null @@ -1,984 +0,0 @@ -import collections -import copy -import functools -import logging -import time -from concurrent.futures import Future - -import aiokafka.errors as Errors -from aiokafka.metrics import AnonMeasurable -from aiokafka.metrics.stats import Avg, Count, Max, Rate -from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest -from aiokafka.structs import OffsetAndMetadata, TopicPartition -from aiokafka.util import WeakMethod - -from .base import BaseCoordinator, Generation -from .assignors.range import RangePartitionAssignor -from .assignors.roundrobin import RoundRobinPartitionAssignor -from .assignors.sticky.sticky_assignor import StickyPartitionAssignor -from .protocol import ConsumerProtocol - - -log = logging.getLogger(__name__) - - -class ConsumerCoordinator(BaseCoordinator): - """This class manages the coordination process with the consumer coordinator.""" - - DEFAULT_CONFIG = { - "group_id": "kafka-python-default-group", - "enable_auto_commit": True, - "auto_commit_interval_ms": 5000, - "default_offset_commit_callback": None, - "assignors": ( - RangePartitionAssignor, - RoundRobinPartitionAssignor, - StickyPartitionAssignor, - ), - "session_timeout_ms": 10000, - "heartbeat_interval_ms": 3000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - "api_version": (0, 10, 1), - "exclude_internal_topics": True, - "metric_group_prefix": "consumer", - } - - def __init__(self, client, subscription, metrics, **configs): - """Initialize the coordination manager. - - Keyword Arguments: - group_id (str): name of the consumer group to join for dynamic - partition assignment (if enabled), and to use for fetching and - committing offsets. Default: 'kafka-python-default-group' - enable_auto_commit (bool): If true the consumer's offset will be - periodically committed in the background. Default: True. - auto_commit_interval_ms (int): milliseconds between automatic - offset commits, if enable_auto_commit is True. Default: 5000. - default_offset_commit_callback (callable): called as - callback(offsets, exception) response will be either an Exception - or None. This callback can be used to trigger custom actions when - a commit request completes. - assignors (list): List of objects to use to distribute partition - ownership amongst consumer instances when group management is - used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] - heartbeat_interval_ms (int): The expected time in milliseconds - between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure - that the consumer's session stays active and to facilitate - rebalancing when new consumers join or leave the group. The - value must be set lower than session_timeout_ms, but typically - should be set no higher than 1/3 of that value. It can be - adjusted even lower to control the expected time for normal - rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. - exclude_internal_topics (bool): Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to - True the only way to receive records from an internal topic is - subscribing to it. Requires 0.10+. Default: True - """ - super(ConsumerCoordinator, self).__init__(client, metrics, **configs) - - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - self._subscription = subscription - self._is_leader = False - self._joined_subscription = set() - self._metadata_snapshot = self._build_metadata_snapshot( - subscription, client.cluster - ) - self._assignment_snapshot = None - self._cluster = client.cluster - self.auto_commit_interval = self.config["auto_commit_interval_ms"] / 1000 - self.next_auto_commit_deadline = None - self.completed_offset_commits = collections.deque() - - if self.config["default_offset_commit_callback"] is None: - self.config[ - "default_offset_commit_callback" - ] = self._default_offset_commit_callback - - if self.config["group_id"] is not None: - if self.config["api_version"] >= (0, 9): - if not self.config["assignors"]: - raise Errors.KafkaConfigurationError( - "Coordinator requires assignors" - ) - if self.config["api_version"] < (0, 10, 1): - if ( - self.config["max_poll_interval_ms"] - != self.config["session_timeout_ms"] - ): - raise Errors.KafkaConfigurationError( - "Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms" - ) - - if self.config["enable_auto_commit"]: - if self.config["api_version"] < (0, 8, 1): - log.warning( - "Broker version (%s) does not support offset" - " commits; disabling auto-commit.", - self.config["api_version"], - ) - self.config["enable_auto_commit"] = False - elif self.config["group_id"] is None: - log.warning("group_id is None: disabling auto-commit.") - self.config["enable_auto_commit"] = False - else: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - - self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, self.config["metric_group_prefix"], self._subscription - ) - - self._cluster.request_update() - self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) - - def __del__(self): - if hasattr(self, "_cluster") and self._cluster: - self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) - super(ConsumerCoordinator, self).__del__() - - def protocol_type(self): - return ConsumerProtocol.PROTOCOL_TYPE - - def group_protocols(self): - """Returns list of preferred (protocols, metadata)""" - if self._subscription.subscription is None: - raise Errors.IllegalStateError("Consumer has not subscribed to topics") - # dpkp note: I really dislike this. - # why? because we are using this strange method group_protocols, - # which is seemingly innocuous, to set internal state (_joined_subscription) - # that is later used to check whether metadata has changed since we joined a - # group but there is no guarantee that this method, group_protocols, will get - # called in the correct sequence or that it will only be called when we want it - # to be. So this really should be moved elsewhere, but I don't have the energy - # to work that out right now. If you read this at some later date after the - # mutable state has bitten you... I'm sorry! It mimics the java client, and - # that's the best I've got for now. - self._joined_subscription = set(self._subscription.subscription) - metadata_list = [] - for assignor in self.config["assignors"]: - metadata = assignor.metadata(self._joined_subscription) - group_protocol = (assignor.name, metadata) - metadata_list.append(group_protocol) - return metadata_list - - def _handle_metadata_update(self, cluster): - # if we encounter any unauthorized topics, raise an exception - if cluster.unauthorized_topics: - raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics) - - if self._subscription.subscribed_pattern: - topics = [] - for topic in cluster.topics(self.config["exclude_internal_topics"]): - if self._subscription.subscribed_pattern.match(topic): - topics.append(topic) - - if set(topics) != self._subscription.subscription: - self._subscription.change_subscription(topics) - self._client.set_topics(self._subscription.group_subscription()) - - # check if there are any changes to the metadata which should trigger - # a rebalance - if self._subscription.partitions_auto_assigned(): - metadata_snapshot = self._build_metadata_snapshot( - self._subscription, cluster - ) - if self._metadata_snapshot != metadata_snapshot: - self._metadata_snapshot = metadata_snapshot - - # If we haven't got group coordinator support, - # just assign all partitions locally - if self._auto_assign_all_partitions(): - self._subscription.assign_from_subscribed( - [ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ] - ) - - def _auto_assign_all_partitions(self): - # For users that use "subscribe" without group support, - # we will simply assign all partitions to this consumer - if self.config["api_version"] < (0, 9): - return True - elif self.config["group_id"] is None: - return True - else: - return False - - def _build_metadata_snapshot(self, subscription, cluster): - metadata_snapshot = {} - for topic in subscription.group_subscription(): - partitions = cluster.partitions_for_topic(topic) or [] - metadata_snapshot[topic] = set(partitions) - return metadata_snapshot - - def _lookup_assignor(self, name): - for assignor in self.config["assignors"]: - if assignor.name == name: - return assignor - return None - - def _on_join_complete( - self, generation, member_id, protocol, member_assignment_bytes - ): - # only the leader is responsible for monitoring for metadata changes - # (i.e. partition changes) - if not self._is_leader: - self._assignment_snapshot = None - - assignor = self._lookup_assignor(protocol) - assert assignor, "Coordinator selected invalid assignment protocol: %s" % ( - protocol, - ) - - assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) - - # set the flag to refresh last committed offsets - self._subscription.needs_fetch_committed_offsets = True - - # update partition assignment - try: - self._subscription.assign_from_subscribed(assignment.partitions()) - except ValueError as e: - log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e) - self.request_rejoin() - - # give the assignor a chance to update internal state - # based on the received assignment - assignor.on_assignment(assignment) - if assignor.name == "sticky": - assignor.on_generation_assignment(generation) - - # reschedule the auto commit starting from now - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - - assigned = set(self._subscription.assigned_partitions()) - log.info( - "Setting newly assigned partitions %s for group %s", assigned, self.group_id - ) - - # execute the user's callback after rebalance - if self._subscription.listener: - try: - self._subscription.listener.on_partitions_assigned(assigned) - except Exception: - log.exception( - "User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, - self.group_id, - assigned, - ) - - def poll(self): - """ - Poll for coordinator events. Only applicable if group_id is set, and - broker version supports GroupCoordinators. This ensures that the - coordinator is known, and if using automatic partition assignment, - ensures that the consumer has joined the group. This also handles - periodic offset commits if they are enabled. - """ - if self.group_id is None: - return - - self._invoke_completed_offset_commit_callbacks() - self.ensure_coordinator_ready() - - if ( - self.config["api_version"] >= (0, 9) - and self._subscription.partitions_auto_assigned() - ): - if self.need_rejoin(): - # due to a race condition between the initial metadata fetch and the - # initial rebalance, we need to ensure that the metadata is fresh - # before joining initially, and then request the metadata update. If - # metadata update arrives while the rebalance is still pending (for - # example, when the join group is still inflight), then we will lose - # track of the fact that we need to rebalance again to reflect the - # change to the topic subscription. Without ensuring that the - # metadata is fresh, any metadata update that changes the topic - # subscriptions and arrives while a rebalance is in progress will - # essentially be ignored. See KAFKA-3949 for the complete - # description of the problem. - if self._subscription.subscribed_pattern: - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - - self.ensure_active_group() - - self.poll_heartbeat() - - self._maybe_auto_commit_offsets_async() - - def time_to_next_poll(self): - """Return seconds (float) remaining until :meth:`.poll` should be called - again - """ - if not self.config["enable_auto_commit"]: - return self.time_to_next_heartbeat() - - if time.time() > self.next_auto_commit_deadline: - return 0 - - return min( - self.next_auto_commit_deadline - time.time(), self.time_to_next_heartbeat() - ) - - def _perform_assignment(self, leader_id, assignment_strategy, members): - assignor = self._lookup_assignor(assignment_strategy) - assert assignor, "Invalid assignment protocol: %s" % (assignment_strategy,) - member_metadata = {} - all_subscribed_topics = set() - for member_id, metadata_bytes in members: - metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) - member_metadata[member_id] = metadata - all_subscribed_topics.update( - metadata.subscription - ) # pylint: disable-msg=no-member - - # the leader will begin watching for changes to any of the topics - # the group is interested in, which ensures that all metadata changes - # will eventually be seen - # Because assignment typically happens within response callbacks, - # we cannot block on metadata updates here (no recursion into poll()) - self._subscription.group_subscribe(all_subscribed_topics) - self._client.set_topics(self._subscription.group_subscription()) - - # keep track of the metadata used for assignment so that we can check - # after rebalance completion whether anything has changed - self._cluster.request_update() - self._is_leader = True - self._assignment_snapshot = self._metadata_snapshot - - log.debug( - "Performing assignment for group %s using strategy %s" - " with subscriptions %s", - self.group_id, - assignor.name, - member_metadata, - ) - - assignments = assignor.assign(self._cluster, member_metadata) - - log.debug("Finished assignment for group %s: %s", self.group_id, assignments) - - group_assignment = {} - for member_id, assignment in assignments.items(): - group_assignment[member_id] = assignment - return group_assignment - - def _on_join_prepare(self, generation, member_id): - # commit offsets prior to rebalance if auto-commit enabled - self._maybe_auto_commit_offsets_sync() - - # execute the user's callback before rebalance - log.info( - "Revoking previously assigned partitions %s for group %s", - self._subscription.assigned_partitions(), - self.group_id, - ) - if self._subscription.listener: - try: - revoked = set(self._subscription.assigned_partitions()) - self._subscription.listener.on_partitions_revoked(revoked) - except Exception: - log.exception( - "User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, - self.group_id, - ) - - self._is_leader = False - self._subscription.reset_group_subscription() - - def need_rejoin(self): - """Check whether the group should be rejoined - - Returns: - bool: True if consumer should rejoin group, False otherwise - """ - if not self._subscription.partitions_auto_assigned(): - return False - - if self._auto_assign_all_partitions(): - return False - - # we need to rejoin if we performed the assignment and metadata has changed - if ( - self._assignment_snapshot is not None - and self._assignment_snapshot != self._metadata_snapshot - ): - return True - - # we need to join if our subscription has changed since the last join - if ( - self._joined_subscription is not None - and self._joined_subscription != self._subscription.subscription - ): - return True - - return super(ConsumerCoordinator, self).need_rejoin() - - def refresh_committed_offsets_if_needed(self): - """Fetch committed offsets for assigned partitions.""" - if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets( - self._subscription.assigned_partitions() - ) - for partition, offset in offsets.items(): - # verify assignment is still active - if self._subscription.is_assigned(partition): - self._subscription.assignment[partition].committed = offset - self._subscription.needs_fetch_committed_offsets = False - - def fetch_committed_offsets(self, partitions): - """Fetch the current committed offsets for specified partitions - - Arguments: - partitions (list of TopicPartition): partitions to fetch - - Returns: - dict: {TopicPartition: OffsetAndMetadata} - """ - if not partitions: - return {} - - while True: - self.ensure_coordinator_ready() - - # contact coordinator to fetch committed offsets - future = self._send_offset_fetch_request(partitions) - self._client.poll(future=future) - - if future.succeeded(): - return future.value - - if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type - - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def close(self, autocommit=True): - """Close the coordinator, leave the current group, - and reset local generation / member_id. - - Keyword Arguments: - autocommit (bool): If auto-commit is configured for this consumer, - this optional flag causes the consumer to attempt to commit any - pending consumed offsets prior to close. Default: True - """ - try: - if autocommit: - self._maybe_auto_commit_offsets_sync() - finally: - super(ConsumerCoordinator, self).close() - - def _invoke_completed_offset_commit_callbacks(self): - while self.completed_offset_commits: - callback, offsets, exception = self.completed_offset_commits.popleft() - callback(offsets, exception) - - def commit_offsets_async(self, offsets, callback=None): - """Commit specific offsets asynchronously. - - Arguments: - offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit - callback (callable, optional): called as callback(offsets, response) - response will be either an Exception or a OffsetCommitResponse - struct. This callback can be used to trigger custom actions when - a commit request completes. - - Returns: - Future - """ - self._invoke_completed_offset_commit_callbacks() - if not self.coordinator_unknown(): - future = self._do_commit_offsets_async(offsets, callback) - else: - # we don't know the current coordinator, so try to find it and then - # send the commit or fail (we don't want recursive retries which can - # cause offset commits to arrive out of order). Note that there may - # be multiple offset commits chained to the same coordinator lookup - # request. This is fine because the listeners will be invoked in the - # same order that they were added. Note also that BaseCoordinator - # prevents multiple concurrent coordinator lookup requests. - future = self.lookup_coordinator() - future.add_callback( - lambda r: functools.partial( - self._do_commit_offsets_async, offsets, callback - )() - ) - if callback: - future.add_errback( - lambda e: self.completed_offset_commits.appendleft( - (callback, offsets, e) - ) - ) - - # ensure the commit has a chance to be transmitted (without blocking on - # its completion). Note that commits are treated as heartbeats by the - # coordinator, so there is no need to explicitly allow heartbeats - # through delayed task execution. - self._client.poll(timeout_ms=0) # no wakeup if we add that feature - - return future - - def _do_commit_offsets_async(self, offsets, callback=None): - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - if callback is None: - callback = self.config["default_offset_commit_callback"] - self._subscription.needs_fetch_committed_offsets = True - future = self._send_offset_commit_request(offsets) - future.add_both( - lambda res: self.completed_offset_commits.appendleft( - (callback, offsets, res) - ) - ) - return future - - def commit_offsets_sync(self, offsets): - """Commit specific offsets synchronously. - - This method will retry until the commit completes successfully or an - unrecoverable error is encountered. - - Arguments: - offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit - - Raises error on failure - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - self._invoke_completed_offset_commit_callbacks() - if not offsets: - return - - while True: - self.ensure_coordinator_ready() - - future = self._send_offset_commit_request(offsets) - self._client.poll(future=future) - - if future.succeeded(): - return future.value - - if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type - - time.sleep(self.config["retry_backoff_ms"] / 1000) - - def _maybe_auto_commit_offsets_sync(self): - if self.config["enable_auto_commit"]: - try: - self.commit_offsets_sync(self._subscription.all_consumed_offsets()) - - # The three main group membership errors are known and should not - # require a stacktrace -- just a warning - except ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): - log.warning( - "Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery." - ) - except Exception: - log.exception( - "Offset commit failed: This is likely to cause" - " duplicate message delivery" - ) - - def _send_offset_commit_request(self, offsets): - """Commit offsets for the specified list of topics and partitions. - - This is a non-blocking call which returns a request future that can be - polled in the case of a synchronous commit or ignored in the - asynchronous case. - - Arguments: - offsets (dict of {TopicPartition: OffsetAndMetadata}): what should - be committed - - Returns: - Future: indicating whether the commit was successful or not - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) - if not offsets: - log.debug("No offsets to commit") - return Future().success(None) - - node_id = self.coordinator() - if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) - - # create the offset commit request - offset_data = collections.defaultdict(dict) - for tp, offset in offsets.items(): - offset_data[tp.topic][tp.partition] = offset - - if self._subscription.partitions_auto_assigned(): - generation = self.generation() - else: - generation = Generation.NO_GENERATION - - # if the generation is None, we are not part of an active group - # (and we expect to be). The only thing we can do is fail the commit - # and let the user rejoin the group in poll() - if self.config["api_version"] >= (0, 9) and generation is None: - return Future().failure(Errors.CommitFailedError()) - - if self.config["api_version"] >= (0, 9): - request = OffsetCommitRequest[2]( - self.group_id, - generation.generation_id, - generation.member_id, - OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, - [ - ( - topic, - [ - (partition, offset.offset, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - elif self.config["api_version"] >= (0, 8, 2): - request = OffsetCommitRequest[1]( - self.group_id, - -1, - "", - [ - ( - topic, - [ - (partition, offset.offset, -1, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - elif self.config["api_version"] >= (0, 8, 1): - request = OffsetCommitRequest[0]( - self.group_id, - [ - ( - topic, - [ - (partition, offset.offset, offset.metadata) - for partition, offset in partitions.items() - ], - ) - for topic, partitions in offset_data.items() - ], - ) - - log.debug( - "Sending offset-commit request with %s for group %s to %s", - offsets, - self.group_id, - node_id, - ) - - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback( - self._handle_offset_commit_response, offsets, future, time.time() - ) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_offset_commit_response(self, offsets, future, send_time, response): - # TODO look at adding request_latency_ms to response (like java kafka) - self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) - unauthorized_topics = set() - - for topic, partitions in response.topics: - for partition, error_code in partitions: - tp = TopicPartition(topic, partition) - offset = offsets[tp] - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - log.debug( - "Group %s committed offset %s for partition %s", - self.group_id, - offset, - tp, - ) - if self._subscription.is_assigned(tp): - self._subscription.assignment[tp].committed = offset - elif error_type is Errors.GroupAuthorizationFailedError: - log.error( - "Not authorized to commit offsets for group %s", self.group_id - ) - future.failure(error_type(self.group_id)) - return - elif error_type is Errors.TopicAuthorizationFailedError: - unauthorized_topics.add(topic) - elif error_type in ( - Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError, - ): - # raise the error to the user - log.debug( - "OffsetCommit for group %s failed on partition %s" " %s", - self.group_id, - tp, - error_type.__name__, - ) - future.failure(error_type()) - return - elif error_type is Errors.GroupLoadInProgressError: - # just retry - log.debug( - "OffsetCommit for group %s failed: %s", - self.group_id, - error_type.__name__, - ) - future.failure(error_type(self.group_id)) - return - elif error_type in ( - Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError, - ): - log.debug( - "OffsetCommit for group %s failed: %s", - self.group_id, - error_type.__name__, - ) - self.coordinator_dead(error_type()) - future.failure(error_type(self.group_id)) - return - elif error_type in ( - Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError, - ): - # need to re-join group - error = error_type(self.group_id) - log.debug( - "OffsetCommit for group %s failed: %s", self.group_id, error - ) - self.reset_generation() - future.failure(Errors.CommitFailedError()) - return - else: - log.error( - "Group %s failed to commit partition %s at offset" " %s: %s", - self.group_id, - tp, - offset, - error_type.__name__, - ) - future.failure(error_type()) - return - - if unauthorized_topics: - log.error( - "Not authorized to commit to topics %s for group %s", - unauthorized_topics, - self.group_id, - ) - future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) - else: - future.success(None) - - def _send_offset_fetch_request(self, partitions): - """Fetch the committed offsets for a set of partitions. - - This is a non-blocking call. The returned future can be polled to get - the actual offsets returned from the broker. - - Arguments: - partitions (list of TopicPartition): the partitions to fetch - - Returns: - Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} - """ - assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" - assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) - if not partitions: - return Future().success({}) - - node_id = self.coordinator() - if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) - - # Verify node is ready - if not self._client.ready(node_id): - log.debug("Node %s not ready -- failing offset fetch request", node_id) - return Future().failure(Errors.NodeNotReadyError) - - log.debug( - "Group %s fetching committed offsets for partitions: %s", - self.group_id, - partitions, - ) - # construct the request - topic_partitions = collections.defaultdict(set) - for tp in partitions: - topic_partitions[tp.topic].add(tp.partition) - - if self.config["api_version"] >= (0, 8, 2): - request = OffsetFetchRequest[1]( - self.group_id, list(topic_partitions.items()) - ) - else: - request = OffsetFetchRequest[0]( - self.group_id, list(topic_partitions.items()) - ) - - # send the request with a callback - future = Future() - _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_fetch_response, future) - _f.add_errback(self._failed_request, node_id, request, future) - return future - - def _handle_offset_fetch_response(self, future, response): - offsets = {} - for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: - tp = TopicPartition(topic, partition) - error_type = Errors.for_code(error_code) - if error_type is not Errors.NoError: - error = error_type() - log.debug( - "Group %s failed to fetch offset for partition" " %s: %s", - self.group_id, - tp, - error, - ) - if error_type is Errors.GroupLoadInProgressError: - # just retry - future.failure(error) - elif error_type is Errors.NotCoordinatorForGroupError: - # re-discover the coordinator and retry - self.coordinator_dead(error_type()) - future.failure(error) - elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning( - "OffsetFetchRequest -- unknown topic %s" - " (have you committed any offsets yet?)", - topic, - ) - continue - else: - log.error( - "Unknown error fetching offsets for %s: %s", tp, error - ) - future.failure(error) - return - elif offset >= 0: - # record the position with the offset - # (-1 indicates no committed offset to fetch) - offsets[tp] = OffsetAndMetadata(offset, metadata) - else: - log.debug( - "Group %s has no committed offset for partition" " %s", - self.group_id, - tp, - ) - future.success(offsets) - - def _default_offset_commit_callback(self, offsets, exception): - if exception is not None: - log.error("Offset commit failed: %s", exception) - - def _commit_offsets_async_on_complete(self, offsets, exception): - if exception is not None: - log.warning( - "Auto offset commit failed for group %s: %s", self.group_id, exception - ) - if getattr(exception, "retriable", False): - self.next_auto_commit_deadline = min( - time.time() + self.config["retry_backoff_ms"] / 1000, - self.next_auto_commit_deadline, - ) - else: - log.debug( - "Completed autocommit of offsets %s for group %s", - offsets, - self.group_id, - ) - - def _maybe_auto_commit_offsets_async(self): - if self.config["enable_auto_commit"]: - if self.coordinator_unknown(): - self.next_auto_commit_deadline = ( - time.time() + self.config["retry_backoff_ms"] / 1000 - ) - elif time.time() > self.next_auto_commit_deadline: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.commit_offsets_async( - self._subscription.all_consumed_offsets(), - self._commit_offsets_async_on_complete, - ) - - -class ConsumerCoordinatorMetrics(object): - def __init__(self, metrics, metric_group_prefix, subscription): - self.metrics = metrics - self.metric_group_name = "%s-coordinator-metrics" % (metric_group_prefix,) - - self.commit_latency = metrics.sensor("commit-latency") - self.commit_latency.add( - metrics.metric_name( - "commit-latency-avg", - self.metric_group_name, - "The average time taken for a commit request", - ), - Avg(), - ) - self.commit_latency.add( - metrics.metric_name( - "commit-latency-max", - self.metric_group_name, - "The max time taken for a commit request", - ), - Max(), - ) - self.commit_latency.add( - metrics.metric_name( - "commit-rate", - self.metric_group_name, - "The number of commit calls per second", - ), - Rate(sampled_stat=Count()), - ) - - num_parts = AnonMeasurable( - lambda config, now: len(subscription.assigned_partitions()) - ) - metrics.add_metric( - metrics.metric_name( - "assigned-partitions", - self.metric_group_name, - "The number of partitions currently assigned to this consumer", - ), - num_parts, - ) diff --git a/aiokafka/coordinator/heartbeat.py b/aiokafka/coordinator/heartbeat.py deleted file mode 100644 index b10a726d..00000000 --- a/aiokafka/coordinator/heartbeat.py +++ /dev/null @@ -1,69 +0,0 @@ -import copy -import time - - -class Heartbeat(object): - DEFAULT_CONFIG = { - "group_id": None, - "heartbeat_interval_ms": 3000, - "session_timeout_ms": 10000, - "max_poll_interval_ms": 300000, - "retry_backoff_ms": 100, - } - - def __init__(self, **configs): - self.config = copy.copy(self.DEFAULT_CONFIG) - for key in self.config: - if key in configs: - self.config[key] = configs[key] - - if self.config["group_id"] is not None: - assert ( - self.config["heartbeat_interval_ms"] - <= self.config["session_timeout_ms"] - ), "Heartbeat interval must be lower than the session timeout" - - self.last_send = -1 * float("inf") - self.last_receive = -1 * float("inf") - self.last_poll = -1 * float("inf") - self.last_reset = time.time() - self.heartbeat_failed = None - - def poll(self): - self.last_poll = time.time() - - def sent_heartbeat(self): - self.last_send = time.time() - self.heartbeat_failed = False - - def fail_heartbeat(self): - self.heartbeat_failed = True - - def received_heartbeat(self): - self.last_receive = time.time() - - def time_to_next_heartbeat(self): - """Returns seconds (float) remaining before next heartbeat should be sent""" - time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) - if self.heartbeat_failed: - delay_to_next_heartbeat = self.config["retry_backoff_ms"] / 1000 - else: - delay_to_next_heartbeat = self.config["heartbeat_interval_ms"] / 1000 - return max(0, delay_to_next_heartbeat - time_since_last_heartbeat) - - def should_heartbeat(self): - return self.time_to_next_heartbeat() == 0 - - def session_timeout_expired(self): - last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config["session_timeout_ms"] / 1000) - - def reset_timeouts(self): - self.last_reset = time.time() - self.last_poll = time.time() - self.heartbeat_failed = False - - def poll_timeout_expired(self): - return (time.time() - self.last_poll) > ( - self.config["max_poll_interval_ms"] / 1000 - ) diff --git a/aiokafka/oauth.py b/aiokafka/oauth.py deleted file mode 100644 index cc416da1..00000000 --- a/aiokafka/oauth.py +++ /dev/null @@ -1,38 +0,0 @@ -import abc - - -class AbstractTokenProvider(abc.ABC): - """ - A Token Provider must be used for the SASL OAuthBearer protocol. - - The implementation should ensure token reuse so that multiple - calls at connect time do not create multiple tokens. The implementation - should also periodically refresh the token in order to guarantee - that each call returns an unexpired token. A timeout error should - be returned after a short period of inactivity so that the - broker can log debugging info and retry. - - Token Providers MUST implement the token() method - """ - - def __init__(self, **config): - pass - - @abc.abstractmethod - def token(self): - """ - Returns a (str) ID/Access Token to be sent to the Kafka - client. - """ - pass - - def extensions(self): - """ - This is an OPTIONAL method that may be implemented. - - Returns a map of key-value pairs that can - be sent with the SASL/OAUTHBEARER initial client request. If - not implemented, the values are ignored. This feature is only available - in Kafka >= 2.1.0. - """ - return {} diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 3c5a096e..ea9fdbe0 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -165,8 +165,7 @@ class AIOKafkaProducer: sasl_plain_password (str): password for SASL ``PLAIN`` authentication. Default: :data:`None` sasl_oauth_token_provider (:class:`~aiokafka.abc.AbstractTokenProvider`): - OAuthBearer token provider instance. (See - :mod:`aiokafka.oauth`). + OAuthBearer token provider instance. Default: :data:`None` Note: diff --git a/aiokafka/protocol/frame.py b/aiokafka/protocol/frame.py deleted file mode 100644 index 897e091b..00000000 --- a/aiokafka/protocol/frame.py +++ /dev/null @@ -1,30 +0,0 @@ -class KafkaBytes(bytearray): - def __init__(self, size): - super(KafkaBytes, self).__init__(size) - self._idx = 0 - - def read(self, nbytes=None): - if nbytes is None: - nbytes = len(self) - self._idx - start = self._idx - self._idx += nbytes - if self._idx > len(self): - self._idx = len(self) - return bytes(self[start:self._idx]) - - def write(self, data): - start = self._idx - self._idx += len(data) - self[start:self._idx] = data - - def seek(self, idx): - self._idx = idx - - def tell(self): - return self._idx - - def __str__(self): - return "KafkaBytes(%d)" % len(self) - - def __repr__(self): - return str(self) diff --git a/aiokafka/protocol/message.py b/aiokafka/protocol/message.py index c981be90..e22b6cb0 100644 --- a/aiokafka/protocol/message.py +++ b/aiokafka/protocol/message.py @@ -15,7 +15,6 @@ from aiokafka.errors import UnsupportedCodecError from aiokafka.util import WeakMethod -from .frame import KafkaBytes from .struct import Struct from .types import Int8, Int32, UInt32, Int64, Bytes, Schema, AbstractType @@ -186,7 +185,7 @@ class MessageSet(AbstractType): @classmethod def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally - if isinstance(items, (io.BytesIO, KafkaBytes)): + if isinstance(items, io.BytesIO): size = Int32.decode(items) if prepend_size: # rewind and return all the bytes @@ -234,7 +233,7 @@ def decode(cls, data, bytes_to_read=None): @classmethod def repr(cls, messages): - if isinstance(messages, (KafkaBytes, io.BytesIO)): + if isinstance(messages, io.BytesIO): offset = messages.tell() decoded = cls.decode(messages) messages.seek(offset) diff --git a/aiokafka/protocol/parser.py b/aiokafka/protocol/parser.py deleted file mode 100644 index c19dc4f1..00000000 --- a/aiokafka/protocol/parser.py +++ /dev/null @@ -1,194 +0,0 @@ -import collections -import logging - -import aiokafka.errors as Errors -from aiokafka import __version__ - -from .commit import GroupCoordinatorResponse -from .frame import KafkaBytes -from .types import Int32 - -log = logging.getLogger(__name__) - - -class KafkaProtocol(object): - """Manage the kafka network protocol - - Use an instance of KafkaProtocol to manage bytes send/recv'd - from a network socket to a broker. - - Arguments: - client_id (str): identifier string to be included in each request - api_version (tuple): Optional tuple to specify api_version to use. - Currently only used to check for 0.8.2 protocol quirks, but - may be used for more in the future. - """ - - def __init__(self, client_id=None, api_version=None): - if client_id is None: - client_id = self._gen_client_id() - self._client_id = client_id - self._api_version = api_version - self._correlation_id = 0 - self._header = KafkaBytes(4) - self._rbuffer = None - self._receiving = False - self.in_flight_requests = collections.deque() - self.bytes_to_send = [] - - def _next_correlation_id(self): - self._correlation_id = (self._correlation_id + 1) % 2**31 - return self._correlation_id - - def _gen_client_id(self): - return "aiokafka" + __version__ - - def send_request(self, request, correlation_id=None): - """Encode and queue a kafka api request for sending. - - Arguments: - request (object): An un-encoded kafka request. - correlation_id (int, optional): Optionally specify an ID to - correlate requests with responses. If not provided, an ID will - be generated automatically. - - Returns: - correlation_id - """ - log.debug("Sending request %s", request) - if correlation_id is None: - correlation_id = self._next_correlation_id() - - header = request.build_request_header( - correlation_id=correlation_id, client_id=self._client_id - ) - message = b"".join([header.encode(), request.encode()]) - size = Int32.encode(len(message)) - data = size + message - self.bytes_to_send.append(data) - if request.expect_response(): - ifr = (correlation_id, request) - self.in_flight_requests.append(ifr) - return correlation_id - - def send_bytes(self): - """Retrieve all pending bytes to send on the network""" - data = b"".join(self.bytes_to_send) - self.bytes_to_send = [] - return data - - def receive_bytes(self, data): - """Process bytes received from the network. - - Arguments: - data (bytes): any length bytes received from a network connection - to a kafka broker. - - Returns: - responses (list of (correlation_id, response)): any/all completed - responses, decoded from bytes to python objects. - - Raises: - KafkaProtocolError: if the bytes received could not be decoded. - CorrelationIdError: if the response does not match the request - correlation id. - """ - i = 0 - n = len(data) - responses = [] - while i < n: - - # Not receiving is the state of reading the payload header - if not self._receiving: - bytes_to_read = min(4 - self._header.tell(), n - i) - self._header.write(data[i:i + bytes_to_read]) - i += bytes_to_read - - if self._header.tell() == 4: - self._header.seek(0) - nbytes = Int32.decode(self._header) - # reset buffer and switch state to receiving payload bytes - self._rbuffer = KafkaBytes(nbytes) - self._receiving = True - elif self._header.tell() > 4: - raise Errors.KafkaError( - "this should not happen - are you threading?" - ) - - if self._receiving: - total_bytes = len(self._rbuffer) - staged_bytes = self._rbuffer.tell() - bytes_to_read = min(total_bytes - staged_bytes, n - i) - self._rbuffer.write(data[i:i + bytes_to_read]) - i += bytes_to_read - - staged_bytes = self._rbuffer.tell() - if staged_bytes > total_bytes: - raise Errors.KafkaError( - "Receive buffer has more bytes than expected?" - ) - - if staged_bytes != total_bytes: - break - - self._receiving = False - self._rbuffer.seek(0) - resp = self._process_response(self._rbuffer) - responses.append(resp) - self._reset_buffer() - return responses - - def _process_response(self, read_buffer): - if not self.in_flight_requests: - raise Errors.CorrelationIdError( - "No in-flight-request found for server response" - ) - (correlation_id, request) = self.in_flight_requests.popleft() - response_header = request.parse_response_header(read_buffer) - recv_correlation_id = response_header.correlation_id - log.debug("Received correlation id: %d", recv_correlation_id) - # 0.8.2 quirk - if ( - recv_correlation_id == 0 - and correlation_id != 0 - and request.RESPONSE_TYPE is GroupCoordinatorResponse[0] - and (self._api_version == (0, 8, 2) or self._api_version is None) - ): - log.warning( - "Kafka 0.8.2 quirk -- GroupCoordinatorResponse" - " Correlation ID does not match request. This" - " should go away once at least one topic has been" - " initialized on the broker." - ) - - elif correlation_id != recv_correlation_id: - # return or raise? - raise Errors.CorrelationIdError( - "Correlation IDs do not match: sent %d, recv %d" - % (correlation_id, recv_correlation_id) - ) - - # decode response - log.debug("Processing response %s", request.RESPONSE_TYPE.__name__) - try: - response = request.RESPONSE_TYPE.decode(read_buffer) - except ValueError: - read_buffer.seek(0) - buf = read_buffer.read() - log.error( - "Response %d [ResponseType: %s Request: %s]:" - " Unable to decode %d-byte buffer: %r", - correlation_id, - request.RESPONSE_TYPE, - request, - len(buf), - buf, - ) - raise Errors.KafkaProtocolError("Unable to decode response") - - return (correlation_id, response) - - def _reset_buffer(self): - self._receiving = False - self._header.seek(0) - self._rbuffer = None diff --git a/aiokafka/protocol/pickle.py b/aiokafka/protocol/pickle.py deleted file mode 100644 index 780c4e88..00000000 --- a/aiokafka/protocol/pickle.py +++ /dev/null @@ -1,30 +0,0 @@ -import copyreg -import types - - -def _pickle_method(method): - try: - func_name = method.__func__.__name__ - obj = method.__self__ - cls = method.__self__.__class__ - except AttributeError: - func_name = method.im_func.__name__ - obj = method.im_self - cls = method.im_class - - return _unpickle_method, (func_name, obj, cls) - - -def _unpickle_method(func_name, obj, cls): - for cls in cls.mro(): - try: - func = cls.__dict__[func_name] - except KeyError: - pass - else: - break - return func.__get__(obj, cls) - - -# https://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods -copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method) diff --git a/docs/api.rst b/docs/api.rst index 9ca4f590..6455acac 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -80,9 +80,7 @@ and ``GSSAPI`` SASL methods. Be sure to install `gssapi`_ python module to use Please consult the `official documentation `__ for setup instructions on Broker side. Client configuration is pretty much the same as Java's, consult the ``sasl_*`` options in Consumer and Producer API -Reference for more details. - -.. automodule:: aiokafka.oauth +Reference for more details. See :class:`~aiokafka.abc.AbstractTokenProvider`. Error handling diff --git a/pyproject.toml b/pyproject.toml index 206dc4ee..b283eca6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,3 +70,12 @@ filterwarnings = [ # FIXME Until we fix socket leaks in tests "default:unclosed event loop:ResourceWarning", ] + +[tool.coverage.run] +branch = true +source_pkgs = ["aiokafka", "tests"] + +[tool.coverage.report] +show_missing = true +skip_covered = true +skip_empty = true