-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement KIP-345 in aiokafka (rebase of #827) #941
Changes from all commits
2dcf7b7
d02ddd9
fc83b3b
052b66f
4aebeb1
5c55d46
39f8fa3
0c4cf27
f1ab871
99cc6f0
5a90fc7
1695902
8017671
77747f2
5d807f2
54d6870
a570839
46f0b40
c0b934f
7ffca59
b8b7ffd
140d115
d9bc186
d430920
5c60f63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,14 +8,17 @@ | |
from aiokafka.client import ConnectionGroup, CoordinationType | ||
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor | ||
from aiokafka.coordinator.protocol import ConsumerProtocol | ||
from aiokafka.protocol.api import Response | ||
from aiokafka.protocol.commit import ( | ||
OffsetCommitRequest_v2 as OffsetCommitRequest, | ||
OffsetFetchRequest_v1 as OffsetFetchRequest) | ||
from aiokafka.protocol.group import ( | ||
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) | ||
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, | ||
SyncGroupRequest, JoinGroupResponse, JoinGroupResponse_v5) | ||
from aiokafka.structs import OffsetAndMetadata, TopicPartition | ||
from aiokafka.util import create_future, create_task | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
UNKNOWN_OFFSET = -1 | ||
|
@@ -209,6 +212,7 @@ | |
|
||
def __init__(self, client, subscription, *, | ||
group_id='aiokafka-default-group', | ||
group_instance_id=None, | ||
session_timeout_ms=10000, heartbeat_interval_ms=3000, | ||
retry_backoff_ms=100, | ||
enable_auto_commit=True, auto_commit_interval_ms=5000, | ||
|
@@ -240,6 +244,7 @@ | |
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID | ||
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID | ||
self.group_id = group_id | ||
self._group_instance_id = group_instance_id | ||
self.coordinator_id = None | ||
|
||
# Coordination flags and futures | ||
|
@@ -345,9 +350,11 @@ | |
return task | ||
|
||
async def _maybe_leave_group(self): | ||
if self.generation > 0: | ||
if self.generation > 0 and self._group_instance_id is None: | ||
# this is a minimal effort attempt to leave the group. we do not | ||
# attempt any resending if the request fails or times out. | ||
# Note: do not send this leave request if we are running in static | ||
# partition assignment mode (when group_instance_id has been set). | ||
version = 0 if self._client.api_version < (0, 11, 0) else 1 | ||
request = LeaveGroupRequest[version](self.group_id, self.member_id) | ||
try: | ||
|
@@ -393,15 +400,22 @@ | |
" for group %s failed on_partitions_revoked", | ||
self._subscription.listener, self.group_id) | ||
|
||
async def _perform_assignment( | ||
self, leader_id, assignment_strategy, members | ||
): | ||
async def _perform_assignment(self, response: Response): | ||
assignment_strategy = response.group_protocol | ||
members = response.members | ||
assignor = self._lookup_assignor(assignment_strategy) | ||
assert assignor, \ | ||
'Invalid assignment protocol: %s' % assignment_strategy | ||
member_metadata = {} | ||
all_subscribed_topics = set() | ||
for member_id, metadata_bytes in members: | ||
for member in members: | ||
if isinstance(response, JoinGroupResponse_v5): | ||
member_id, group_instance_id, metadata_bytes = member | ||
elif isinstance(response, (JoinGroupResponse[0], JoinGroupResponse[1], | ||
JoinGroupResponse[2])): | ||
member_id, metadata_bytes = member | ||
else: | ||
raise Exception("unknown protocol returned from assignment") | ||
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) | ||
member_metadata[member_id] = metadata | ||
all_subscribed_topics.update(metadata.subscription) | ||
|
@@ -1202,46 +1216,67 @@ | |
metadata = metadata.encode() | ||
group_protocol = (assignor.name, metadata) | ||
metadata_list.append(group_protocol) | ||
# for KIP-394 we may have to send a second join request | ||
try_join = True | ||
while try_join: | ||
try_join = False | ||
|
||
if self._api_version < (0, 10, 1): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Could we abstract out this into a function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This depends on the KIP-394 decision, since this is tied to that. If we move that to another PR than this would revert to how we currently handle this in master. |
||
request = JoinGroupRequest[0]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list, | ||
) | ||
elif self._api_version < (0, 11, 0): | ||
request = JoinGroupRequest[1]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._rebalance_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list, | ||
) | ||
elif self._api_version < (2, 3, 0): | ||
request = JoinGroupRequest[2]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._rebalance_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list, | ||
) | ||
else: | ||
request = JoinGroupRequest[3]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._rebalance_timeout_ms, | ||
self._coordinator.member_id, | ||
self._coordinator._group_instance_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list, | ||
) | ||
|
||
if self._api_version < (0, 10, 1): | ||
request = JoinGroupRequest[0]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list) | ||
elif self._api_version < (0, 11, 0): | ||
request = JoinGroupRequest[1]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._rebalance_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list) | ||
else: | ||
request = JoinGroupRequest[2]( | ||
self.group_id, | ||
self._session_timeout_ms, | ||
self._rebalance_timeout_ms, | ||
self._coordinator.member_id, | ||
ConsumerProtocol.PROTOCOL_TYPE, | ||
metadata_list) | ||
# create the request for the coordinator | ||
log.debug("Sending JoinGroup (%s) to coordinator %s", | ||
request, self.coordinator_id) | ||
try: | ||
response = await self._coordinator._send_req(request) | ||
except Errors.KafkaError: | ||
# Return right away. It's a connection error, so backoff will be | ||
# handled by coordinator lookup | ||
return None | ||
if not self._subscription.active: | ||
# Subscription changed. Ignore response and restart group join | ||
return None | ||
|
||
# create the request for the coordinator | ||
log.debug("Sending JoinGroup (%s) to coordinator %s", | ||
request, self.coordinator_id) | ||
try: | ||
response = await self._coordinator._send_req(request) | ||
except Errors.KafkaError: | ||
# Return right away. It's a connection error, so backoff will be | ||
# handled by coordinator lookup | ||
return None | ||
error_type = Errors.for_code(response.error_code) | ||
|
||
if not self._subscription.active: | ||
# Subscription changed. Ignore response and restart group join | ||
return None | ||
if error_type is Errors.MemberIdRequired: | ||
self._coordinator.member_id = response.member_id | ||
try_join = True | ||
|
||
error_type = Errors.for_code(response.error_code) | ||
if error_type is Errors.NoError: | ||
log.debug("Join group response %s", response) | ||
self._coordinator.member_id = response.member_id | ||
|
@@ -1300,12 +1335,22 @@ | |
|
||
async def _on_join_follower(self): | ||
# send follower's sync group with an empty assignment | ||
version = 0 if self._api_version < (0, 11, 0) else 1 | ||
request = SyncGroupRequest[version]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
[]) | ||
if self._api_version < (2, 3, 0): | ||
version = 0 if self._api_version < (0, 11, 0) else 1 | ||
request = SyncGroupRequest[version]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
[], | ||
) | ||
else: | ||
request = SyncGroupRequest[2]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
self._coordinator._group_instance_id, | ||
[], | ||
) | ||
log.debug( | ||
"Sending follower SyncGroup for group %s to coordinator %s: %s", | ||
self.group_id, self.coordinator_id, request) | ||
|
@@ -1323,11 +1368,7 @@ | |
Future: resolves to member assignment encoded-bytes | ||
""" | ||
try: | ||
group_assignment = \ | ||
await self._coordinator._perform_assignment( | ||
response.leader_id, | ||
response.group_protocol, | ||
response.members) | ||
group_assignment = await self._coordinator._perform_assignment(response) | ||
except Exception as e: | ||
raise Errors.KafkaError(repr(e)) | ||
|
||
|
@@ -1337,12 +1378,22 @@ | |
assignment = assignment.encode() | ||
assignment_req.append((member_id, assignment)) | ||
|
||
version = 0 if self._api_version < (0, 11, 0) else 1 | ||
request = SyncGroupRequest[version]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
assignment_req) | ||
if self._api_version < (2, 3, 0): | ||
version = 0 if self._api_version < (0, 11, 0) else 1 | ||
request = SyncGroupRequest[version]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
assignment_req, | ||
) | ||
else: | ||
request = SyncGroupRequest[2]( | ||
self.group_id, | ||
self._coordinator.generation, | ||
self._coordinator.member_id, | ||
self._coordinator._group_instance_id, | ||
assignment_req, | ||
) | ||
|
||
log.debug( | ||
"Sending leader SyncGroup for group %s to coordinator %s: %s", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,8 +14,6 @@ | |
|
||
log = logging.getLogger(__name__) | ||
|
||
(List, Future) | ||
|
||
|
||
class SubscriptionType(Enum): | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a completely different effort, maybe worth separating out to another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bulk of this addition is captured in the new
while
loop wrapping lines 1222-1279, and the newly added MemberIdRequired error. KIP-394 introduces theMemberIdRequired
error, and according to the KIP, if the client gets this error they should retry the join with the previous member id.I'd prefer to leave it in since the original author thought it necessary and the KIP itself calls out that this is "an important complement to KIP-345" but if you think this should be moved to it's own PR (and would delay this PR) I can remove the code related to KIP-394 from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tvoinarovskyi Any updates on next steps? If you or @ods think it's best to remove this and leave it for another PR, I'm happy to do so, since it seems like this is the last unresolved thread for this PR. I don't want to hold this up unnecessarily.