diff --git a/aiokafka/coordinator/assignors/range.py b/aiokafka/coordinator/assignors/range.py index 82f36b2d..19a58755 100644 --- a/aiokafka/coordinator/assignors/range.py +++ b/aiokafka/coordinator/assignors/range.py @@ -4,7 +4,10 @@ import logging from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) log = logging.getLogger(__name__) @@ -26,7 +29,8 @@ class RangePartitionAssignor(AbstractPartitionAssignor): C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2] """ - name = 'range' + + name = "range" version = 0 @classmethod @@ -42,7 +46,7 @@ def assign(cls, cluster, member_metadata): for topic, consumers_for_topic in consumers_per_topic.items(): partitions = cluster.partitions_for_topic(topic) if partitions is None: - log.warning('No partition metadata for topic %s', topic) + log.warning("No partition metadata for topic %s", topic) continue partitions = sorted(partitions) consumers_for_topic.sort() @@ -56,19 +60,18 @@ def assign(cls, cluster, member_metadata): length = partitions_per_consumer if not i + 1 > consumers_with_extra: length += 1 - assignment[member][topic] = partitions[start:start+length] + assignment[member][topic] = partitions[start : start + length] protocol_assignment = {} for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, - sorted(assignment[member_id].items()), - b'') + cls.version, sorted(assignment[member_id].items()), b"" + ) return protocol_assignment @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"") @classmethod def on_assignment(cls, assignment): diff --git a/aiokafka/coordinator/assignors/roundrobin.py b/aiokafka/coordinator/assignors/roundrobin.py index 5d370b95..e5926a17 100644 --- a/aiokafka/coordinator/assignors/roundrobin.py +++ b/aiokafka/coordinator/assignors/roundrobin.py @@ -7,7 +7,10 @@ from kafka.structs import TopicPartition from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) log = logging.getLogger(__name__) @@ -44,7 +47,8 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): C1: [t1p0] C2: [t1p1, t2p0, t2p1, t2p2] """ - name = 'roundrobin' + + name = "roundrobin" version = 0 @classmethod @@ -57,7 +61,7 @@ def assign(cls, cluster, member_metadata): for topic in all_topics: partitions = cluster.partitions_for_topic(topic) if partitions is None: - log.warning('No partition metadata for topic %s', topic) + log.warning("No partition metadata for topic %s", topic) continue for partition in partitions: all_topic_partitions.append(TopicPartition(topic, partition)) @@ -81,14 +85,13 @@ def assign(cls, cluster, member_metadata): protocol_assignment = {} for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, - sorted(assignment[member_id].items()), - b'') + cls.version, sorted(assignment[member_id].items()), b"" + ) return protocol_assignment @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"") @classmethod def on_assignment(cls, assignment): diff --git a/aiokafka/coordinator/assignors/sticky/partition_movements.py b/aiokafka/coordinator/assignors/sticky/partition_movements.py index 78f2eb22..3deb35d5 100644 --- a/aiokafka/coordinator/assignors/sticky/partition_movements.py +++ b/aiokafka/coordinator/assignors/sticky/partition_movements.py @@ -26,7 +26,7 @@ def is_sublist(source, target): true if target is in source; false otherwise """ for index in (i for i, e in enumerate(source) if e == target[0]): - if tuple(source[index: index + len(target)]) == target: + if tuple(source[index : index + len(target)]) == target: return True return False @@ -39,9 +39,7 @@ class PartitionMovements: """ def __init__(self): - self.partition_movements_by_topic = defaultdict( - lambda: defaultdict(set) - ) + self.partition_movements_by_topic = defaultdict(lambda: defaultdict(set)) self.partition_movements = {} def move_partition(self, partition, old_consumer, new_consumer): @@ -53,7 +51,11 @@ def move_partition(self, partition, old_consumer, new_consumer): if existing_pair.src_member_id != new_consumer: # the partition is not moving back to its previous consumer self._add_partition_movement_record( - partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer) + partition, + ConsumerPair( + src_member_id=existing_pair.src_member_id, + dst_member_id=new_consumer, + ), ) else: self._add_partition_movement_record(partition, pair) @@ -65,11 +67,15 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer): # this partition has previously moved assert old_consumer == self.partition_movements[partition].dst_member_id old_consumer = self.partition_movements[partition].src_member_id - reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer) + reverse_pair = ConsumerPair( + src_member_id=new_consumer, dst_member_id=old_consumer + ) if reverse_pair not in self.partition_movements_by_topic[partition.topic]: return partition - return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) + return next( + iter(self.partition_movements_by_topic[partition.topic][reverse_pair]) + ) def are_sticky(self): for topic, movements in self.partition_movements_by_topic.items(): @@ -105,11 +111,13 @@ def _has_cycles(self, consumer_pairs): reduced_pairs = deepcopy(consumer_pairs) reduced_pairs.remove(pair) path = [pair.src_member_id] - if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle( - path, cycles - ): + if self._is_linked( + pair.dst_member_id, pair.src_member_id, reduced_pairs, path + ) and not self._is_subcycle(path, cycles): cycles.add(tuple(path)) - log.error("A cycle of length {} was found: {}".format(len(path) - 1, path)) + log.error( + "A cycle of length {} was found: {}".format(len(path) - 1, path) + ) # for now we want to make sure there is no partition movements of the same topic between a pair of consumers. # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized @@ -143,5 +151,7 @@ def _is_linked(self, src, dst, pairs, current_path): reduced_set = deepcopy(pairs) reduced_set.remove(pair) current_path.append(pair.src_member_id) - return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path) + return self._is_linked( + pair.dst_member_id, dst, reduced_set, current_path + ) return False diff --git a/aiokafka/coordinator/assignors/sticky/sorted_set.py b/aiokafka/coordinator/assignors/sticky/sorted_set.py index 6a454a42..7903f6ca 100644 --- a/aiokafka/coordinator/assignors/sticky/sorted_set.py +++ b/aiokafka/coordinator/assignors/sticky/sorted_set.py @@ -35,9 +35,13 @@ def pop_last(self): return value def add(self, value): - if self._cached_last is not None and self._key(value) > self._key(self._cached_last): + if self._cached_last is not None and self._key(value) > self._key( + self._cached_last + ): self._cached_last = value - if self._cached_first is not None and self._key(value) < self._key(self._cached_first): + if self._cached_first is not None and self._key(value) < self._key( + self._cached_first + ): self._cached_first = value return self._set.add(value) diff --git a/aiokafka/coordinator/assignors/sticky/sticky_assignor.py b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py index 348a47ed..c941a83c 100644 --- a/aiokafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py @@ -9,12 +9,17 @@ from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements from aiokafka.coordinator.assignors.sticky.sorted_set import SortedSet -from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) from aiokafka.coordinator.protocol import Schema log = logging.getLogger(__name__) -ConsumerGenerationPair = namedtuple("ConsumerGenerationPair", ["consumer", "generation"]) +ConsumerGenerationPair = namedtuple( + "ConsumerGenerationPair", ["consumer", "generation"] +) def has_identical_list_elements(list_): @@ -49,8 +54,9 @@ def remove_if_present(collection, element): pass -StickyAssignorMemberMetadataV1 = namedtuple("StickyAssignorMemberMetadataV1", - ["subscription", "partitions", "generation"]) +StickyAssignorMemberMetadataV1 = namedtuple( + "StickyAssignorMemberMetadataV1", ["subscription", "partitions", "generation"] +) class StickyAssignorUserDataV1(Struct): @@ -60,7 +66,11 @@ class StickyAssignorUserDataV1(Struct): """ SCHEMA = Schema( - ("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32) + ( + "previous_assignment", + Array(("topic", String("utf-8")), ("partitions", Array(Int32))), + ), + ("generation", Int32), ) @@ -98,7 +108,10 @@ def perform_initial_assignment(self): def balance(self): self._initialize_current_subscriptions() - initializing = len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) == 0 + initializing = ( + len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) + == 0 + ) # assign all unassigned partitions for partition in self.unassigned_partitions: @@ -120,7 +133,9 @@ def balance(self): fixed_assignments = {} for consumer in self.consumer_to_all_potential_partitions.keys(): if not self._can_consumer_participate_in_reassignment(consumer): - self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + consumer + ) fixed_assignments[consumer] = self.current_assignment[consumer] del self.current_assignment[consumer] @@ -140,7 +155,8 @@ def balance(self): if ( not initializing and reassignment_performed - and self._get_balance_score(self.current_assignment) >= self._get_balance_score(prebalance_assignment) + and self._get_balance_score(self.current_assignment) + >= self._get_balance_score(prebalance_assignment) ): self.current_assignment = prebalance_assignment self.current_partition_consumer.clear() @@ -177,8 +193,12 @@ def _initialize(self, cluster): continue for p in cluster.partitions_for_topic(topic): partition = TopicPartition(topic=topic, partition=p) - self.consumer_to_all_potential_partitions[consumer_id].append(partition) - self.partition_to_all_potential_consumers[partition].append(consumer_id) + self.consumer_to_all_potential_partitions[consumer_id].append( + partition + ) + self.partition_to_all_potential_consumers[partition].append( + consumer_id + ) if consumer_id not in self.current_assignment: self.current_assignment[consumer_id] = [] @@ -193,18 +213,25 @@ def _init_current_assignments(self, members): for partitions in member_metadata.partitions: if partitions in sorted_partition_consumers_by_generation: consumers = sorted_partition_consumers_by_generation[partitions] - if member_metadata.generation and member_metadata.generation in consumers: + if ( + member_metadata.generation + and member_metadata.generation in consumers + ): # same partition is assigned to two consumers during the same rebalance. # log a warning and skip this record log.warning( "Partition {} is assigned to multiple consumers " - "following sticky assignment generation {}.".format(partitions, member_metadata.generation) + "following sticky assignment generation {}.".format( + partitions, member_metadata.generation + ) ) else: consumers[member_metadata.generation] = consumer else: sorted_consumers = {member_metadata.generation: consumer} - sorted_partition_consumers_by_generation[partitions] = sorted_consumers + sorted_partition_consumers_by_generation[ + partitions + ] = sorted_consumers # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition # current and previous consumers are the last two consumers of each partition in the above sorted map @@ -229,15 +256,23 @@ def _are_subscriptions_identical(self): true, if both potential consumers of partitions and potential partitions that consumers can consume are the same """ - if not has_identical_list_elements(list(self.partition_to_all_potential_consumers.values())): + if not has_identical_list_elements( + list(self.partition_to_all_potential_consumers.values()) + ): return False - return has_identical_list_elements(list(self.consumer_to_all_potential_partitions.values())) + return has_identical_list_elements( + list(self.consumer_to_all_potential_partitions.values()) + ) def _populate_sorted_partitions(self): # set of topic partitions with their respective potential consumers - all_partitions = set((tp, tuple(consumers)) - for tp, consumers in self.partition_to_all_potential_consumers.items()) - partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key) + all_partitions = set( + (tp, tuple(consumers)) + for tp, consumers in self.partition_to_all_potential_consumers.items() + ) + partitions_sorted_by_num_of_potential_consumers = sorted( + all_partitions, key=partitions_comparator_key + ) self.sorted_partitions = [] if not self.is_fresh_assignment and self._are_subscriptions_identical(): @@ -254,7 +289,10 @@ def _populate_sorted_partitions(self): partitions.remove(partition) sorted_consumers = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in assignments.items()], + iterable=[ + (consumer, tuple(partitions)) + for consumer, partitions in assignments.items() + ], key=subscriptions_comparator_key, ) # at this point, sorted_consumers contains an ascending-sorted list of consumers based on @@ -266,7 +304,9 @@ def _populate_sorted_partitions(self): remaining_partitions = assignments[consumer] # from partitions that had a different consumer before, # keep only those that are assigned to this consumer now - previous_partitions = set(self.previous_assignment.keys()).intersection(set(remaining_partitions)) + previous_partitions = set(self.previous_assignment.keys()).intersection( + set(remaining_partitions) + ) if previous_partitions: # if there is a partition of this consumer that was assigned to another consumer before # mark it as good options for reassignment @@ -285,7 +325,9 @@ def _populate_sorted_partitions(self): self.sorted_partitions.append(partition) else: while partitions_sorted_by_num_of_potential_consumers: - self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0]) + self.sorted_partitions.append( + partitions_sorted_by_num_of_potential_consumers.pop(0)[0] + ) def _populate_partitions_to_reassign(self): self.unassigned_partitions = deepcopy(self.sorted_partitions) @@ -324,7 +366,10 @@ def _populate_partitions_to_reassign(self): def _initialize_current_subscriptions(self): self.sorted_current_subscriptions = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in self.current_assignment.items()], + iterable=[ + (consumer, tuple(partitions)) + for consumer, partitions in self.current_assignment.items() + ], key=subscriptions_comparator_key, ) @@ -335,16 +380,23 @@ def _get_consumer_with_most_subscriptions(self): return self.sorted_current_subscriptions.last()[0] def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): - self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) + self.sorted_current_subscriptions.remove( + (consumer, tuple(self.current_assignment[consumer])) + ) def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer): - self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer]))) + self.sorted_current_subscriptions.add( + (consumer, tuple(self.current_assignment[consumer])) + ) def _is_balanced(self): """Determines if the current assignment is a balanced one""" if ( len(self.current_assignment[self._get_consumer_with_least_subscriptions()]) - >= len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) - 1 + >= len( + self.current_assignment[self._get_consumer_with_most_subscriptions()] + ) + - 1 ): # if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true return True @@ -354,7 +406,9 @@ def _is_balanced(self): for consumer_id, consumer_partitions in self.current_assignment.items(): for partition in consumer_partitions: if partition in all_assigned_partitions: - log.error("{} is assigned to more than one consumer.".format(partition)) + log.error( + "{} is assigned to more than one consumer.".format(partition) + ) all_assigned_partitions[partition] = consumer_id # for each consumer that does not have all the topic partitions it can get @@ -363,14 +417,18 @@ def _is_balanced(self): for consumer, _ in self.sorted_current_subscriptions: consumer_partition_count = len(self.current_assignment[consumer]) # skip if this consumer already has all the topic partitions it can get - if consumer_partition_count == len(self.consumer_to_all_potential_partitions[consumer]): + if consumer_partition_count == len( + self.consumer_to_all_potential_partitions[consumer] + ): continue # otherwise make sure it cannot get any more for partition in self.consumer_to_all_potential_partitions[consumer]: if partition not in self.current_assignment[consumer]: other_consumer = all_assigned_partitions[partition] - other_consumer_partition_count = len(self.current_assignment[other_consumer]) + other_consumer_partition_count = len( + self.current_assignment[other_consumer] + ) if consumer_partition_count < other_consumer_partition_count: return False return True @@ -378,7 +436,9 @@ def _is_balanced(self): def _assign_partition(self, partition): for consumer, _ in self.sorted_current_subscriptions: if partition in self.consumer_to_all_potential_partitions[consumer]: - self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + consumer + ) self.current_assignment[consumer].append(partition) self.current_partition_consumer[partition] = consumer self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) @@ -392,7 +452,11 @@ def _can_consumer_participate_in_reassignment(self, consumer): current_assignment_size = len(current_partitions) max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) if current_assignment_size > max_assignment_size: - log.error("The consumer {} is assigned more partitions than the maximum possible.".format(consumer)) + log.error( + "The consumer {} is assigned more partitions than the maximum possible.".format( + consumer + ) + ) if current_assignment_size < max_assignment_size: # if a consumer is not assigned all its potential partitions it is subject to reassignment return True @@ -416,27 +480,46 @@ def _perform_reassignments(self, reassignable_partitions): break # the partition must have at least two potential consumers if len(self.partition_to_all_potential_consumers[partition]) <= 1: - log.error("Expected more than one potential consumer for partition {}".format(partition)) + log.error( + "Expected more than one potential consumer for partition {}".format( + partition + ) + ) # the partition must have a current consumer consumer = self.current_partition_consumer.get(partition) if consumer is None: - log.error("Expected partition {} to be assigned to a consumer".format(partition)) + log.error( + "Expected partition {} to be assigned to a consumer".format( + partition + ) + ) if ( partition in self.previous_assignment and len(self.current_assignment[consumer]) - > len(self.current_assignment[self.previous_assignment[partition].consumer]) + 1 + > len( + self.current_assignment[ + self.previous_assignment[partition].consumer + ] + ) + + 1 ): self._reassign_partition_to_consumer( - partition, self.previous_assignment[partition].consumer, + partition, + self.previous_assignment[partition].consumer, ) reassignment_performed = True modified = True continue # check if a better-suited consumer exist for the partition; if so, reassign it - for other_consumer in self.partition_to_all_potential_consumers[partition]: - if len(self.current_assignment[consumer]) > len(self.current_assignment[other_consumer]) + 1: + for other_consumer in self.partition_to_all_potential_consumers[ + partition + ]: + if ( + len(self.current_assignment[consumer]) + > len(self.current_assignment[other_consumer]) + 1 + ): self._reassign_partition(partition) reassignment_performed = True modified = True @@ -458,13 +541,19 @@ def _reassign_partition(self, partition): def _reassign_partition_to_consumer(self, partition, new_consumer): consumer = self.current_partition_consumer[partition] # find the correct partition movement considering the stickiness requirement - partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer) + partition_to_be_moved = self.partition_movements.get_partition_to_be_moved( + partition, consumer, new_consumer + ) self._move_partition(partition_to_be_moved, new_consumer) def _move_partition(self, partition, new_consumer): old_consumer = self.current_partition_consumer[partition] - self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer) - self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + old_consumer + ) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + new_consumer + ) self.partition_movements.move_partition(partition, old_consumer, new_consumer) @@ -498,7 +587,10 @@ def _get_balance_score(assignment): if consumer_id in consumers_to_explore: consumers_to_explore.remove(consumer_id) for other_consumer_id in consumers_to_explore: - score += abs(consumer_to_assignment[consumer_id] - consumer_to_assignment[other_consumer_id]) + score += abs( + consumer_to_assignment[consumer_id] + - consumer_to_assignment[other_consumer_id] + ) return score @@ -604,7 +696,7 @@ def assign(cls, cluster, members): assignment = {} for member_id in members: assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, sorted(executor.get_final_assignment(member_id)), b'' + cls.version, sorted(executor.get_final_assignment(member_id)), b"" ) return assignment @@ -625,24 +717,37 @@ def parse_member_metadata(cls, metadata): user_data = metadata.user_data if not user_data: return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], + generation=cls.DEFAULT_GENERATION_ID, + subscription=metadata.subscription, ) try: decoded_user_data = StickyAssignorUserDataV1.decode(user_data) except Exception as e: # ignore the consumer's previous assignment if it cannot be parsed - log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args + log.error( + "Could not parse member data", e + ) # pylint: disable=logging-too-many-args return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], + generation=cls.DEFAULT_GENERATION_ID, + subscription=metadata.subscription, ) member_partitions = [] - for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member - member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) + for ( + topic, + partitions, + ) in decoded_user_data.previous_assignment: # pylint: disable=no-member + member_partitions.extend( + [TopicPartition(topic, partition) for partition in partitions] + ) return StickyAssignorMemberMetadataV1( # pylint: disable=no-member - partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription + partitions=member_partitions, + generation=decoded_user_data.generation, + subscription=metadata.subscription, ) @classmethod @@ -653,12 +758,18 @@ def metadata(cls, topics): def _metadata(cls, topics, member_assignment_partitions, generation=-1): if member_assignment_partitions is None: log.debug("No member assignment available") - user_data = b'' + user_data = b"" else: - log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) + log.debug( + "Member assignment is available, generating the metadata: generation {}".format( + cls.generation + ) + ) partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: - partitions_by_topic[topic_partition.topic].append(topic_partition.partition) + partitions_by_topic[topic_partition.topic].append( + topic_partition.partition + ) data = StickyAssignorUserDataV1(partitions_by_topic.items(), generation) user_data = data.encode() return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) diff --git a/aiokafka/coordinator/base.py b/aiokafka/coordinator/base.py index 2fe5cf81..5f15bd57 100644 --- a/aiokafka/coordinator/base.py +++ b/aiokafka/coordinator/base.py @@ -11,20 +11,24 @@ from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) +from kafka.protocol.group import ( + HeartbeatRequest, + JoinGroupRequest, + LeaveGroupRequest, + SyncGroupRequest, +) from aiokafka import errors as Errors from .heartbeat import Heartbeat -log = logging.getLogger('aiokafka.coordinator') +log = logging.getLogger("aiokafka.coordinator") class MemberState(object): - UNJOINED = '' # the client is not part of a group - REBALANCING = '' # the client has begun rebalancing - STABLE = '' # the client has joined and is sending heartbeats + UNJOINED = "" # the client is not part of a group + REBALANCING = "" # the client has begun rebalancing + STABLE = "" # the client has joined and is sending heartbeats class Generation(object): @@ -33,10 +37,12 @@ def __init__(self, generation_id, member_id, protocol): self.member_id = member_id self.protocol = protocol + Generation.NO_GENERATION = Generation( OffsetCommitRequest[2].DEFAULT_GENERATION_ID, JoinGroupRequest[0].UNKNOWN_MEMBER_ID, - None) + None, +) class UnjoinedGroupException(Errors.KafkaError): @@ -81,13 +87,13 @@ class BaseCoordinator(object): """ DEFAULT_CONFIG = { - 'group_id': 'kafka-python-default-group', - 'session_timeout_ms': 10000, - 'heartbeat_interval_ms': 3000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, - 'api_version': (0, 10, 1), - 'metric_group_prefix': '', + "group_id": "kafka-python-default-group", + "session_timeout_ms": 10000, + "heartbeat_interval_ms": 3000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, + "api_version": (0, 10, 1), + "metric_group_prefix": "", } def __init__(self, client, metrics, **configs): @@ -115,14 +121,16 @@ def __init__(self, client, metrics, **configs): if key in configs: self.config[key] = configs[key] - if self.config['api_version'] < (0, 10, 1): - if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: - raise Errors.KafkaConfigurationError("Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms") + if self.config["api_version"] < (0, 10, 1): + if self.config["max_poll_interval_ms"] != self.config["session_timeout_ms"]: + raise Errors.KafkaConfigurationError( + "Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms" + ) self._client = client - self.group_id = self.config['group_id'] + self.group_id = self.config["group_id"] self.heartbeat = Heartbeat(**self.config) self._heartbeat_thread = None self._lock = threading.Condition() @@ -133,8 +141,9 @@ def __init__(self, client, metrics, **configs): self.coordinator_id = None self._find_coordinator_future = None self._generation = Generation.NO_GENERATION - self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, - self.config['metric_group_prefix']) + self.sensors = GroupCoordinatorMetrics( + self.heartbeat, metrics, self.config["metric_group_prefix"] + ) @abc.abstractmethod def protocol_type(self): @@ -201,8 +210,9 @@ def _perform_assignment(self, leader_id, protocol, members): pass @abc.abstractmethod - def _on_join_complete(self, generation, member_id, protocol, - member_assignment_bytes): + def _on_join_complete( + self, generation, member_id, protocol, member_assignment_bytes + ): """Invoked when a group member has successfully joined a group. Arguments: @@ -233,7 +243,7 @@ def coordinator(self): if self.coordinator_id is None: return None elif self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead('Node Disconnected') + self.coordinator_dead("Node Disconnected") return None else: return self.coordinator_id @@ -248,7 +258,7 @@ def ensure_coordinator_ready(self): # Prior to 0.8.2 there was no group coordinator # so we will just pick a node at random and treat # it as the "coordinator" - if self.config['api_version'] < (0, 8, 2): + if self.config["api_version"] < (0, 8, 2): self.coordinator_id = self._client.least_loaded_node() if self.coordinator_id is not None: self._client.maybe_connect(self.coordinator_id) @@ -259,12 +269,15 @@ def ensure_coordinator_ready(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): - log.debug('Requesting metadata for group coordinator request: %s', future.exception) + if getattr(future.exception, "invalid_metadata", False): + log.debug( + "Requesting metadata for group coordinator request: %s", + future.exception, + ) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) else: raise future.exception # pylint: disable-msg=raising-bad-type @@ -327,13 +340,16 @@ def time_to_next_heartbeat(self): with self._lock: # if we have not joined the group, we don't need to send heartbeats if self.state is MemberState.UNJOINED: - return float('inf') + return float("inf") return self.heartbeat.time_to_next_heartbeat() def _handle_join_success(self, member_assignment_bytes): with self._lock: - log.info("Successfully joined group %s with generation %s", - self.group_id, self._generation.generation_id) + log.info( + "Successfully joined group %s with generation %s", + self.group_id, + self._generation.generation_id, + ) self.state = MemberState.STABLE self.rejoin_needed = False if self._heartbeat_thread: @@ -361,8 +377,9 @@ def ensure_active_group(self): # changes the matched subscription set) can occur # while another rebalance is still in progress. if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) + self._on_join_prepare( + self._generation.generation_id, self._generation.member_id + ) self.rejoining = True # ensure that there are no pending requests to the coordinator. @@ -389,7 +406,9 @@ def ensure_active_group(self): self.state = MemberState.REBALANCING future = self._send_join_group_request() - self.join_future = future # this should happen before adding callbacks + self.join_future = ( + future # this should happen before adding callbacks + ) # handle join completion in the callback so that the # callback will be invoked even if the consumer is woken up @@ -407,23 +426,30 @@ def ensure_active_group(self): self._client.poll(future=future) if future.succeeded(): - self._on_join_complete(self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - future.value) + self._on_join_complete( + self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + future.value, + ) self.join_future = None self.rejoining = False else: self.join_future = None exception = future.exception - if isinstance(exception, (Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): + if isinstance( + exception, + ( + Errors.UnknownMemberIdError, + Errors.RebalanceInProgressError, + Errors.IllegalGenerationError, + ), + ): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def _rejoin_incomplete(self): return self.join_future is not None @@ -452,59 +478,75 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - if self.config['api_version'] < (0, 9): - raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') - elif (0, 9) <= self.config['api_version'] < (0, 10, 1): + if self.config["api_version"] < (0, 9): + raise Errors.KafkaError("JoinGroupRequest api requires 0.9+ brokers") + elif (0, 9) <= self.config["api_version"] < (0, 10, 1): request = JoinGroupRequest[0]( self.group_id, - self.config['session_timeout_ms'], + self.config["session_timeout_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) - elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): + member_metadata, + ) + elif (0, 10, 1) <= self.config["api_version"] < (0, 11, 0): request = JoinGroupRequest[1]( self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], + self.config["session_timeout_ms"], + self.config["max_poll_interval_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) else: request = JoinGroupRequest[2]( self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], + self.config["session_timeout_ms"], + self.config["max_poll_interval_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) + log.debug( + "Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id + ) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_join_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _failed_request(self, node_id, request, future, error): # Marking coordinator dead # unless the error is caused by internal client pipelining - if not isinstance(error, (Errors.NodeNotReadyError, - Errors.TooManyInFlightRequests)): - log.error('Error sending %s to node %s [%s]', - request.__class__.__name__, node_id, error) + if not isinstance( + error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests) + ): + log.error( + "Error sending %s to node %s [%s]", + request.__class__.__name__, + node_id, + error, + ) self.coordinator_dead(error) else: - log.debug('Error sending %s to node %s [%s]', - request.__class__.__name__, node_id, error) + log.debug( + "Error sending %s to node %s [%s]", + request.__class__.__name__, + node_id, + error, + ) future.failure(error) def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful JoinGroup response for group %s: %s", - self.group_id, response) + log.debug( + "Received successful JoinGroup response for group %s: %s", + self.group_id, + response, + ) self.sensors.join_latency.record((time.time() - send_time) * 1000) with self._lock: if self.state is not MemberState.REBALANCING: @@ -513,44 +555,65 @@ def _handle_join_group_response(self, future, send_time, response): # not want to continue with the sync group. future.failure(UnjoinedGroupException()) else: - self._generation = Generation(response.generation_id, - response.member_id, - response.group_protocol) + self._generation = Generation( + response.generation_id, + response.member_id, + response.group_protocol, + ) if response.leader_id == response.member_id: - log.info("Elected group leader -- performing partition" - " assignments using %s", self._generation.protocol) + log.info( + "Elected group leader -- performing partition" + " assignments using %s", + self._generation.protocol, + ) self._on_join_leader(response).chain(future) else: self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: - log.debug("Attempt to join group %s rejected since coordinator %s" - " is loading the group.", self.group_id, self.coordinator_id) + log.debug( + "Attempt to join group %s rejected since coordinator %s" + " is loading the group.", + self.group_id, + self.coordinator_id, + ) # backoff and retry future.failure(error_type(response)) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self._generation.member_id) self.reset_generation() - log.debug("Attempt to join group %s failed due to unknown member id", - self.group_id) + log.debug( + "Attempt to join group %s failed due to unknown member id", + self.group_id, + ) future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): # re-discover the coordinator and retry with backoff self.coordinator_dead(error_type()) - log.debug("Attempt to join group %s failed due to obsolete " - "coordinator information: %s", self.group_id, - error_type.__name__) + log.debug( + "Attempt to join group %s failed due to obsolete " + "coordinator information: %s", + self.group_id, + error_type.__name__, + ) future.failure(error_type()) - elif error_type in (Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError): + elif error_type in ( + Errors.InconsistentGroupProtocolError, + Errors.InvalidSessionTimeoutError, + Errors.InvalidGroupIdError, + ): # log the error and re-throw the exception error = error_type(response) - log.error("Attempt to join group %s failed due to fatal error: %s", - self.group_id, error) + log.error( + "Attempt to join group %s failed due to fatal error: %s", + self.group_id, + error, + ) future.failure(error) elif error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) @@ -562,14 +625,19 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, self._generation.member_id, - {}) - log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) + {}, + ) + log.debug( + "Sending follower SyncGroup for group %s to coordinator %s: %s", + self.group_id, + self.coordinator_id, + request, + ) return self._send_sync_group_request(request) def _on_join_leader(self, response): @@ -584,23 +652,34 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - group_assignment = self._perform_assignment(response.leader_id, - response.group_protocol, - response.members) + group_assignment = self._perform_assignment( + response.leader_id, response.group_protocol, response.members + ) except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in group_assignment.items()]) - - log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) + [ + ( + member_id, + assignment + if isinstance(assignment, bytes) + else assignment.encode(), + ) + for member_id, assignment in group_assignment.items() + ], + ) + + log.debug( + "Sending leader SyncGroup for group %s to coordinator %s: %s", + self.group_id, + self.coordinator_id, + request, + ) return self._send_sync_group_request(request) def _send_sync_group_request(self, request): @@ -617,8 +696,7 @@ def _send_sync_group_request(self, request): future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_sync_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_sync_group_response(self, future, send_time, response): @@ -633,17 +711,20 @@ def _handle_sync_group_response(self, future, send_time, response): if error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) elif error_type is Errors.RebalanceInProgressError: - log.debug("SyncGroup for group %s failed due to coordinator" - " rebalance", self.group_id) + log.debug( + "SyncGroup for group %s failed due to coordinator" " rebalance", + self.group_id, + ) future.failure(error_type(self.group_id)) - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError): + elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.reset_generation() future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.coordinator_dead(error) @@ -667,8 +748,11 @@ def _send_group_coordinator_request(self): e = Errors.NodeNotReadyError(node_id) return Future().failure(e) - log.debug("Sending group coordinator request for group %s to broker %s", - self.group_id, node_id) + log.debug( + "Sending group coordinator request for group %s to broker %s", + self.group_id, + node_id, + ) request = GroupCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) @@ -682,7 +766,9 @@ def _handle_group_coordinator_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: with self._lock: - coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response) + coordinator_id = self._client.cluster.add_group_coordinator( + self.group_id, response + ) if not coordinator_id: # This could happen if coordinator metadata is different # than broker metadata @@ -690,8 +776,11 @@ def _handle_group_coordinator_response(self, future, response): return self.coordinator_id = coordinator_id - log.info("Discovered coordinator %s for group %s", - self.coordinator_id, self.group_id) + log.info( + "Discovered coordinator %s for group %s", + self.coordinator_id, + self.group_id, + ) self._client.maybe_connect(self.coordinator_id) self.heartbeat.reset_timeouts() future.success(self.coordinator_id) @@ -705,15 +794,20 @@ def _handle_group_coordinator_response(self, future, response): future.failure(error) else: error = error_type() - log.error("Group coordinator lookup for group %s failed: %s", - self.group_id, error) + log.error( + "Group coordinator lookup for group %s failed: %s", self.group_id, error + ) future.failure(error) def coordinator_dead(self, error): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, self.group_id, error) + log.warning( + "Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, + self.group_id, + error, + ) self.coordinator_id = None def generation(self): @@ -738,14 +832,14 @@ def request_rejoin(self): def _start_heartbeat_thread(self): if self._heartbeat_thread is None: - log.info('Starting new heartbeat thread') + log.info("Starting new heartbeat thread") self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) self._heartbeat_thread.daemon = True self._heartbeat_thread.start() def _close_heartbeat_thread(self): if self._heartbeat_thread is not None: - log.info('Stopping heartbeat thread') + log.info("Stopping heartbeat thread") try: self._heartbeat_thread.close() except ReferenceError: @@ -764,15 +858,19 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" with self._client._lock, self._lock: - if (not self.coordinator_unknown() + if ( + not self.coordinator_unknown() and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION): + and self._generation is not Generation.NO_GENERATION + ): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + log.info("Leaving consumer group (%s).", self.group_id) + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 + request = LeaveGroupRequest[version]( + self.group_id, self._generation.member_id + ) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") @@ -783,11 +881,15 @@ def maybe_leave_group(self): def _handle_leave_group_response(self, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("LeaveGroup request for group %s returned successfully", - self.group_id) + log.debug( + "LeaveGroup request for group %s returned successfully", self.group_id + ) else: - log.error("LeaveGroup request for group %s failed with error: %s", - self.group_id, error_type()) + log.error( + "LeaveGroup request for group %s failed with error: %s", + self.group_id, + error_type(), + ) def _send_heartbeat_request(self): """Send a heartbeat request""" @@ -799,45 +901,61 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = HeartbeatRequest[version](self.group_id, - self._generation.generation_id, - self._generation.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 + request = HeartbeatRequest[version]( + self.group_id, self._generation.generation_id, self._generation.member_id + ) + log.debug( + "Heartbeat: %s[%s] %s", + request.group, + request.generation_id, + request.member_id, + ) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_heartbeat_response(self, future, send_time, response): self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful heartbeat response for group %s", - self.group_id) + log.debug( + "Received successful heartbeat response for group %s", self.group_id + ) future.success(None) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): - log.warning("Heartbeat failed for group %s: coordinator (node %s)" - " is either not started or not valid", self.group_id, - self.coordinator()) + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): + log.warning( + "Heartbeat failed for group %s: coordinator (node %s)" + " is either not started or not valid", + self.group_id, + self.coordinator(), + ) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.warning("Heartbeat failed for group %s because it is" - " rebalancing", self.group_id) + log.warning( + "Heartbeat failed for group %s because it is" " rebalancing", + self.group_id, + ) self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.warning("Heartbeat failed for group %s: generation id is not " - " current.", self.group_id) + log.warning( + "Heartbeat failed for group %s: generation id is not " " current.", + self.group_id, + ) self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: - log.warning("Heartbeat: local member_id was not recognized;" - " this consumer needs to re-join") + log.warning( + "Heartbeat: local member_id was not recognized;" + " this consumer needs to re-join" + ) self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: @@ -856,55 +974,99 @@ def __init__(self, heartbeat, metrics, prefix, tags=None): self.metrics = metrics self.metric_group_name = prefix + "-coordinator-metrics" - self.heartbeat_latency = metrics.sensor('heartbeat-latency') - self.heartbeat_latency.add(metrics.metric_name( - 'heartbeat-response-time-max', self.metric_group_name, - 'The max time taken to receive a response to a heartbeat request', - tags), Max()) - self.heartbeat_latency.add(metrics.metric_name( - 'heartbeat-rate', self.metric_group_name, - 'The average number of heartbeats per second', - tags), Rate(sampled_stat=Count())) - - self.join_latency = metrics.sensor('join-latency') - self.join_latency.add(metrics.metric_name( - 'join-time-avg', self.metric_group_name, - 'The average time taken for a group rejoin', - tags), Avg()) - self.join_latency.add(metrics.metric_name( - 'join-time-max', self.metric_group_name, - 'The max time taken for a group rejoin', - tags), Max()) - self.join_latency.add(metrics.metric_name( - 'join-rate', self.metric_group_name, - 'The number of group joins per second', - tags), Rate(sampled_stat=Count())) - - self.sync_latency = metrics.sensor('sync-latency') - self.sync_latency.add(metrics.metric_name( - 'sync-time-avg', self.metric_group_name, - 'The average time taken for a group sync', - tags), Avg()) - self.sync_latency.add(metrics.metric_name( - 'sync-time-max', self.metric_group_name, - 'The max time taken for a group sync', - tags), Max()) - self.sync_latency.add(metrics.metric_name( - 'sync-rate', self.metric_group_name, - 'The number of group syncs per second', - tags), Rate(sampled_stat=Count())) - - metrics.add_metric(metrics.metric_name( - 'last-heartbeat-seconds-ago', self.metric_group_name, - 'The number of seconds since the last controller heartbeat was sent', - tags), AnonMeasurable( - lambda _, now: (now / 1000) - self.heartbeat.last_send)) + self.heartbeat_latency = metrics.sensor("heartbeat-latency") + self.heartbeat_latency.add( + metrics.metric_name( + "heartbeat-response-time-max", + self.metric_group_name, + "The max time taken to receive a response to a heartbeat request", + tags, + ), + Max(), + ) + self.heartbeat_latency.add( + metrics.metric_name( + "heartbeat-rate", + self.metric_group_name, + "The average number of heartbeats per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + self.join_latency = metrics.sensor("join-latency") + self.join_latency.add( + metrics.metric_name( + "join-time-avg", + self.metric_group_name, + "The average time taken for a group rejoin", + tags, + ), + Avg(), + ) + self.join_latency.add( + metrics.metric_name( + "join-time-max", + self.metric_group_name, + "The max time taken for a group rejoin", + tags, + ), + Max(), + ) + self.join_latency.add( + metrics.metric_name( + "join-rate", + self.metric_group_name, + "The number of group joins per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + self.sync_latency = metrics.sensor("sync-latency") + self.sync_latency.add( + metrics.metric_name( + "sync-time-avg", + self.metric_group_name, + "The average time taken for a group sync", + tags, + ), + Avg(), + ) + self.sync_latency.add( + metrics.metric_name( + "sync-time-max", + self.metric_group_name, + "The max time taken for a group sync", + tags, + ), + Max(), + ) + self.sync_latency.add( + metrics.metric_name( + "sync-rate", + self.metric_group_name, + "The number of group syncs per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + metrics.add_metric( + metrics.metric_name( + "last-heartbeat-seconds-ago", + self.metric_group_name, + "The number of seconds since the last controller heartbeat was sent", + tags, + ), + AnonMeasurable(lambda _, now: (now / 1000) - self.heartbeat.last_send), + ) class HeartbeatThread(threading.Thread): def __init__(self, coordinator): super(HeartbeatThread, self).__init__() - self.name = coordinator.group_id + '-heartbeat' + self.name = coordinator.group_id + "-heartbeat" self.coordinator = coordinator self.enabled = False self.closed = False @@ -924,26 +1086,29 @@ def close(self): with self.coordinator._lock: self.coordinator._lock.notify() if self.is_alive(): - self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) + self.join(self.coordinator.config["heartbeat_interval_ms"] / 1000) if self.is_alive(): log.warning("Heartbeat thread did not fully terminate during close") def run(self): try: - log.debug('Heartbeat thread started') + log.debug("Heartbeat thread started") while not self.closed: self._run_once() except ReferenceError: - log.debug('Heartbeat thread closed due to coordinator gc') + log.debug("Heartbeat thread closed due to coordinator gc") except RuntimeError as e: - log.error("Heartbeat thread for group %s failed due to unexpected error: %s", - self.coordinator.group_id, e) + log.error( + "Heartbeat thread for group %s failed due to unexpected error: %s", + self.coordinator.group_id, + e, + ) self.failed = e finally: - log.debug('Heartbeat thread closed') + log.debug("Heartbeat thread closed") def _run_once(self): with self.coordinator._client._lock, self.coordinator._lock: @@ -957,16 +1122,16 @@ def _run_once(self): with self.coordinator._lock: if not self.enabled: - log.debug('Heartbeat disabled. Waiting') + log.debug("Heartbeat disabled. Waiting") self.coordinator._lock.wait() - log.debug('Heartbeat re-enabled.') + log.debug("Heartbeat re-enabled.") return if self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. - log.debug('Group state is not stable, disabling heartbeats') + log.debug("Group state is not stable, disabling heartbeats") self.disable() return @@ -976,27 +1141,31 @@ def _run_once(self): # the immediate future check ensures that we backoff # properly in the case that no brokers are available # to connect to (and the future is automatically failed). - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + self.coordinator._lock.wait( + self.coordinator.config["retry_backoff_ms"] / 1000 + ) elif self.coordinator.heartbeat.session_timeout_expired(): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure # the coordinator is still healthy. - log.warning('Heartbeat session expired, marking coordinator dead') - self.coordinator.coordinator_dead('Heartbeat session expired') + log.warning("Heartbeat session expired, marking coordinator dead") + self.coordinator.coordinator_dead("Heartbeat session expired") elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - log.warning('Heartbeat poll expired, leaving group') + log.warning("Heartbeat poll expired, leaving group") self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected - log.log(0, 'Not ready to heartbeat, waiting') - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + log.log(0, "Not ready to heartbeat, waiting") + self.coordinator._lock.wait( + self.coordinator.config["retry_backoff_ms"] / 1000 + ) else: self.coordinator.heartbeat.sent_heartbeat() diff --git a/aiokafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py index 58b90a5d..394f8333 100644 --- a/aiokafka/coordinator/consumer.py +++ b/aiokafka/coordinator/consumer.py @@ -27,19 +27,24 @@ class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" + DEFAULT_CONFIG = { - 'group_id': 'kafka-python-default-group', - 'enable_auto_commit': True, - 'auto_commit_interval_ms': 5000, - 'default_offset_commit_callback': None, - 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor), - 'session_timeout_ms': 10000, - 'heartbeat_interval_ms': 3000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, - 'api_version': (0, 10, 1), - 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + "group_id": "kafka-python-default-group", + "enable_auto_commit": True, + "auto_commit_interval_ms": 5000, + "default_offset_commit_callback": None, + "assignors": ( + RangePartitionAssignor, + RoundRobinPartitionAssignor, + StickyPartitionAssignor, + ), + "session_timeout_ms": 10000, + "heartbeat_interval_ms": 3000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, + "api_version": (0, 10, 1), + "exclude_internal_topics": True, + "metric_group_prefix": "consumer", } def __init__(self, client, subscription, metrics, **configs): @@ -88,46 +93,60 @@ def __init__(self, client, subscription, metrics, **configs): self._subscription = subscription self._is_leader = False self._joined_subscription = set() - self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) + self._metadata_snapshot = self._build_metadata_snapshot( + subscription, client.cluster + ) self._assignment_snapshot = None self._cluster = client.cluster - self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 + self.auto_commit_interval = self.config["auto_commit_interval_ms"] / 1000 self.next_auto_commit_deadline = None self.completed_offset_commits = collections.deque() - if self.config['default_offset_commit_callback'] is None: - self.config['default_offset_commit_callback'] = self._default_offset_commit_callback - - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 9): - if not self.config['assignors']: - raise Errors.KafkaConfigurationError('Coordinator requires assignors') - if self.config['api_version'] < (0, 10, 1): - if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: - raise Errors.KafkaConfigurationError("Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms") - - if self.config['enable_auto_commit']: - if self.config['api_version'] < (0, 8, 1): - log.warning('Broker version (%s) does not support offset' - ' commits; disabling auto-commit.', - self.config['api_version']) - self.config['enable_auto_commit'] = False - elif self.config['group_id'] is None: - log.warning('group_id is None: disabling auto-commit.') - self.config['enable_auto_commit'] = False + if self.config["default_offset_commit_callback"] is None: + self.config[ + "default_offset_commit_callback" + ] = self._default_offset_commit_callback + + if self.config["group_id"] is not None: + if self.config["api_version"] >= (0, 9): + if not self.config["assignors"]: + raise Errors.KafkaConfigurationError( + "Coordinator requires assignors" + ) + if self.config["api_version"] < (0, 10, 1): + if ( + self.config["max_poll_interval_ms"] + != self.config["session_timeout_ms"] + ): + raise Errors.KafkaConfigurationError( + "Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms" + ) + + if self.config["enable_auto_commit"]: + if self.config["api_version"] < (0, 8, 1): + log.warning( + "Broker version (%s) does not support offset" + " commits; disabling auto-commit.", + self.config["api_version"], + ) + self.config["enable_auto_commit"] = False + elif self.config["group_id"] is None: + log.warning("group_id is None: disabling auto-commit.") + self.config["enable_auto_commit"] = False else: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, self.config['metric_group_prefix'], self._subscription) + metrics, self.config["metric_group_prefix"], self._subscription + ) self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) def __del__(self): - if hasattr(self, '_cluster') and self._cluster: + if hasattr(self, "_cluster") and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) super(ConsumerCoordinator, self).__del__() @@ -137,7 +156,7 @@ def protocol_type(self): def group_protocols(self): """Returns list of preferred (protocols, metadata)""" if self._subscription.subscription is None: - raise Errors.IllegalStateError('Consumer has not subscribed to topics') + raise Errors.IllegalStateError("Consumer has not subscribed to topics") # dpkp note: I really dislike this. # why? because we are using this strange method group_protocols, # which is seemingly innocuous, to set internal state (_joined_subscription) @@ -150,7 +169,7 @@ def group_protocols(self): # best I've got for now. self._joined_subscription = set(self._subscription.subscription) metadata_list = [] - for assignor in self.config['assignors']: + for assignor in self.config["assignors"]: metadata = assignor.metadata(self._joined_subscription) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) @@ -163,7 +182,7 @@ def _handle_metadata_update(self, cluster): if self._subscription.subscribed_pattern: topics = [] - for topic in cluster.topics(self.config['exclude_internal_topics']): + for topic in cluster.topics(self.config["exclude_internal_topics"]): if self._subscription.subscribed_pattern.match(topic): topics.append(topic) @@ -174,25 +193,29 @@ def _handle_metadata_update(self, cluster): # check if there are any changes to the metadata which should trigger # a rebalance if self._subscription.partitions_auto_assigned(): - metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) + metadata_snapshot = self._build_metadata_snapshot( + self._subscription, cluster + ) if self._metadata_snapshot != metadata_snapshot: self._metadata_snapshot = metadata_snapshot # If we haven't got group coordinator support, # just assign all partitions locally if self._auto_assign_all_partitions(): - self._subscription.assign_from_subscribed([ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ]) + self._subscription.assign_from_subscribed( + [ + TopicPartition(topic, partition) + for topic in self._subscription.subscription + for partition in self._metadata_snapshot[topic] + ] + ) def _auto_assign_all_partitions(self): # For users that use "subscribe" without group support, # we will simply assign all partitions to this consumer - if self.config['api_version'] < (0, 9): + if self.config["api_version"] < (0, 9): return True - elif self.config['group_id'] is None: + elif self.config["group_id"] is None: return True else: return False @@ -205,20 +228,23 @@ def _build_metadata_snapshot(self, subscription, cluster): return metadata_snapshot def _lookup_assignor(self, name): - for assignor in self.config['assignors']: + for assignor in self.config["assignors"]: if assignor.name == name: return assignor return None - def _on_join_complete(self, generation, member_id, protocol, - member_assignment_bytes): + def _on_join_complete( + self, generation, member_id, protocol, member_assignment_bytes + ): # only the leader is responsible for monitoring for metadata changes # (i.e. partition changes) if not self._is_leader: self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) - assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) + assert assignor, "Coordinator selected invalid assignment protocol: %s" % ( + protocol, + ) assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -235,25 +261,29 @@ def _on_join_complete(self, generation, member_id, protocol, # give the assignor a chance to update internal state # based on the received assignment assignor.on_assignment(assignment) - if assignor.name == 'sticky': + if assignor.name == "sticky": assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) - log.info("Setting newly assigned partitions %s for group %s", - assigned, self.group_id) + log.info( + "Setting newly assigned partitions %s for group %s", assigned, self.group_id + ) # execute the user's callback after rebalance if self._subscription.listener: try: self._subscription.listener.on_partitions_assigned(assigned) except Exception: - log.exception("User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, self.group_id, - assigned) + log.exception( + "User provided listener %s for group %s" + " failed on partition assignment: %s", + self._subscription.listener, + self.group_id, + assigned, + ) def poll(self): """ @@ -269,7 +299,10 @@ def poll(self): self._invoke_completed_offset_commit_callbacks() self.ensure_coordinator_ready() - if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if ( + self.config["api_version"] >= (0, 9) + and self._subscription.partitions_auto_assigned() + ): if self.need_rejoin(): # due to a race condition between the initial metadata fetch and the # initial rebalance, we need to ensure that the metadata is fresh @@ -294,24 +327,27 @@ def poll(self): def time_to_next_poll(self): """Return seconds (float) remaining until :meth:`.poll` should be called again""" - if not self.config['enable_auto_commit']: + if not self.config["enable_auto_commit"]: return self.time_to_next_heartbeat() if time.time() > self.next_auto_commit_deadline: return 0 - return min(self.next_auto_commit_deadline - time.time(), - self.time_to_next_heartbeat()) + return min( + self.next_auto_commit_deadline - time.time(), self.time_to_next_heartbeat() + ) def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) + assert assignor, "Invalid assignment protocol: %s" % (assignment_strategy,) member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) member_metadata[member_id] = metadata - all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + all_subscribed_topics.update( + metadata.subscription + ) # pylint: disable-msg=no-member # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes @@ -327,9 +363,13 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): self._is_leader = True self._assignment_snapshot = self._metadata_snapshot - log.debug("Performing assignment for group %s using strategy %s" - " with subscriptions %s", self.group_id, assignor.name, - member_metadata) + log.debug( + "Performing assignment for group %s using strategy %s" + " with subscriptions %s", + self.group_id, + assignor.name, + member_metadata, + ) assignments = assignor.assign(self._cluster, member_metadata) @@ -345,16 +385,22 @@ def _on_join_prepare(self, generation, member_id): self._maybe_auto_commit_offsets_sync() # execute the user's callback before rebalance - log.info("Revoking previously assigned partitions %s for group %s", - self._subscription.assigned_partitions(), self.group_id) + log.info( + "Revoking previously assigned partitions %s for group %s", + self._subscription.assigned_partitions(), + self.group_id, + ) if self._subscription.listener: try: revoked = set(self._subscription.assigned_partitions()) self._subscription.listener.on_partitions_revoked(revoked) except Exception: - log.exception("User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, self.group_id) + log.exception( + "User provided subscription listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.listener, + self.group_id, + ) self._is_leader = False self._subscription.reset_group_subscription() @@ -372,13 +418,17 @@ def need_rejoin(self): return False # we need to rejoin if we performed the assignment and metadata has changed - if (self._assignment_snapshot is not None - and self._assignment_snapshot != self._metadata_snapshot): + if ( + self._assignment_snapshot is not None + and self._assignment_snapshot != self._metadata_snapshot + ): return True # we need to join if our subscription has changed since the last join - if (self._joined_subscription is not None - and self._joined_subscription != self._subscription.subscription): + if ( + self._joined_subscription is not None + and self._joined_subscription != self._subscription.subscription + ): return True return super(ConsumerCoordinator, self).need_rejoin() @@ -386,7 +436,9 @@ def need_rejoin(self): def refresh_committed_offsets_if_needed(self): """Fetch committed offsets for assigned partitions.""" if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) + offsets = self.fetch_committed_offsets( + self._subscription.assigned_partitions() + ) for partition, offset in offsets.items(): # verify assignment is still active if self._subscription.is_assigned(partition): @@ -416,9 +468,9 @@ def fetch_committed_offsets(self, partitions): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def close(self, autocommit=True): """Close the coordinator, leave the current group, @@ -465,28 +517,39 @@ def commit_offsets_async(self, offsets, callback=None): # same order that they were added. Note also that BaseCoordinator # prevents multiple concurrent coordinator lookup requests. future = self.lookup_coordinator() - future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) + future.add_callback( + lambda r: functools.partial( + self._do_commit_offsets_async, offsets, callback + )() + ) if callback: - future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) + future.add_errback( + lambda e: self.completed_offset_commits.appendleft( + (callback, offsets, e) + ) + ) # ensure the commit has a chance to be transmitted (without blocking on # its completion). Note that commits are treated as heartbeats by the # coordinator, so there is no need to explicitly allow heartbeats # through delayed task execution. - self._client.poll(timeout_ms=0) # no wakeup if we add that feature + self._client.poll(timeout_ms=0) # no wakeup if we add that feature return future def _do_commit_offsets_async(self, offsets, callback=None): - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) if callback is None: - callback = self.config['default_offset_commit_callback'] + callback = self.config["default_offset_commit_callback"] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) - future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) + future.add_both( + lambda res: self.completed_offset_commits.appendleft( + (callback, offsets, res) + ) + ) return future def commit_offsets_sync(self, offsets): @@ -500,10 +563,9 @@ def commit_offsets_sync(self, offsets): Raises error on failure """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) self._invoke_completed_offset_commit_callbacks() if not offsets: return @@ -518,26 +580,32 @@ def commit_offsets_sync(self, offsets): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def _maybe_auto_commit_offsets_sync(self): - if self.config['enable_auto_commit']: + if self.config["enable_auto_commit"]: try: self.commit_offsets_sync(self._subscription.all_consumed_offsets()) # The three main group membership errors are known and should not # require a stacktrace -- just a warning - except (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): - log.warning("Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery.") + except ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): + log.warning( + "Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery." + ) except Exception: - log.exception("Offset commit failed: This is likely to cause" - " duplicate message delivery") + log.exception( + "Offset commit failed: This is likely to cause" + " duplicate message delivery" + ) def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. @@ -553,19 +621,17 @@ def _send_offset_commit_request(self, offsets): Returns: Future: indicating whether the commit was successful or not """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) if not offsets: - log.debug('No offsets to commit') + log.debug("No offsets to commit") return Future().success(None) node_id = self.coordinator() if node_id is None: return Future().failure(Errors.GroupCoordinatorNotAvailableError) - # create the offset commit request offset_data = collections.defaultdict(dict) for tp, offset in offsets.items(): @@ -579,53 +645,69 @@ def _send_offset_commit_request(self, offsets): # if the generation is None, we are not part of an active group # (and we expect to be). The only thing we can do is fail the commit # and let the user rejoin the group in poll() - if self.config['api_version'] >= (0, 9) and generation is None: + if self.config["api_version"] >= (0, 9) and generation is None: return Future().failure(Errors.CommitFailedError()) - if self.config['api_version'] >= (0, 9): + if self.config["api_version"] >= (0, 9): request = OffsetCommitRequest[2]( self.group_id, generation.generation_id, generation.member_id, OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] + [ + ( + topic, + [ + (partition, offset.offset, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - elif self.config['api_version'] >= (0, 8, 2): + elif self.config["api_version"] >= (0, 8, 2): request = OffsetCommitRequest[1]( - self.group_id, -1, '', - [( - topic, [( - partition, - offset.offset, - -1, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] + self.group_id, + -1, + "", + [ + ( + topic, + [ + (partition, offset.offset, -1, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - elif self.config['api_version'] >= (0, 8, 1): + elif self.config["api_version"] >= (0, 8, 1): request = OffsetCommitRequest[0]( self.group_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in partitions.items()] - ) for topic, partitions in offset_data.items()] + [ + ( + topic, + [ + (partition, offset.offset, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - log.debug("Sending offset-commit request with %s for group %s to %s", - offsets, self.group_id, node_id) + log.debug( + "Sending offset-commit request with %s for group %s to %s", + offsets, + self.group_id, + node_id, + ) future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) + _f.add_callback( + self._handle_offset_commit_response, offsets, future, time.time() + ) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -641,58 +723,87 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - log.debug("Group %s committed offset %s for partition %s", - self.group_id, offset, tp) + log.debug( + "Group %s committed offset %s for partition %s", + self.group_id, + offset, + tp, + ) if self._subscription.is_assigned(tp): self._subscription.assignment[tp].committed = offset elif error_type is Errors.GroupAuthorizationFailedError: - log.error("Not authorized to commit offsets for group %s", - self.group_id) + log.error( + "Not authorized to commit offsets for group %s", self.group_id + ) future.failure(error_type(self.group_id)) return elif error_type is Errors.TopicAuthorizationFailedError: unauthorized_topics.add(topic) - elif error_type in (Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError): + elif error_type in ( + Errors.OffsetMetadataTooLargeError, + Errors.InvalidCommitOffsetSizeError, + ): # raise the error to the user - log.debug("OffsetCommit for group %s failed on partition %s" - " %s", self.group_id, tp, error_type.__name__) + log.debug( + "OffsetCommit for group %s failed on partition %s" " %s", + self.group_id, + tp, + error_type.__name__, + ) future.failure(error_type()) return elif error_type is Errors.GroupLoadInProgressError: # just retry - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error_type.__name__) + log.debug( + "OffsetCommit for group %s failed: %s", + self.group_id, + error_type.__name__, + ) future.failure(error_type(self.group_id)) return - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError): - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error_type.__name__) + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + Errors.RequestTimedOutError, + ): + log.debug( + "OffsetCommit for group %s failed: %s", + self.group_id, + error_type.__name__, + ) self.coordinator_dead(error_type()) future.failure(error_type(self.group_id)) return - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): + elif error_type in ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): # need to re-join group error = error_type(self.group_id) - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error) + log.debug( + "OffsetCommit for group %s failed: %s", self.group_id, error + ) self.reset_generation() future.failure(Errors.CommitFailedError()) return else: - log.error("Group %s failed to commit partition %s at offset" - " %s: %s", self.group_id, tp, offset, - error_type.__name__) + log.error( + "Group %s failed to commit partition %s at offset" " %s: %s", + self.group_id, + tp, + offset, + error_type.__name__, + ) future.failure(error_type()) return if unauthorized_topics: - log.error("Not authorized to commit to topics %s for group %s", - unauthorized_topics, self.group_id) + log.error( + "Not authorized to commit to topics %s for group %s", + unauthorized_topics, + self.group_id, + ) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: future.success(None) @@ -709,7 +820,7 @@ def _send_offset_fetch_request(self, partitions): Returns: Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions: return Future().success({}) @@ -720,26 +831,26 @@ def _send_offset_fetch_request(self, partitions): # Verify node is ready if not self._client.ready(node_id): - log.debug("Node %s not ready -- failing offset fetch request", - node_id) + log.debug("Node %s not ready -- failing offset fetch request", node_id) return Future().failure(Errors.NodeNotReadyError) - log.debug("Group %s fetching committed offsets for partitions: %s", - self.group_id, partitions) + log.debug( + "Group %s fetching committed offsets for partitions: %s", + self.group_id, + partitions, + ) # construct the request topic_partitions = collections.defaultdict(set) for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - if self.config['api_version'] >= (0, 8, 2): + if self.config["api_version"] >= (0, 8, 2): request = OffsetFetchRequest[1]( - self.group_id, - list(topic_partitions.items()) + self.group_id, list(topic_partitions.items()) ) else: request = OffsetFetchRequest[0]( - self.group_id, - list(topic_partitions.items()) + self.group_id, list(topic_partitions.items()) ) # send the request with a callback @@ -757,8 +868,12 @@ def _handle_offset_fetch_response(self, future, response): error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: error = error_type() - log.debug("Group %s failed to fetch offset for partition" - " %s: %s", self.group_id, tp, error) + log.debug( + "Group %s failed to fetch offset for partition" " %s: %s", + self.group_id, + tp, + error, + ) if error_type is Errors.GroupLoadInProgressError: # just retry future.failure(error) @@ -767,13 +882,16 @@ def _handle_offset_fetch_response(self, future, response): self.coordinator_dead(error_type()) future.failure(error) elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning("OffsetFetchRequest -- unknown topic %s" - " (have you committed any offsets yet?)", - topic) + log.warning( + "OffsetFetchRequest -- unknown topic %s" + " (have you committed any offsets yet?)", + topic, + ) continue else: - log.error("Unknown error fetching offsets for %s: %s", - tp, error) + log.error( + "Unknown error fetching offsets for %s: %s", tp, error + ) future.failure(error) return elif offset >= 0: @@ -781,8 +899,11 @@ def _handle_offset_fetch_response(self, future, response): # (-1 indicates no committed offset to fetch) offsets[tp] = OffsetAndMetadata(offset, metadata) else: - log.debug("Group %s has no committed offset for partition" - " %s", self.group_id, tp) + log.debug( + "Group %s has no committed offset for partition" " %s", + self.group_id, + tp, + ) future.success(offsets) def _default_offset_commit_callback(self, offsets, exception): @@ -791,43 +912,74 @@ def _default_offset_commit_callback(self, offsets, exception): def _commit_offsets_async_on_complete(self, offsets, exception): if exception is not None: - log.warning("Auto offset commit failed for group %s: %s", - self.group_id, exception) - if getattr(exception, 'retriable', False): - self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + log.warning( + "Auto offset commit failed for group %s: %s", self.group_id, exception + ) + if getattr(exception, "retriable", False): + self.next_auto_commit_deadline = min( + time.time() + self.config["retry_backoff_ms"] / 1000, + self.next_auto_commit_deadline, + ) else: - log.debug("Completed autocommit of offsets %s for group %s", - offsets, self.group_id) + log.debug( + "Completed autocommit of offsets %s for group %s", + offsets, + self.group_id, + ) def _maybe_auto_commit_offsets_async(self): - if self.config['enable_auto_commit']: + if self.config["enable_auto_commit"]: if self.coordinator_unknown(): - self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 + self.next_auto_commit_deadline = ( + time.time() + self.config["retry_backoff_ms"] / 1000 + ) elif time.time() > self.next_auto_commit_deadline: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.commit_offsets_async(self._subscription.all_consumed_offsets(), - self._commit_offsets_async_on_complete) + self.commit_offsets_async( + self._subscription.all_consumed_offsets(), + self._commit_offsets_async_on_complete, + ) class ConsumerCoordinatorMetrics(object): def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) - - self.commit_latency = metrics.sensor('commit-latency') - self.commit_latency.add(metrics.metric_name( - 'commit-latency-avg', self.metric_group_name, - 'The average time taken for a commit request'), Avg()) - self.commit_latency.add(metrics.metric_name( - 'commit-latency-max', self.metric_group_name, - 'The max time taken for a commit request'), Max()) - self.commit_latency.add(metrics.metric_name( - 'commit-rate', self.metric_group_name, - 'The number of commit calls per second'), Rate(sampled_stat=Count())) - - num_parts = AnonMeasurable(lambda config, now: - len(subscription.assigned_partitions())) - metrics.add_metric(metrics.metric_name( - 'assigned-partitions', self.metric_group_name, - 'The number of partitions currently assigned to this consumer'), - num_parts) + self.metric_group_name = "%s-coordinator-metrics" % (metric_group_prefix,) + + self.commit_latency = metrics.sensor("commit-latency") + self.commit_latency.add( + metrics.metric_name( + "commit-latency-avg", + self.metric_group_name, + "The average time taken for a commit request", + ), + Avg(), + ) + self.commit_latency.add( + metrics.metric_name( + "commit-latency-max", + self.metric_group_name, + "The max time taken for a commit request", + ), + Max(), + ) + self.commit_latency.add( + metrics.metric_name( + "commit-rate", + self.metric_group_name, + "The number of commit calls per second", + ), + Rate(sampled_stat=Count()), + ) + + num_parts = AnonMeasurable( + lambda config, now: len(subscription.assigned_partitions()) + ) + metrics.add_metric( + metrics.metric_name( + "assigned-partitions", + self.metric_group_name, + "The number of partitions currently assigned to this consumer", + ), + num_parts, + ) diff --git a/aiokafka/coordinator/heartbeat.py b/aiokafka/coordinator/heartbeat.py index 2f5930b6..dccf9da1 100644 --- a/aiokafka/coordinator/heartbeat.py +++ b/aiokafka/coordinator/heartbeat.py @@ -6,11 +6,11 @@ class Heartbeat(object): DEFAULT_CONFIG = { - 'group_id': None, - 'heartbeat_interval_ms': 3000, - 'session_timeout_ms': 10000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, + "group_id": None, + "heartbeat_interval_ms": 3000, + "session_timeout_ms": 10000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, } def __init__(self, **configs): @@ -19,14 +19,15 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs[key] - if self.config['group_id'] is not None: - assert (self.config['heartbeat_interval_ms'] - <= self.config['session_timeout_ms']), ( - 'Heartbeat interval must be lower than the session timeout') + if self.config["group_id"] is not None: + assert ( + self.config["heartbeat_interval_ms"] + <= self.config["session_timeout_ms"] + ), "Heartbeat interval must be lower than the session timeout" - self.last_send = -1 * float('inf') - self.last_receive = -1 * float('inf') - self.last_poll = -1 * float('inf') + self.last_send = -1 * float("inf") + self.last_receive = -1 * float("inf") + self.last_poll = -1 * float("inf") self.last_reset = time.time() self.heartbeat_failed = None @@ -47,9 +48,9 @@ def time_to_next_heartbeat(self): """Returns seconds (float) remaining before next heartbeat should be sent""" time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) if self.heartbeat_failed: - delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000 + delay_to_next_heartbeat = self.config["retry_backoff_ms"] / 1000 else: - delay_to_next_heartbeat = self.config['heartbeat_interval_ms'] / 1000 + delay_to_next_heartbeat = self.config["heartbeat_interval_ms"] / 1000 return max(0, delay_to_next_heartbeat - time_since_last_heartbeat) def should_heartbeat(self): @@ -57,7 +58,7 @@ def should_heartbeat(self): def session_timeout_expired(self): last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000) + return (time.time() - last_recv) > (self.config["session_timeout_ms"] / 1000) def reset_timeouts(self): self.last_reset = time.time() @@ -65,4 +66,6 @@ def reset_timeouts(self): self.heartbeat_failed = False def poll_timeout_expired(self): - return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) + return (time.time() - self.last_poll) > ( + self.config["max_poll_interval_ms"] / 1000 + ) diff --git a/aiokafka/coordinator/protocol.py b/aiokafka/coordinator/protocol.py index 56a39015..e3bbd692 100644 --- a/aiokafka/coordinator/protocol.py +++ b/aiokafka/coordinator/protocol.py @@ -7,27 +7,29 @@ class ConsumerProtocolMemberMetadata(Struct): SCHEMA = Schema( - ('version', Int16), - ('subscription', Array(String('utf-8'))), - ('user_data', Bytes)) + ("version", Int16), + ("subscription", Array(String("utf-8"))), + ("user_data", Bytes), + ) class ConsumerProtocolMemberAssignment(Struct): SCHEMA = Schema( - ('version', Int16), - ('assignment', Array( - ('topic', String('utf-8')), - ('partitions', Array(Int32)))), - ('user_data', Bytes)) + ("version", Int16), + ("assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), + ("user_data", Bytes), + ) def partitions(self): - return [TopicPartition(topic, partition) - for topic, partitions in self.assignment # pylint: disable-msg=no-member - for partition in partitions] + return [ + TopicPartition(topic, partition) + for topic, partitions in self.assignment # pylint: disable-msg=no-member + for partition in partitions + ] class ConsumerProtocol(object): - PROTOCOL_TYPE = 'consumer' - ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') + PROTOCOL_TYPE = "consumer" + ASSIGNMENT_STRATEGIES = ("range", "roundrobin") METADATA = ConsumerProtocolMemberMetadata ASSIGNMENT = ConsumerProtocolMemberAssignment