From c4b88cfce6407dbb7b9bbf9464c4875dfee74623 Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 22 Oct 2023 16:03:37 +0300 Subject: [PATCH] Untie ConsumerRebalanceListener --- aiokafka/abc.py | 3 +- aiokafka/consumer/subscription_state.py | 2 +- kafka/__init__.py | 1 - kafka/consumer/__init__.py | 0 kafka/consumer/subscription_state.py | 501 ------------------ tests/kafka/conftest.py | 8 - tests/kafka/test_consumer_integration.py | 297 ----------- tests/kafka/test_coordinator.py | 638 ----------------------- tests/kafka/test_subscription_state.py | 25 - 9 files changed, 2 insertions(+), 1473 deletions(-) delete mode 100644 kafka/consumer/__init__.py delete mode 100644 kafka/consumer/subscription_state.py delete mode 100644 tests/kafka/test_consumer_integration.py delete mode 100644 tests/kafka/test_coordinator.py delete mode 100644 tests/kafka/test_subscription_state.py diff --git a/aiokafka/abc.py b/aiokafka/abc.py index 6d1815a6..d51b986e 100644 --- a/aiokafka/abc.py +++ b/aiokafka/abc.py @@ -1,8 +1,7 @@ import abc -from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener -class ConsumerRebalanceListener(BaseConsumerRebalanceListener): +class ConsumerRebalanceListener(abc.ABC): """ A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. diff --git a/aiokafka/consumer/subscription_state.py b/aiokafka/consumer/subscription_state.py index b2dbd388..0f2351eb 100644 --- a/aiokafka/consumer/subscription_state.py +++ b/aiokafka/consumer/subscription_state.py @@ -5,7 +5,7 @@ from asyncio import shield, Event, Future from enum import Enum -from typing import Dict, FrozenSet, Iterable, List, Pattern, Set +from typing import Dict, Iterable, List, Pattern, Set from aiokafka.errors import IllegalStateError from aiokafka.structs import OffsetAndMetadata, TopicPartition diff --git a/kafka/__init__.py b/kafka/__init__.py index c4308c5e..2a335d23 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -20,7 +20,6 @@ def emit(self, record): from kafka.admin import KafkaAdminClient from kafka.client_async import KafkaClient -from kafka.consumer.subscription_state import ConsumerRebalanceListener from kafka.conn import BrokerConnection from kafka.serializer import Serializer, Deserializer from kafka.structs import TopicPartition, OffsetAndMetadata diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py deleted file mode 100644 index 08842d13..00000000 --- a/kafka/consumer/subscription_state.py +++ /dev/null @@ -1,501 +0,0 @@ -from __future__ import absolute_import - -import abc -import logging -import re - -from kafka.vendor import six - -from kafka.errors import IllegalStateError -from kafka.protocol.offset import OffsetResetStrategy -from kafka.structs import OffsetAndMetadata - -log = logging.getLogger(__name__) - - -class SubscriptionState(object): - """ - A class for tracking the topics, partitions, and offsets for the consumer. - A partition is "assigned" either directly with assign_from_user() (manual - assignment) or with assign_from_subscribed() (automatic assignment from - subscription). - - Once assigned, the partition is not considered "fetchable" until its initial - position has been set with seek(). Fetchable partitions track a fetch - position which is used to set the offset of the next fetch, and a consumed - position which is the last offset that has been returned to the user. You - can suspend fetching from a partition through pause() without affecting the - fetched/consumed offsets. The partition will remain unfetchable until the - resume() is used. You can also query the pause state independently with - is_paused(). - - Note that pause state as well as fetch/consumed positions are not preserved - when partition assignment is changed whether directly by the user or - through a group rebalance. - - This class also maintains a cache of the latest commit position for each of - the assigned partitions. This is updated through committed() and can be used - to set the initial fetch position (e.g. Fetcher._reset_offset() ). - """ - _SUBSCRIPTION_EXCEPTION_MESSAGE = ( - "You must choose only one way to configure your consumer:" - " (1) subscribe to specific topics by name," - " (2) subscribe to topics matching a regex pattern," - " (3) assign itself specific topic-partitions.") - - # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 - _MAX_NAME_LENGTH = 249 - _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') - - def __init__(self, offset_reset_strategy='earliest'): - """Initialize a SubscriptionState instance - - Keyword Arguments: - offset_reset_strategy: 'earliest' or 'latest', otherwise - exception will be raised when fetching an offset that is no - longer available. Default: 'earliest' - """ - try: - offset_reset_strategy = getattr(OffsetResetStrategy, - offset_reset_strategy.upper()) - except AttributeError: - log.warning('Unrecognized offset_reset_strategy, using NONE') - offset_reset_strategy = OffsetResetStrategy.NONE - self._default_offset_reset_strategy = offset_reset_strategy - - self.subscription = None # set() or None - self.subscribed_pattern = None # regex str or None - self._group_subscription = set() - self._user_assignment = set() - self.assignment = dict() - self.listener = None - - # initialize to true for the consumers to fetch offset upon starting up - self.needs_fetch_committed_offsets = True - - def subscribe(self, topics=(), pattern=None, listener=None): - """Subscribe to a list of topics, or a topic regex pattern. - - Partitions will be dynamically assigned via a group coordinator. - Topic subscriptions are not incremental: this list will replace the - current assignment (if there is one). - - This method is incompatible with assign_from_user() - - Arguments: - topics (list): List of topics for subscription. - pattern (str): Pattern to match available topics. You must provide - either topics or pattern, but not both. - listener (ConsumerRebalanceListener): Optionally include listener - callback, which will be called before and after each rebalance - operation. - - As part of group management, the consumer will keep track of the - list of consumers that belong to a particular group and will - trigger a rebalance operation if one of the following events - trigger: - - * Number of partitions change for any of the subscribed topics - * Topic is created or deleted - * An existing member of the consumer group dies - * A new member is added to the consumer group - - When any of these events are triggered, the provided listener - will be invoked first to indicate that the consumer's assignment - has been revoked, and then again when the new assignment has - been received. Note that this listener will immediately override - any listener set in a previous call to subscribe. It is - guaranteed, however, that the partitions revoked/assigned - through this interface are from topics subscribed in this call. - """ - if self._user_assignment or (topics and pattern): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - assert topics or pattern, 'Must provide topics or pattern' - - if pattern: - log.info('Subscribing to pattern: /%s/', pattern) - self.subscription = set() - self.subscribed_pattern = re.compile(pattern) - else: - self.change_subscription(topics) - - if listener and not isinstance(listener, ConsumerRebalanceListener): - raise TypeError('listener must be a ConsumerRebalanceListener') - self.listener = listener - - def _ensure_valid_topic_name(self, topic): - """ Ensures that the topic name is valid according to the kafka source. """ - - # See Kafka Source: - # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java - if topic is None: - raise TypeError('All topics must not be None') - if not isinstance(topic, six.string_types): - raise TypeError('All topics must be strings') - if len(topic) == 0: - raise ValueError('All topics must be non-empty strings') - if topic == '.' or topic == '..': - raise ValueError('Topic name cannot be "." or ".."') - if len(topic) > self._MAX_NAME_LENGTH: - raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) - if not self._TOPIC_LEGAL_CHARS.match(topic): - raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) - - def change_subscription(self, topics): - """Change the topic subscription. - - Arguments: - topics (list of str): topics for subscription - - Raises: - IllegalStateError: if assign_from_user has been used already - TypeError: if a topic is None or a non-str - ValueError: if a topic is an empty string or - - a topic name is '.' or '..' or - - a topic name does not consist of ASCII-characters/'-'/'_'/'.' - """ - if self._user_assignment: - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - - if isinstance(topics, six.string_types): - topics = [topics] - - if self.subscription == set(topics): - log.warning("subscription unchanged by change_subscription(%s)", - topics) - return - - for t in topics: - self._ensure_valid_topic_name(t) - - log.info('Updating subscribed topics to: %s', topics) - self.subscription = set(topics) - self._group_subscription.update(topics) - - # Remove any assigned partitions which are no longer subscribed to - for tp in set(self.assignment.keys()): - if tp.topic not in self.subscription: - del self.assignment[tp] - - def group_subscribe(self, topics): - """Add topics to the current group subscription. - - This is used by the group leader to ensure that it receives metadata - updates for all topics that any member of the group is subscribed to. - - Arguments: - topics (list of str): topics to add to the group subscription - """ - if self._user_assignment: - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - self._group_subscription.update(topics) - - def reset_group_subscription(self): - """Reset the group's subscription to only contain topics subscribed by this consumer.""" - if self._user_assignment: - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - assert self.subscription is not None, 'Subscription required' - self._group_subscription.intersection_update(self.subscription) - - def assign_from_user(self, partitions): - """Manually assign a list of TopicPartitions to this consumer. - - This interface does not allow for incremental assignment and will - replace the previous assignment (if there was one). - - Manual topic assignment through this method does not use the consumer's - group management functionality. As such, there will be no rebalance - operation triggered when group membership or cluster and topic metadata - change. Note that it is not possible to use both manual partition - assignment with assign() and group assignment with subscribe(). - - Arguments: - partitions (list of TopicPartition): assignment for this instance. - - Raises: - IllegalStateError: if consumer has already called subscribe() - """ - if self.subscription is not None: - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - - if self._user_assignment != set(partitions): - self._user_assignment = set(partitions) - - for partition in partitions: - if partition not in self.assignment: - self._add_assigned_partition(partition) - - for tp in set(self.assignment.keys()) - self._user_assignment: - del self.assignment[tp] - - self.needs_fetch_committed_offsets = True - - def assign_from_subscribed(self, assignments): - """Update the assignment to the specified partitions - - This method is called by the coordinator to dynamically assign - partitions based on the consumer's topic subscription. This is different - from assign_from_user() which directly sets the assignment from a - user-supplied TopicPartition list. - - Arguments: - assignments (list of TopicPartition): partitions to assign to this - consumer instance. - """ - if not self.partitions_auto_assigned(): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - - for tp in assignments: - if tp.topic not in self.subscription: - raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) - - # after rebalancing, we always reinitialize the assignment state - self.assignment.clear() - for tp in assignments: - self._add_assigned_partition(tp) - self.needs_fetch_committed_offsets = True - log.info("Updated partition assignment: %s", assignments) - - def unsubscribe(self): - """Clear all topic subscriptions and partition assignments""" - self.subscription = None - self._user_assignment.clear() - self.assignment.clear() - self.subscribed_pattern = None - - def group_subscription(self): - """Get the topic subscription for the group. - - For the leader, this will include the union of all member subscriptions. - For followers, it is the member's subscription only. - - This is used when querying topic metadata to detect metadata changes - that would require rebalancing (the leader fetches metadata for all - topics in the group so that it can do partition assignment). - - Returns: - set: topics - """ - return self._group_subscription - - def seek(self, partition, offset): - """Manually specify the fetch offset for a TopicPartition. - - Overrides the fetch offsets that the consumer will use on the next - poll(). If this API is invoked for the same partition more than once, - the latest offset will be used on the next poll(). Note that you may - lose data if this API is arbitrarily used in the middle of consumption, - to reset the fetch offsets. - - Arguments: - partition (TopicPartition): partition for seek operation - offset (int): message offset in partition - """ - self.assignment[partition].seek(offset) - - def assigned_partitions(self): - """Return set of TopicPartitions in current assignment.""" - return set(self.assignment.keys()) - - def paused_partitions(self): - """Return current set of paused TopicPartitions.""" - return set(partition for partition in self.assignment - if self.is_paused(partition)) - - def fetchable_partitions(self): - """Return set of TopicPartitions that should be Fetched.""" - fetchable = set() - for partition, state in six.iteritems(self.assignment): - if state.is_fetchable(): - fetchable.add(partition) - return fetchable - - def partitions_auto_assigned(self): - """Return True unless user supplied partitions manually.""" - return self.subscription is not None - - def all_consumed_offsets(self): - """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}""" - all_consumed = {} - for partition, state in six.iteritems(self.assignment): - if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.position, '') - return all_consumed - - def need_offset_reset(self, partition, offset_reset_strategy=None): - """Mark partition for offset reset using specified or default strategy. - - Arguments: - partition (TopicPartition): partition to mark - offset_reset_strategy (OffsetResetStrategy, optional) - """ - if offset_reset_strategy is None: - offset_reset_strategy = self._default_offset_reset_strategy - self.assignment[partition].await_reset(offset_reset_strategy) - - def has_default_offset_reset_policy(self): - """Return True if default offset reset policy is Earliest or Latest""" - return self._default_offset_reset_strategy != OffsetResetStrategy.NONE - - def is_offset_reset_needed(self, partition): - return self.assignment[partition].awaiting_reset - - def has_all_fetch_positions(self): - for state in self.assignment.values(): - if not state.has_valid_position: - return False - return True - - def missing_fetch_positions(self): - missing = set() - for partition, state in six.iteritems(self.assignment): - if not state.has_valid_position: - missing.add(partition) - return missing - - def is_assigned(self, partition): - return partition in self.assignment - - def is_paused(self, partition): - return partition in self.assignment and self.assignment[partition].paused - - def is_fetchable(self, partition): - return partition in self.assignment and self.assignment[partition].is_fetchable() - - def pause(self, partition): - self.assignment[partition].pause() - - def resume(self, partition): - self.assignment[partition].resume() - - def _add_assigned_partition(self, partition): - self.assignment[partition] = TopicPartitionState() - - -class TopicPartitionState(object): - def __init__(self): - self.committed = None # last committed OffsetAndMetadata - self.has_valid_position = False # whether we have valid position - self.paused = False # whether this partition has been paused by the user - self.awaiting_reset = False # whether we are awaiting reset - self.reset_strategy = None # the reset strategy if awaitingReset is set - self._position = None # offset exposed to the user - self.highwater = None - self.drop_pending_message_set = False - # The last message offset hint available from a message batch with - # magic=2 which includes deleted compacted messages - self.last_offset_from_message_batch = None - - def _set_position(self, offset): - assert self.has_valid_position, 'Valid position required' - self._position = offset - - def _get_position(self): - return self._position - - position = property(_get_position, _set_position, None, "last position") - - def await_reset(self, strategy): - self.awaiting_reset = True - self.reset_strategy = strategy - self._position = None - self.last_offset_from_message_batch = None - self.has_valid_position = False - - def seek(self, offset): - self._position = offset - self.awaiting_reset = False - self.reset_strategy = None - self.has_valid_position = True - self.drop_pending_message_set = True - self.last_offset_from_message_batch = None - - def pause(self): - self.paused = True - - def resume(self): - self.paused = False - - def is_fetchable(self): - return not self.paused and self.has_valid_position - - -class ConsumerRebalanceListener(object): - """ - A callback interface that the user can implement to trigger custom actions - when the set of partitions assigned to the consumer changes. - - This is applicable when the consumer is having Kafka auto-manage group - membership. If the consumer's directly assign partitions, those - partitions will never be reassigned and this callback is not applicable. - - When Kafka is managing the group membership, a partition re-assignment will - be triggered any time the members of the group changes or the subscription - of the members changes. This can occur when processes die, new process - instances are added or old instances come back to life after failure. - Rebalances can also be triggered by changes affecting the subscribed - topics (e.g. when then number of partitions is administratively adjusted). - - There are many uses for this functionality. One common use is saving offsets - in a custom store. By saving offsets in the on_partitions_revoked(), call we - can ensure that any time partition assignment changes the offset gets saved. - - Another use is flushing out any kind of cache of intermediate results the - consumer may be keeping. For example, consider a case where the consumer is - subscribed to a topic containing user page views, and the goal is to count - the number of page views per users for each five minute window. Let's say - the topic is partitioned by the user id so that all events for a particular - user will go to a single consumer instance. The consumer can keep in memory - a running tally of actions per user and only flush these out to a remote - data store when its cache gets too big. However if a partition is reassigned - it may want to automatically trigger a flush of this cache, before the new - owner takes over consumption. - - This callback will execute in the user thread as part of the Consumer.poll() - whenever partition assignment changes. - - It is guaranteed that all consumer processes will invoke - on_partitions_revoked() prior to any process invoking - on_partitions_assigned(). So if offsets or other state is saved in the - on_partitions_revoked() call, it should be saved by the time the process - taking over that partition has their on_partitions_assigned() callback - called to load the state. - """ - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def on_partitions_revoked(self, revoked): - """ - A callback method the user can implement to provide handling of offset - commits to a customized store on the start of a rebalance operation. - This method will be called before a rebalance operation starts and - after the consumer stops fetching data. It is recommended that offsets - should be committed in this callback to either Kafka or a custom offset - store to prevent duplicate data. - - NOTE: This method is only called before rebalances. It is not called - prior to KafkaConsumer.close() - - Arguments: - revoked (list of TopicPartition): the partitions that were assigned - to the consumer on the last rebalance - """ - pass - - @abc.abstractmethod - def on_partitions_assigned(self, assigned): - """ - A callback method the user can implement to provide handling of - customized offsets on completion of a successful partition - re-assignment. This method will be called after an offset re-assignment - completes and before the consumer starts fetching data. - - It is guaranteed that all the processes in a consumer group will execute - their on_partitions_revoked() callback before any instance executes its - on_partitions_assigned() callback. - - Arguments: - assigned (list of TopicPartition): the partitions assigned to the - consumer (may include partitions that were previously assigned) - """ - pass diff --git a/tests/kafka/conftest.py b/tests/kafka/conftest.py index 0bbd1a2b..2fd11b40 100644 --- a/tests/kafka/conftest.py +++ b/tests/kafka/conftest.py @@ -42,14 +42,6 @@ def factory(**broker_params): broker.close() -@pytest.fixture -def kafka_client(kafka_broker, request): - """Return a KafkaClient fixture""" - (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) - yield client - client.close() - - @pytest.fixture def kafka_consumer(kafka_consumer_factory): """Return a KafkaConsumer fixture""" diff --git a/tests/kafka/test_consumer_integration.py b/tests/kafka/test_consumer_integration.py deleted file mode 100644 index a2644bae..00000000 --- a/tests/kafka/test_consumer_integration.py +++ /dev/null @@ -1,297 +0,0 @@ -import logging -import time - -from unittest.mock import patch -import pytest -from kafka.vendor.six.moves import range - -import kafka.codec -from kafka.errors import UnsupportedCodecError, UnsupportedVersionError -from kafka.structs import TopicPartition, OffsetAndTimestamp - -from tests.kafka.testutil import Timer, assert_message_count, env_kafka_version, random_string - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_version_infer(kafka_consumer_factory): - consumer = kafka_consumer_factory() - actual_ver_major_minor = env_kafka_version()[:2] - client = consumer._client - conn = list(client._conns.values())[0] - inferred_ver_major_minor = conn.check_version()[:2] - assert actual_ver_major_minor == inferred_ver_major_minor, \ - "Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor) - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(kafka_consumer_factory, send_messages): - """Test KafkaConsumer""" - consumer = kafka_consumer_factory(auto_offset_reset='earliest') - send_messages(range(0, 100), partition=0) - send_messages(range(0, 100), partition=1) - cnt = 0 - messages = {0: [], 1: []} - for message in consumer: - logging.debug("Consumed message %s", repr(message)) - cnt += 1 - messages[message.partition].append(message) - if cnt >= 200: - break - - assert_message_count(messages[0], 100) - assert_message_count(messages[1], 100) - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer_unsupported_encoding( - topic, kafka_producer_factory, kafka_consumer_factory): - # Send a compressed message - producer = kafka_producer_factory(compression_type="gzip") - fut = producer.send(topic, b"simple message" * 200) - fut.get(timeout=5) - producer.close() - - # Consume, but with the related compression codec not available - with patch.object(kafka.codec, "has_gzip") as mocked: - mocked.return_value = False - consumer = kafka_consumer_factory(auto_offset_reset='earliest') - error_msg = "Libraries for gzip compression codec not found" - with pytest.raises(UnsupportedCodecError, match=error_msg): - consumer.poll(timeout_ms=2000) - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): - TIMEOUT_MS = 500 - consumer = kafka_consumer_factory(auto_offset_reset='earliest', - enable_auto_commit=False, - consumer_timeout_ms=TIMEOUT_MS) - - # Manual assignment avoids overhead of consumer group mgmt - consumer.unsubscribe() - consumer.assign([TopicPartition(topic, 0)]) - - # Ask for 5 messages, nothing in queue, block 500ms - with Timer() as t: - with pytest.raises(StopIteration): - msg = next(consumer) - assert t.interval >= (TIMEOUT_MS / 1000.0) - - send_messages(range(0, 10)) - - # Ask for 5 messages, 10 in queue. Get 5 back, no blocking - messages = [] - with Timer() as t: - for i in range(5): - msg = next(consumer) - messages.append(msg) - assert_message_count(messages, 5) - assert t.interval < (TIMEOUT_MS / 1000.0) - - # Ask for 10 messages, get 5 back, block 500ms - messages = [] - with Timer() as t: - with pytest.raises(StopIteration): - for i in range(10): - msg = next(consumer) - messages.append(msg) - assert_message_count(messages, 5) - assert t.interval >= (TIMEOUT_MS / 1000.0) - - -@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") -def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messages): - GROUP_ID = random_string(10) - - send_messages(range(0, 100), partition=0) - send_messages(range(100, 200), partition=1) - - # Start a consumer and grab the first 180 messages - consumer1 = kafka_consumer_factory( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - output_msgs1 = [] - for _ in range(180): - m = next(consumer1) - output_msgs1.append(m) - assert_message_count(output_msgs1, 180) - - # Normally we let the pytest fixture `kafka_consumer_factory` handle - # closing as part of its teardown. Here we manually call close() to force - # auto-commit to occur before the second consumer starts. That way the - # second consumer only consumes previously unconsumed messages. - consumer1.close() - - # Start a second consumer to grab 181-200 - consumer2 = kafka_consumer_factory( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - output_msgs2 = [] - for _ in range(20): - m = next(consumer2) - output_msgs2.append(m) - assert_message_count(output_msgs2, 20) - - # Verify the second consumer wasn't reconsuming messages that the first - # consumer already saw - assert_message_count(output_msgs1 + output_msgs2, 200) - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): - send_messages(range(100, 200), partition=0) - send_messages(range(200, 300), partition=1) - - # Start a consumer - consumer = kafka_consumer_factory( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set() - for i in range(90): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in poll_res.items(): - for msg in msgs: - seen_partitions.add(partition) - - # Check that we fetched at least 1 message from both partitions - assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - send_messages(range(100, 200)) - - # Start a consumer. FetchResponse_v3 should always include at least 1 - # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time - # But 0.11.0.0 returns 1 MessageSet at a time when the messages are - # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = kafka_consumer_factory( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) - - fetched_msgs = [next(consumer) for i in range(10)] - assert_message_count(fetched_msgs, 10) - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer): - late_time = int(time.time()) * 1000 - middle_time = late_time - 1000 - early_time = late_time - 2000 - tp = TopicPartition(topic, 0) - - timeout = 10 - early_msg = kafka_producer.send( - topic, partition=0, value=b"first", - timestamp_ms=early_time).get(timeout) - late_msg = kafka_producer.send( - topic, partition=0, value=b"last", - timestamp_ms=late_time).get(timeout) - - consumer = kafka_consumer - offsets = consumer.offsets_for_times({tp: early_time}) - assert len(offsets) == 1 - assert offsets[tp].offset == early_msg.offset - assert offsets[tp].timestamp == early_time - - offsets = consumer.offsets_for_times({tp: middle_time}) - assert offsets[tp].offset == late_msg.offset - assert offsets[tp].timestamp == late_time - - offsets = consumer.offsets_for_times({tp: late_time}) - assert offsets[tp].offset == late_msg.offset - assert offsets[tp].timestamp == late_time - - offsets = consumer.offsets_for_times({}) - assert offsets == {} - - # Out of bound timestamps check - - offsets = consumer.offsets_for_times({tp: 0}) - assert offsets[tp].offset == early_msg.offset - assert offsets[tp].timestamp == early_time - - offsets = consumer.offsets_for_times({tp: 9999999999999}) - assert offsets[tp] is None - - # Beginning/End offsets - - offsets = consumer.beginning_offsets([tp]) - assert offsets == {tp: early_msg.offset} - offsets = consumer.end_offsets([tp]) - assert offsets == {tp: late_msg.offset + 1} - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_producer, topic): - tp0 = TopicPartition(topic, 0) - tp1 = TopicPartition(topic, 1) - - send_time = int(time.time() * 1000) - timeout = 10 - p0msg = kafka_producer.send( - topic, partition=0, value=b"XXX", - timestamp_ms=send_time).get(timeout) - p1msg = kafka_producer.send( - topic, partition=1, value=b"XXX", - timestamp_ms=send_time).get(timeout) - - consumer = kafka_consumer - offsets = consumer.offsets_for_times({ - tp0: send_time, - tp1: send_time - }) - - assert offsets == { - tp0: OffsetAndTimestamp(p0msg.offset, send_time), - tp1: OffsetAndTimestamp(p1msg.offset, send_time) - } - - offsets = consumer.beginning_offsets([tp0, tp1]) - assert offsets == { - tp0: p0msg.offset, - tp1: p1msg.offset - } - - offsets = consumer.end_offsets([tp0, tp1]) - assert offsets == { - tp0: p0msg.offset + 1, - tp1: p1msg.offset + 1 - } - - -@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") -def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): - consumer = kafka_consumer - tp = TopicPartition(topic, 0) - - with pytest.raises(UnsupportedVersionError): - consumer.offsets_for_times({tp: int(time.time())}) - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): - consumer = kafka_consumer_factory(fetch_max_wait_ms=200, - request_timeout_ms=500) - tp = TopicPartition(topic, 0) - bad_tp = TopicPartition(topic, 100) - - with pytest.raises(ValueError): - consumer.offsets_for_times({tp: -1}) - - assert consumer.offsets_for_times({bad_tp: 0}) == {bad_tp: None} diff --git a/tests/kafka/test_coordinator.py b/tests/kafka/test_coordinator.py deleted file mode 100644 index a35cdd1a..00000000 --- a/tests/kafka/test_coordinator.py +++ /dev/null @@ -1,638 +0,0 @@ -# pylint: skip-file -from __future__ import absolute_import -import time - -import pytest - -from kafka.client_async import KafkaClient -from kafka.consumer.subscription_state import ( - SubscriptionState, ConsumerRebalanceListener) -from kafka.coordinator.assignors.range import RangePartitionAssignor -from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor -from kafka.coordinator.base import Generation, MemberState, HeartbeatThread -from kafka.coordinator.consumer import ConsumerCoordinator -from kafka.coordinator.protocol import ( - ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) -import kafka.errors as Errors -from kafka.future import Future -from kafka.metrics import Metrics -from kafka.protocol.commit import ( - OffsetCommitRequest, OffsetCommitResponse, - OffsetFetchRequest, OffsetFetchResponse) -from kafka.protocol.metadata import MetadataResponse -from kafka.structs import OffsetAndMetadata, TopicPartition -from kafka.util import WeakMethod - - -@pytest.fixture -def client(conn): - return KafkaClient(api_version=(0, 9)) - -@pytest.fixture -def coordinator(client): - return ConsumerCoordinator(client, SubscriptionState(), Metrics()) - - -def test_init(client, coordinator): - # metadata update on init - assert client.cluster._need_update is True - assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners - - -@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_autocommit_enable_api_version(client, api_version): - coordinator = ConsumerCoordinator(client, SubscriptionState(), - Metrics(), - enable_auto_commit=True, - session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms - max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError - group_id='foobar', - api_version=api_version) - if api_version < (0, 8, 1): - assert coordinator.config['enable_auto_commit'] is False - else: - assert coordinator.config['enable_auto_commit'] is True - - -def test_protocol_type(coordinator): - assert coordinator.protocol_type() == 'consumer' - - -def test_group_protocols(coordinator): - # Requires a subscription - try: - coordinator.group_protocols() - except Errors.IllegalStateError: - pass - else: - assert False, 'Exception not raised when expected' - - coordinator._subscription.subscribe(topics=['foobar']) - assert coordinator.group_protocols() == [ - ('range', ConsumerProtocolMemberMetadata( - RangePartitionAssignor.version, - ['foobar'], - b'')), - ('roundrobin', ConsumerProtocolMemberMetadata( - RoundRobinPartitionAssignor.version, - ['foobar'], - b'')), - ('sticky', ConsumerProtocolMemberMetadata( - StickyPartitionAssignor.version, - ['foobar'], - b'')), - ] - - -@pytest.mark.parametrize('api_version', [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) -def test_pattern_subscription(coordinator, api_version): - coordinator.config['api_version'] = api_version - coordinator._subscription.subscribe(pattern='foo') - assert coordinator._subscription.subscription == set([]) - assert coordinator._metadata_snapshot == coordinator._build_metadata_snapshot(coordinator._subscription, {}) - - cluster = coordinator._client.cluster - cluster.update_metadata(MetadataResponse[0]( - # brokers - [(0, 'foo', 12), (1, 'bar', 34)], - # topics - [(0, 'fizz', []), - (0, 'foo1', [(0, 0, 0, [], [])]), - (0, 'foo2', [(0, 0, 1, [], [])])])) - assert coordinator._subscription.subscription == {'foo1', 'foo2'} - - # 0.9 consumers should trigger dynamic partition assignment - if api_version >= (0, 9): - assert coordinator._subscription.assignment == {} - - # earlier consumers get all partitions assigned locally - else: - assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0), - TopicPartition('foo2', 0)} - - -def test_lookup_assignor(coordinator): - assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor - assert coordinator._lookup_assignor('range') is RangePartitionAssignor - assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor - assert coordinator._lookup_assignor('foobar') is None - - -def test_join_complete(mocker, coordinator): - coordinator._subscription.subscribe(topics=['foobar']) - assignor = RoundRobinPartitionAssignor() - coordinator.config['assignors'] = (assignor,) - mocker.spy(assignor, 'on_assignment') - assert assignor.on_assignment.call_count == 0 - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode()) - assert assignor.on_assignment.call_count == 1 - assignor.on_assignment.assert_called_with(assignment) - - -def test_join_complete_with_sticky_assignor(mocker, coordinator): - coordinator._subscription.subscribe(topics=['foobar']) - assignor = StickyPartitionAssignor() - coordinator.config['assignors'] = (assignor,) - mocker.spy(assignor, 'on_assignment') - mocker.spy(assignor, 'on_generation_assignment') - assert assignor.on_assignment.call_count == 0 - assert assignor.on_generation_assignment.call_count == 0 - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode()) - assert assignor.on_assignment.call_count == 1 - assert assignor.on_generation_assignment.call_count == 1 - assignor.on_assignment.assert_called_with(assignment) - assignor.on_generation_assignment.assert_called_with(0) - - -def test_subscription_listener(mocker, coordinator): - listener = mocker.MagicMock(spec=ConsumerRebalanceListener) - coordinator._subscription.subscribe( - topics=['foobar'], - listener=listener) - - coordinator._on_join_prepare(0, 'member-foo') - assert listener.on_partitions_revoked.call_count == 1 - listener.on_partitions_revoked.assert_called_with(set([])) - - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete( - 0, 'member-foo', 'roundrobin', assignment.encode()) - assert listener.on_partitions_assigned.call_count == 1 - listener.on_partitions_assigned.assert_called_with({TopicPartition('foobar', 0), TopicPartition('foobar', 1)}) - - -def test_subscription_listener_failure(mocker, coordinator): - listener = mocker.MagicMock(spec=ConsumerRebalanceListener) - coordinator._subscription.subscribe( - topics=['foobar'], - listener=listener) - - # exception raised in listener should not be re-raised by coordinator - listener.on_partitions_revoked.side_effect = Exception('crash') - coordinator._on_join_prepare(0, 'member-foo') - assert listener.on_partitions_revoked.call_count == 1 - - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') - coordinator._on_join_complete( - 0, 'member-foo', 'roundrobin', assignment.encode()) - assert listener.on_partitions_assigned.call_count == 1 - - -def test_perform_assignment(mocker, coordinator): - member_metadata = { - 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''), - 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'') - } - assignments = { - 'member-foo': ConsumerProtocolMemberAssignment( - 0, [('foo1', [0])], b''), - 'member-bar': ConsumerProtocolMemberAssignment( - 0, [('foo1', [1])], b'') - } - - mocker.patch.object(RoundRobinPartitionAssignor, 'assign') - RoundRobinPartitionAssignor.assign.return_value = assignments - - ret = coordinator._perform_assignment( - 'member-foo', 'roundrobin', - [(member, metadata.encode()) - for member, metadata in member_metadata.items()]) - - assert RoundRobinPartitionAssignor.assign.call_count == 1 - RoundRobinPartitionAssignor.assign.assert_called_with( - coordinator._client.cluster, member_metadata) - assert ret == assignments - - -def test_on_join_prepare(coordinator): - coordinator._subscription.subscribe(topics=['foobar']) - coordinator._on_join_prepare(0, 'member-foo') - - -def test_need_rejoin(coordinator): - # No subscription - no rejoin - assert coordinator.need_rejoin() is False - - coordinator._subscription.subscribe(topics=['foobar']) - assert coordinator.need_rejoin() is True - - -def test_refresh_committed_offsets_if_needed(mocker, coordinator): - mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', - return_value = { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) - coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) - assert coordinator._subscription.needs_fetch_committed_offsets is True - coordinator.refresh_committed_offsets_if_needed() - assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'') - assert TopicPartition('foobar', 1) not in assignment - assert coordinator._subscription.needs_fetch_committed_offsets is False - - -def test_fetch_committed_offsets(mocker, coordinator): - - # No partitions, no IO polling - mocker.patch.object(coordinator._client, 'poll') - assert coordinator.fetch_committed_offsets([]) == {} - assert coordinator._client.poll.call_count == 0 - - # general case -- send offset fetch request, get successful future - mocker.patch.object(coordinator, 'ensure_coordinator_ready') - mocker.patch.object(coordinator, '_send_offset_fetch_request', - return_value=Future().success('foobar')) - partitions = [TopicPartition('foobar', 0)] - ret = coordinator.fetch_committed_offsets(partitions) - assert ret == 'foobar' - coordinator._send_offset_fetch_request.assert_called_with(partitions) - assert coordinator._client.poll.call_count == 1 - - # Failed future is raised if not retriable - coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) - coordinator._client.poll.reset_mock() - try: - coordinator.fetch_committed_offsets(partitions) - except AssertionError: - pass - else: - assert False, 'Exception not raised when expected' - assert coordinator._client.poll.call_count == 1 - - coordinator._client.poll.reset_mock() - coordinator._send_offset_fetch_request.side_effect = [ - Future().failure(Errors.RequestTimedOutError), - Future().success('fizzbuzz')] - - ret = coordinator.fetch_committed_offsets(partitions) - assert ret == 'fizzbuzz' - assert coordinator._client.poll.call_count == 2 # call + retry - - -def test_close(mocker, coordinator): - mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync') - mocker.patch.object(coordinator, '_handle_leave_group_response') - mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) - coordinator.coordinator_id = 0 - coordinator._generation = Generation(1, 'foobar', b'') - coordinator.state = MemberState.STABLE - cli = coordinator._client - mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) - mocker.patch.object(cli, 'poll') - - coordinator.close() - assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 - coordinator._handle_leave_group_response.assert_called_with('foobar') - - assert coordinator.generation() is None - assert coordinator._generation is Generation.NO_GENERATION - assert coordinator.state is MemberState.UNJOINED - assert coordinator.rejoin_needed is True - - -@pytest.fixture -def offsets(): - return { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), - } - - -def test_commit_offsets_async(mocker, coordinator, offsets): - mocker.patch.object(coordinator._client, 'poll') - mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) - mocker.patch.object(coordinator, 'ensure_coordinator_ready') - mocker.patch.object(coordinator, '_send_offset_commit_request', - return_value=Future().success('fizzbuzz')) - coordinator.commit_offsets_async(offsets) - assert coordinator._send_offset_commit_request.call_count == 1 - - -def test_commit_offsets_sync(mocker, coordinator, offsets): - mocker.patch.object(coordinator, 'ensure_coordinator_ready') - mocker.patch.object(coordinator, '_send_offset_commit_request', - return_value=Future().success('fizzbuzz')) - cli = coordinator._client - mocker.patch.object(cli, 'poll') - - # No offsets, no calls - assert coordinator.commit_offsets_sync({}) is None - assert coordinator._send_offset_commit_request.call_count == 0 - assert cli.poll.call_count == 0 - - ret = coordinator.commit_offsets_sync(offsets) - assert coordinator._send_offset_commit_request.call_count == 1 - assert cli.poll.call_count == 1 - assert ret == 'fizzbuzz' - - # Failed future is raised if not retriable - coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) - coordinator._client.poll.reset_mock() - try: - coordinator.commit_offsets_sync(offsets) - except AssertionError: - pass - else: - assert False, 'Exception not raised when expected' - assert coordinator._client.poll.call_count == 1 - - coordinator._client.poll.reset_mock() - coordinator._send_offset_commit_request.side_effect = [ - Future().failure(Errors.RequestTimedOutError), - Future().success('fizzbuzz')] - - ret = coordinator.commit_offsets_sync(offsets) - assert ret == 'fizzbuzz' - assert coordinator._client.poll.call_count == 2 # call + retry - - -@pytest.mark.parametrize( - 'api_version,group_id,enable,error,has_auto_commit,commit_offsets,warn,exc', [ - ((0, 8, 0), 'foobar', True, None, False, False, True, False), - ((0, 8, 1), 'foobar', True, None, True, True, False, False), - ((0, 8, 2), 'foobar', True, None, True, True, False, False), - ((0, 9), 'foobar', False, None, False, False, False, False), - ((0, 9), 'foobar', True, Errors.UnknownMemberIdError(), True, True, True, False), - ((0, 9), 'foobar', True, Errors.IllegalGenerationError(), True, True, True, False), - ((0, 9), 'foobar', True, Errors.RebalanceInProgressError(), True, True, True, False), - ((0, 9), 'foobar', True, Exception(), True, True, False, True), - ((0, 9), 'foobar', True, None, True, True, False, False), - ((0, 9), None, True, None, False, False, True, False), - ]) -def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, - error, has_auto_commit, commit_offsets, - warn, exc): - mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') - mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') - client = KafkaClient(api_version=api_version) - coordinator = ConsumerCoordinator(client, SubscriptionState(), - Metrics(), - api_version=api_version, - session_timeout_ms=30000, - max_poll_interval_ms=30000, - enable_auto_commit=enable, - group_id=group_id) - commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', - side_effect=error) - if has_auto_commit: - assert coordinator.next_auto_commit_deadline is not None - else: - assert coordinator.next_auto_commit_deadline is None - - assert coordinator._maybe_auto_commit_offsets_sync() is None - - if has_auto_commit: - assert coordinator.next_auto_commit_deadline is not None - - assert commit_sync.call_count == (1 if commit_offsets else 0) - assert mock_warn.call_count == (1 if warn else 0) - assert mock_exc.call_count == (1 if exc else 0) - - -@pytest.fixture -def patched_coord(mocker, coordinator): - coordinator._subscription.subscribe(topics=['foobar']) - mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) - coordinator.coordinator_id = 0 - mocker.patch.object(coordinator, 'coordinator', return_value=0) - coordinator._generation = Generation(0, 'foobar', b'') - coordinator.state = MemberState.STABLE - coordinator.rejoin_needed = False - mocker.patch.object(coordinator, 'need_rejoin', return_value=False) - mocker.patch.object(coordinator._client, 'least_loaded_node', - return_value=1) - mocker.patch.object(coordinator._client, 'ready', return_value=True) - mocker.patch.object(coordinator._client, 'send') - mocker.patch.object(coordinator, '_heartbeat_thread') - mocker.spy(coordinator, '_failed_request') - mocker.spy(coordinator, '_handle_offset_commit_response') - mocker.spy(coordinator, '_handle_offset_fetch_response') - return coordinator - - -def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): - patched_coord.coordinator_unknown.return_value = True - patched_coord.coordinator_id = None - patched_coord.coordinator.return_value = None - - # No offsets - ret = patched_coord._send_offset_commit_request({}) - assert isinstance(ret, Future) - assert ret.succeeded() - - # No coordinator - ret = patched_coord._send_offset_commit_request(offsets) - assert ret.failed() - assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) - - -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest[0]), - ((0, 8, 2), OffsetCommitRequest[1]), - ((0, 9), OffsetCommitRequest[2])]) -def test_send_offset_commit_request_versions(patched_coord, offsets, - api_version, req_type): - expect_node = 0 - patched_coord.config['api_version'] = api_version - - patched_coord._send_offset_commit_request(offsets) - (node, request), _ = patched_coord._client.send.call_args - assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) - - -def test_send_offset_commit_request_failure(patched_coord, offsets): - _f = Future() - patched_coord._client.send.return_value = _f - future = patched_coord._send_offset_commit_request(offsets) - (node, request), _ = patched_coord._client.send.call_args - error = Exception() - _f.failure(error) - patched_coord._failed_request.assert_called_with(0, request, future, error) - assert future.failed() - assert future.exception is error - - -def test_send_offset_commit_request_success(mocker, patched_coord, offsets): - _f = Future() - patched_coord._client.send.return_value = _f - future = patched_coord._send_offset_commit_request(offsets) - (node, request), _ = patched_coord._client.send.call_args - response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]) - _f.success(response) - patched_coord._handle_offset_commit_response.assert_called_with( - offsets, future, mocker.ANY, response) - - -@pytest.mark.parametrize('response,error,dead', [ - (OffsetCommitResponse[0]([('foobar', [(0, 30), (1, 30)])]), - Errors.GroupAuthorizationFailedError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 12), (1, 12)])]), - Errors.OffsetMetadataTooLargeError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), - Errors.InvalidCommitOffsetSizeError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), - Errors.GroupLoadInProgressError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), - Errors.GroupCoordinatorNotAvailableError, True), - (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), - Errors.NotCoordinatorForGroupError, True), - (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), - Errors.RequestTimedOutError, True), - (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), - Errors.CommitFailedError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 22), (1, 22)])]), - Errors.CommitFailedError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 27), (1, 27)])]), - Errors.CommitFailedError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 17), (1, 17)])]), - Errors.InvalidTopicError, False), - (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), - Errors.TopicAuthorizationFailedError, False), -]) -def test_handle_offset_commit_response(mocker, patched_coord, offsets, - response, error, dead): - future = Future() - patched_coord._handle_offset_commit_response(offsets, future, time.time(), - response) - assert isinstance(future.exception, error) - assert patched_coord.coordinator_id is (None if dead else 0) - - -@pytest.fixture -def partitions(): - return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] - - -def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): - patched_coord.coordinator_unknown.return_value = True - patched_coord.coordinator_id = None - patched_coord.coordinator.return_value = None - - # No partitions - ret = patched_coord._send_offset_fetch_request([]) - assert isinstance(ret, Future) - assert ret.succeeded() - assert ret.value == {} - - # No coordinator - ret = patched_coord._send_offset_fetch_request(partitions) - assert ret.failed() - assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) - - -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest[0]), - ((0, 8, 2), OffsetFetchRequest[1]), - ((0, 9), OffsetFetchRequest[1])]) -def test_send_offset_fetch_request_versions(patched_coord, partitions, - api_version, req_type): - # assuming fixture sets coordinator=0, least_loaded_node=1 - expect_node = 0 - patched_coord.config['api_version'] = api_version - - patched_coord._send_offset_fetch_request(partitions) - (node, request), _ = patched_coord._client.send.call_args - assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) - - -def test_send_offset_fetch_request_failure(patched_coord, partitions): - _f = Future() - patched_coord._client.send.return_value = _f - future = patched_coord._send_offset_fetch_request(partitions) - (node, request), _ = patched_coord._client.send.call_args - error = Exception() - _f.failure(error) - patched_coord._failed_request.assert_called_with(0, request, future, error) - assert future.failed() - assert future.exception is error - - -def test_send_offset_fetch_request_success(patched_coord, partitions): - _f = Future() - patched_coord._client.send.return_value = _f - future = patched_coord._send_offset_fetch_request(partitions) - (node, request), _ = patched_coord._client.send.call_args - response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]) - _f.success(response) - patched_coord._handle_offset_fetch_response.assert_called_with( - future, response) - - -@pytest.mark.parametrize('response,error,dead', [ - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), - Errors.GroupLoadInProgressError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), - Errors.NotCoordinatorForGroupError, True), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), - Errors.UnknownMemberIdError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), - Errors.IllegalGenerationError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), - Errors.TopicAuthorizationFailedError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), - None, False), -]) -def test_handle_offset_fetch_response(patched_coord, offsets, - response, error, dead): - future = Future() - patched_coord._handle_offset_fetch_response(future, response) - if error is not None: - assert isinstance(future.exception, error) - else: - assert future.succeeded() - assert future.value == offsets - assert patched_coord.coordinator_id is (None if dead else 0) - - -def test_heartbeat(mocker, patched_coord): - heartbeat = HeartbeatThread(patched_coord) - - assert not heartbeat.enabled and not heartbeat.closed - - heartbeat.enable() - assert heartbeat.enabled - - heartbeat.disable() - assert not heartbeat.enabled - - # heartbeat disables when un-joined - heartbeat.enable() - patched_coord.state = MemberState.UNJOINED - heartbeat._run_once() - assert not heartbeat.enabled - - heartbeat.enable() - patched_coord.state = MemberState.STABLE - mocker.spy(patched_coord, '_send_heartbeat_request') - mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) - heartbeat._run_once() - assert patched_coord._send_heartbeat_request.call_count == 1 - - heartbeat.close() - assert heartbeat.closed - - -def test_lookup_coordinator_failure(mocker, coordinator): - - mocker.patch.object(coordinator, '_send_group_coordinator_request', - return_value=Future().failure(Exception('foobar'))) - future = coordinator.lookup_coordinator() - assert future.failed() - - -def test_ensure_active_group(mocker, coordinator): - coordinator._subscription.subscribe(topics=['foobar']) - mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) - mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True)) - mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, False]) - mocker.patch.object(coordinator, '_on_join_complete') - mocker.patch.object(coordinator, '_heartbeat_thread') - - coordinator.ensure_active_group() - - coordinator._send_join_group_request.assert_called_once_with() diff --git a/tests/kafka/test_subscription_state.py b/tests/kafka/test_subscription_state.py deleted file mode 100644 index 9718f6af..00000000 --- a/tests/kafka/test_subscription_state.py +++ /dev/null @@ -1,25 +0,0 @@ -# pylint: skip-file -from __future__ import absolute_import - -import pytest - -from kafka.consumer.subscription_state import SubscriptionState - -@pytest.mark.parametrize(('topic_name', 'expectation'), [ - (0, pytest.raises(TypeError)), - (None, pytest.raises(TypeError)), - ('', pytest.raises(ValueError)), - ('.', pytest.raises(ValueError)), - ('..', pytest.raises(ValueError)), - ('a' * 250, pytest.raises(ValueError)), - ('abc/123', pytest.raises(ValueError)), - ('/abc/123', pytest.raises(ValueError)), - ('/abc123', pytest.raises(ValueError)), - ('name with space', pytest.raises(ValueError)), - ('name*with*stars', pytest.raises(ValueError)), - ('name+with+plus', pytest.raises(ValueError)), -]) -def test_topic_name_validation(topic_name, expectation): - state = SubscriptionState() - with expectation: - state._ensure_valid_topic_name(topic_name)