Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-345 in aiokafka (rebase of #827) #941

Merged
merged 25 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2dcf7b7
Implement KIP-345 in aiokafka
patkivikram Nov 3, 2020
d02ddd9
fixing linting errors
patkivikram Nov 25, 2020
fc83b3b
fixing linting errors
patkivikram Nov 25, 2020
052b66f
fixing linting errors
patkivikram Nov 25, 2020
4aebeb1
Update tests.yml
patkivikram Dec 4, 2020
5c55d46
Fix linting errors
g-clef Dec 4, 2020
39f8fa3
Linting fixed, tests still failing
g-clef Dec 5, 2020
0c4cf27
fixed tests.
g-clef Dec 5, 2020
f1ab871
Undoing a lot of linting
g-clef Dec 7, 2020
99cc6f0
last few lints
g-clef Dec 7, 2020
5a90fc7
Update assignors.py
patkivikram Dec 8, 2020
1695902
fix linting
patkivikram Dec 8, 2020
8017671
fix lgtm exception
g-clef Dec 9, 2020
77747f2
fix trailing space
g-clef Dec 11, 2020
5d807f2
add KIP-345 tests, remove broker version check
joshuaherrera Nov 14, 2023
54d6870
Merge 'upstream/master' into kip-345, resolve conflicts
joshuaherrera Nov 15, 2023
a570839
use aiokafka AbstractPartitionAssignor
joshuaherrera Nov 15, 2023
46f0b40
only test KIP-345 mode with valid Kafka versions
joshuaherrera Nov 15, 2023
c0b934f
Update aiokafka/consumer/group_coordinator.py
joshuaherrera Nov 20, 2023
7ffca59
remove AbstractStaticPartitionAssignor
joshuaherrera Nov 20, 2023
b8b7ffd
refactor _perform_assignment to use a JoinGroupResponse class as it's…
joshuaherrera Nov 28, 2023
140d115
Merge remote-tracking branch 'upstream/master' into kip-345
joshuaherrera Nov 29, 2023
d9bc186
poll periodically in kip-345 tests
joshuaherrera Nov 30, 2023
d430920
update tests to use async_timeout
joshuaherrera Dec 1, 2023
5c60f63
update isinstance function, use JoinGroupResponse_v5 for check
joshuaherrera Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions aiokafka/consumer/assignors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
import abc


class AbstractStaticPartitionAssignor(AbstractPartitionAssignor):
"""
Abstract assignor implementation that also supports static assignments (KIP-345)
"""

@abc.abstractmethod
def assign(self, cluster, members, # lgtm[py/inheritance/signature-mismatch]
member_group_instance_ids):
joshuaherrera marked this conversation as resolved.
Show resolved Hide resolved
"""Perform group assignment given cluster metadata, member subscriptions
and group_instance_ids
Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
member_group_instance_ids (dict of {member_id: MemberMetadata}):
decoded metadata for each member in the group.
Returns:
dict: {member_id: MemberAssignment}
"""
pass
5 changes: 5 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class AIOKafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str or None): name of the group instance ID used for
static membership (KIP-345)
key_deserializer (Callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (Callable, Optional): Any callable that takes a
Expand Down Expand Up @@ -229,6 +231,7 @@ def __init__(self, *topics, loop=None,
bootstrap_servers='localhost',
client_id='aiokafka-' + __version__,
group_id=None,
group_instance_id=None,
key_deserializer=None, value_deserializer=None,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800,
Expand Down Expand Up @@ -291,6 +294,7 @@ def __init__(self, *topics, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

self._group_id = group_id
self._group_instance_id = group_instance_id
self._heartbeat_interval_ms = heartbeat_interval_ms
self._session_timeout_ms = session_timeout_ms
self._retry_backoff_ms = retry_backoff_ms
Expand Down Expand Up @@ -382,6 +386,7 @@ async def start(self):
self._coordinator = GroupCoordinator(
self._client, self._subscription,
group_id=self._group_id,
group_instance_id=self._group_instance_id,
heartbeat_interval_ms=self._heartbeat_interval_ms,
session_timeout_ms=self._session_timeout_ms,
retry_backoff_ms=self._retry_backoff_ms,
Expand Down
166 changes: 114 additions & 52 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import create_future, create_task
from aiokafka.consumer.assignors import AbstractStaticPartitionAssignor


log = logging.getLogger(__name__)

UNKNOWN_OFFSET = -1


class BaseCoordinator:
class BaseCoordinator(object):
joshuaherrera marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, client, subscription, *,
exclude_internal_topics=True):
Expand Down Expand Up @@ -209,6 +211,7 @@ class GroupCoordinator(BaseCoordinator):

def __init__(self, client, subscription, *,
group_id='aiokafka-default-group',
group_instance_id=None,
session_timeout_ms=10000, heartbeat_interval_ms=3000,
retry_backoff_ms=100,
enable_auto_commit=True, auto_commit_interval_ms=5000,
Expand Down Expand Up @@ -240,6 +243,7 @@ def __init__(self, client, subscription, *,
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = group_id
self._group_instance_id = group_instance_id
self.coordinator_id = None

# Coordination flags and futures
Expand Down Expand Up @@ -345,9 +349,11 @@ def maybe_leave_group(self):
return task

async def _maybe_leave_group(self):
if self.generation > 0:
if self.generation > 0 and self._group_instance_id is None:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
# Note: do not send this leave request if we are running in static
# partition assignment mode (when group_instance_id has been set).
version = 0 if self._client.api_version < (0, 11, 0) else 1
request = LeaveGroupRequest[version](self.group_id, self.member_id)
try:
Expand Down Expand Up @@ -401,8 +407,18 @@ async def _perform_assignment(
'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
group_instance_id_mapping = {}
# for member_id, group_instance_id, metadata_bytes in members:
for member in members:
if len(member) == 2:
ods marked this conversation as resolved.
Show resolved Hide resolved
member_id, metadata_bytes = member
group_instance_id = None
elif len(member) == 3:
member_id, group_instance_id, metadata_bytes = member
else:
raise Exception("unknown protocol returned from assignment")
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
group_instance_id_mapping = group_instance_id
joshuaherrera marked this conversation as resolved.
Show resolved Hide resolved
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription)

Expand All @@ -420,7 +436,12 @@ async def _perform_assignment(
" with subscriptions %s", self.group_id, assignor.name,
member_metadata)

assignments = assignor.assign(self._cluster, member_metadata)
if isinstance(assignor, AbstractStaticPartitionAssignor):
assignments = assignor.assign(
self._cluster, member_metadata, group_instance_id_mapping
)
else:
assignments = assignor.assign(self._cluster, member_metadata)
log.debug("Finished assignment for group %s: %s",
self.group_id, assignments)

Expand Down Expand Up @@ -1202,46 +1223,67 @@ async def perform_group_join(self):
metadata = metadata.encode()
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
# for KIP-394 we may have to send a second join request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a completely different effort, maybe worth separating out to another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this addition is captured in the new while loop wrapping lines 1222-1279, and the newly added MemberIdRequired error. KIP-394 introduces the MemberIdRequired error, and according to the KIP, if the client gets this error they should retry the join with the previous member id.

I'd prefer to leave it in since the original author thought it necessary and the KIP itself calls out that this is "an important complement to KIP-345" but if you think this should be moved to it's own PR (and would delay this PR) I can remove the code related to KIP-394 from this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvoinarovskyi Any updates on next steps? If you or @ods think it's best to remove this and leave it for another PR, I'm happy to do so, since it seems like this is the last unresolved thread for this PR. I don't want to hold this up unnecessarily.

try_join = True
while try_join:
try_join = False

if self._api_version < (0, 10, 1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we abstract out this into a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on the KIP-394 decision, since this is tied to that. If we move that to another PR than this would revert to how we currently handle this in master.

request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
elif self._api_version < (2, 3, 0):
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)
else:
request = JoinGroupRequest[3](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
self._coordinator._group_instance_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list,
)

if self._api_version < (0, 10, 1):
request = JoinGroupRequest[0](
self.group_id,
self._session_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
elif self._api_version < (0, 11, 0):
request = JoinGroupRequest[1](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
else:
request = JoinGroupRequest[2](
self.group_id,
self._session_timeout_ms,
self._rebalance_timeout_ms,
self._coordinator.member_id,
ConsumerProtocol.PROTOCOL_TYPE,
metadata_list)
# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None

# create the request for the coordinator
log.debug("Sending JoinGroup (%s) to coordinator %s",
request, self.coordinator_id)
try:
response = await self._coordinator._send_req(request)
except Errors.KafkaError:
# Return right away. It's a connection error, so backoff will be
# handled by coordinator lookup
return None
error_type = Errors.for_code(response.error_code)

if not self._subscription.active:
# Subscription changed. Ignore response and restart group join
return None
if error_type is Errors.MemberIdRequired:
self._coordinator.member_id = response.member_id
try_join = True

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Join group response %s", response)
self._coordinator.member_id = response.member_id
Expand Down Expand Up @@ -1300,12 +1342,22 @@ async def perform_group_join(self):

async def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[])
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
[],
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
[],
)
log.debug(
"Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
Expand Down Expand Up @@ -1337,12 +1389,22 @@ async def _on_join_leader(self, response):
assignment = assignment.encode()
assignment_req.append((member_id, assignment))

version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req)
if self._api_version < (2, 3, 0):
version = 0 if self._api_version < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
assignment_req,
)
else:
request = SyncGroupRequest[2](
self.group_id,
self._coordinator.generation,
self._coordinator.member_id,
self._coordinator._group_instance_id,
assignment_req,
)

log.debug(
"Sending leader SyncGroup for group %s to coordinator %s: %s",
Expand Down
2 changes: 0 additions & 2 deletions aiokafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

log = logging.getLogger(__name__)

(List, Future)


class SubscriptionType(Enum):

Expand Down
9 changes: 9 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,15 @@ class ListenerNotFound(BrokerResponseError):
)


class MemberIdRequired(BrokerResponseError):
errno = 79
message = 'MEMBER_ID_REQUIRED'
description = (
'Consumer needs to have a valid member '
'id before actually entering group'
)


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if (
Expand Down
Loading