Skip to content

Commit

Permalink
Move coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 22, 2023
1 parent 3bab6d4 commit f509fc9
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 90 deletions.
3 changes: 1 addition & 2 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import warnings
from typing import Dict, List

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor

from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.client import AIOKafkaClient
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.errors import (
TopicAuthorizationFailedError, OffsetOutOfRangeError,
ConsumerStoppedError, IllegalOperation, UnsupportedVersionError,
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import copy
import time

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
from kafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
Expand All @@ -15,6 +13,8 @@
import aiokafka.errors as Errors
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocol
from aiokafka.util import create_future, create_task

log = logging.getLogger(__name__)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import collections
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -34,14 +32,14 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
@classmethod
def assign(cls, cluster, member_metadata):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for member, metadata in member_metadata.items():
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)

for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
for topic, consumers_for_topic in consumers_per_topic.items():
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import itertools
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.structs import TopicPartition

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -51,7 +50,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
@classmethod
def assign(cls, cluster, member_metadata):
all_topics = set()
for metadata in six.itervalues(member_metadata):
for metadata in member_metadata.values():
all_topics.update(metadata.subscription)

all_topic_partitions = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.vendor import six

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -74,7 +72,7 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))

def are_sticky(self):
for topic, movements in six.iteritems(self.partition_movements_by_topic):
for topic, movements in self.partition_movements_by_topic.items():
movement_pairs = set(movements.keys())
if self._has_cycles(movement_pairs):
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.cluster import ClusterMetadata
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import Schema
from kafka.protocol.struct import Struct
from kafka.protocol.types import String, Array, Int32
from kafka.structs import TopicPartition
from kafka.vendor import six

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from aiokafka.coordinator.assignors.sticky.sorted_set import SortedSet
from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from aiokafka.coordinator.protocol import Schema

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -110,7 +109,7 @@ def balance(self):

# narrow down the reassignment scope to only those partitions that can actually be reassigned
fixed_partitions = set()
for partition in six.iterkeys(self.partition_to_all_potential_consumers):
for partition in self.partition_to_all_potential_consumers.keys():
if not self._can_partition_participate_in_reassignment(partition):
fixed_partitions.add(partition)
for fixed_partition in fixed_partitions:
Expand All @@ -119,7 +118,7 @@ def balance(self):

# narrow down the reassignment scope to only those consumers that are subject to reassignment
fixed_assignments = {}
for consumer in six.iterkeys(self.consumer_to_all_potential_partitions):
for consumer in self.consumer_to_all_potential_partitions.keys():
if not self._can_consumer_participate_in_reassignment(consumer):
self._remove_consumer_from_current_subscriptions_and_maintain_order(consumer)
fixed_assignments[consumer] = self.current_assignment[consumer]
Expand Down Expand Up @@ -148,16 +147,16 @@ def balance(self):
self.current_partition_consumer.update(prebalance_partition_consumers)

# add the fixed assignments (those that could not change) back
for consumer, partitions in six.iteritems(fixed_assignments):
for consumer, partitions in fixed_assignments.items():
self.current_assignment[consumer] = partitions
self._add_consumer_to_current_subscriptions_and_maintain_order(consumer)

def get_final_assignment(self, member_id):
assignment = defaultdict(list)
for topic_partition in self.current_assignment[member_id]:
assignment[topic_partition.topic].append(topic_partition.partition)
assignment = {k: sorted(v) for k, v in six.iteritems(assignment)}
return six.viewitems(assignment)
assignment = {k: sorted(v) for k, v in assignment.items()}
return assignment.items()

def _initialize(self, cluster):
self._init_current_assignments(self.members)
Expand All @@ -170,7 +169,7 @@ def _initialize(self, cluster):
for p in partitions:
partition = TopicPartition(topic=topic, partition=p)
self.partition_to_all_potential_consumers[partition] = []
for consumer_id, member_metadata in six.iteritems(self.members):
for consumer_id, member_metadata in self.members.items():
self.consumer_to_all_potential_partitions[consumer_id] = []
for topic in member_metadata.subscription:
if cluster.partitions_for_topic(topic) is None:
Expand All @@ -190,7 +189,7 @@ def _init_current_assignments(self, members):

