Skip to content

Commit

Permalink
refactor _perform_assignment to use a JoinGroupResponse class as it's…
Browse files Browse the repository at this point in the history
… argument
  • Loading branch information
joshuaherrera committed Nov 28, 2023
1 parent 7ffca59 commit b8b7ffd
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
26 changes: 12 additions & 14 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocol
from aiokafka.protocol.api import Response
from aiokafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from aiokafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest,
SyncGroupRequest, JoinGroupResponse)
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import create_future, create_task

Expand Down Expand Up @@ -398,21 +400,21 @@ async def _on_join_prepare(self, previous_assignment):
" for group %s failed on_partitions_revoked",
self._subscription.listener, self.group_id)

async def _perform_assignment(
self, leader_id, assignment_strategy, members
):
async def _perform_assignment(self, response: Response):
assignment_strategy = response.group_protocol
members = response.members
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, \
'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
# for member_id, group_instance_id, metadata_bytes in members:
for member in members:
if len(member) == 2:
member_id, metadata_bytes = member
group_instance_id = None
elif len(member) == 3:
if isinstance(response, JoinGroupResponse[3]):
member_id, group_instance_id, metadata_bytes = member
elif (isinstance(response, JoinGroupResponse[0]) or
isinstance(response, JoinGroupResponse[1]) or
isinstance(response, JoinGroupResponse[2])):
member_id, metadata_bytes = member
else:
raise Exception("unknown protocol returned from assignment")

Check warning on line 419 in aiokafka/consumer/group_coordinator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/consumer/group_coordinator.py#L419

Added line #L419 was not covered by tests
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
Expand Down Expand Up @@ -1367,11 +1369,7 @@ async def _on_join_leader(self, response):
Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = \
await self._coordinator._perform_assignment(
response.leader_id,
response.group_protocol,
response.members)
group_assignment = await self._coordinator._perform_assignment(response)
except Exception as e:
raise Errors.KafkaError(repr(e))

Expand Down
21 changes: 11 additions & 10 deletions aiokafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aiokafka import errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate
from aiokafka.protocol.api import Response

Check warning on line 12 in aiokafka/coordinator/base.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/coordinator/base.py#L12

Added line #L12 was not covered by tests
from aiokafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from aiokafka.protocol.group import (
HeartbeatRequest,
Expand Down Expand Up @@ -186,19 +187,21 @@ def _on_join_prepare(self, generation, member_id):
pass

@abc.abstractmethod
def _perform_assignment(self, leader_id, protocol, members):
def _perform_assignment(self, response: Response):

Check warning on line 190 in aiokafka/coordinator/base.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/coordinator/base.py#L190

Added line #L190 was not covered by tests
"""Perform assignment for the group.
This is used by the leader to push state to all the members of the group
(e.g. to push partition assignments in the case of the new consumer)
Arguments:
leader_id (str): The id of the leader (which is this member)
protocol (str): the chosen group protocol (assignment strategy)
members (list): [(member_id, metadata_bytes)] from
JoinGroupResponse. metadata_bytes are associated with the chosen
group protocol, and the Coordinator subclass is responsible for
decoding metadata_bytes based on that protocol.
response (Response): A JoinGroupResponse class that implements the
Response abstract class. The following attributes of this class
are used within the function:
protocol (str): the chosen group protocol (assignment strategy)
members (list): [(member_id, metadata_bytes)] from
JoinGroupResponse. metadata_bytes are associated with
the chosen group protocol, and the Coordinator subclass is
responsible for decoding metadata_bytes based on that protocol.
Returns:
dict: {member_id: assignment}; assignment must either be bytes
Expand Down Expand Up @@ -649,9 +652,7 @@ def _on_join_leader(self, response):
Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = self._perform_assignment(
response.leader_id, response.group_protocol, response.members
)
group_assignment = self._perform_assignment(response)

Check warning on line 655 in aiokafka/coordinator/base.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/coordinator/base.py#L655

Added line #L655 was not covered by tests
except Exception as e:
return Future().failure(e)

Expand Down
5 changes: 4 additions & 1 deletion aiokafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import aiokafka.errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate
from aiokafka.protocol.api import Response

Check warning on line 11 in aiokafka/coordinator/consumer.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/coordinator/consumer.py#L11

Added line #L11 was not covered by tests
from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import WeakMethod
Expand Down Expand Up @@ -336,7 +337,9 @@ def time_to_next_poll(self):
self.next_auto_commit_deadline - time.time(), self.time_to_next_heartbeat()
)

def _perform_assignment(self, leader_id, assignment_strategy, members):
def _perform_assignment(self, response: Response):
assignment_strategy = response.group_protocol
members = response.members

Check warning on line 342 in aiokafka/coordinator/consumer.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/coordinator/consumer.py#L340-L342

Added lines #L340 - L342 were not covered by tests
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, "Invalid assignment protocol: %s" % (assignment_strategy,)
member_metadata = {}
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class JoinGroupRequest_v5(Request):
JoinGroupResponse_v0,
JoinGroupResponse_v1,
JoinGroupResponse_v2,
JoinGroupRequest_v5,
JoinGroupResponse_v5,
]


Expand Down

0 comments on commit b8b7ffd

Please sign in to comment.