Skip to content

Commit

Permalink
Add concurrent-between-partitions kafka subscriber (#2017)
Browse files Browse the repository at this point in the history
* Add concurrent between partitions kafka subscriber

* Add warning for unassigned consumers

* docs: generate API References

* Fix ConcurrentBetweenPartitionsSubscriber logging

* docs: generate API References

* Add default logging consumer rebalance listener factory

* docs: generate API References

* Move unassigned consumer warning to rebalance listener

* Tweak constructor and tests

* Switch to LoggingListenerProxy and return f-strings in logs

* docs: generate API References

---------

Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
Co-authored-by: Arseniy-Popov <Arseniy-Popov@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 12, 2025
1 parent db1033d commit dfd7b9a
Show file tree
Hide file tree
Showing 19 changed files with 646 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2025-01-08T14:22:57Z"
"generated_at": "2025-01-10T14:53:01Z"
}
5 changes: 5 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,14 @@ search:
- [KafkaRouter](api/faststream/kafka/fastapi/KafkaRouter.md)
- fastapi
- [KafkaRouter](api/faststream/kafka/fastapi/fastapi/KafkaRouter.md)
- listener
- [LoggingListenerProxy](api/faststream/kafka/listener/LoggingListenerProxy.md)
- message
- [ConsumerProtocol](api/faststream/kafka/message/ConsumerProtocol.md)
- [FakeConsumer](api/faststream/kafka/message/FakeConsumer.md)
- [KafkaAckableMessage](api/faststream/kafka/message/KafkaAckableMessage.md)
- [KafkaMessage](api/faststream/kafka/message/KafkaMessage.md)
- [KafkaRawMessage](api/faststream/kafka/message/KafkaRawMessage.md)
- opentelemetry
- [KafkaTelemetryMiddleware](api/faststream/kafka/opentelemetry/KafkaTelemetryMiddleware.md)
- middleware
Expand Down Expand Up @@ -677,13 +680,15 @@ search:
- subscriber
- asyncapi
- [AsyncAPIBatchSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIBatchSubscriber.md)
- [AsyncAPIConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md)
- [AsyncAPIConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md)
- [AsyncAPIDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md)
- [AsyncAPISubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPISubscriber.md)
- factory
- [create_subscriber](api/faststream/kafka/subscriber/factory/create_subscriber.md)
- usecase
- [BatchSubscriber](api/faststream/kafka/subscriber/usecase/BatchSubscriber.md)
- [ConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md)
- [ConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentDefaultSubscriber.md)
- [DefaultSubscriber](api/faststream/kafka/subscriber/usecase/DefaultSubscriber.md)
- [LogicSubscriber](api/faststream/kafka/subscriber/usecase/LogicSubscriber.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/listener/LoggingListenerProxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.listener.LoggingListenerProxy
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/message/KafkaRawMessage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.message.KafkaRawMessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentBetweenPartitionsSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentBetweenPartitionsSubscriber
7 changes: 7 additions & 0 deletions docs/docs/en/kafka/Subscriber/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@ async def base_handler(
):
...
```


## Concurrent processing

There are two possible modes of concurrent message processing:
- With `auto_commit=False` and `max_workers` > 1, a handler processes all messages concurrently in a at-most-once semantic.
- With `auto_commit=True` and `max_workers` > 1, processing is concurrent between topic partitions and sequential within a partition to ensure reliable at-least-once processing. Maximum concurrency is achieved when total number of workers across all application instances running workers in the same consumer group is equal to the number of partitions in the topic. Increasing worker count beyond that will result in idle workers as not more than one consumer from a consumer group can be consuming from the same partition.
17 changes: 17 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SubscriberUsecase(
extra_watcher_options: "AnyDict"
extra_context: "AnyDict"
graceful_timeout: Optional[float]
logger: Optional["LoggerProto"]

_broker_dependencies: Iterable["Depends"]
_call_options: Optional["_CallOptions"]
Expand Down Expand Up @@ -162,6 +163,7 @@ def setup( # type: ignore[override]
self._producer = producer
self.graceful_timeout = graceful_timeout
self.extra_context = extra_context
self.logger = logger

self.watcher = get_watcher_context(logger, self._no_ack, self._retry)

Expand Down Expand Up @@ -470,3 +472,18 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]:
)

return payloads

def _log(
self,
log_level: int,
message: str,
extra: Optional["AnyDict"] = None,
exc_info: Optional[Exception] = None,
) -> None:
if self.logger is not None:
self.logger.log(
log_level,
message,
extra=extra,
exc_info=exc_info,
)
48 changes: 36 additions & 12 deletions faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
TYPE_CHECKING,
Any,
Callable,
Collection,
Dict,
Iterable,
Literal,
Expand Down Expand Up @@ -41,6 +42,7 @@
)
from faststream.kafka.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentBetweenPartitionsSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
Expand All @@ -62,6 +64,7 @@ class KafkaRegistrator(
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
],
]
_publishers: Dict[
Expand Down Expand Up @@ -382,7 +385,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -763,7 +766,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1144,7 +1147,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1528,7 +1531,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand All @@ -1555,7 +1558,14 @@ def subscriber(
] = (),
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
filter: Annotated[
"Filter[KafkaMessage]",
Expand Down Expand Up @@ -1602,6 +1612,7 @@ def subscriber(
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
]:
subscriber = super().subscriber(
create_subscriber(
Expand Down Expand Up @@ -1660,13 +1671,26 @@ def subscriber(

else:
if max_workers > 1:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
if not auto_commit:
return cast(
"AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber
).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
else:
return cast(
"AsyncAPIConcurrentDefaultSubscriber", subscriber
).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
filter_=filter,
Expand Down
18 changes: 16 additions & 2 deletions faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
)
from faststream.kafka.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentBetweenPartitionsSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
Expand Down Expand Up @@ -2621,12 +2622,20 @@ def subscriber(
] = False,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
) -> Union[
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
]:
subscriber = super().subscriber(
*topics,
Expand Down Expand Up @@ -2683,7 +2692,12 @@ def subscriber(
return cast("AsyncAPIBatchSubscriber", subscriber)
else:
if max_workers > 1:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
if not auto_commit:
return cast(
"AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber
)
else:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber)

Expand Down
63 changes: 63 additions & 0 deletions faststream/kafka/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import logging
from typing import TYPE_CHECKING, Optional, Set

from aiokafka import ConsumerRebalanceListener

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, TopicPartition

from faststream.types import AnyDict, LoggerProto


class LoggingListenerProxy(ConsumerRebalanceListener): # type: ignore[misc]
"""Logs partition assignments and passes calls to user-supplied listener."""

def __init__(
self,
consumer: "AIOKafkaConsumer",
logger: Optional["LoggerProto"],
listener: Optional[ConsumerRebalanceListener],
):
self.consumer = consumer
self.logger = logger
self.listener = listener

async def on_partitions_revoked(self, revoked: Set["TopicPartition"]) -> None:
if self.listener:
call_result = self.listener.on_partitions_revoked(revoked)
if asyncio.iscoroutine(call_result):
await call_result

async def on_partitions_assigned(self, assigned: Set["TopicPartition"]) -> None:
self._log(
logging.INFO,
f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: "
f"{assigned}",
)
if not assigned:
self._log(
logging.WARNING,
f"Consumer in group {self.consumer._group_id} has no partition assignments - topics "
f"{self.consumer._subscription.topics} may have fewer partitions than consumers",
)

if self.listener:
call_result = self.listener.on_partitions_assigned(assigned)
if asyncio.iscoroutine(call_result):
await call_result

def _log(
self,
log_level: int,
message: str,
extra: Optional["AnyDict"] = None,
exc_info: Optional[Exception] = None,
) -> None:
if self.logger is not None:
self.logger.log(
log_level,
message,
extra=extra,
exc_info=exc_info,
)
12 changes: 8 additions & 4 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import TYPE_CHECKING, Any, Protocol, Tuple, Union
from dataclasses import dataclass
from typing import Any, Protocol, Tuple, Union

from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import TopicPartition as AIOKafkaTopicPartition

from faststream.broker.message import StreamMessage

if TYPE_CHECKING:
from aiokafka import ConsumerRecord


class ConsumerProtocol(Protocol):
"""A protocol for Kafka consumers."""
Expand Down Expand Up @@ -38,6 +37,11 @@ def seek(
FAKE_CONSUMER = FakeConsumer()


@dataclass
class KafkaRawMessage(ConsumerRecord): # type: ignore[misc]
consumer: AIOKafkaConsumer


class KafkaMessage(
StreamMessage[
Union[
Expand Down
Loading

0 comments on commit dfd7b9a

Please sign in to comment.