Skip to content

Commit

Permalink
Add warning for unassigned consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
Arseniy-Popov committed Jan 9, 2025
1 parent a25bea6 commit 571f7aa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
6 changes: 6 additions & 0 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,12 @@ async def start(self) -> None:
f"{consumer.assignment()}",
logging.INFO,
)
if any(not consumer.assignment() for consumer in self.consumer_subgroup):
self._log(
f"Consumer in group {self.group_id} has no partition assignments - topic "
f"{self.topics[0]} may have fewer partitions than consumers",
logging.WARNING,
)

self.running = True

Expand Down
54 changes: 53 additions & 1 deletion tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from unittest.mock import MagicMock, patch
import logging
from unittest.mock import MagicMock, Mock, patch

import pytest
from aiokafka import AIOKafkaConsumer
Expand All @@ -8,6 +9,7 @@
from faststream.exceptions import AckMessage
from faststream.kafka import KafkaBroker, TopicPartition
from faststream.kafka.annotations import KafkaMessage
from faststream.kafka.subscriber.usecase import ConcurrentBetweenPartitionsSubscriber
from tests.brokers.base.consume import BrokerRealConsumeTestcase
from tests.tools import spy_decorator

Expand Down Expand Up @@ -457,6 +459,56 @@ async def handler(msg: KafkaMessage):

await broker.close()

@pytest.mark.asyncio
@pytest.mark.slow
@pytest.mark.parametrize(
("partitions", "warning"),
[
pytest.param(2, True, id="unassigned consumers"),
pytest.param(3, False, id="no unassigned consumers"),
],
)
async def test_concurrent_consume_between_partitions_assignment_warning(
self,
queue: str,
partitions: int,
warning: bool,
):
admin_client = AIOKafkaAdminClient()
try:
await admin_client.start()
await admin_client.create_topics([NewTopic(queue, partitions, 1)])
finally:
await admin_client.close()

consume_broker = self.get_broker()

@consume_broker.subscriber(
queue,
max_workers=3,
auto_commit=False,
group_id="service_1",
)
async def handler(msg: str):
pass

with patch.object(
ConcurrentBetweenPartitionsSubscriber, "_log", Mock()
) as mock:
async with self.patch_broker(consume_broker) as broker:
await broker.start()
await broker.close()
if warning:
assert (
len([x for x in mock.call_args_list if x[0][1] == logging.WARNING])
== 1
)
else:
assert (
len([x for x in mock.call_args_list if x[0][1] == logging.WARNING])
== 0
)

@pytest.mark.asyncio
async def test_consume_without_value(
self,
Expand Down

0 comments on commit 571f7aa

Please sign in to comment.