# for each partition we create a map of its consumers by generation
sorted_partition_consumers_by_generation = {}
for consumer, member_metadata in six.iteritems(members):
for consumer, member_metadata in members.items():
for partitions in member_metadata.partitions:
if partitions in sorted_partition_consumers_by_generation:
consumers = sorted_partition_consumers_by_generation[partitions]
Expand All @@ -209,7 +208,7 @@ def _init_current_assignments(self, members):

# previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition
# current and previous consumers are the last two consumers of each partition in the above sorted map
for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation):
for partitions, consumers in sorted_partition_consumers_by_generation.items():
generations = sorted(consumers.keys(), reverse=True)
self.current_assignment[consumers[generations[0]]].append(partitions)
# now update previous assignment if any
Expand All @@ -220,7 +219,7 @@ def _init_current_assignments(self, members):

self.is_fresh_assignment = len(self.current_assignment) == 0

for consumer_id, partitions in six.iteritems(self.current_assignment):
for consumer_id, partitions in self.current_assignment.items():
for partition in partitions:
self.current_partition_consumer[partition] = consumer_id

Expand All @@ -230,14 +229,14 @@ def _are_subscriptions_identical(self):
true, if both potential consumers of partitions and potential partitions that consumers can
consume are the same
"""
if not has_identical_list_elements(list(six.itervalues(self.partition_to_all_potential_consumers))):
if not has_identical_list_elements(list(self.partition_to_all_potential_consumers.values())):
return False
return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions)))
return has_identical_list_elements(list(self.consumer_to_all_potential_partitions.values()))

def _populate_sorted_partitions(self):
# set of topic partitions with their respective potential consumers
all_partitions = set((tp, tuple(consumers))
for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers))
for tp, consumers in self.partition_to_all_potential_consumers.items())
partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key)

self.sorted_partitions = []
Expand All @@ -246,7 +245,7 @@ def _populate_sorted_partitions(self):
# then we just need to simply list partitions in a round robin fashion (from consumers with
# most assigned partitions to those with least)
assignments = deepcopy(self.current_assignment)
for consumer_id, partitions in six.iteritems(assignments):
for partitions in assignments.values():
to_remove = []
for partition in partitions:
if partition not in self.partition_to_all_potential_consumers:
Expand All @@ -255,7 +254,7 @@ def _populate_sorted_partitions(self):
partitions.remove(partition)

sorted_consumers = SortedSet(
iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(assignments)],
iterable=[(consumer, tuple(partitions)) for consumer, partitions in assignments.items()],
key=subscriptions_comparator_key,
)
# at this point, sorted_consumers contains an ascending-sorted list of consumers based on
Expand All @@ -267,7 +266,7 @@ def _populate_sorted_partitions(self):
remaining_partitions = assignments[consumer]
# from partitions that had a different consumer before,
# keep only those that are assigned to this consumer now
previous_partitions = set(six.iterkeys(self.previous_assignment)).intersection(set(remaining_partitions))
previous_partitions = set(self.previous_assignment.keys()).intersection(set(remaining_partitions))
if previous_partitions:
# if there is a partition of this consumer that was assigned to another consumer before
# mark it as good options for reassignment
Expand All @@ -292,7 +291,7 @@ def _populate_partitions_to_reassign(self):
self.unassigned_partitions = deepcopy(self.sorted_partitions)

assignments_to_remove = []
for consumer_id, partitions in six.iteritems(self.current_assignment):
for consumer_id, partitions in self.current_assignment.items():
if consumer_id not in self.members:
# if a consumer that existed before (and had some partition assignments) is now removed,
# remove it from current_assignment
Expand Down Expand Up @@ -325,7 +324,7 @@ def _populate_partitions_to_reassign(self):

def _initialize_current_subscriptions(self):
self.sorted_current_subscriptions = SortedSet(
iterable=[(consumer, tuple(partitions)) for consumer, partitions in six.iteritems(self.current_assignment)],
iterable=[(consumer, tuple(partitions)) for consumer, partitions in self.current_assignment.items()],
key=subscriptions_comparator_key,
)

Expand All @@ -352,7 +351,7 @@ def _is_balanced(self):

# create a mapping from partitions to the consumer assigned to them
all_assigned_partitions = {}
for consumer_id, consumer_partitions in six.iteritems(self.current_assignment):
for consumer_id, consumer_partitions in self.current_assignment.items():
for partition in consumer_partitions:
if partition in all_assigned_partitions:
log.error("{} is assigned to more than one consumer.".format(partition))
Expand Down Expand Up @@ -491,7 +490,7 @@ def _get_balance_score(assignment):
"""
score = 0
consumer_to_assignment = {}
for consumer_id, partitions in six.iteritems(assignment):
for consumer_id, partitions in assignment.items():
consumer_to_assignment[consumer_id] = len(partitions)

