Skip to content

Commit

Permalink
Reformat coordinator with black
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 22, 2023
1 parent f509fc9 commit bd32596
Show file tree
Hide file tree
Showing 9 changed files with 982 additions and 525 deletions.
19 changes: 11 additions & 8 deletions aiokafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import logging

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from aiokafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata,
ConsumerProtocolMemberAssignment,
)

log = logging.getLogger(__name__)

Expand All @@ -26,7 +29,8 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
"""
name = 'range'

name = "range"
version = 0

@classmethod
Expand All @@ -42,7 +46,7 @@ def assign(cls, cluster, member_metadata):
for topic, consumers_for_topic in consumers_per_topic.items():
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
log.warning("No partition metadata for topic %s", topic)
continue
partitions = sorted(partitions)
consumers_for_topic.sort()
Expand All @@ -56,19 +60,18 @@ def assign(cls, cluster, member_metadata):
length = partitions_per_consumer
if not i + 1 > consumers_with_extra:
length += 1
assignment[member][topic] = partitions[start:start+length]
assignment[member][topic] = partitions[start : start + length]

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
cls.version, sorted(assignment[member_id].items()), b""
)
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"")

@classmethod
def on_assignment(cls, assignment):
Expand Down
17 changes: 10 additions & 7 deletions aiokafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from kafka.structs import TopicPartition

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from aiokafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata,
ConsumerProtocolMemberAssignment,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +47,8 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
"""
name = 'roundrobin'

name = "roundrobin"
version = 0

@classmethod
Expand All @@ -57,7 +61,7 @@ def assign(cls, cluster, member_metadata):
for topic in all_topics:
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
log.warning("No partition metadata for topic %s", topic)
continue
for partition in partitions:
all_topic_partitions.append(TopicPartition(topic, partition))
Expand All @@ -81,14 +85,13 @@ def assign(cls, cluster, member_metadata):
protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
cls.version, sorted(assignment[member_id].items()), b""
)
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"")

@classmethod
def on_assignment(cls, assignment):
Expand Down
34 changes: 22 additions & 12 deletions aiokafka/coordinator/assignors/sticky/partition_movements.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def is_sublist(source, target):
true if target is in source; false otherwise
"""
for index in (i for i, e in enumerate(source) if e == target[0]):
if tuple(source[index: index + len(target)]) == target:
if tuple(source[index : index + len(target)]) == target:
return True
return False

Expand All @@ -39,9 +39,7 @@ class PartitionMovements:
"""

def __init__(self):
self.partition_movements_by_topic = defaultdict(
lambda: defaultdict(set)
)
self.partition_movements_by_topic = defaultdict(lambda: defaultdict(set))
self.partition_movements = {}

def move_partition(self, partition, old_consumer, new_consumer):
Expand All @@ -53,7 +51,11 @@ def move_partition(self, partition, old_consumer, new_consumer):
if existing_pair.src_member_id != new_consumer:
# the partition is not moving back to its previous consumer
self._add_partition_movement_record(
partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer)
partition,
ConsumerPair(
src_member_id=existing_pair.src_member_id,
dst_member_id=new_consumer,
),
)
else:
self._add_partition_movement_record(partition, pair)
Expand All @@ -65,11 +67,15 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
# this partition has previously moved
assert old_consumer == self.partition_movements[partition].dst_member_id
old_consumer = self.partition_movements[partition].src_member_id
reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer)
reverse_pair = ConsumerPair(
src_member_id=new_consumer, dst_member_id=old_consumer
)
if reverse_pair not in self.partition_movements_by_topic[partition.topic]:
return partition

return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))
return next(
iter(self.partition_movements_by_topic[partition.topic][reverse_pair])
)

def are_sticky(self):
for topic, movements in self.partition_movements_by_topic.items():
Expand Down Expand Up @@ -105,11 +111,13 @@ def _has_cycles(self, consumer_pairs):
reduced_pairs = deepcopy(consumer_pairs)
reduced_pairs.remove(pair)
path = [pair.src_member_id]
if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle(
path, cycles
):
if self._is_linked(
pair.dst_member_id, pair.src_member_id, reduced_pairs, path
) and not self._is_subcycle(path, cycles):
cycles.add(tuple(path))
log.error("A cycle of length {} was found: {}".format(len(path) - 1, path))
log.error(
"A cycle of length {} was found: {}".format(len(path) - 1, path)
)

# for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
# the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
Expand Down Expand Up @@ -143,5 +151,7 @@ def _is_linked(self, src, dst, pairs, current_path):
reduced_set = deepcopy(pairs)
reduced_set.remove(pair)
current_path.append(pair.src_member_id)
return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path)
return self._is_linked(
pair.dst_member_id, dst, reduced_set, current_path
)
return False
8 changes: 6 additions & 2 deletions aiokafka/coordinator/assignors/sticky/sorted_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ def pop_last(self):
return value

def add(self, value):
if self._cached_last is not None and self._key(value) > self._key(self._cached_last):
if self._cached_last is not None and self._key(value) > self._key(
self._cached_last
):
self._cached_last = value
if self._cached_first is not None and self._key(value) < self._key(self._cached_first):
if self._cached_first is not None and self._key(value) < self._key(
self._cached_first
):
self._cached_first = value

return self._set.add(value)
Expand Down
Loading

0 comments on commit bd32596

Please sign in to comment.