diff --git a/aiokafka/client.py b/aiokafka/client.py index c1bdf6d1..d7999eb2 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -27,6 +27,7 @@ from aiokafka.util import ( create_future, create_task, parse_kafka_version, get_running_loop ) +from aiokafka.protocol.group import SyncGroupRequest __all__ = ['AIOKafkaClient'] @@ -587,6 +588,7 @@ def _check_api_version_response(self, response): # ((2, 6, 0), DescribeClientQuotasRequest[0]), ((2, 5, 0), DescribeAclsRequest_v2), ((2, 4, 0), ProduceRequest[8]), + ((2, 3, 0), SyncGroupRequest[3]), ((2, 3, 0), FetchRequest[11]), ((2, 2, 0), OffsetRequest[5]), ((2, 1, 0), FetchRequest[10]), diff --git a/aiokafka/consumer/assignors.py b/aiokafka/consumer/assignors.py new file mode 100644 index 00000000..7788575e --- /dev/null +++ b/aiokafka/consumer/assignors.py @@ -0,0 +1,24 @@ +from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor +import abc + + +class AbstractStaticPartitionAssignor(AbstractPartitionAssignor): + """ + Abstract assignor implementation that also supports static assignments (KIP-345) + """ + + @abc.abstractmethod + def assign(self, cluster, members, # lgtm[py/inheritance/signature-mismatch] + member_group_instance_ids): + """Perform group assignment given cluster metadata, member subscriptions + and group_instance_ids + Arguments: + cluster (ClusterMetadata): metadata for use in assignment + members (dict of {member_id: MemberMetadata}): decoded metadata for + each member in the group. + member_group_instance_ids (dict of {member_id: MemberMetadata}): + decoded metadata for each member in the group. + Returns: + dict: {member_id: MemberAssignment} + """ + pass diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 2fa2376d..b3a7cb58 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -65,6 +65,8 @@ class AIOKafkaConsumer: committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str or None): name of the group instance ID used for + static membership (KIP-345) key_deserializer (Callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (Callable, Optional): Any callable that takes a @@ -228,6 +230,7 @@ def __init__(self, *topics, loop=None, bootstrap_servers='localhost', client_id='aiokafka-' + __version__, group_id=None, + group_instance_id=None, key_deserializer=None, value_deserializer=None, fetch_max_wait_ms=500, fetch_max_bytes=52428800, @@ -290,6 +293,7 @@ def __init__(self, *topics, loop=None, sasl_oauth_token_provider=sasl_oauth_token_provider) self._group_id = group_id + self._group_instance_id = group_instance_id self._heartbeat_interval_ms = heartbeat_interval_ms self._session_timeout_ms = session_timeout_ms self._retry_backoff_ms = retry_backoff_ms @@ -381,6 +385,7 @@ async def start(self): self._coordinator = GroupCoordinator( self._client, self._subscription, group_id=self._group_id, + group_instance_id=self._group_instance_id, heartbeat_interval_ms=self._heartbeat_interval_ms, session_timeout_ms=self._session_timeout_ms, retry_backoff_ms=self._retry_backoff_ms, diff --git a/aiokafka/consumer/group_coordinator.py b/aiokafka/consumer/group_coordinator.py index 8d244288..e60fe62d 100644 --- a/aiokafka/consumer/group_coordinator.py +++ b/aiokafka/consumer/group_coordinator.py @@ -9,20 +9,24 @@ from kafka.protocol.commit import ( OffsetCommitRequest_v2 as OffsetCommitRequest, OffsetFetchRequest_v1 as OffsetFetchRequest) -from kafka.protocol.group import ( - HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) +from kafka.protocol.group import HeartbeatRequest, LeaveGroupRequest + +from aiokafka.protocol.group import JoinGroupRequest, SyncGroupRequest + import aiokafka.errors as Errors from aiokafka.structs import OffsetAndMetadata, TopicPartition from aiokafka.client import ConnectionGroup, CoordinationType from aiokafka.util import create_future, create_task +from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor + log = logging.getLogger(__name__) UNKNOWN_OFFSET = -1 -class BaseCoordinator: +class BaseCoordinator(object): def __init__(self, client, subscription, *, exclude_internal_topics=True): @@ -210,6 +214,7 @@ class GroupCoordinator(BaseCoordinator): def __init__(self, client, subscription, *, group_id='aiokafka-default-group', + group_instance_id=None, session_timeout_ms=10000, heartbeat_interval_ms=3000, retry_backoff_ms=100, enable_auto_commit=True, auto_commit_interval_ms=5000, @@ -241,6 +246,7 @@ def __init__(self, client, subscription, *, self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = group_id + self._group_instance_id = group_instance_id self.coordinator_id = None # Coordination flags and futures @@ -346,9 +352,11 @@ def maybe_leave_group(self): return task async def _maybe_leave_group(self): - if self.generation > 0: + if self.generation > 0 and self._group_instance_id is None: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. + # Note: do not send this leave request if we are running in static + # partition assignment mode (when group_instance_id has been set). version = 0 if self._client.api_version < (0, 11, 0) else 1 request = LeaveGroupRequest[version](self.group_id, self.member_id) try: @@ -402,8 +410,18 @@ async def _perform_assignment( 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: + group_instance_id_mapping = {} + # for member_id, group_instance_id, metadata_bytes in members: + for member in members: + if len(member) == 2: + member_id, metadata_bytes = member + group_instance_id = None + elif len(member) == 3: + member_id, group_instance_id, metadata_bytes = member + else: + raise Exception("unknown protocol returned from assignment") metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) + group_instance_id_mapping = group_instance_id member_metadata[member_id] = metadata all_subscribed_topics.update(metadata.subscription) @@ -421,7 +439,12 @@ async def _perform_assignment( " with subscriptions %s", self.group_id, assignor.name, member_metadata) - assignments = assignor.assign(self._cluster, member_metadata) + if isinstance(assignor, AbstractStaticPartitionAssignor): + assignments = assignor.assign( + self._cluster, member_metadata, group_instance_id_mapping + ) + else: + assignments = assignor.assign(self._cluster, member_metadata) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) @@ -1203,46 +1226,67 @@ async def perform_group_join(self): metadata = metadata.encode() group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) + # for KIP-394 we may have to send a second join request + try_join = True + while try_join: + try_join = False + + if self._api_version < (0, 10, 1): + request = JoinGroupRequest[0]( + self.group_id, + self._session_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + elif self._api_version < (0, 11, 0): + request = JoinGroupRequest[1]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + elif self._api_version < (2, 3, 0): + request = JoinGroupRequest[2]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) + else: + request = JoinGroupRequest[3]( + self.group_id, + self._session_timeout_ms, + self._rebalance_timeout_ms, + self._coordinator.member_id, + self._coordinator._group_instance_id, + ConsumerProtocol.PROTOCOL_TYPE, + metadata_list, + ) - if self._api_version < (0, 10, 1): - request = JoinGroupRequest[0]( - self.group_id, - self._session_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - elif self._api_version < (0, 11, 0): - request = JoinGroupRequest[1]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) - else: - request = JoinGroupRequest[2]( - self.group_id, - self._session_timeout_ms, - self._rebalance_timeout_ms, - self._coordinator.member_id, - ConsumerProtocol.PROTOCOL_TYPE, - metadata_list) + # create the request for the coordinator + log.debug("Sending JoinGroup (%s) to coordinator %s", + request, self.coordinator_id) + try: + response = await self._coordinator._send_req(request) + except Errors.KafkaError: + # Return right away. It's a connection error, so backoff will be + # handled by coordinator lookup + return None + if not self._subscription.active: + # Subscription changed. Ignore response and restart group join + return None - # create the request for the coordinator - log.debug("Sending JoinGroup (%s) to coordinator %s", - request, self.coordinator_id) - try: - response = await self._coordinator._send_req(request) - except Errors.KafkaError: - # Return right away. It's a connection error, so backoff will be - # handled by coordinator lookup - return None + error_type = Errors.for_code(response.error_code) - if not self._subscription.active: - # Subscription changed. Ignore response and restart group join - return None + if error_type is Errors.MemberIdRequired: + self._coordinator.member_id = response.member_id + try_join = True - error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Join group response %s", response) self._coordinator.member_id = response.member_id @@ -1301,12 +1345,22 @@ async def perform_group_join(self): async def _on_join_follower(self): # send follower's sync group with an empty assignment - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - []) + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + [], + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + [], + ) log.debug( "Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) @@ -1338,12 +1392,22 @@ async def _on_join_leader(self, response): assignment = assignment.encode() assignment_req.append((member_id, assignment)) - version = 0 if self._api_version < (0, 11, 0) else 1 - request = SyncGroupRequest[version]( - self.group_id, - self._coordinator.generation, - self._coordinator.member_id, - assignment_req) + if self._api_version < (2, 3, 0): + version = 0 if self._api_version < (0, 11, 0) else 1 + request = SyncGroupRequest[version]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + assignment_req, + ) + else: + request = SyncGroupRequest[2]( + self.group_id, + self._coordinator.generation, + self._coordinator.member_id, + self._coordinator._group_instance_id, + assignment_req, + ) log.debug( "Sending leader SyncGroup for group %s to coordinator %s: %s", diff --git a/aiokafka/consumer/subscription_state.py b/aiokafka/consumer/subscription_state.py index b2dbd388..9631b008 100644 --- a/aiokafka/consumer/subscription_state.py +++ b/aiokafka/consumer/subscription_state.py @@ -14,8 +14,6 @@ log = logging.getLogger(__name__) -(List, Future) - class SubscriptionType(Enum): diff --git a/aiokafka/errors.py b/aiokafka/errors.py index c000369b..d067958e 100644 --- a/aiokafka/errors.py +++ b/aiokafka/errors.py @@ -415,6 +415,15 @@ class ListenerNotFound(BrokerResponseError): ) +class MemberIdRequired(BrokerResponseError): + errno = 79 + message = 'MEMBER_ID_REQUIRED' + description = ( + 'Consumer needs to have a valid member ' + 'id before actually entering group' + ) + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \ diff --git a/aiokafka/protocol/group.py b/aiokafka/protocol/group.py new file mode 100644 index 00000000..4bda2b17 --- /dev/null +++ b/aiokafka/protocol/group.py @@ -0,0 +1,95 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.protocol.group import ( + JoinGroupRequest_v0, + JoinGroupRequest_v1, + JoinGroupRequest_v2, + JoinGroupResponse_v0, + JoinGroupResponse_v1, + SyncGroupRequest_v0, + SyncGroupRequest_v1, + SyncGroupResponse_v0, + SyncGroupResponse_v1 + ) + + +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ("throttle_time_ms", Int32), + ("error_code", Int16), + ("generation_id", Int32), + ("group_protocol", String("utf-8")), + ("leader_id", String("utf-8")), + ("member_id", String("utf-8")), + ( + "members", + Array( + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("member_metadata", Bytes), + ), + ), + ) + + +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ("group", String("utf-8")), + ("session_timeout", Int32), + ("rebalance_timeout", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ("protocol_type", String("utf-8")), + ( + "group_protocols", + Array(("protocol_name", String("utf-8")), ("protocol_metadata", Bytes)), + ), + ) + UNKNOWN_MEMBER_ID = "" + + +JoinGroupRequest = [ + JoinGroupRequest_v0, + JoinGroupRequest_v1, + JoinGroupRequest_v2, + JoinGroupRequest_v5, +] +JoinGroupResponse = [ + JoinGroupResponse_v0, + JoinGroupResponse_v1, + JoinGroupRequest_v2, + JoinGroupResponse_v5, +] + + +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = SyncGroupResponse_v1.SCHEMA + + +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ("group", String("utf-8")), + ("generation_id", Int32), + ("member_id", String("utf-8")), + ("group_instance_id", String("utf-8")), + ( + "group_assignment", + Array(("member_id", String("utf-8")), ("member_metadata", Bytes)), + ), + ) + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v3] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v3]