consumers_to_explore = set(consumer_to_assignment.keys())
Expand All @@ -506,66 +505,66 @@ def _get_balance_score(assignment):
class StickyPartitionAssignor(AbstractPartitionAssignor):
"""
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
- the numbers of topic partitions assigned to consumers differ by at most one; or
- each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
Second, it preserved as many existing assignment as possible when a reassignment occurs.
This helps in saving some of the overhead processing when topic partitions move from one consumer to another.
Starting fresh it would work by distributing the partitions over consumers as evenly as possible.
Even though this may sound similar to how round robin assignor works, the second example below shows that it is not.
During a reassignment it would perform the reassignment in such a way that in the new assignment
- topic partitions are still distributed as evenly as possible, and
- topic partitions stay with their previously assigned consumers as much as possible.
The first goal above takes precedence over the second one.
Example 1.
Suppose there are three consumers C0, C1, C2,
four topics t0, t1, t2, t3, and each topic has 2 partitions,
resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1.
Each consumer is subscribed to all three topics.
The assignment with both sticky and round robin assignors will be:
- C0: [t0p0, t1p1, t3p0]
- C1: [t0p1, t2p0, t3p1]
- C2: [t1p0, t2p1]
Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce:
- C0: [t0p0, t1p0, t2p0, t3p0]
- C2: [t0p1, t1p1, t2p1, t3p1]
while the sticky assignor would result in:
- C0 [t0p0, t1p1, t3p0, t2p0]
- C2 [t1p0, t2p1, t0p1, t3p1]
preserving all the previous assignments (unlike the round robin assignor).
Example 2.
There are three consumers C0, C1, C2,
and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively.
Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.
C0 is subscribed to t0;
C1 is subscribed to t0, t1;
and C2 is subscribed to t0, t1, t2.
The round robin assignor would come up with the following assignment:
- C0 [t0p0]
- C1 [t1p0]
- C2 [t1p1, t2p0, t2p1, t2p2]
which is not as balanced as the assignment suggested by sticky assignor:
- C0 [t0p0]
- C1 [t1p0, t1p1]
- C2 [t2p0, t2p1, t2p2]
Now, if consumer C0 is removed, these two assignors would produce the following assignments.
Round Robin (preserves 3 partition assignments):
- C1 [t0p0, t1p1]
- C2 [t1p0, t2p0, t2p1, t2p2]
Sticky (preserves 5 partition assignments):
- C1 [t1p0, t1p1, t0p0]
- C2 [t2p0, t2p1, t2p2]
Expand Down Expand Up @@ -593,7 +592,7 @@ def assign(cls, cluster, members):
dict: {member_id: MemberAssignment}
"""
members_metadata = {}
for consumer, member_metadata in six.iteritems(members):
for consumer, member_metadata in members.items():
members_metadata[consumer] = cls.parse_member_metadata(member_metadata)

executor = StickyAssignmentExecutor(cluster, members_metadata)
Expand Down Expand Up @@ -660,7 +659,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
partitions_by_topic = defaultdict(list)
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
data = StickyAssignorUserDataV1(partitions_by_topic.items(), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)

Expand Down
Loading

0 comments on commit f509fc9

Please sign in to comment.