-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement KIP-345 in aiokafka (rebase of #827) #941
Conversation
suppressing lgtm warning
@ods Here's the PR for the tests, I'll work on getting merge conflicts fixed on my end since it's only two files, hopefully by EOD today. Edit: Merge conflicts resolved. |
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #941 +/- ##
==========================================
+ Coverage 94.89% 94.91% +0.01%
==========================================
Files 106 106
Lines 16136 16318 +182
Branches 2592 2615 +23
==========================================
+ Hits 15313 15488 +175
- Misses 549 554 +5
- Partials 274 276 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Hi joshuaherrera, Thanks for the PR, will be joining the review. Really nice, clean PR, thanks for the contribution. Added a few small points to consider. |
while try_join: | ||
try_join = False | ||
|
||
if self._api_version < (0, 10, 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we abstract out this into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This depends on the KIP-394 decision, since this is tied to that. If we move that to another PR than this would revert to how we currently handle this in master.
@@ -1202,46 +1217,67 @@ async def perform_group_join(self): | |||
metadata = metadata.encode() | |||
group_protocol = (assignor.name, metadata) | |||
metadata_list.append(group_protocol) | |||
# for KIP-394 we may have to send a second join request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a completely different effort, maybe worth separating out to another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bulk of this addition is captured in the new while
loop wrapping lines 1222-1279, and the newly added MemberIdRequired error. KIP-394 introduces the MemberIdRequired
error, and according to the KIP, if the client gets this error they should retry the join with the previous member id.
I'd prefer to leave it in since the original author thought it necessary and the KIP itself calls out that this is "an important complement to KIP-345" but if you think this should be moved to it's own PR (and would delay this PR) I can remove the code related to KIP-394 from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tvoinarovskyi Any updates on next steps? If you or @ods think it's best to remove this and leave it for another PR, I'm happy to do so, since it seems like this is the last unresolved thread for this PR. I don't want to hold this up unnecessarily.
tests/test_consumer.py
Outdated
# partitions after rebalance | ||
all_partitions = frozenset(list(c1_partitions) + list(c2_partitions)) | ||
await consumer2.stop() | ||
await asyncio.sleep(25) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we document here exactly the reason this is 25s? As I understand it is derived from default timeouts, maybe we can make it as low as 10s?
The reason I am asking is that CI has limited time to execute and stops like those increase runtime constantly. A better approach would be to poll for results, for example, wait for every second till listener1.revoke_mock.call_count > 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make the polling change as recommended.
tests/test_consumer.py
Outdated
|
||
# stop consumer2, which will trigger yet another rebalance | ||
await consumer2.stop() | ||
await asyncio.sleep(15) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
This is very exciting ! Thanks a lot for taking care of this. Do you have any idea when this could be released? |
@tartieret Please try it out and provide feedback whether it works as expected. |
I'll have someone in my team look at this over the next few days |
hey @ods , replying on behalf of @tartieret, I just tested these changes and they seems to be working for me. |
Changes
Fixes #680
This is a rebase of #827 that adds test for KIP-345 functionality - See #827 (comment)
Checklist
CHANGES
folder<issue_id>.<type>
(e.g.588.bugfix
)issue_id
change it to the pr id after creating the PR.feature
: Signifying a new feature..bugfix
: Signifying a bug fix..doc
: Signifying a documentation improvement..removal
: Signifying a deprecation or removal of public API..misc
: A ticket has been closed, but it is not of interest to users.Fix issue with non-ascii contents in doctest text files.