diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 27421690..c82f6fb0 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -6,10 +6,9 @@ import warnings from typing import Dict, List -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor - from aiokafka.abc import ConsumerRebalanceListener from aiokafka.client import AIOKafkaClient +from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from aiokafka.errors import ( TopicAuthorizationFailedError, OffsetOutOfRangeError, ConsumerStoppedError, IllegalOperation, UnsupportedVersionError, diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 8d244288..8a8c76f4 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -4,8 +4,6 @@ import copy import time -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocol from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest) @@ -15,6 +13,8 @@ import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition from aiokafka.client import ConnectionGroup, CoordinationType +from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from aiokafka.coordinator.protocol import ConsumerProtocol from aiokafka.util import create_future, create_task log = logging.getLogger(__name__) diff --git a/kafka/coordinator/__init__.py b/aiokafka/coordinator/__init__.py similarity index 100% rename from kafka/coordinator/__init__.py rename to aiokafka/coordinator/__init__.py diff --git a/kafka/coordinator/assignors/__init__.py b/aiokafka/coordinator/assignors/__init__.py similarity index 100% rename from kafka/coordinator/assignors/__init__.py rename to aiokafka/coordinator/assignors/__init__.py diff --git a/kafka/coordinator/assignors/abstract.py b/aiokafka/coordinator/assignors/abstract.py similarity index 89% rename from kafka/coordinator/assignors/abstract.py rename to aiokafka/coordinator/assignors/abstract.py index a1fef384..d569c809 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/aiokafka/coordinator/assignors/abstract.py @@ -7,9 +7,8 @@ class AbstractPartitionAssignor(object): - """ - Abstract assignor implementation which does some common grunt work (in particular collecting - partition counts which are always needed in assignors). + """Abstract assignor implementation which does some common grunt work (in particular + collecting partition counts which are always needed in assignors). """ @abc.abstractproperty diff --git a/kafka/coordinator/assignors/range.py b/aiokafka/coordinator/assignors/range.py similarity index 78% rename from kafka/coordinator/assignors/range.py rename to aiokafka/coordinator/assignors/range.py index 299e39c4..19a58755 100644 --- a/kafka/coordinator/assignors/range.py +++ b/aiokafka/coordinator/assignors/range.py @@ -3,10 +3,11 @@ import collections import logging -from kafka.vendor import six - -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) log = logging.getLogger(__name__) @@ -28,23 +29,24 @@ class RangePartitionAssignor(AbstractPartitionAssignor): C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2] """ - name = 'range' + + name = "range" version = 0 @classmethod def assign(cls, cluster, member_metadata): consumers_per_topic = collections.defaultdict(list) - for member, metadata in six.iteritems(member_metadata): + for member, metadata in member_metadata.items(): for topic in metadata.subscription: consumers_per_topic[topic].append(member) # construct {member_id: {topic: [partition, ...]}} assignment = collections.defaultdict(dict) - for topic, consumers_for_topic in six.iteritems(consumers_per_topic): + for topic, consumers_for_topic in consumers_per_topic.items(): partitions = cluster.partitions_for_topic(topic) if partitions is None: - log.warning('No partition metadata for topic %s', topic) + log.warning("No partition metadata for topic %s", topic) continue partitions = sorted(partitions) consumers_for_topic.sort() @@ -58,19 +60,18 @@ def assign(cls, cluster, member_metadata): length = partitions_per_consumer if not i + 1 > consumers_with_extra: length += 1 - assignment[member][topic] = partitions[start:start+length] + assignment[member][topic] = partitions[start : start + length] protocol_assignment = {} for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, - sorted(assignment[member_id].items()), - b'') + cls.version, sorted(assignment[member_id].items()), b"" + ) return protocol_assignment @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"") @classmethod def on_assignment(cls, assignment): diff --git a/kafka/coordinator/assignors/roundrobin.py b/aiokafka/coordinator/assignors/roundrobin.py similarity index 87% rename from kafka/coordinator/assignors/roundrobin.py rename to aiokafka/coordinator/assignors/roundrobin.py index 2d24a5c8..e5926a17 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/aiokafka/coordinator/assignors/roundrobin.py @@ -4,12 +4,14 @@ import itertools import logging -from kafka.vendor import six - -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment from kafka.structs import TopicPartition +from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) + log = logging.getLogger(__name__) @@ -45,20 +47,21 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): C1: [t1p0] C2: [t1p1, t2p0, t2p1, t2p2] """ - name = 'roundrobin' + + name = "roundrobin" version = 0 @classmethod def assign(cls, cluster, member_metadata): all_topics = set() - for metadata in six.itervalues(member_metadata): + for metadata in member_metadata.values(): all_topics.update(metadata.subscription) all_topic_partitions = [] for topic in all_topics: partitions = cluster.partitions_for_topic(topic) if partitions is None: - log.warning('No partition metadata for topic %s', topic) + log.warning("No partition metadata for topic %s", topic) continue for partition in partitions: all_topic_partitions.append(TopicPartition(topic, partition)) @@ -82,14 +85,13 @@ def assign(cls, cluster, member_metadata): protocol_assignment = {} for member_id in member_metadata: protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, - sorted(assignment[member_id].items()), - b'') + cls.version, sorted(assignment[member_id].items()), b"" + ) return protocol_assignment @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"") @classmethod def on_assignment(cls, assignment): diff --git a/kafka/coordinator/assignors/sticky/__init__.py b/aiokafka/coordinator/assignors/sticky/__init__.py similarity index 100% rename from kafka/coordinator/assignors/sticky/__init__.py rename to aiokafka/coordinator/assignors/sticky/__init__.py diff --git a/kafka/coordinator/assignors/sticky/partition_movements.py b/aiokafka/coordinator/assignors/sticky/partition_movements.py similarity index 83% rename from kafka/coordinator/assignors/sticky/partition_movements.py rename to aiokafka/coordinator/assignors/sticky/partition_movements.py index 8851e4cd..3deb35d5 100644 --- a/kafka/coordinator/assignors/sticky/partition_movements.py +++ b/aiokafka/coordinator/assignors/sticky/partition_movements.py @@ -2,8 +2,6 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from kafka.vendor import six - log = logging.getLogger(__name__) @@ -28,7 +26,7 @@ def is_sublist(source, target): true if target is in source; false otherwise """ for index in (i for i, e in enumerate(source) if e == target[0]): - if tuple(source[index: index + len(target)]) == target: + if tuple(source[index : index + len(target)]) == target: return True return False @@ -41,9 +39,7 @@ class PartitionMovements: """ def __init__(self): - self.partition_movements_by_topic = defaultdict( - lambda: defaultdict(set) - ) + self.partition_movements_by_topic = defaultdict(lambda: defaultdict(set)) self.partition_movements = {} def move_partition(self, partition, old_consumer, new_consumer): @@ -55,7 +51,11 @@ def move_partition(self, partition, old_consumer, new_consumer): if existing_pair.src_member_id != new_consumer: # the partition is not moving back to its previous consumer self._add_partition_movement_record( - partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer) + partition, + ConsumerPair( + src_member_id=existing_pair.src_member_id, + dst_member_id=new_consumer, + ), ) else: self._add_partition_movement_record(partition, pair) @@ -67,14 +67,18 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer): # this partition has previously moved assert old_consumer == self.partition_movements[partition].dst_member_id old_consumer = self.partition_movements[partition].src_member_id - reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer) + reverse_pair = ConsumerPair( + src_member_id=new_consumer, dst_member_id=old_consumer + ) if reverse_pair not in self.partition_movements_by_topic[partition.topic]: return partition - return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair])) + return next( + iter(self.partition_movements_by_topic[partition.topic][reverse_pair]) + ) def are_sticky(self): - for topic, movements in six.iteritems(self.partition_movements_by_topic): + for topic, movements in self.partition_movements_by_topic.items(): movement_pairs = set(movements.keys()) if self._has_cycles(movement_pairs): log.error( @@ -107,11 +111,13 @@ def _has_cycles(self, consumer_pairs): reduced_pairs = deepcopy(consumer_pairs) reduced_pairs.remove(pair) path = [pair.src_member_id] - if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle( - path, cycles - ): + if self._is_linked( + pair.dst_member_id, pair.src_member_id, reduced_pairs, path + ) and not self._is_subcycle(path, cycles): cycles.add(tuple(path)) - log.error("A cycle of length {} was found: {}".format(len(path) - 1, path)) + log.error( + "A cycle of length {} was found: {}".format(len(path) - 1, path) + ) # for now we want to make sure there is no partition movements of the same topic between a pair of consumers. # the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized @@ -145,5 +151,7 @@ def _is_linked(self, src, dst, pairs, current_path): reduced_set = deepcopy(pairs) reduced_set.remove(pair) current_path.append(pair.src_member_id) - return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path) + return self._is_linked( + pair.dst_member_id, dst, reduced_set, current_path + ) return False diff --git a/kafka/coordinator/assignors/sticky/sorted_set.py b/aiokafka/coordinator/assignors/sticky/sorted_set.py similarity index 94% rename from kafka/coordinator/assignors/sticky/sorted_set.py rename to aiokafka/coordinator/assignors/sticky/sorted_set.py index 6a454a42..7903f6ca 100644 --- a/kafka/coordinator/assignors/sticky/sorted_set.py +++ b/aiokafka/coordinator/assignors/sticky/sorted_set.py @@ -35,9 +35,13 @@ def pop_last(self): return value def add(self, value): - if self._cached_last is not None and self._key(value) > self._key(self._cached_last): + if self._cached_last is not None and self._key(value) > self._key( + self._cached_last + ): self._cached_last = value - if self._cached_first is not None and self._key(value) < self._key(self._cached_first): + if self._cached_first is not None and self._key(value) < self._key( + self._cached_first + ): self._cached_first = value return self._set.add(value) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py similarity index 69% rename from kafka/coordinator/assignors/sticky/sticky_assignor.py rename to aiokafka/coordinator/assignors/sticky/sticky_assignor.py index dce714f1..f3302908 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/aiokafka/coordinator/assignors/sticky/sticky_assignor.py @@ -2,20 +2,24 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from kafka.cluster import ClusterMetadata -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements -from kafka.coordinator.assignors.sticky.sorted_set import SortedSet -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment -from kafka.coordinator.protocol import Schema from kafka.protocol.struct import Struct from kafka.protocol.types import String, Array, Int32 from kafka.structs import TopicPartition -from kafka.vendor import six + +from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor +from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from aiokafka.coordinator.assignors.sticky.sorted_set import SortedSet +from aiokafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, + ConsumerProtocolMemberAssignment, +) +from aiokafka.coordinator.protocol import Schema log = logging.getLogger(__name__) -ConsumerGenerationPair = namedtuple("ConsumerGenerationPair", ["consumer", "generation"]) +ConsumerGenerationPair = namedtuple( + "ConsumerGenerationPair", ["consumer", "generation"] +) def has_identical_list_elements(list_): @@ -50,8 +54,9 @@ def remove_if_present(collection, element): pass -StickyAssignorMemberMetadataV1 = namedtuple("StickyAssignorMemberMetadataV1", - ["subscription", "partitions", "generation"]) +StickyAssignorMemberMetadataV1 = namedtuple( + "StickyAssignorMemberMetadataV1", ["subscription", "partitions", "generation"] +) class StickyAssignorUserDataV1(Struct): @@ -61,14 +66,19 @@ class StickyAssignorUserDataV1(Struct): """ SCHEMA = Schema( - ("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32) + ( + "previous_assignment", + Array(("topic", String("utf-8")), ("partitions", Array(Int32))), + ), + ("generation", Int32), ) class StickyAssignmentExecutor: def __init__(self, cluster, members): self.members = members - # a mapping between consumers and their assigned partitions that is updated during assignment procedure + # a mapping between consumers and their assigned partitions that is updated + # during assignment procedure self.current_assignment = defaultdict(list) # an assignment from a previous generation self.previous_assignment = {} @@ -76,18 +86,22 @@ def __init__(self, cluster, members): self.current_partition_consumer = {} # a flag indicating that there were no previous assignments performed ever self.is_fresh_assignment = False - # a mapping of all topic partitions to all consumers that can be assigned to them + # a mapping of all topic partitions to all consumers that can be assigned to + # them self.partition_to_all_potential_consumers = {} - # a mapping of all consumers to all potential topic partitions that can be assigned to them + # a mapping of all consumers to all potential topic partitions that can be + # assigned to them self.consumer_to_all_potential_partitions = {} - # an ascending sorted set of consumers based on how many topic partitions are already assigned to them + # an ascending sorted set of consumers based on how many topic partitions are + # already assigned to them self.sorted_current_subscriptions = SortedSet() - # an ascending sorted list of topic partitions based on how many consumers can potentially use them + # an ascending sorted list of topic partitions based on how many consumers can + # potentially use them self.sorted_partitions = [] # all partitions that need to be assigned self.unassigned_partitions = [] - # a flag indicating that a certain partition cannot remain assigned to its current consumer because the consumer - # is no longer subscribed to its topic + # a flag indicating that a certain partition cannot remain assigned to its + # current consumer because the consumer is no longer subscribed to its topic self.revocation_required = False self.partition_movements = PartitionMovements() @@ -99,7 +113,10 @@ def perform_initial_assignment(self): def balance(self): self._initialize_current_subscriptions() - initializing = len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) == 0 + initializing = ( + len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) + == 0 + ) # assign all unassigned partitions for partition in self.unassigned_partitions: @@ -108,20 +125,24 @@ def balance(self): continue self._assign_partition(partition) - # narrow down the reassignment scope to only those partitions that can actually be reassigned + # narrow down the reassignment scope to only those partitions that can actually + # be reassigned fixed_partitions = set() - for partition in six.iterkeys(self.partition_to_all_potential_consumers): + for partition in self.partition_to_all_potential_consumers.keys(): if not self._can_partition_participate_in_reassignment(partition): fixed_partitions.add(partition) for fixed_partition in fixed_partitions: remove_if_present(self.sorted_partitions, fixed_partition) remove_if_present(self.unassigned_partitions, fixed_partition) - # narrow down the reassignment scope to only those consumers that are subject to reassignment + # narrow down the reassignment scope to only those consumers that are subject to + # reassignment fixed_assignments = {} - for consumer in six.iterkeys(self.consumer_to_all_potential_partitions): + for consumer in self.consumer_to_all_potential_partitions.keys(): if not self._can_consumer_participate_in_reassignment(consumer): - self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + consumer + ) fixed_assignments[consumer] = self.current_assignment[consumer] del self.current_assignment[consumer] @@ -136,19 +157,21 @@ def balance(self): self._perform_reassignments(self.unassigned_partitions) reassignment_performed = self._perform_reassignments(self.sorted_partitions) - # if we are not preserving existing assignments and we have made changes to the current assignment - # make sure we are getting a more balanced assignment; otherwise, revert to previous assignment + # if we are not preserving existing assignments and we have made changes to the + # current assignment make sure we are getting a more balanced assignment; + # otherwise, revert to previous assignment if ( not initializing and reassignment_performed - and self._get_balance_score(self.current_assignment) >= self._get_balance_score(prebalance_assignment) + and self._get_balance_score(self.current_assignment) + >= self._get_balance_score(prebalance_assignment) ): self.current_assignment = prebalance_assignment self.current_partition_consumer.clear() self.current_partition_consumer.update(prebalance_partition_consumers) # add the fixed assignments (those that could not change) back - for consumer, partitions in six.iteritems(fixed_assignments): + for consumer, partitions in fixed_assignments.items(): self.current_assignment[consumer] = partitions self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) @@ -156,8 +179,8 @@ def get_final_assignment(self, member_id): assignment = defaultdict(list) for topic_partition in self.current_assignment[member_id]: assignment[topic_partition.topic].append(topic_partition.partition) - assignment = {k: sorted(v) for k, v in six.iteritems(assignment)} - return six.viewitems(assignment) + assignment = {k: sorted(v) for k, v in assignment.items()} + return assignment.items() def _initialize(self, cluster): self._init_current_assignments(self.members) @@ -170,7 +193,7 @@ def _initialize(self, cluster): for p in partitions: partition = TopicPartition(topic=topic, partition=p) self.partition_to_all_potential_consumers[partition] = [] - for consumer_id, member_metadata in six.iteritems(self.members): + for consumer_id, member_metadata in self.members.items(): self.consumer_to_all_potential_partitions[consumer_id] = [] for topic in member_metadata.subscription: if cluster.partitions_for_topic(topic) is None: @@ -178,38 +201,51 @@ def _initialize(self, cluster): continue for p in cluster.partitions_for_topic(topic): partition = TopicPartition(topic=topic, partition=p) - self.consumer_to_all_potential_partitions[consumer_id].append(partition) - self.partition_to_all_potential_consumers[partition].append(consumer_id) + self.consumer_to_all_potential_partitions[consumer_id].append( + partition + ) + self.partition_to_all_potential_consumers[partition].append( + consumer_id + ) if consumer_id not in self.current_assignment: self.current_assignment[consumer_id] = [] def _init_current_assignments(self, members): - # we need to process subscriptions' user data with each consumer's reported generation in mind - # higher generations overwrite lower generations in case of a conflict - # note that a conflict could exists only if user data is for different generations + # we need to process subscriptions' user data with each consumer's reported + # generation in mind higher generations overwrite lower generations in case of + # a conflict note that a conflict could exists only if user data is for + # different generations # for each partition we create a map of its consumers by generation sorted_partition_consumers_by_generation = {} - for consumer, member_metadata in six.iteritems(members): + for consumer, member_metadata in members.items(): for partitions in member_metadata.partitions: if partitions in sorted_partition_consumers_by_generation: consumers = sorted_partition_consumers_by_generation[partitions] - if member_metadata.generation and member_metadata.generation in consumers: - # same partition is assigned to two consumers during the same rebalance. - # log a warning and skip this record + if ( + member_metadata.generation + and member_metadata.generation in consumers + ): + # same partition is assigned to two consumers during the same + # rebalance. log a warning and skip this record log.warning( "Partition {} is assigned to multiple consumers " - "following sticky assignment generation {}.".format(partitions, member_metadata.generation) + "following sticky assignment generation {}.".format( + partitions, member_metadata.generation + ) ) else: consumers[member_metadata.generation] = consumer else: sorted_consumers = {member_metadata.generation: consumer} - sorted_partition_consumers_by_generation[partitions] = sorted_consumers - - # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition - # current and previous consumers are the last two consumers of each partition in the above sorted map - for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation): + sorted_partition_consumers_by_generation[ + partitions + ] = sorted_consumers + + # previous_assignment holds the prior ConsumerGenerationPair (before current) of + # each partition current and previous consumers are the last two consumers of + # each partition in the above sorted map + for partitions, consumers in sorted_partition_consumers_by_generation.items(): generations = sorted(consumers.keys(), reverse=True) self.current_assignment[consumers[generations[0]]].append(partitions) # now update previous assignment if any @@ -220,33 +256,42 @@ def _init_current_assignments(self, members): self.is_fresh_assignment = len(self.current_assignment) == 0 - for consumer_id, partitions in six.iteritems(self.current_assignment): + for consumer_id, partitions in self.current_assignment.items(): for partition in partitions: self.current_partition_consumer[partition] = consumer_id def _are_subscriptions_identical(self): """ Returns: - true, if both potential consumers of partitions and potential partitions that consumers can - consume are the same + true, if both potential consumers of partitions and potential partitions + that consumers can consume are the same """ - if not has_identical_list_elements(list(six.itervalues(self.partition_to_all_potential_consumers))): + if not has_identical_list_elements( + list(self.partition_to_all_potential_consumers.values()) + ): return False - return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions))) + return has_identical_list_elements( + list(self.consumer_to_all_potential_partitions.values()) + ) def _populate_sorted_partitions(self): # set of topic partitions with their respective potential consumers - all_partitions = set((tp, tuple(consumers)) - for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers)) - partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key) + all_partitions = set( + (tp, tuple(consumers)) + for tp, consumers in self.partition_to_all_potential_consumers.items() + ) + partitions_sorted_by_num_of_potential_consumers = sorted( + all_partitions, key=partitions_comparator_key + ) self.sorted_partitions = [] if not self.is_fresh_assignment and self._are_subscriptions_identical(): - # if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics) - # then we just need to simply list partitions in a round robin fashion (from consumers with - # most assigned partitions to those with least) + # if this is a reassignment and the subscriptions are identical (all + # consumers can consumer from all topics) then we just need to simply list + # partitions in a round robin fashion (from consumers with most assigned + # partitions to those with least) assignments = deepcopy(self.current_assignment) - for consumer_id, partitions in six.iteritems(assignments): + for partitions in assignments.values(): to_remove = [] for partition in partitions: if partition not in self.partition_to_all_potential_consumers: @@ -255,11 +300,15 @@ def _populate_sorted_partitions(self): partitions.remove(partition) sorted_consumers = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(assignments)], + iterable=[ + (consumer, tuple(partitions)) + for consumer, partitions in assignments.items() + ], key=subscriptions_comparator_key, ) - # at this point, sorted_consumers contains an ascending-sorted list of consumers based on - # how many valid partitions are currently assigned to them + # at this point, sorted_consumers contains an ascending-sorted list of + # consumers based on how many valid partitions are currently assigned to + # them while sorted_consumers: # take the consumer with the most partitions consumer, _ = sorted_consumers.pop_last() @@ -267,16 +316,19 @@ def _populate_sorted_partitions(self): remaining_partitions = assignments[consumer] # from partitions that had a different consumer before, # keep only those that are assigned to this consumer now - previous_partitions = set(six.iterkeys(self.previous_assignment)).intersection(set(remaining_partitions)) + previous_partitions = set(self.previous_assignment.keys()).intersection( + set(remaining_partitions) + ) if previous_partitions: - # if there is a partition of this consumer that was assigned to another consumer before - # mark it as good options for reassignment + # if there is a partition of this consumer that was assigned to + # another consumer before mark it as good options for reassignment partition = previous_partitions.pop() remaining_partitions.remove(partition) self.sorted_partitions.append(partition) sorted_consumers.add((consumer, tuple(assignments[consumer]))) elif remaining_partitions: - # otherwise, mark any other one of the current partitions as a reassignment candidate + # otherwise, mark any other one of the current partitions as a + # reassignment candidate self.sorted_partitions.append(remaining_partitions.pop()) sorted_consumers.add((consumer, tuple(assignments[consumer]))) @@ -286,16 +338,18 @@ def _populate_sorted_partitions(self): self.sorted_partitions.append(partition) else: while partitions_sorted_by_num_of_potential_consumers: - self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0]) + self.sorted_partitions.append( + partitions_sorted_by_num_of_potential_consumers.pop(0)[0] + ) def _populate_partitions_to_reassign(self): self.unassigned_partitions = deepcopy(self.sorted_partitions) assignments_to_remove = [] - for consumer_id, partitions in six.iteritems(self.current_assignment): + for consumer_id, partitions in self.current_assignment.items(): if consumer_id not in self.members: - # if a consumer that existed before (and had some partition assignments) is now removed, - # remove it from current_assignment + # if a consumer that existed before (and had some partition assignments) + # is now removed, remove it from current_assignment for partition in partitions: del self.current_partition_consumer[partition] assignments_to_remove.append(consumer_id) @@ -308,14 +362,16 @@ def _populate_partitions_to_reassign(self): # remove it from current_assignment of the consumer partitions_to_remove.append(partition) elif partition.topic not in self.members[consumer_id].subscription: - # if this partition cannot remain assigned to its current consumer because the consumer - # is no longer subscribed to its topic remove it from current_assignment of the consumer + # if this partition cannot remain assigned to its current + # consumer because the consumer is no longer subscribed to its + # topic remove it from current_assignment of the consumer partitions_to_remove.append(partition) self.revocation_required = True else: - # otherwise, remove the topic partition from those that need to be assigned only if - # its current consumer is still subscribed to its topic (because it is already assigned - # and we would want to preserve that assignment as much as possible) + # otherwise, remove the topic partition from those that need to + # be assigned only if its current consumer is still subscribed + # to its topic (because it is already assigned and we would want + # to preserve that assignment as much as possible) self.unassigned_partitions.remove(partition) for partition in partitions_to_remove: self.current_assignment[consumer_id].remove(partition) @@ -325,7 +381,10 @@ def _populate_partitions_to_reassign(self): def _initialize_current_subscriptions(self): self.sorted_current_subscriptions = SortedSet( - iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(self.current_assignment)], + iterable=[ + (consumer, tuple(partitions)) + for consumer, partitions in self.current_assignment.items() + ], key=subscriptions_comparator_key, ) @@ -336,42 +395,56 @@ def _get_consumer_with_most_subscriptions(self): return self.sorted_current_subscriptions.last()[0] def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): - self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) + self.sorted_current_subscriptions.remove( + (consumer, tuple(self.current_assignment[consumer])) + ) def _add_consumer_to_current_subscriptions_and_maintain_order(self, consumer): - self.sorted_current_subscriptions.add((consumer, tuple(self.current_assignment[consumer]))) + self.sorted_current_subscriptions.add( + (consumer, tuple(self.current_assignment[consumer])) + ) def _is_balanced(self): """Determines if the current assignment is a balanced one""" if ( len(self.current_assignment[self._get_consumer_with_least_subscriptions()]) - >= len(self.current_assignment[self._get_consumer_with_most_subscriptions()]) - 1 + >= len( + self.current_assignment[self._get_consumer_with_most_subscriptions()] + ) + - 1 ): - # if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true + # if minimum and maximum numbers of partitions assigned to consumers differ + # by at most one return true return True # create a mapping from partitions to the consumer assigned to them all_assigned_partitions = {} - for consumer_id, consumer_partitions in six.iteritems(self.current_assignment): + for consumer_id, consumer_partitions in self.current_assignment.items(): for partition in consumer_partitions: if partition in all_assigned_partitions: - log.error("{} is assigned to more than one consumer.".format(partition)) + log.error( + "{} is assigned to more than one consumer.".format(partition) + ) all_assigned_partitions[partition] = consumer_id # for each consumer that does not have all the topic partitions it can get - # make sure none of the topic partitions it could but did not get cannot be moved to it - # (because that would break the balance) + # make sure none of the topic partitions it could but did not get cannot be + # moved to it (because that would break the balance) for consumer, _ in self.sorted_current_subscriptions: consumer_partition_count = len(self.current_assignment[consumer]) # skip if this consumer already has all the topic partitions it can get - if consumer_partition_count == len(self.consumer_to_all_potential_partitions[consumer]): + if consumer_partition_count == len( + self.consumer_to_all_potential_partitions[consumer] + ): continue # otherwise make sure it cannot get any more for partition in self.consumer_to_all_potential_partitions[consumer]: if partition not in self.current_assignment[consumer]: other_consumer = all_assigned_partitions[partition] - other_consumer_partition_count = len(self.current_assignment[other_consumer]) + other_consumer_partition_count = len( + self.current_assignment[other_consumer] + ) if consumer_partition_count < other_consumer_partition_count: return False return True @@ -379,7 +452,9 @@ def _is_balanced(self): def _assign_partition(self, partition): for consumer, _ in self.sorted_current_subscriptions: if partition in self.consumer_to_all_potential_partitions[consumer]: - self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + consumer + ) self.current_assignment[consumer].append(partition) self.current_partition_consumer[partition] = consumer self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) @@ -393,13 +468,18 @@ def _can_consumer_participate_in_reassignment(self, consumer): current_assignment_size = len(current_partitions) max_assignment_size = len(self.consumer_to_all_potential_partitions[consumer]) if current_assignment_size > max_assignment_size: - log.error("The consumer {} is assigned more partitions than the maximum possible.".format(consumer)) + log.error( + "The consumer {} is assigned more partitions than the maximum possible.".format( + consumer + ) + ) if current_assignment_size < max_assignment_size: - # if a consumer is not assigned all its potential partitions it is subject to reassignment + # if a consumer is not assigned all its potential partitions it is subject + # to reassignment return True for partition in current_partitions: - # if any of the partitions assigned to a consumer is subject to reassignment the consumer itself - # is subject to reassignment + # if any of the partitions assigned to a consumer is subject to reassignment + # the consumer itself is subject to reassignment if self._can_partition_participate_in_reassignment(partition): return True return False @@ -410,34 +490,54 @@ def _perform_reassignments(self, reassignable_partitions): # repeat reassignment until no partition can be moved to improve the balance while True: modified = False - # reassign all reassignable partitions until the full list is processed or a balance is achieved - # (starting from the partition with least potential consumers and if needed) + # reassign all reassignable partitions until the full list is processed or + # a balance is achieved (starting from the partition with least potential + # consumers and if needed) for partition in reassignable_partitions: if self._is_balanced(): break # the partition must have at least two potential consumers if len(self.partition_to_all_potential_consumers[partition]) <= 1: - log.error("Expected more than one potential consumer for partition {}".format(partition)) + log.error( + "Expected more than one potential consumer for partition " + "{}".format(partition) + ) # the partition must have a current consumer consumer = self.current_partition_consumer.get(partition) if consumer is None: - log.error("Expected partition {} to be assigned to a consumer".format(partition)) + log.error( + "Expected partition {} to be assigned to a consumer".format( + partition + ) + ) if ( partition in self.previous_assignment and len(self.current_assignment[consumer]) - > len(self.current_assignment[self.previous_assignment[partition].consumer]) + 1 + > len( + self.current_assignment[ + self.previous_assignment[partition].consumer + ] + ) + + 1 ): self._reassign_partition_to_consumer( - partition, self.previous_assignment[partition].consumer, + partition, + self.previous_assignment[partition].consumer, ) reassignment_performed = True modified = True continue - # check if a better-suited consumer exist for the partition; if so, reassign it - for other_consumer in self.partition_to_all_potential_consumers[partition]: - if len(self.current_assignment[consumer]) > len(self.current_assignment[other_consumer]) + 1: + # check if a better-suited consumer exist for the partition; if so, + # reassign it + for other_consumer in self.partition_to_all_potential_consumers[ + partition + ]: + if ( + len(self.current_assignment[consumer]) + > len(self.current_assignment[other_consumer]) + 1 + ): self._reassign_partition(partition) reassignment_performed = True modified = True @@ -459,13 +559,19 @@ def _reassign_partition(self, partition): def _reassign_partition_to_consumer(self, partition, new_consumer): consumer = self.current_partition_consumer[partition] # find the correct partition movement considering the stickiness requirement - partition_to_be_moved = self.partition_movements.get_partition_to_be_moved(partition, consumer, new_consumer) + partition_to_be_moved = self.partition_movements.get_partition_to_be_moved( + partition, consumer, new_consumer + ) self._move_partition(partition_to_be_moved, new_consumer) def _move_partition(self, partition, new_consumer): old_consumer = self.current_partition_consumer[partition] - self._remove_consumer_from_current_subscriptions_and_maintain_order(old_consumer) - self._remove_consumer_from_current_subscriptions_and_maintain_order(new_consumer) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + old_consumer + ) + self._remove_consumer_from_current_subscriptions_and_maintain_order( + new_consumer + ) self.partition_movements.move_partition(partition, old_consumer, new_consumer) @@ -480,8 +586,9 @@ def _move_partition(self, partition, new_consumer): def _get_balance_score(assignment): """Calculates a balance score of a give assignment as the sum of assigned partitions size difference of all consumer pairs. - A perfectly balanced assignment (with all consumers getting the same number of partitions) - has a balance score of 0. Lower balance score indicates a more balanced assignment. + A perfectly balanced assignment (with all consumers getting the same number of + partitions) has a balance score of 0. Lower balance score indicates a more + balanced assignment. Arguments: assignment (dict): {consumer: list of assigned topic partitions} @@ -491,7 +598,7 @@ def _get_balance_score(assignment): """ score = 0 consumer_to_assignment = {} - for consumer_id, partitions in six.iteritems(assignment): + for consumer_id, partitions in assignment.items(): consumer_to_assignment[consumer_id] = len(partitions) consumers_to_explore = set(consumer_to_assignment.keys()) @@ -499,50 +606,58 @@ def _get_balance_score(assignment): if consumer_id in consumers_to_explore: consumers_to_explore.remove(consumer_id) for other_consumer_id in consumers_to_explore: - score += abs(consumer_to_assignment[consumer_id] - consumer_to_assignment[other_consumer_id]) + score += abs( + consumer_to_assignment[consumer_id] + - consumer_to_assignment[other_consumer_id] + ) return score class StickyPartitionAssignor(AbstractPartitionAssignor): """ https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy - - The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: + + The sticky assignor serves two purposes. First, it guarantees an assignment that is + as balanced as possible, meaning either: - the numbers of topic partitions assigned to consumers differ by at most one; or - - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it. - - Second, it preserved as many existing assignment as possible when a reassignment occurs. - This helps in saving some of the overhead processing when topic partitions move from one consumer to another. - - Starting fresh it would work by distributing the partitions over consumers as evenly as possible. - Even though this may sound similar to how round robin assignor works, the second example below shows that it is not. - During a reassignment it would perform the reassignment in such a way that in the new assignment + - each consumer that has 2+ fewer topic partitions than some other consumer cannot + get any of those topic partitions transferred to it. + + Second, it preserved as many existing assignment as possible when a reassignment + occurs. This helps in saving some of the overhead processing when topic partitions + move from one consumer to another. + + Starting fresh it would work by distributing the partitions over consumers as evenly + as possible. Even though this may sound similar to how round robin assignor works, + the second example below shows that it is not. During a reassignment it would + perform the reassignment in such a way that in the new assignment - topic partitions are still distributed as evenly as possible, and - topic partitions stay with their previously assigned consumers as much as possible. - + The first goal above takes precedence over the second one. - + Example 1. Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all three topics. - + The assignment with both sticky and round robin assignors will be: - C0: [t0p0, t1p1, t3p0] - C1: [t0p1, t2p0, t3p1] - C2: [t1p0, t2p1] - - Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce: + + Now, let's assume C1 is removed and a reassignment is about to happen. The round + robin assignor would produce: - C0: [t0p0, t1p0, t2p0, t3p0] - C2: [t0p1, t1p1, t2p1, t3p1] - + while the sticky assignor would result in: - C0 [t0p0, t1p1, t3p0, t2p0] - C2 [t1p0, t2p1, t0p1, t3p1] preserving all the previous assignments (unlike the round robin assignor). - - + + Example 2. There are three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. @@ -550,22 +665,22 @@ class StickyPartitionAssignor(AbstractPartitionAssignor): C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. - + The round robin assignor would come up with the following assignment: - C0 [t0p0] - C1 [t1p0] - C2 [t1p1, t2p0, t2p1, t2p2] - + which is not as balanced as the assignment suggested by sticky assignor: - C0 [t0p0] - C1 [t1p0, t1p1] - C2 [t2p0, t2p1, t2p2] - - Now, if consumer C0 is removed, these two assignors would produce the following assignments. - Round Robin (preserves 3 partition assignments): + + Now, if consumer C0 is removed, these two assignors would produce the following + assignments. Round Robin (preserves 3 partition assignments): - C1 [t0p0, t1p1] - C2 [t1p0, t2p0, t2p1, t2p2] - + Sticky (preserves 5 partition assignments): - C1 [t1p0, t1p1, t0p0] - C2 [t2p0, t2p1, t2p2] @@ -587,13 +702,14 @@ def assign(cls, cluster, members): Arguments: cluster (ClusterMetadata): cluster metadata - members (dict of {member_id: MemberMetadata}): decoded metadata for each member in the group. + members (dict of {member_id: MemberMetadata}): decoded metadata for each + member in the group. Returns: dict: {member_id: MemberAssignment} """ members_metadata = {} - for consumer, member_metadata in six.iteritems(members): + for consumer, member_metadata in members.items(): members_metadata[consumer] = cls.parse_member_metadata(member_metadata) executor = StickyAssignmentExecutor(cluster, members_metadata) @@ -605,7 +721,7 @@ def assign(cls, cluster, members): assignment = {} for member_id in members: assignment[member_id] = ConsumerProtocolMemberAssignment( - cls.version, sorted(executor.get_final_assignment(member_id)), b'' + cls.version, sorted(executor.get_final_assignment(member_id)), b"" ) return assignment @@ -613,9 +729,10 @@ def assign(cls, cluster, members): def parse_member_metadata(cls, metadata): """ Parses member metadata into a python object. - This implementation only serializes and deserializes the StickyAssignorMemberMetadataV1 user data, - since no StickyAssignor written in Python was deployed ever in the wild with version V0, meaning that - there is no need to support backward compatibility with V0. + This implementation only serializes and deserializes the + StickyAssignorMemberMetadataV1 user data, since no StickyAssignor written in + Python was deployed ever in the wild with version V0, meaning that there is no + need to support backward compatibility with V0. Arguments: metadata (MemberMetadata): decoded metadata for a member of the group. @@ -626,24 +743,37 @@ def parse_member_metadata(cls, metadata): user_data = metadata.user_data if not user_data: return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], + generation=cls.DEFAULT_GENERATION_ID, + subscription=metadata.subscription, ) try: decoded_user_data = StickyAssignorUserDataV1.decode(user_data) except Exception as e: # ignore the consumer's previous assignment if it cannot be parsed - log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args + log.error( + "Could not parse member data", e + ) # pylint: disable=logging-too-many-args return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], + generation=cls.DEFAULT_GENERATION_ID, + subscription=metadata.subscription, ) member_partitions = [] - for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member - member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) + for ( + topic, + partitions, + ) in decoded_user_data.previous_assignment: # pylint: disable=no-member + member_partitions.extend( + [TopicPartition(topic, partition) for partition in partitions] + ) return StickyAssignorMemberMetadataV1( # pylint: disable=no-member - partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription + partitions=member_partitions, + generation=decoded_user_data.generation, + subscription=metadata.subscription, ) @classmethod @@ -654,13 +784,18 @@ def metadata(cls, topics): def _metadata(cls, topics, member_assignment_partitions, generation=-1): if member_assignment_partitions is None: log.debug("No member assignment available") - user_data = b'' + user_data = b"" else: - log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) + log.debug( + "Member assignment is available, generating the metadata: " + "generation {}".format(cls.generation) + ) partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: - partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation) + partitions_by_topic[topic_partition.topic].append( + topic_partition.partition + ) + data = StickyAssignorUserDataV1(partitions_by_topic.items(), generation) user_data = data.encode() return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) diff --git a/kafka/coordinator/base.py b/aiokafka/coordinator/base.py similarity index 70% rename from kafka/coordinator/base.py rename to aiokafka/coordinator/base.py index e7198410..5f15bd57 100644 --- a/kafka/coordinator/base.py +++ b/aiokafka/coordinator/base.py @@ -7,24 +7,28 @@ import time import weakref -from kafka.vendor import six - -from kafka.coordinator.heartbeat import Heartbeat -from kafka import errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) +from kafka.protocol.group import ( + HeartbeatRequest, + JoinGroupRequest, + LeaveGroupRequest, + SyncGroupRequest, +) + +from aiokafka import errors as Errors + +from .heartbeat import Heartbeat -log = logging.getLogger('kafka.coordinator') +log = logging.getLogger("aiokafka.coordinator") class MemberState(object): - UNJOINED = '' # the client is not part of a group - REBALANCING = '' # the client has begun rebalancing - STABLE = '' # the client has joined and is sending heartbeats + UNJOINED = "" # the client is not part of a group + REBALANCING = "" # the client has begun rebalancing + STABLE = "" # the client has joined and is sending heartbeats class Generation(object): @@ -33,10 +37,12 @@ def __init__(self, generation_id, member_id, protocol): self.member_id = member_id self.protocol = protocol + Generation.NO_GENERATION = Generation( OffsetCommitRequest[2].DEFAULT_GENERATION_ID, JoinGroupRequest[0].UNKNOWN_MEMBER_ID, - None) + None, +) class UnjoinedGroupException(Errors.KafkaError): @@ -81,13 +87,13 @@ class BaseCoordinator(object): """ DEFAULT_CONFIG = { - 'group_id': 'kafka-python-default-group', - 'session_timeout_ms': 10000, - 'heartbeat_interval_ms': 3000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, - 'api_version': (0, 10, 1), - 'metric_group_prefix': '', + "group_id": "kafka-python-default-group", + "session_timeout_ms": 10000, + "heartbeat_interval_ms": 3000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, + "api_version": (0, 10, 1), + "metric_group_prefix": "", } def __init__(self, client, metrics, **configs): @@ -115,14 +121,16 @@ def __init__(self, client, metrics, **configs): if key in configs: self.config[key] = configs[key] - if self.config['api_version'] < (0, 10, 1): - if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: - raise Errors.KafkaConfigurationError("Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms") + if self.config["api_version"] < (0, 10, 1): + if self.config["max_poll_interval_ms"] != self.config["session_timeout_ms"]: + raise Errors.KafkaConfigurationError( + "Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms" + ) self._client = client - self.group_id = self.config['group_id'] + self.group_id = self.config["group_id"] self.heartbeat = Heartbeat(**self.config) self._heartbeat_thread = None self._lock = threading.Condition() @@ -133,8 +141,9 @@ def __init__(self, client, metrics, **configs): self.coordinator_id = None self._find_coordinator_future = None self._generation = Generation.NO_GENERATION - self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, - self.config['metric_group_prefix']) + self.sensors = GroupCoordinatorMetrics( + self.heartbeat, metrics, self.config["metric_group_prefix"] + ) @abc.abstractmethod def protocol_type(self): @@ -201,8 +210,9 @@ def _perform_assignment(self, leader_id, protocol, members): pass @abc.abstractmethod - def _on_join_complete(self, generation, member_id, protocol, - member_assignment_bytes): + def _on_join_complete( + self, generation, member_id, protocol, member_assignment_bytes + ): """Invoked when a group member has successfully joined a group. Arguments: @@ -233,7 +243,7 @@ def coordinator(self): if self.coordinator_id is None: return None elif self._client.is_disconnected(self.coordinator_id): - self.coordinator_dead('Node Disconnected') + self.coordinator_dead("Node Disconnected") return None else: return self.coordinator_id @@ -248,7 +258,7 @@ def ensure_coordinator_ready(self): # Prior to 0.8.2 there was no group coordinator # so we will just pick a node at random and treat # it as the "coordinator" - if self.config['api_version'] < (0, 8, 2): + if self.config["api_version"] < (0, 8, 2): self.coordinator_id = self._client.least_loaded_node() if self.coordinator_id is not None: self._client.maybe_connect(self.coordinator_id) @@ -259,12 +269,15 @@ def ensure_coordinator_ready(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): - log.debug('Requesting metadata for group coordinator request: %s', future.exception) + if getattr(future.exception, "invalid_metadata", False): + log.debug( + "Requesting metadata for group coordinator request: %s", + future.exception, + ) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) else: raise future.exception # pylint: disable-msg=raising-bad-type @@ -327,13 +340,16 @@ def time_to_next_heartbeat(self): with self._lock: # if we have not joined the group, we don't need to send heartbeats if self.state is MemberState.UNJOINED: - return float('inf') + return float("inf") return self.heartbeat.time_to_next_heartbeat() def _handle_join_success(self, member_assignment_bytes): with self._lock: - log.info("Successfully joined group %s with generation %s", - self.group_id, self._generation.generation_id) + log.info( + "Successfully joined group %s with generation %s", + self.group_id, + self._generation.generation_id, + ) self.state = MemberState.STABLE self.rejoin_needed = False if self._heartbeat_thread: @@ -361,8 +377,9 @@ def ensure_active_group(self): # changes the matched subscription set) can occur # while another rebalance is still in progress. if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) + self._on_join_prepare( + self._generation.generation_id, self._generation.member_id + ) self.rejoining = True # ensure that there are no pending requests to the coordinator. @@ -389,7 +406,9 @@ def ensure_active_group(self): self.state = MemberState.REBALANCING future = self._send_join_group_request() - self.join_future = future # this should happen before adding callbacks + self.join_future = ( + future # this should happen before adding callbacks + ) # handle join completion in the callback so that the # callback will be invoked even if the consumer is woken up @@ -407,23 +426,30 @@ def ensure_active_group(self): self._client.poll(future=future) if future.succeeded(): - self._on_join_complete(self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - future.value) + self._on_join_complete( + self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + future.value, + ) self.join_future = None self.rejoining = False else: self.join_future = None exception = future.exception - if isinstance(exception, (Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): + if isinstance( + exception, + ( + Errors.UnknownMemberIdError, + Errors.RebalanceInProgressError, + Errors.IllegalGenerationError, + ), + ): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def _rejoin_incomplete(self): return self.join_future is not None @@ -452,59 +478,75 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - if self.config['api_version'] < (0, 9): - raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers') - elif (0, 9) <= self.config['api_version'] < (0, 10, 1): + if self.config["api_version"] < (0, 9): + raise Errors.KafkaError("JoinGroupRequest api requires 0.9+ brokers") + elif (0, 9) <= self.config["api_version"] < (0, 10, 1): request = JoinGroupRequest[0]( self.group_id, - self.config['session_timeout_ms'], + self.config["session_timeout_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) - elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0): + member_metadata, + ) + elif (0, 10, 1) <= self.config["api_version"] < (0, 11, 0): request = JoinGroupRequest[1]( self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], + self.config["session_timeout_ms"], + self.config["max_poll_interval_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) else: request = JoinGroupRequest[2]( self.group_id, - self.config['session_timeout_ms'], - self.config['max_poll_interval_ms'], + self.config["session_timeout_ms"], + self.config["max_poll_interval_ms"], self._generation.member_id, self.protocol_type(), - member_metadata) + member_metadata, + ) # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) + log.debug( + "Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id + ) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_join_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _failed_request(self, node_id, request, future, error): # Marking coordinator dead # unless the error is caused by internal client pipelining - if not isinstance(error, (Errors.NodeNotReadyError, - Errors.TooManyInFlightRequests)): - log.error('Error sending %s to node %s [%s]', - request.__class__.__name__, node_id, error) + if not isinstance( + error, (Errors.NodeNotReadyError, Errors.TooManyInFlightRequests) + ): + log.error( + "Error sending %s to node %s [%s]", + request.__class__.__name__, + node_id, + error, + ) self.coordinator_dead(error) else: - log.debug('Error sending %s to node %s [%s]', - request.__class__.__name__, node_id, error) + log.debug( + "Error sending %s to node %s [%s]", + request.__class__.__name__, + node_id, + error, + ) future.failure(error) def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful JoinGroup response for group %s: %s", - self.group_id, response) + log.debug( + "Received successful JoinGroup response for group %s: %s", + self.group_id, + response, + ) self.sensors.join_latency.record((time.time() - send_time) * 1000) with self._lock: if self.state is not MemberState.REBALANCING: @@ -513,44 +555,65 @@ def _handle_join_group_response(self, future, send_time, response): # not want to continue with the sync group. future.failure(UnjoinedGroupException()) else: - self._generation = Generation(response.generation_id, - response.member_id, - response.group_protocol) + self._generation = Generation( + response.generation_id, + response.member_id, + response.group_protocol, + ) if response.leader_id == response.member_id: - log.info("Elected group leader -- performing partition" - " assignments using %s", self._generation.protocol) + log.info( + "Elected group leader -- performing partition" + " assignments using %s", + self._generation.protocol, + ) self._on_join_leader(response).chain(future) else: self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: - log.debug("Attempt to join group %s rejected since coordinator %s" - " is loading the group.", self.group_id, self.coordinator_id) + log.debug( + "Attempt to join group %s rejected since coordinator %s" + " is loading the group.", + self.group_id, + self.coordinator_id, + ) # backoff and retry future.failure(error_type(response)) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self._generation.member_id) self.reset_generation() - log.debug("Attempt to join group %s failed due to unknown member id", - self.group_id) + log.debug( + "Attempt to join group %s failed due to unknown member id", + self.group_id, + ) future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): # re-discover the coordinator and retry with backoff self.coordinator_dead(error_type()) - log.debug("Attempt to join group %s failed due to obsolete " - "coordinator information: %s", self.group_id, - error_type.__name__) + log.debug( + "Attempt to join group %s failed due to obsolete " + "coordinator information: %s", + self.group_id, + error_type.__name__, + ) future.failure(error_type()) - elif error_type in (Errors.InconsistentGroupProtocolError, - Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError): + elif error_type in ( + Errors.InconsistentGroupProtocolError, + Errors.InvalidSessionTimeoutError, + Errors.InvalidGroupIdError, + ): # log the error and re-throw the exception error = error_type(response) - log.error("Attempt to join group %s failed due to fatal error: %s", - self.group_id, error) + log.error( + "Attempt to join group %s failed due to fatal error: %s", + self.group_id, + error, + ) future.failure(error) elif error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) @@ -562,14 +625,19 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, self._generation.member_id, - {}) - log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) + {}, + ) + log.debug( + "Sending follower SyncGroup for group %s to coordinator %s: %s", + self.group_id, + self.coordinator_id, + request, + ) return self._send_sync_group_request(request) def _on_join_leader(self, response): @@ -584,23 +652,34 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - group_assignment = self._perform_assignment(response.leader_id, - response.group_protocol, - response.members) + group_assignment = self._perform_assignment( + response.leader_id, response.group_protocol, response.members + ) except Exception as e: return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 request = SyncGroupRequest[version]( self.group_id, self._generation.generation_id, self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in six.iteritems(group_assignment)]) - - log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", - self.group_id, self.coordinator_id, request) + [ + ( + member_id, + assignment + if isinstance(assignment, bytes) + else assignment.encode(), + ) + for member_id, assignment in group_assignment.items() + ], + ) + + log.debug( + "Sending leader SyncGroup for group %s to coordinator %s: %s", + self.group_id, + self.coordinator_id, + request, + ) return self._send_sync_group_request(request) def _send_sync_group_request(self, request): @@ -617,8 +696,7 @@ def _send_sync_group_request(self, request): future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_sync_group_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_sync_group_response(self, future, send_time, response): @@ -633,17 +711,20 @@ def _handle_sync_group_response(self, future, send_time, response): if error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) elif error_type is Errors.RebalanceInProgressError: - log.debug("SyncGroup for group %s failed due to coordinator" - " rebalance", self.group_id) + log.debug( + "SyncGroup for group %s failed due to coordinator" " rebalance", + self.group_id, + ) future.failure(error_type(self.group_id)) - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError): + elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.reset_generation() future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.coordinator_dead(error) @@ -667,8 +748,11 @@ def _send_group_coordinator_request(self): e = Errors.NodeNotReadyError(node_id) return Future().failure(e) - log.debug("Sending group coordinator request for group %s to broker %s", - self.group_id, node_id) + log.debug( + "Sending group coordinator request for group %s to broker %s", + self.group_id, + node_id, + ) request = GroupCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) @@ -682,7 +766,9 @@ def _handle_group_coordinator_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: with self._lock: - coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response) + coordinator_id = self._client.cluster.add_group_coordinator( + self.group_id, response + ) if not coordinator_id: # This could happen if coordinator metadata is different # than broker metadata @@ -690,8 +776,11 @@ def _handle_group_coordinator_response(self, future, response): return self.coordinator_id = coordinator_id - log.info("Discovered coordinator %s for group %s", - self.coordinator_id, self.group_id) + log.info( + "Discovered coordinator %s for group %s", + self.coordinator_id, + self.group_id, + ) self._client.maybe_connect(self.coordinator_id) self.heartbeat.reset_timeouts() future.success(self.coordinator_id) @@ -705,15 +794,20 @@ def _handle_group_coordinator_response(self, future, response): future.failure(error) else: error = error_type() - log.error("Group coordinator lookup for group %s failed: %s", - self.group_id, error) + log.error( + "Group coordinator lookup for group %s failed: %s", self.group_id, error + ) future.failure(error) def coordinator_dead(self, error): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s) for group %s: %s.", - self.coordinator_id, self.group_id, error) + log.warning( + "Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, + self.group_id, + error, + ) self.coordinator_id = None def generation(self): @@ -738,14 +832,14 @@ def request_rejoin(self): def _start_heartbeat_thread(self): if self._heartbeat_thread is None: - log.info('Starting new heartbeat thread') + log.info("Starting new heartbeat thread") self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) self._heartbeat_thread.daemon = True self._heartbeat_thread.start() def _close_heartbeat_thread(self): if self._heartbeat_thread is not None: - log.info('Stopping heartbeat thread') + log.info("Stopping heartbeat thread") try: self._heartbeat_thread.close() except ReferenceError: @@ -764,15 +858,19 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" with self._client._lock, self._lock: - if (not self.coordinator_unknown() + if ( + not self.coordinator_unknown() and self.state is not MemberState.UNJOINED - and self._generation is not Generation.NO_GENERATION): + and self._generation is not Generation.NO_GENERATION + ): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - log.info('Leaving consumer group (%s).', self.group_id) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + log.info("Leaving consumer group (%s).", self.group_id) + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 + request = LeaveGroupRequest[version]( + self.group_id, self._generation.member_id + ) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") @@ -783,11 +881,15 @@ def maybe_leave_group(self): def _handle_leave_group_response(self, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("LeaveGroup request for group %s returned successfully", - self.group_id) + log.debug( + "LeaveGroup request for group %s returned successfully", self.group_id + ) else: - log.error("LeaveGroup request for group %s failed with error: %s", - self.group_id, error_type()) + log.error( + "LeaveGroup request for group %s failed with error: %s", + self.group_id, + error_type(), + ) def _send_heartbeat_request(self): """Send a heartbeat request""" @@ -799,45 +901,61 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = 0 if self.config['api_version'] < (0, 11, 0) else 1 - request = HeartbeatRequest[version](self.group_id, - self._generation.generation_id, - self._generation.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member + version = 0 if self.config["api_version"] < (0, 11, 0) else 1 + request = HeartbeatRequest[version]( + self.group_id, self._generation.generation_id, self._generation.member_id + ) + log.debug( + "Heartbeat: %s[%s] %s", + request.group, + request.generation_id, + request.member_id, + ) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) - _f.add_errback(self._failed_request, self.coordinator_id, - request, future) + _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_heartbeat_response(self, future, send_time, response): self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful heartbeat response for group %s", - self.group_id) + log.debug( + "Received successful heartbeat response for group %s", self.group_id + ) future.success(None) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): - log.warning("Heartbeat failed for group %s: coordinator (node %s)" - " is either not started or not valid", self.group_id, - self.coordinator()) + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + ): + log.warning( + "Heartbeat failed for group %s: coordinator (node %s)" + " is either not started or not valid", + self.group_id, + self.coordinator(), + ) self.coordinator_dead(error_type()) future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.warning("Heartbeat failed for group %s because it is" - " rebalancing", self.group_id) + log.warning( + "Heartbeat failed for group %s because it is" " rebalancing", + self.group_id, + ) self.request_rejoin() future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.warning("Heartbeat failed for group %s: generation id is not " - " current.", self.group_id) + log.warning( + "Heartbeat failed for group %s: generation id is not " " current.", + self.group_id, + ) self.reset_generation() future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: - log.warning("Heartbeat: local member_id was not recognized;" - " this consumer needs to re-join") + log.warning( + "Heartbeat: local member_id was not recognized;" + " this consumer needs to re-join" + ) self.reset_generation() future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: @@ -856,55 +974,99 @@ def __init__(self, heartbeat, metrics, prefix, tags=None): self.metrics = metrics self.metric_group_name = prefix + "-coordinator-metrics" - self.heartbeat_latency = metrics.sensor('heartbeat-latency') - self.heartbeat_latency.add(metrics.metric_name( - 'heartbeat-response-time-max', self.metric_group_name, - 'The max time taken to receive a response to a heartbeat request', - tags), Max()) - self.heartbeat_latency.add(metrics.metric_name( - 'heartbeat-rate', self.metric_group_name, - 'The average number of heartbeats per second', - tags), Rate(sampled_stat=Count())) - - self.join_latency = metrics.sensor('join-latency') - self.join_latency.add(metrics.metric_name( - 'join-time-avg', self.metric_group_name, - 'The average time taken for a group rejoin', - tags), Avg()) - self.join_latency.add(metrics.metric_name( - 'join-time-max', self.metric_group_name, - 'The max time taken for a group rejoin', - tags), Max()) - self.join_latency.add(metrics.metric_name( - 'join-rate', self.metric_group_name, - 'The number of group joins per second', - tags), Rate(sampled_stat=Count())) - - self.sync_latency = metrics.sensor('sync-latency') - self.sync_latency.add(metrics.metric_name( - 'sync-time-avg', self.metric_group_name, - 'The average time taken for a group sync', - tags), Avg()) - self.sync_latency.add(metrics.metric_name( - 'sync-time-max', self.metric_group_name, - 'The max time taken for a group sync', - tags), Max()) - self.sync_latency.add(metrics.metric_name( - 'sync-rate', self.metric_group_name, - 'The number of group syncs per second', - tags), Rate(sampled_stat=Count())) - - metrics.add_metric(metrics.metric_name( - 'last-heartbeat-seconds-ago', self.metric_group_name, - 'The number of seconds since the last controller heartbeat was sent', - tags), AnonMeasurable( - lambda _, now: (now / 1000) - self.heartbeat.last_send)) + self.heartbeat_latency = metrics.sensor("heartbeat-latency") + self.heartbeat_latency.add( + metrics.metric_name( + "heartbeat-response-time-max", + self.metric_group_name, + "The max time taken to receive a response to a heartbeat request", + tags, + ), + Max(), + ) + self.heartbeat_latency.add( + metrics.metric_name( + "heartbeat-rate", + self.metric_group_name, + "The average number of heartbeats per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + self.join_latency = metrics.sensor("join-latency") + self.join_latency.add( + metrics.metric_name( + "join-time-avg", + self.metric_group_name, + "The average time taken for a group rejoin", + tags, + ), + Avg(), + ) + self.join_latency.add( + metrics.metric_name( + "join-time-max", + self.metric_group_name, + "The max time taken for a group rejoin", + tags, + ), + Max(), + ) + self.join_latency.add( + metrics.metric_name( + "join-rate", + self.metric_group_name, + "The number of group joins per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + self.sync_latency = metrics.sensor("sync-latency") + self.sync_latency.add( + metrics.metric_name( + "sync-time-avg", + self.metric_group_name, + "The average time taken for a group sync", + tags, + ), + Avg(), + ) + self.sync_latency.add( + metrics.metric_name( + "sync-time-max", + self.metric_group_name, + "The max time taken for a group sync", + tags, + ), + Max(), + ) + self.sync_latency.add( + metrics.metric_name( + "sync-rate", + self.metric_group_name, + "The number of group syncs per second", + tags, + ), + Rate(sampled_stat=Count()), + ) + + metrics.add_metric( + metrics.metric_name( + "last-heartbeat-seconds-ago", + self.metric_group_name, + "The number of seconds since the last controller heartbeat was sent", + tags, + ), + AnonMeasurable(lambda _, now: (now / 1000) - self.heartbeat.last_send), + ) class HeartbeatThread(threading.Thread): def __init__(self, coordinator): super(HeartbeatThread, self).__init__() - self.name = coordinator.group_id + '-heartbeat' + self.name = coordinator.group_id + "-heartbeat" self.coordinator = coordinator self.enabled = False self.closed = False @@ -924,26 +1086,29 @@ def close(self): with self.coordinator._lock: self.coordinator._lock.notify() if self.is_alive(): - self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) + self.join(self.coordinator.config["heartbeat_interval_ms"] / 1000) if self.is_alive(): log.warning("Heartbeat thread did not fully terminate during close") def run(self): try: - log.debug('Heartbeat thread started') + log.debug("Heartbeat thread started") while not self.closed: self._run_once() except ReferenceError: - log.debug('Heartbeat thread closed due to coordinator gc') + log.debug("Heartbeat thread closed due to coordinator gc") except RuntimeError as e: - log.error("Heartbeat thread for group %s failed due to unexpected error: %s", - self.coordinator.group_id, e) + log.error( + "Heartbeat thread for group %s failed due to unexpected error: %s", + self.coordinator.group_id, + e, + ) self.failed = e finally: - log.debug('Heartbeat thread closed') + log.debug("Heartbeat thread closed") def _run_once(self): with self.coordinator._client._lock, self.coordinator._lock: @@ -951,22 +1116,22 @@ def _run_once(self): # TODO: When consumer.wakeup() is implemented, we need to # disable here to prevent propagating an exception to this # heartbeat thread - # must get client._lock, or maybe deadlock at heartbeat + # must get client._lock, or maybe deadlock at heartbeat # failure callback in consumer poll self.coordinator._client.poll(timeout_ms=0) with self.coordinator._lock: if not self.enabled: - log.debug('Heartbeat disabled. Waiting') + log.debug("Heartbeat disabled. Waiting") self.coordinator._lock.wait() - log.debug('Heartbeat re-enabled.') + log.debug("Heartbeat re-enabled.") return if self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. - log.debug('Group state is not stable, disabling heartbeats') + log.debug("Group state is not stable, disabling heartbeats") self.disable() return @@ -976,27 +1141,31 @@ def _run_once(self): # the immediate future check ensures that we backoff # properly in the case that no brokers are available # to connect to (and the future is automatically failed). - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + self.coordinator._lock.wait( + self.coordinator.config["retry_backoff_ms"] / 1000 + ) elif self.coordinator.heartbeat.session_timeout_expired(): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure # the coordinator is still healthy. - log.warning('Heartbeat session expired, marking coordinator dead') - self.coordinator.coordinator_dead('Heartbeat session expired') + log.warning("Heartbeat session expired, marking coordinator dead") + self.coordinator.coordinator_dead("Heartbeat session expired") elif self.coordinator.heartbeat.poll_timeout_expired(): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - log.warning('Heartbeat poll expired, leaving group') + log.warning("Heartbeat poll expired, leaving group") self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected - log.log(0, 'Not ready to heartbeat, waiting') - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + log.log(0, "Not ready to heartbeat, waiting") + self.coordinator._lock.wait( + self.coordinator.config["retry_backoff_ms"] / 1000 + ) else: self.coordinator.heartbeat.sent_heartbeat() diff --git a/kafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py similarity index 62% rename from kafka/coordinator/consumer.py rename to aiokafka/coordinator/consumer.py index 6f0de2db..d6ce1920 100644 --- a/kafka/coordinator/consumer.py +++ b/aiokafka/coordinator/consumer.py @@ -6,14 +6,6 @@ import logging import time -from kafka.vendor import six - -from kafka.coordinator.base import BaseCoordinator, Generation -from kafka.coordinator.assignors.range import RangePartitionAssignor -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocol -import aiokafka.errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate @@ -21,25 +13,38 @@ from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod +import aiokafka.errors as Errors + +from .base import BaseCoordinator, Generation +from .assignors.range import RangePartitionAssignor +from .assignors.roundrobin import RoundRobinPartitionAssignor +from .assignors.sticky.sticky_assignor import StickyPartitionAssignor +from .protocol import ConsumerProtocol + log = logging.getLogger(__name__) class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" + DEFAULT_CONFIG = { - 'group_id': 'kafka-python-default-group', - 'enable_auto_commit': True, - 'auto_commit_interval_ms': 5000, - 'default_offset_commit_callback': None, - 'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor), - 'session_timeout_ms': 10000, - 'heartbeat_interval_ms': 3000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, - 'api_version': (0, 10, 1), - 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + "group_id": "kafka-python-default-group", + "enable_auto_commit": True, + "auto_commit_interval_ms": 5000, + "default_offset_commit_callback": None, + "assignors": ( + RangePartitionAssignor, + RoundRobinPartitionAssignor, + StickyPartitionAssignor, + ), + "session_timeout_ms": 10000, + "heartbeat_interval_ms": 3000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, + "api_version": (0, 10, 1), + "exclude_internal_topics": True, + "metric_group_prefix": "consumer", } def __init__(self, client, subscription, metrics, **configs): @@ -88,46 +93,60 @@ def __init__(self, client, subscription, metrics, **configs): self._subscription = subscription self._is_leader = False self._joined_subscription = set() - self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster) + self._metadata_snapshot = self._build_metadata_snapshot( + subscription, client.cluster + ) self._assignment_snapshot = None self._cluster = client.cluster - self.auto_commit_interval = self.config['auto_commit_interval_ms'] / 1000 + self.auto_commit_interval = self.config["auto_commit_interval_ms"] / 1000 self.next_auto_commit_deadline = None self.completed_offset_commits = collections.deque() - if self.config['default_offset_commit_callback'] is None: - self.config['default_offset_commit_callback'] = self._default_offset_commit_callback - - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 9): - if not self.config['assignors']: - raise Errors.KafkaConfigurationError('Coordinator requires assignors') - if self.config['api_version'] < (0, 10, 1): - if self.config['max_poll_interval_ms'] != self.config['session_timeout_ms']: - raise Errors.KafkaConfigurationError("Broker version %s does not support " - "different values for max_poll_interval_ms " - "and session_timeout_ms") - - if self.config['enable_auto_commit']: - if self.config['api_version'] < (0, 8, 1): - log.warning('Broker version (%s) does not support offset' - ' commits; disabling auto-commit.', - self.config['api_version']) - self.config['enable_auto_commit'] = False - elif self.config['group_id'] is None: - log.warning('group_id is None: disabling auto-commit.') - self.config['enable_auto_commit'] = False + if self.config["default_offset_commit_callback"] is None: + self.config[ + "default_offset_commit_callback" + ] = self._default_offset_commit_callback + + if self.config["group_id"] is not None: + if self.config["api_version"] >= (0, 9): + if not self.config["assignors"]: + raise Errors.KafkaConfigurationError( + "Coordinator requires assignors" + ) + if self.config["api_version"] < (0, 10, 1): + if ( + self.config["max_poll_interval_ms"] + != self.config["session_timeout_ms"] + ): + raise Errors.KafkaConfigurationError( + "Broker version %s does not support " + "different values for max_poll_interval_ms " + "and session_timeout_ms" + ) + + if self.config["enable_auto_commit"]: + if self.config["api_version"] < (0, 8, 1): + log.warning( + "Broker version (%s) does not support offset" + " commits; disabling auto-commit.", + self.config["api_version"], + ) + self.config["enable_auto_commit"] = False + elif self.config["group_id"] is None: + log.warning("group_id is None: disabling auto-commit.") + self.config["enable_auto_commit"] = False else: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, self.config['metric_group_prefix'], self._subscription) + metrics, self.config["metric_group_prefix"], self._subscription + ) self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) def __del__(self): - if hasattr(self, '_cluster') and self._cluster: + if hasattr(self, "_cluster") and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) super(ConsumerCoordinator, self).__del__() @@ -137,20 +156,20 @@ def protocol_type(self): def group_protocols(self): """Returns list of preferred (protocols, metadata)""" if self._subscription.subscription is None: - raise Errors.IllegalStateError('Consumer has not subscribed to topics') + raise Errors.IllegalStateError("Consumer has not subscribed to topics") # dpkp note: I really dislike this. # why? because we are using this strange method group_protocols, # which is seemingly innocuous, to set internal state (_joined_subscription) - # that is later used to check whether metadata has changed since we joined a group - # but there is no guarantee that this method, group_protocols, will get called - # in the correct sequence or that it will only be called when we want it to be. - # So this really should be moved elsewhere, but I don't have the energy to - # work that out right now. If you read this at some later date after the mutable - # state has bitten you... I'm sorry! It mimics the java client, and that's the - # best I've got for now. + # that is later used to check whether metadata has changed since we joined a + # group but there is no guarantee that this method, group_protocols, will get + # called in the correct sequence or that it will only be called when we want it + # to be. So this really should be moved elsewhere, but I don't have the energy + # to work that out right now. If you read this at some later date after the + # mutable state has bitten you... I'm sorry! It mimics the java client, and + # that's the best I've got for now. self._joined_subscription = set(self._subscription.subscription) metadata_list = [] - for assignor in self.config['assignors']: + for assignor in self.config["assignors"]: metadata = assignor.metadata(self._joined_subscription) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) @@ -163,7 +182,7 @@ def _handle_metadata_update(self, cluster): if self._subscription.subscribed_pattern: topics = [] - for topic in cluster.topics(self.config['exclude_internal_topics']): + for topic in cluster.topics(self.config["exclude_internal_topics"]): if self._subscription.subscribed_pattern.match(topic): topics.append(topic) @@ -174,25 +193,29 @@ def _handle_metadata_update(self, cluster): # check if there are any changes to the metadata which should trigger # a rebalance if self._subscription.partitions_auto_assigned(): - metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster) + metadata_snapshot = self._build_metadata_snapshot( + self._subscription, cluster + ) if self._metadata_snapshot != metadata_snapshot: self._metadata_snapshot = metadata_snapshot # If we haven't got group coordinator support, # just assign all partitions locally if self._auto_assign_all_partitions(): - self._subscription.assign_from_subscribed([ - TopicPartition(topic, partition) - for topic in self._subscription.subscription - for partition in self._metadata_snapshot[topic] - ]) + self._subscription.assign_from_subscribed( + [ + TopicPartition(topic, partition) + for topic in self._subscription.subscription + for partition in self._metadata_snapshot[topic] + ] + ) def _auto_assign_all_partitions(self): # For users that use "subscribe" without group support, # we will simply assign all partitions to this consumer - if self.config['api_version'] < (0, 9): + if self.config["api_version"] < (0, 9): return True - elif self.config['group_id'] is None: + elif self.config["group_id"] is None: return True else: return False @@ -205,20 +228,23 @@ def _build_metadata_snapshot(self, subscription, cluster): return metadata_snapshot def _lookup_assignor(self, name): - for assignor in self.config['assignors']: + for assignor in self.config["assignors"]: if assignor.name == name: return assignor return None - def _on_join_complete(self, generation, member_id, protocol, - member_assignment_bytes): + def _on_join_complete( + self, generation, member_id, protocol, member_assignment_bytes + ): # only the leader is responsible for monitoring for metadata changes # (i.e. partition changes) if not self._is_leader: self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) - assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) + assert assignor, "Coordinator selected invalid assignment protocol: %s" % ( + protocol, + ) assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -235,25 +261,29 @@ def _on_join_complete(self, generation, member_id, protocol, # give the assignor a chance to update internal state # based on the received assignment assignor.on_assignment(assignment) - if assignor.name == 'sticky': + if assignor.name == "sticky": assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now self.next_auto_commit_deadline = time.time() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) - log.info("Setting newly assigned partitions %s for group %s", - assigned, self.group_id) + log.info( + "Setting newly assigned partitions %s for group %s", assigned, self.group_id + ) # execute the user's callback after rebalance if self._subscription.listener: try: self._subscription.listener.on_partitions_assigned(assigned) except Exception: - log.exception("User provided listener %s for group %s" - " failed on partition assignment: %s", - self._subscription.listener, self.group_id, - assigned) + log.exception( + "User provided listener %s for group %s" + " failed on partition assignment: %s", + self._subscription.listener, + self.group_id, + assigned, + ) def poll(self): """ @@ -269,7 +299,10 @@ def poll(self): self._invoke_completed_offset_commit_callbacks() self.ensure_coordinator_ready() - if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if ( + self.config["api_version"] >= (0, 9) + and self._subscription.partitions_auto_assigned() + ): if self.need_rejoin(): # due to a race condition between the initial metadata fetch and the # initial rebalance, we need to ensure that the metadata is fresh @@ -293,25 +326,30 @@ def poll(self): self._maybe_auto_commit_offsets_async() def time_to_next_poll(self): - """Return seconds (float) remaining until :meth:`.poll` should be called again""" - if not self.config['enable_auto_commit']: + """Return seconds (float) remaining until :meth:`.poll` should be called + again + """ + if not self.config["enable_auto_commit"]: return self.time_to_next_heartbeat() if time.time() > self.next_auto_commit_deadline: return 0 - return min(self.next_auto_commit_deadline - time.time(), - self.time_to_next_heartbeat()) + return min( + self.next_auto_commit_deadline - time.time(), self.time_to_next_heartbeat() + ) def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) + assert assignor, "Invalid assignment protocol: %s" % (assignment_strategy,) member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) member_metadata[member_id] = metadata - all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + all_subscribed_topics.update( + metadata.subscription + ) # pylint: disable-msg=no-member # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes @@ -327,16 +365,20 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): self._is_leader = True self._assignment_snapshot = self._metadata_snapshot - log.debug("Performing assignment for group %s using strategy %s" - " with subscriptions %s", self.group_id, assignor.name, - member_metadata) + log.debug( + "Performing assignment for group %s using strategy %s" + " with subscriptions %s", + self.group_id, + assignor.name, + member_metadata, + ) assignments = assignor.assign(self._cluster, member_metadata) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) group_assignment = {} - for member_id, assignment in six.iteritems(assignments): + for member_id, assignment in assignments.items(): group_assignment[member_id] = assignment return group_assignment @@ -345,16 +387,22 @@ def _on_join_prepare(self, generation, member_id): self._maybe_auto_commit_offsets_sync() # execute the user's callback before rebalance - log.info("Revoking previously assigned partitions %s for group %s", - self._subscription.assigned_partitions(), self.group_id) + log.info( + "Revoking previously assigned partitions %s for group %s", + self._subscription.assigned_partitions(), + self.group_id, + ) if self._subscription.listener: try: revoked = set(self._subscription.assigned_partitions()) self._subscription.listener.on_partitions_revoked(revoked) except Exception: - log.exception("User provided subscription listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.listener, self.group_id) + log.exception( + "User provided subscription listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.listener, + self.group_id, + ) self._is_leader = False self._subscription.reset_group_subscription() @@ -372,13 +420,17 @@ def need_rejoin(self): return False # we need to rejoin if we performed the assignment and metadata has changed - if (self._assignment_snapshot is not None - and self._assignment_snapshot != self._metadata_snapshot): + if ( + self._assignment_snapshot is not None + and self._assignment_snapshot != self._metadata_snapshot + ): return True # we need to join if our subscription has changed since the last join - if (self._joined_subscription is not None - and self._joined_subscription != self._subscription.subscription): + if ( + self._joined_subscription is not None + and self._joined_subscription != self._subscription.subscription + ): return True return super(ConsumerCoordinator, self).need_rejoin() @@ -386,8 +438,10 @@ def need_rejoin(self): def refresh_committed_offsets_if_needed(self): """Fetch committed offsets for assigned partitions.""" if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) - for partition, offset in six.iteritems(offsets): + offsets = self.fetch_committed_offsets( + self._subscription.assigned_partitions() + ) + for partition, offset in offsets.items(): # verify assignment is still active if self._subscription.is_assigned(partition): self._subscription.assignment[partition].committed = offset @@ -416,9 +470,9 @@ def fetch_committed_offsets(self, partitions): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def close(self, autocommit=True): """Close the coordinator, leave the current group, @@ -465,28 +519,39 @@ def commit_offsets_async(self, offsets, callback=None): # same order that they were added. Note also that BaseCoordinator # prevents multiple concurrent coordinator lookup requests. future = self.lookup_coordinator() - future.add_callback(lambda r: functools.partial(self._do_commit_offsets_async, offsets, callback)()) + future.add_callback( + lambda r: functools.partial( + self._do_commit_offsets_async, offsets, callback + )() + ) if callback: - future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) + future.add_errback( + lambda e: self.completed_offset_commits.appendleft( + (callback, offsets, e) + ) + ) # ensure the commit has a chance to be transmitted (without blocking on # its completion). Note that commits are treated as heartbeats by the # coordinator, so there is no need to explicitly allow heartbeats # through delayed task execution. - self._client.poll(timeout_ms=0) # no wakeup if we add that feature + self._client.poll(timeout_ms=0) # no wakeup if we add that feature return future def _do_commit_offsets_async(self, offsets, callback=None): - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) if callback is None: - callback = self.config['default_offset_commit_callback'] + callback = self.config["default_offset_commit_callback"] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) - future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) + future.add_both( + lambda res: self.completed_offset_commits.appendleft( + (callback, offsets, res) + ) + ) return future def commit_offsets_sync(self, offsets): @@ -500,10 +565,9 @@ def commit_offsets_sync(self, offsets): Raises error on failure """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) self._invoke_completed_offset_commit_callbacks() if not offsets: return @@ -518,26 +582,32 @@ def commit_offsets_sync(self, offsets): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(self.config["retry_backoff_ms"] / 1000) def _maybe_auto_commit_offsets_sync(self): - if self.config['enable_auto_commit']: + if self.config["enable_auto_commit"]: try: self.commit_offsets_sync(self._subscription.all_consumed_offsets()) # The three main group membership errors are known and should not # require a stacktrace -- just a warning - except (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): - log.warning("Offset commit failed: group membership out of date" - " This is likely to cause duplicate message" - " delivery.") + except ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): + log.warning( + "Offset commit failed: group membership out of date" + " This is likely to cause duplicate message" + " delivery." + ) except Exception: - log.exception("Offset commit failed: This is likely to cause" - " duplicate message delivery") + log.exception( + "Offset commit failed: This is likely to cause" + " duplicate message delivery" + ) def _send_offset_commit_request(self, offsets): """Commit offsets for the specified list of topics and partitions. @@ -553,22 +623,20 @@ def _send_offset_commit_request(self, offsets): Returns: Future: indicating whether the commit was successful or not """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) - assert all(map(lambda v: isinstance(v, OffsetAndMetadata), - offsets.values())) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) if not offsets: - log.debug('No offsets to commit') + log.debug("No offsets to commit") return Future().success(None) node_id = self.coordinator() if node_id is None: return Future().failure(Errors.GroupCoordinatorNotAvailableError) - # create the offset commit request offset_data = collections.defaultdict(dict) - for tp, offset in six.iteritems(offsets): + for tp, offset in offsets.items(): offset_data[tp.topic][tp.partition] = offset if self._subscription.partitions_auto_assigned(): @@ -579,53 +647,69 @@ def _send_offset_commit_request(self, offsets): # if the generation is None, we are not part of an active group # (and we expect to be). The only thing we can do is fail the commit # and let the user rejoin the group in poll() - if self.config['api_version'] >= (0, 9) and generation is None: + if self.config["api_version"] >= (0, 9) and generation is None: return Future().failure(Errors.CommitFailedError()) - if self.config['api_version'] >= (0, 9): + if self.config["api_version"] >= (0, 9): request = OffsetCommitRequest[2]( self.group_id, generation.generation_id, generation.member_id, OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + [ + ( + topic, + [ + (partition, offset.offset, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - elif self.config['api_version'] >= (0, 8, 2): + elif self.config["api_version"] >= (0, 8, 2): request = OffsetCommitRequest[1]( - self.group_id, -1, '', - [( - topic, [( - partition, - offset.offset, - -1, - offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + self.group_id, + -1, + "", + [ + ( + topic, + [ + (partition, offset.offset, -1, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - elif self.config['api_version'] >= (0, 8, 1): + elif self.config["api_version"] >= (0, 8, 1): request = OffsetCommitRequest[0]( self.group_id, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] + [ + ( + topic, + [ + (partition, offset.offset, offset.metadata) + for partition, offset in partitions.items() + ], + ) + for topic, partitions in offset_data.items() + ], ) - log.debug("Sending offset-commit request with %s for group %s to %s", - offsets, self.group_id, node_id) + log.debug( + "Sending offset-commit request with %s for group %s to %s", + offsets, + self.group_id, + node_id, + ) future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) + _f.add_callback( + self._handle_offset_commit_response, offsets, future, time.time() + ) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -641,58 +725,87 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - log.debug("Group %s committed offset %s for partition %s", - self.group_id, offset, tp) + log.debug( + "Group %s committed offset %s for partition %s", + self.group_id, + offset, + tp, + ) if self._subscription.is_assigned(tp): self._subscription.assignment[tp].committed = offset elif error_type is Errors.GroupAuthorizationFailedError: - log.error("Not authorized to commit offsets for group %s", - self.group_id) + log.error( + "Not authorized to commit offsets for group %s", self.group_id + ) future.failure(error_type(self.group_id)) return elif error_type is Errors.TopicAuthorizationFailedError: unauthorized_topics.add(topic) - elif error_type in (Errors.OffsetMetadataTooLargeError, - Errors.InvalidCommitOffsetSizeError): + elif error_type in ( + Errors.OffsetMetadataTooLargeError, + Errors.InvalidCommitOffsetSizeError, + ): # raise the error to the user - log.debug("OffsetCommit for group %s failed on partition %s" - " %s", self.group_id, tp, error_type.__name__) + log.debug( + "OffsetCommit for group %s failed on partition %s" " %s", + self.group_id, + tp, + error_type.__name__, + ) future.failure(error_type()) return elif error_type is Errors.GroupLoadInProgressError: # just retry - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error_type.__name__) + log.debug( + "OffsetCommit for group %s failed: %s", + self.group_id, + error_type.__name__, + ) future.failure(error_type(self.group_id)) return - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, - Errors.RequestTimedOutError): - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error_type.__name__) + elif error_type in ( + Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + Errors.RequestTimedOutError, + ): + log.debug( + "OffsetCommit for group %s failed: %s", + self.group_id, + error_type.__name__, + ) self.coordinator_dead(error_type()) future.failure(error_type(self.group_id)) return - elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): + elif error_type in ( + Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError, + ): # need to re-join group error = error_type(self.group_id) - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error) + log.debug( + "OffsetCommit for group %s failed: %s", self.group_id, error + ) self.reset_generation() future.failure(Errors.CommitFailedError()) return else: - log.error("Group %s failed to commit partition %s at offset" - " %s: %s", self.group_id, tp, offset, - error_type.__name__) + log.error( + "Group %s failed to commit partition %s at offset" " %s: %s", + self.group_id, + tp, + offset, + error_type.__name__, + ) future.failure(error_type()) return if unauthorized_topics: - log.error("Not authorized to commit to topics %s for group %s", - unauthorized_topics, self.group_id) + log.error( + "Not authorized to commit to topics %s for group %s", + unauthorized_topics, + self.group_id, + ) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: future.success(None) @@ -709,7 +822,7 @@ def _send_offset_fetch_request(self, partitions): Returns: Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert self.config["api_version"] >= (0, 8, 1), "Unsupported Broker API" assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions: return Future().success({}) @@ -720,26 +833,26 @@ def _send_offset_fetch_request(self, partitions): # Verify node is ready if not self._client.ready(node_id): - log.debug("Node %s not ready -- failing offset fetch request", - node_id) + log.debug("Node %s not ready -- failing offset fetch request", node_id) return Future().failure(Errors.NodeNotReadyError) - log.debug("Group %s fetching committed offsets for partitions: %s", - self.group_id, partitions) + log.debug( + "Group %s fetching committed offsets for partitions: %s", + self.group_id, + partitions, + ) # construct the request topic_partitions = collections.defaultdict(set) for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - if self.config['api_version'] >= (0, 8, 2): + if self.config["api_version"] >= (0, 8, 2): request = OffsetFetchRequest[1]( - self.group_id, - list(topic_partitions.items()) + self.group_id, list(topic_partitions.items()) ) else: request = OffsetFetchRequest[0]( - self.group_id, - list(topic_partitions.items()) + self.group_id, list(topic_partitions.items()) ) # send the request with a callback @@ -757,8 +870,12 @@ def _handle_offset_fetch_response(self, future, response): error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: error = error_type() - log.debug("Group %s failed to fetch offset for partition" - " %s: %s", self.group_id, tp, error) + log.debug( + "Group %s failed to fetch offset for partition" " %s: %s", + self.group_id, + tp, + error, + ) if error_type is Errors.GroupLoadInProgressError: # just retry future.failure(error) @@ -767,13 +884,16 @@ def _handle_offset_fetch_response(self, future, response): self.coordinator_dead(error_type()) future.failure(error) elif error_type is Errors.UnknownTopicOrPartitionError: - log.warning("OffsetFetchRequest -- unknown topic %s" - " (have you committed any offsets yet?)", - topic) + log.warning( + "OffsetFetchRequest -- unknown topic %s" + " (have you committed any offsets yet?)", + topic, + ) continue else: - log.error("Unknown error fetching offsets for %s: %s", - tp, error) + log.error( + "Unknown error fetching offsets for %s: %s", tp, error + ) future.failure(error) return elif offset >= 0: @@ -781,8 +901,11 @@ def _handle_offset_fetch_response(self, future, response): # (-1 indicates no committed offset to fetch) offsets[tp] = OffsetAndMetadata(offset, metadata) else: - log.debug("Group %s has no committed offset for partition" - " %s", self.group_id, tp) + log.debug( + "Group %s has no committed offset for partition" " %s", + self.group_id, + tp, + ) future.success(offsets) def _default_offset_commit_callback(self, offsets, exception): @@ -791,43 +914,74 @@ def _default_offset_commit_callback(self, offsets, exception): def _commit_offsets_async_on_complete(self, offsets, exception): if exception is not None: - log.warning("Auto offset commit failed for group %s: %s", - self.group_id, exception) - if getattr(exception, 'retriable', False): - self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + log.warning( + "Auto offset commit failed for group %s: %s", self.group_id, exception + ) + if getattr(exception, "retriable", False): + self.next_auto_commit_deadline = min( + time.time() + self.config["retry_backoff_ms"] / 1000, + self.next_auto_commit_deadline, + ) else: - log.debug("Completed autocommit of offsets %s for group %s", - offsets, self.group_id) + log.debug( + "Completed autocommit of offsets %s for group %s", + offsets, + self.group_id, + ) def _maybe_auto_commit_offsets_async(self): - if self.config['enable_auto_commit']: + if self.config["enable_auto_commit"]: if self.coordinator_unknown(): - self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 + self.next_auto_commit_deadline = ( + time.time() + self.config["retry_backoff_ms"] / 1000 + ) elif time.time() > self.next_auto_commit_deadline: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.commit_offsets_async(self._subscription.all_consumed_offsets(), - self._commit_offsets_async_on_complete) + self.commit_offsets_async( + self._subscription.all_consumed_offsets(), + self._commit_offsets_async_on_complete, + ) class ConsumerCoordinatorMetrics(object): def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) - - self.commit_latency = metrics.sensor('commit-latency') - self.commit_latency.add(metrics.metric_name( - 'commit-latency-avg', self.metric_group_name, - 'The average time taken for a commit request'), Avg()) - self.commit_latency.add(metrics.metric_name( - 'commit-latency-max', self.metric_group_name, - 'The max time taken for a commit request'), Max()) - self.commit_latency.add(metrics.metric_name( - 'commit-rate', self.metric_group_name, - 'The number of commit calls per second'), Rate(sampled_stat=Count())) - - num_parts = AnonMeasurable(lambda config, now: - len(subscription.assigned_partitions())) - metrics.add_metric(metrics.metric_name( - 'assigned-partitions', self.metric_group_name, - 'The number of partitions currently assigned to this consumer'), - num_parts) + self.metric_group_name = "%s-coordinator-metrics" % (metric_group_prefix,) + + self.commit_latency = metrics.sensor("commit-latency") + self.commit_latency.add( + metrics.metric_name( + "commit-latency-avg", + self.metric_group_name, + "The average time taken for a commit request", + ), + Avg(), + ) + self.commit_latency.add( + metrics.metric_name( + "commit-latency-max", + self.metric_group_name, + "The max time taken for a commit request", + ), + Max(), + ) + self.commit_latency.add( + metrics.metric_name( + "commit-rate", + self.metric_group_name, + "The number of commit calls per second", + ), + Rate(sampled_stat=Count()), + ) + + num_parts = AnonMeasurable( + lambda config, now: len(subscription.assigned_partitions()) + ) + metrics.add_metric( + metrics.metric_name( + "assigned-partitions", + self.metric_group_name, + "The number of partitions currently assigned to this consumer", + ), + num_parts, + ) diff --git a/kafka/coordinator/heartbeat.py b/aiokafka/coordinator/heartbeat.py similarity index 60% rename from kafka/coordinator/heartbeat.py rename to aiokafka/coordinator/heartbeat.py index 2f5930b6..dccf9da1 100644 --- a/kafka/coordinator/heartbeat.py +++ b/aiokafka/coordinator/heartbeat.py @@ -6,11 +6,11 @@ class Heartbeat(object): DEFAULT_CONFIG = { - 'group_id': None, - 'heartbeat_interval_ms': 3000, - 'session_timeout_ms': 10000, - 'max_poll_interval_ms': 300000, - 'retry_backoff_ms': 100, + "group_id": None, + "heartbeat_interval_ms": 3000, + "session_timeout_ms": 10000, + "max_poll_interval_ms": 300000, + "retry_backoff_ms": 100, } def __init__(self, **configs): @@ -19,14 +19,15 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs[key] - if self.config['group_id'] is not None: - assert (self.config['heartbeat_interval_ms'] - <= self.config['session_timeout_ms']), ( - 'Heartbeat interval must be lower than the session timeout') + if self.config["group_id"] is not None: + assert ( + self.config["heartbeat_interval_ms"] + <= self.config["session_timeout_ms"] + ), "Heartbeat interval must be lower than the session timeout" - self.last_send = -1 * float('inf') - self.last_receive = -1 * float('inf') - self.last_poll = -1 * float('inf') + self.last_send = -1 * float("inf") + self.last_receive = -1 * float("inf") + self.last_poll = -1 * float("inf") self.last_reset = time.time() self.heartbeat_failed = None @@ -47,9 +48,9 @@ def time_to_next_heartbeat(self): """Returns seconds (float) remaining before next heartbeat should be sent""" time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) if self.heartbeat_failed: - delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000 + delay_to_next_heartbeat = self.config["retry_backoff_ms"] / 1000 else: - delay_to_next_heartbeat = self.config['heartbeat_interval_ms'] / 1000 + delay_to_next_heartbeat = self.config["heartbeat_interval_ms"] / 1000 return max(0, delay_to_next_heartbeat - time_since_last_heartbeat) def should_heartbeat(self): @@ -57,7 +58,7 @@ def should_heartbeat(self): def session_timeout_expired(self): last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000) + return (time.time() - last_recv) > (self.config["session_timeout_ms"] / 1000) def reset_timeouts(self): self.last_reset = time.time() @@ -65,4 +66,6 @@ def reset_timeouts(self): self.heartbeat_failed = False def poll_timeout_expired(self): - return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) + return (time.time() - self.last_poll) > ( + self.config["max_poll_interval_ms"] / 1000 + ) diff --git a/aiokafka/coordinator/protocol.py b/aiokafka/coordinator/protocol.py new file mode 100644 index 00000000..e3bbd692 --- /dev/null +++ b/aiokafka/coordinator/protocol.py @@ -0,0 +1,35 @@ +from __future__ import absolute_import + +from kafka.protocol.struct import Struct +from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.structs import TopicPartition + + +class ConsumerProtocolMemberMetadata(Struct): + SCHEMA = Schema( + ("version", Int16), + ("subscription", Array(String("utf-8"))), + ("user_data", Bytes), + ) + + +class ConsumerProtocolMemberAssignment(Struct): + SCHEMA = Schema( + ("version", Int16), + ("assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), + ("user_data", Bytes), + ) + + def partitions(self): + return [ + TopicPartition(topic, partition) + for topic, partitions in self.assignment # pylint: disable-msg=no-member + for partition in partitions + ] + + +class ConsumerProtocol(object): + PROTOCOL_TYPE = "consumer" + ASSIGNMENT_STRATEGIES = ("range", "roundrobin") + METADATA = ConsumerProtocolMemberMetadata + ASSIGNMENT = ConsumerProtocolMemberAssignment diff --git a/docs/api.rst b/docs/api.rst index 441ba01a..c6b18061 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -132,7 +132,7 @@ Other references .. autoclass:: aiokafka.producer.message_accumulator.BatchBuilder .. autoclass:: aiokafka.consumer.group_coordinator.GroupCoordinator -.. autoclass:: kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor +.. autoclass:: aiokafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor Errors diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py deleted file mode 100644 index 56a39015..00000000 --- a/kafka/coordinator/protocol.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import absolute_import - -from kafka.protocol.struct import Struct -from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String -from kafka.structs import TopicPartition - - -class ConsumerProtocolMemberMetadata(Struct): - SCHEMA = Schema( - ('version', Int16), - ('subscription', Array(String('utf-8'))), - ('user_data', Bytes)) - - -class ConsumerProtocolMemberAssignment(Struct): - SCHEMA = Schema( - ('version', Int16), - ('assignment', Array( - ('topic', String('utf-8')), - ('partitions', Array(Int32)))), - ('user_data', Bytes)) - - def partitions(self): - return [TopicPartition(topic, partition) - for topic, partitions in self.assignment # pylint: disable-msg=no-member - for partition in partitions] - - -class ConsumerProtocol(object): - PROTOCOL_TYPE = 'consumer' - ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') - METADATA = ConsumerProtocolMemberMetadata - ASSIGNMENT = ConsumerProtocolMemberAssignment diff --git a/tests/coordinator/__init__.py b/tests/coordinator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/kafka/test_assignors.py b/tests/coordinator/test_assignors.py similarity index 98% rename from tests/kafka/test_assignors.py rename to tests/coordinator/test_assignors.py index 858ef426..5d159aa2 100644 --- a/tests/kafka/test_assignors.py +++ b/tests/coordinator/test_assignors.py @@ -7,10 +7,10 @@ import pytest from kafka.structs import TopicPartition -from kafka.coordinator.assignors.range import RangePartitionAssignor -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1 -from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata +from aiokafka.coordinator.assignors.range import RangePartitionAssignor +from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from aiokafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1 +from aiokafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata from kafka.vendor import six diff --git a/tests/kafka/test_partition_movements.py b/tests/coordinator/test_partition_movements.py similarity index 90% rename from tests/kafka/test_partition_movements.py rename to tests/coordinator/test_partition_movements.py index bc990bf3..5cf21283 100644 --- a/tests/kafka/test_partition_movements.py +++ b/tests/coordinator/test_partition_movements.py @@ -1,6 +1,6 @@ from kafka.structs import TopicPartition -from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements def test_empty_movements_are_sticky():