Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-345 in aiokafka (rebase of #682) #827

Closed
wants to merge 14 commits into from
2 changes: 2 additions & 0 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from aiokafka.util import (
create_future, create_task, parse_kafka_version, get_running_loop
)
from aiokafka.protocol.group import SyncGroupRequest


__all__ = ['AIOKafkaClient']
Expand Down Expand Up @@ -587,6 +588,7 @@ def _check_api_version_response(self, response):
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), SyncGroupRequest[3]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is used to determine broker version. One feature is enough for each version, so this line is excessive.

((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
Expand Down
24 changes: 24 additions & 0 deletions aiokafka/consumer/assignors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
import abc


class AbstractStaticPartitionAssignor(AbstractPartitionAssignor):
"""
Abstract assignor implementation that also supports static assignments (KIP-345)
"""

@abc.abstractmethod
def assign(self, cluster, members, # lgtm[py/inheritance/signature-mismatch]
member_group_instance_ids):
"""Perform group assignment given cluster metadata, member subscriptions
and group_instance_ids
Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
member_group_instance_ids (dict of {member_id: MemberMetadata}):
decoded metadata for each member in the group.
Returns:
dict: {member_id: MemberAssignment}
"""
pass
5 changes: 5 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class AIOKafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str or None): name of the group instance ID used for
static membership (KIP-345)
key_deserializer (Callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (Callable, Optional): Any callable that takes a
Expand Down Expand Up @@ -228,6 +230,7 @@ def __init__(self, *topics, loop=None,
bootstrap_servers='localhost',
client_id='aiokafka-' + __version__,
group_id=None,
group_instance_id=None,
key_deserializer=None, value_deserializer=None,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800,
Expand Down Expand Up @@ -290,6 +293,7 @@ def __init__(self, *topics, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

self._group_id = group_id
self._group_instance_id = group_instance_id
self._heartbeat_interval_ms = heartbeat_interval_ms
self._session_timeout_ms = session_timeout_ms
self._retry_backoff_ms = retry_backoff_ms
Expand Down Expand Up @@ -381,6 +385,7 @@ async def start(self):
self._coordinator = GroupCoordinator(
self._client, self._subscription,
group_id=self._group_id,
group_instance_id=self._group_instance_id,
heartbeat_interval_ms=self._heartbeat_interval_ms,
session_timeout_ms=self._session_timeout_ms,
retry_backoff_ms=self._retry_backoff_ms,
Expand Down
172 changes: 118 additions & 54 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
from kafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from kafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)
from kafka.protocol.group import HeartbeatRequest, LeaveGroupRequest

from aiokafka.protocol.group import JoinGroupRequest, SyncGroupRequest


import aiokafka.errors as Errors
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.util import create_future, create_task
from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor


log = logging.getLogger(__name__)

UNKNOWN_OFFSET = -1


class BaseCoordinator:
class BaseCoordinator(object):

def __init__(self, client, subscription, *,
exclude_internal_topics=True):
Expand Down Expand Up @@ -210,6 +214,7 @@ class GroupCoordinator(BaseCoordinator):

def __init__(self, client, subscription, *,
group_id='aiokafka-default-group',
group_instance_id=None,
session_timeout_ms=10000, heartbeat_interval_ms=3000,
retry_backoff_ms=100,
enable_auto_commit=True, auto_commit_interval_ms=5000,
Expand Down Expand Up @@ -241,6 +246,7 @@ def __init__(self, client, subscription, *,
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = group_id
self._group_instance_id = group_instance_id
self.coordinator_id = None

# Coordination flags and futures
Expand Down Expand Up @@ -346,9 +352,11 @@ def maybe_leave_group(self):
return task

async def _maybe_leave_group(self):
if self.generation > 0:
if self.generation > 0 and self._group_instance_id is None:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
# Note: do not send this leave request if we are running in static
# partition assignment mode (when group_instance_id has been set).
version = 0 if self._client.api_version < (0, 11, 0) else 1
request = LeaveGroupRequest[version](self.group_id, self.member_id)
try:
Expand Down Expand Up @@ -402,8 +410,18 @@ async def _perform_assignment(
'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
group_instance_id_mapping = {}
# for member_id, group_instance_id, metadata_bytes in members:
for member in members:
if len(member) == 2:
member_id, metadata_bytes = member
group_instance_id = None
elif len(member) == 3:
member_id, group_instance_id, metadata_bytes = member
else:
raise Exception("unknown protocol returned from assignment")
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
group_instance_id_mapping = group_instance_id
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription)

Expand All @@ -421,7 +439,12 @@ async def _perform_assignment(
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)

assignments = assignor.assign(self._cluster, member_metadata)
if isinstance(assignor, AbstractStaticPartitionAssignor):
assignments = assignor.assign(
self._cluster, member_metadata, group_instance_id_mapping
)
else:
assignments = assignor.assign(self._cluster, member_metadata)
log.debug("Finished assignment for group %s: %s",
self.group_id, assignments)

Expand Down Expand Up @@ -1203,46 +1226,67 @@ async def perform_group_join(self):
metadata = metadata.encode()
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
# for KIP-394 we may have to send a second join request
try_join = True
while try_join:
try_join = False

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (2, 3, 0):
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
else:
request = JoinGroupRequest[3](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
self._coordinator._group_instance_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
else:
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
error_type = Errors.for_code(response.error_code)

if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None
if error_type is Errors.MemberIdRequired:
self._coordinator.member_id = response.member_id
try_join = True

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Join group response %s", response)
self._coordinator.member_id = response.member_id
Expand Down Expand Up @@ -1301,12 +1345,22 @@ async def perform_group_join(self):

async def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[])
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[],
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
[],
)
log.debug(
"Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
Expand Down Expand Up @@ -1338,12 +1392,22 @@ async def _on_join_leader(self, response):
assignment = assignment.encode()
assignment_req.append((member_id, assignment))

version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req)
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req,
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
assignment_req,
)

log.debug(
"Sending leader SyncGroup for group %s to coordinator %s: %s",
Expand Down
2 changes: 0 additions & 2 deletions aiokafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

log = logging.getLogger(__name__)

(List, Future)


class SubscriptionType(Enum):

Expand Down
9 changes: 9 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,15 @@ class ListenerNotFound(BrokerResponseError):
)


class MemberIdRequired(BrokerResponseError):
errno = 79
message = 'MEMBER_ID_REQUIRED'
description = (
'Consumer needs to have a valid member '
'id before actually entering group'
)


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \
Expand Down
Loading