Skip to content

Commit

Permalink
Move coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 22, 2023
1 parent a37e233 commit 3fe52b1
Show file tree
Hide file tree
Showing 20 changed files with 1,166 additions and 690 deletions.
3 changes: 1 addition & 2 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import warnings
from typing import Dict, List

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor

from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.client import AIOKafkaClient
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.errors import (
TopicAuthorizationFailedError, OffsetOutOfRangeError,
ConsumerStoppedError, IllegalOperation, UnsupportedVersionError,
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import copy
import time

from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
from kafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
Expand All @@ -15,6 +13,8 @@
import aiokafka.errors as Errors
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocol
from aiokafka.util import create_future, create_task

log = logging.getLogger(__name__)
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@


class AbstractPartitionAssignor(object):
"""
Abstract assignor implementation which does some common grunt work (in particular collecting
partition counts which are always needed in assignors).
"""Abstract assignor implementation which does some common grunt work (in particular
collecting partition counts which are always needed in assignors).
"""

@abc.abstractproperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import collections
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata,
ConsumerProtocolMemberAssignment,
)

log = logging.getLogger(__name__)

Expand All @@ -28,23 +29,24 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
"""
name = 'range'

name = "range"
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for member, metadata in member_metadata.items():
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)

for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
for topic, consumers_for_topic in consumers_per_topic.items():
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
log.warning("No partition metadata for topic %s", topic)
continue
partitions = sorted(partitions)
consumers_for_topic.sort()
Expand All @@ -58,19 +60,18 @@ def assign(cls, cluster, member_metadata):
length = partitions_per_consumer
if not i + 1 > consumers_with_extra:
length += 1
assignment[member][topic] = partitions[start:start+length]
assignment[member][topic] = partitions[start : start + length]

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
cls.version, sorted(assignment[member_id].items()), b""
)
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"")

@classmethod
def on_assignment(cls, assignment):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import itertools
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.structs import TopicPartition

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata,
ConsumerProtocolMemberAssignment,
)

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -45,20 +47,21 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
"""
name = 'roundrobin'

name = "roundrobin"
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
all_topics = set()
for metadata in six.itervalues(member_metadata):
for metadata in member_metadata.values():
all_topics.update(metadata.subscription)

all_topic_partitions = []
for topic in all_topics:
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
log.warning("No partition metadata for topic %s", topic)
continue
for partition in partitions:
all_topic_partitions.append(TopicPartition(topic, partition))
Expand All @@ -82,14 +85,13 @@ def assign(cls, cluster, member_metadata):
protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
cls.version, sorted(assignment[member_id].items()), b""
)
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b"")

@classmethod
def on_assignment(cls, assignment):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.vendor import six

log = logging.getLogger(__name__)


Expand All @@ -28,7 +26,7 @@ def is_sublist(source, target):
true if target is in source; false otherwise
"""
for index in (i for i, e in enumerate(source) if e == target[0]):
if tuple(source[index: index + len(target)]) == target:
if tuple(source[index : index + len(target)]) == target:
return True
return False

Expand All @@ -41,9 +39,7 @@ class PartitionMovements:
"""

def __init__(self):
self.partition_movements_by_topic = defaultdict(
lambda: defaultdict(set)
)
self.partition_movements_by_topic = defaultdict(lambda: defaultdict(set))
self.partition_movements = {}

def move_partition(self, partition, old_consumer, new_consumer):
Expand All @@ -55,7 +51,11 @@ def move_partition(self, partition, old_consumer, new_consumer):
if existing_pair.src_member_id != new_consumer:
# the partition is not moving back to its previous consumer
self._add_partition_movement_record(
partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer)
partition,
ConsumerPair(
src_member_id=existing_pair.src_member_id,
dst_member_id=new_consumer,
),
)
else:
self._add_partition_movement_record(partition, pair)
Expand All @@ -67,14 +67,18 @@ def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
# this partition has previously moved
assert old_consumer == self.partition_movements[partition].dst_member_id
old_consumer = self.partition_movements[partition].src_member_id
reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer)
reverse_pair = ConsumerPair(
src_member_id=new_consumer, dst_member_id=old_consumer
)
if reverse_pair not in self.partition_movements_by_topic[partition.topic]:
return partition

return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))
return next(
iter(self.partition_movements_by_topic[partition.topic][reverse_pair])
)

def are_sticky(self):
for topic, movements in six.iteritems(self.partition_movements_by_topic):
for topic, movements in self.partition_movements_by_topic.items():
movement_pairs = set(movements.keys())
if self._has_cycles(movement_pairs):
log.error(
Expand Down Expand Up @@ -107,11 +111,13 @@ def _has_cycles(self, consumer_pairs):
reduced_pairs = deepcopy(consumer_pairs)
reduced_pairs.remove(pair)
path = [pair.src_member_id]
if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle(
path, cycles
):
if self._is_linked(
pair.dst_member_id, pair.src_member_id, reduced_pairs, path
) and not self._is_subcycle(path, cycles):
cycles.add(tuple(path))
log.error("A cycle of length {} was found: {}".format(len(path) - 1, path))
log.error(
"A cycle of length {} was found: {}".format(len(path) - 1, path)
)

# for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
# the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
Expand Down Expand Up @@ -145,5 +151,7 @@ def _is_linked(self, src, dst, pairs, current_path):
reduced_set = deepcopy(pairs)
reduced_set.remove(pair)
current_path.append(pair.src_member_id)
return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path)
return self._is_linked(
pair.dst_member_id, dst, reduced_set, current_path
)
return False
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ def pop_last(self):
return value

def add(self, value):
if self._cached_last is not None and self._key(value) > self._key(self._cached_last):
if self._cached_last is not None and self._key(value) > self._key(
self._cached_last
):
self._cached_last = value
if self._cached_first is not None and self._key(value) < self._key(self._cached_first):
if self._cached_first is not None and self._key(value) < self._key(
self._cached_first
):
self._cached_first = value

return self._set.add(value)
Expand Down
Loading

0 comments on commit 3fe52b1

Please sign in to comment.