diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index aba21b9b..bbf1f75e 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -65,6 +65,8 @@ class AIOKafkaConsumer: committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str or None): name of the group instance ID used for + static membership (KIP-345) key_deserializer (Callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (Callable, Optional): Any callable that takes a @@ -229,6 +231,7 @@ def __init__(self, *topics, loop=None, bootstrap_servers='localhost', client_id='aiokafka-' + __version__, group_id=None, + group_instance_id=None, key_deserializer=None, value_deserializer=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, @@ -291,6 +294,7 @@ def __init__(self, *topics, loop=None, sasl_oauth_token_provider=sasl_oauth_token_provider) self._group_id = group_id + self._group_instance_id = group_instance_id self._heartbeat_interval_ms = heartbeat_interval_ms self._session_timeout_ms = session_timeout_ms self._retry_backoff_ms = retry_backoff_ms @@ -382,6 +386,7 @@ async def start(self): self._coordinator = GroupCoordinator( self._client, self._subscription, group_id=self._group_id, + group_instance_id=self._group_instance_id, heartbeat_interval_ms=self._heartbeat_interval_ms, session_timeout_ms=self._session_timeout_ms, retry_backoff_ms=self._retry_backoff_ms, diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 963d558c..9fdbd26b 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -8,14 +8,17 @@ from aiokafka.client import ConnectionGroup, CoordinationType from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from aiokafka.coordinator.protocol import ConsumerProtocol +from aiokafka.protocol.api import Response from aiokafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest) from aiokafka.protocol.group import ( - HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) + HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, + SyncGroupRequest, JoinGroupResponse, JoinGroupResponse_v5) from aiokafka.structs import OffsetAndMetadata, TopicPartition from aiokafka.util import create_future, create_task + log = logging.getLogger(__name__) UNKNOWN_OFFSET = -1 @@ -209,6 +212,7 @@ class GroupCoordinator(BaseCoordinator): def __init__(self, client, subscription, *, group_id='aiokafka-default-group', + group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, enable_auto_commit=True, auto_commit_interval_ms=5000, @@ -240,6 +244,7 @@ def __init__(self, client, subscription, *, self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = group_id + self._group_instance_id = group_instance_id self.coordinator_id = None # Coordination flags and futures @@ -345,9 +350,11 @@ def maybe_leave_group(self): return task async def _maybe_leave_group(self): - if self.generation > 0: + if self.generation > 0 and self._group_instance_id is None: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. + # Note: do not send this leave request if we are running in static + # partition assignment mode (when group_instance_id has been set). version = 0 if self._client.api_version < (0, 11, 0) else 1 request = LeaveGroupRequest[version](self.group_id, self.member_id) try: @@ -393,15 +400,22 @@ async def _on_join_prepare(self, previous_assignment): " for group %s failed on_partitions_revoked", self._subscription.listener, self.group_id) - async def _perform_assignment( - self, leader_id, assignment_strategy, members - ): + async def _perform_assignment(self, response: Response): + assignment_strategy = response.group_protocol + members = response.members assignor = self._lookup_assignor(assignment_strategy) assert assignor, \ 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + for member in members: + if isinstance(response, JoinGroupResponse_v5): + member_id, group_instance_id, metadata_bytes = member + elif isinstance(response, (JoinGroupResponse[0], JoinGroupResponse[1], + JoinGroupResponse[2])): + member_id, metadata_bytes = member + else: + raise Exception("unknown protocol returned from assignment") metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) member_metadata[member_id] = metadata all_subscribed_topics.update(metadata.subscription) @@ -1202,46 +1216,67 @@ async def perform_group_join(self): metadata = metadata.encode() group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) + # for KIP-394 we may have to send a second join request + try_join = True + while try_join: + try_join = False + + if self._api_version < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self._session_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + elif self._api_version < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + elif self._api_version < (2, 3, 0): + request = JoinGroupRequest[2]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + else: + request = JoinGroupRequest[3]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + self._coordinator._group_instance_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) - if self._api_version < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self._session_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - else: - request = JoinGroupRequest[2]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) + # create the request for the coordinator + log.debug("Sending JoinGroup (%s) to coordinator %s", + request, self.coordinator_id) + try: + response = await self._coordinator._send_req(request) + except Errors.KafkaError: + # Return right away. It's a connection error, so backoff will be + # handled by coordinator lookup + return None + if not self._subscription.active: + # Subscription changed. Ignore response and restart group join + return None - # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", - request, self.coordinator_id) - try: - response = await self._coordinator._send_req(request) - except Errors.KafkaError: - # Return right away. It's a connection error, so backoff will be - # handled by coordinator lookup - return None + error_type = Errors.for_code(response.error_code) - if not self._subscription.active: - # Subscription changed. Ignore response and restart group join - return None + if error_type is Errors.MemberIdRequired: + self._coordinator.member_id = response.member_id + try_join = True - error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Join group response %s", response) self._coordinator.member_id = response.member_id @@ -1300,12 +1335,22 @@ async def perform_group_join(self): async def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - []) + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + [], + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + [], + ) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -1323,11 +1368,7 @@ async def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - group_assignment = \ - await self._coordinator._perform_assignment( - response.leader_id, - response.group_protocol, - response.members) + group_assignment = await self._coordinator._perform_assignment(response) except Exception as e: raise Errors.KafkaError(repr(e)) @@ -1337,12 +1378,22 @@ async def _on_join_leader(self, response): assignment = assignment.encode() assignment_req.append((member_id, assignment)) - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - assignment_req) + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + assignment_req, + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + assignment_req, + ) log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", diff --git a/aiokafka/consumer/subscription_state.py b/aiokafka/consumer/subscription_state.py index b2dbd388..9631b008 100644 --- a/aiokafka/consumer/subscription_state.py +++ b/aiokafka/consumer/subscription_state.py @@ -14,8 +14,6 @@ log = logging.getLogger(__name__) -(List, Future) - class SubscriptionType(Enum): diff --git a/aiokafka/errors.py b/aiokafka/errors.py index d35c04b2..1642e4db 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -856,6 +856,15 @@ class ListenerNotFound(BrokerResponseError): ) +class MemberIdRequired(BrokerResponseError): + errno = 79 + message = 'MEMBER_ID_REQUIRED' + description = ( + 'Consumer needs to have a valid member ' + 'id before actually entering group' + ) + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if ( diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py index a809738a..9e4efb41 100644 --- a/aiokafka/protocol/group.py +++ b/aiokafka/protocol/group.py @@ -36,6 +36,27 @@ class JoinGroupResponse_v2(Response): ) +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("error_code", Int16), + ("generation_id", Int32), + ("group_protocol", String("utf-8")), + ("leader_id", String("utf-8")), + ("member_id", String("utf-8")), + ( + "members", + Array( + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("member_metadata", Bytes), + ), + ), + ) + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -79,8 +100,37 @@ class JoinGroupRequest_v2(Request): UNKNOWN_MEMBER_ID = "" -JoinGroupRequest = [JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2] -JoinGroupResponse = [JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2] +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ("group", String("utf-8")), + ("session_timeout", Int32), + ("rebalance_timeout", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("protocol_type", String("utf-8")), + ( + "group_protocols", + Array(("protocol_name", String("utf-8")), ("protocol_metadata", Bytes)), + ), + ) + UNKNOWN_MEMBER_ID = "" + + +JoinGroupRequest = [ + JoinGroupRequest_v0, + JoinGroupRequest_v1, + JoinGroupRequest_v2, + JoinGroupRequest_v5, +] +JoinGroupResponse = [ + JoinGroupResponse_v0, + JoinGroupResponse_v1, + JoinGroupResponse_v2, + JoinGroupResponse_v5, +] class ProtocolMetadata(Struct): @@ -105,6 +155,12 @@ class SyncGroupResponse_v1(Response): ) +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = SyncGroupResponse_v1.SCHEMA + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -127,8 +183,24 @@ class SyncGroupRequest_v1(Request): SCHEMA = SyncGroupRequest_v0.SCHEMA -SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1] +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ("group", String("utf-8")), + ("generation_id", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ( + "group_assignment", + Array(("member_id", String("utf-8")), ("member_metadata", Bytes)), + ), + ) + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3] class MemberAssignment(Struct): diff --git a/tests/_testutil.py b/tests/_testutil.py index 3dd5087b..894b5266 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -15,6 +15,7 @@ from concurrent import futures from contextlib import contextmanager from functools import wraps +from unittest.mock import Mock from aiokafka import ConsumerRebalanceListener from aiokafka.client import AIOKafkaClient @@ -131,6 +132,18 @@ def on_partitions_assigned(self, assigned): self.assigns.put_nowait(assigned) +class DetectRebalanceListener(ConsumerRebalanceListener): + def __init__(self): + self.revoke_mock = Mock() + self.assign_mock = Mock() + + async def on_partitions_revoked(self, revoked): + self.revoke_mock(revoked) + + async def on_partitions_assigned(self, assigned): + self.assign_mock(assigned) + + class ACLManager: def __init__(self, docker, tag): @@ -457,3 +470,11 @@ async def _wait_kafka(kafka_host, kafka_port, timeout): time.sleep(0.5) if loop.time() - start > timeout: return False + + +async def _wait_mock_count(listener, cnt): + while True: + if (listener.revoke_mock.call_count > cnt + and listener.assign_mock.call_count > cnt): + return + await asyncio.sleep(1) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 5bd51b9e..616d7db9 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -6,6 +6,7 @@ from contextlib import contextmanager import pytest +import async_timeout from aiokafka.abc import ConsumerRebalanceListener from aiokafka.consumer import AIOKafkaConsumer @@ -26,8 +27,8 @@ ) from ._testutil import ( - KafkaIntegrationTestCase, StubRebalanceListener, - run_until_complete, run_in_thread, random_string, kafka_versions) + KafkaIntegrationTestCase, StubRebalanceListener, DetectRebalanceListener, + run_until_complete, run_in_thread, random_string, kafka_versions, _wait_mock_count) class TestConsumerIntegration(KafkaIntegrationTestCase): @@ -2053,3 +2054,205 @@ async def on_partitions_assigned(self, assigned): for i in range(10): msg = await consumer1.getone() self.assertGreaterEqual(int(msg.value), 20) + + @kafka_versions('>=2.3.0') + @run_until_complete + async def test_kip_345_enabled(self): + await self.send_messages(0, list(range(0, 10))) + await self.send_messages(1, list(range(10, 20))) + + # group_instance_id key enables KIP-345 mode + # default session_timeout_ms is 10s + consumer1 = AIOKafkaConsumer( + group_id='test-kip-345-group', + bootstrap_servers=self.hosts, + enable_auto_commit=True, + auto_offset_reset='earliest', + group_instance_id="consumer_1_static_group_member") + consumer2 = AIOKafkaConsumer( + group_id='test-kip-345-group', + bootstrap_servers=self.hosts, + enable_auto_commit=True, + auto_offset_reset='earliest', + group_instance_id="consumer_2_static_group_member") + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + listener1 = DetectRebalanceListener() + listener2 = DetectRebalanceListener() + consumer1.subscribe([self.topic], listener=listener1) + consumer2.subscribe([self.topic], listener=listener2) + # initial rebalance that assigns two partitions to consumer1 + await consumer1.start() + self.add_cleanup(consumer1.stop) + await consumer1.seek_to_committed() + listener1.revoke_mock.assert_called_with(set()) + self.assertEqual(listener1.revoke_mock.call_count, 1) + listener1.assign_mock.assert_called_with({tp0, tp1}) + self.assertEqual(listener1.assign_mock.call_count, 1) + # second rebalance for consumer1, first rebalance for + # consumer2 that assigns 1 partition to both consumer1 + # and consumer2 + await consumer2.start() + self.add_cleanup(consumer2.stop) + await consumer2.seek_to_committed() + + assert consumer1._group_instance_id == "consumer_1_static_group_member" + assert consumer2._group_instance_id == "consumer_2_static_group_member" + + c1_partitions = consumer1.assignment() + c2_partitions = consumer2.assignment() + # take note of assigned partitions + if c1_partitions == {tp0}: + c1_assignment = {tp0} + c2_assignment = {tp1} + else: + c1_assignment = {tp1} + c2_assignment = {tp0} + + listener1.revoke_mock.assert_called_with({tp0, tp1}) + self.assertEqual(listener1.revoke_mock.call_count, 2) + listener1.assign_mock.assert_called_with(c1_assignment) + self.assertEqual(listener1.assign_mock.call_count, 2) + listener2.revoke_mock.assert_called_with(set()) + self.assertEqual(listener2.revoke_mock.call_count, 1) + listener2.assign_mock.assert_called_with(c2_assignment) + self.assertEqual(listener2.assign_mock.call_count, 1) + + # confirm diff partitions + assert c2_partitions != c1_partitions + # unsubscribe from topic, check if + # a third rebalance occurs for consumer1. + # It should not since KIP-345 is active. + consumer2.unsubscribe() + self.assertEqual(listener1.revoke_mock.call_count, 2) + self.assertEqual(listener1.assign_mock.call_count, 2) + # ensure that consumer2's assigned partitions + # are not re-assigned to consumer1 + for p in c2_partitions: + assert p not in c1_partitions + # reassign, ensure that it goes back to same partition + consumer2.subscribe(topics=[self.topic], listener=listener2) + await consumer2._subscription.wait_for_assignment() + # since consumer2 rejoins the group, it receives it's old + # assigned partitions. We assert that a rebalance for all + # partitions was not triggered by checking that the listener2 + # call count is 2, which is the initial rebalance that assigned the + # inital partitions, and this assignment that reassigns the partitions + self.assertEqual(listener2.revoke_mock.call_count, 2) + self.assertEqual(listener2.assign_mock.call_count, 2) + # we check the other consumer's listener to make sure + # it did not undergo a rebalance. The call count should still + # be 2. + self.assertEqual(listener1.revoke_mock.call_count, 2) + self.assertEqual(listener1.assign_mock.call_count, 2) + assert c2_partitions == consumer2.assignment() + + # wait for timeout, consumer1 should get all + # partitions after rebalance + all_partitions = frozenset(list(c1_partitions) + list(c2_partitions)) + await consumer2.stop() + async with async_timeout.timeout(15): + await _wait_mock_count(listener1, 2) + # this is the last rebalance for consumer1, so the count should now be + # 3. + self.assertEqual(listener1.revoke_mock.call_count, 3) + self.assertEqual(listener1.assign_mock.call_count, 3) + # since the timeout has passed, a rebalance should occur + listener1.revoke_mock.assert_called_with(c1_assignment) + listener1.assign_mock.assert_called_with(c1_assignment.union(c2_assignment)) + assert all_partitions == consumer1.assignment() + + @kafka_versions('>=2.3.0') + @run_until_complete + async def test_kip_345_disabled(self): + await self.send_messages(0, list(range(0, 10))) + await self.send_messages(1, list(range(10, 20))) + + # default session_timeout_ms is 10s + consumer1 = AIOKafkaConsumer( + group_id='test-kip-345-group', + bootstrap_servers=self.hosts, + enable_auto_commit=True, + auto_offset_reset='earliest') + consumer2 = AIOKafkaConsumer( + group_id='test-kip-345-group', + bootstrap_servers=self.hosts, + enable_auto_commit=True, + auto_offset_reset='earliest') + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + listener1 = DetectRebalanceListener() + listener2 = DetectRebalanceListener() + consumer1.subscribe([self.topic], listener=listener1) + consumer2.subscribe([self.topic], listener=listener2) + # initial rebalance that assigns two partitions to consumer1 + await consumer1.start() + self.add_cleanup(consumer1.stop) + await consumer1.seek_to_committed() + listener1.revoke_mock.assert_called_with(set()) + self.assertEqual(listener1.revoke_mock.call_count, 1) + listener1.assign_mock.assert_called_with({tp0, tp1}) + self.assertEqual(listener1.assign_mock.call_count, 1) + # second rebalance for consumer1, first rebalance for + # consumer2 that assigns 1 partition to both consumer1 + # and consumer2 + await consumer2.start() + self.add_cleanup(consumer2.stop) + await consumer2.seek_to_committed() + + c1_partitions = consumer1.assignment() + c2_partitions = consumer2.assignment() + # take note of assigned partitions + if c1_partitions == {tp0}: + c1_assignment = {tp0} + c2_assignment = {tp1} + else: + c1_assignment = {tp1} + c2_assignment = {tp0} + + listener1.revoke_mock.assert_called_with({tp0, tp1}) + self.assertEqual(listener1.revoke_mock.call_count, 2) + listener1.assign_mock.assert_called_with(c1_assignment) + self.assertEqual(listener1.assign_mock.call_count, 2) + listener2.revoke_mock.assert_called_with(set()) + self.assertEqual(listener2.revoke_mock.call_count, 1) + listener2.assign_mock.assert_called_with(c2_assignment) + self.assertEqual(listener2.assign_mock.call_count, 1) + + # confirm diff partitions + assert c2_partitions != c1_partitions + # unsubscribe from topic, check if + # a third rebalance occurs for consumer1. + # It should since KIP-345 is inactive. + consumer2.unsubscribe() + # need to wait for rebalance + async with async_timeout.timeout(15): + await _wait_mock_count(listener1, 2) + self.assertEqual(listener1.revoke_mock.call_count, 3) + self.assertEqual(listener1.assign_mock.call_count, 3) + # ensure that consumer2's assigned partitions + # are re-assigned to consumer1 + for p in c2_partitions: + assert p in consumer1.assignment() + + consumer2.subscribe(topics=[self.topic], listener=listener2) + await consumer2._subscription.wait_for_assignment() + # since consumer2 rejoins the group, a rebalance should occur + # for both consumers + async with async_timeout.timeout(15): + await _wait_mock_count(listener1, 3) + async with async_timeout.timeout(15): + await _wait_mock_count(listener2, 1) + self.assertEqual(listener2.revoke_mock.call_count, 2) + self.assertEqual(listener2.assign_mock.call_count, 2) + self.assertEqual(listener1.revoke_mock.call_count, 4) + self.assertEqual(listener1.assign_mock.call_count, 4) + + # stop consumer2, which will trigger yet another rebalance + await consumer2.stop() + async with async_timeout.timeout(15): + await _wait_mock_count(listener1, 4) + self.assertEqual(listener1.revoke_mock.call_count, 5) + self.assertEqual(listener1.assign_mock.call_count, 5)