Skip to content

Commit

Permalink
feat: add BatchBufferOverflowException (#1990)
Browse files Browse the repository at this point in the history
* feat: add BatchBufferOverflowException

* docs: generate API References

* fix: use pre-commit

* Update producer.py

* chore: fix CI

* chore: fix CI

---------

Co-authored-by: spataphore1337 <spataphore1337@users.noreply.github.com>
Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
Co-authored-by: Nikita Pastukhov <diementros@yandex.ru>
  • Loading branch information
4 people authored Dec 16, 2024
1 parent 001aedd commit 684ef1a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ search:
- [KafkaLoggingBroker](api/faststream/kafka/broker/logging/KafkaLoggingBroker.md)
- registrator
- [KafkaRegistrator](api/faststream/kafka/broker/registrator/KafkaRegistrator.md)
- exceptions
- [BatchBufferOverflowException](api/faststream/kafka/exceptions/BatchBufferOverflowException.md)
- fastapi
- [Context](api/faststream/kafka/fastapi/Context.md)
- [KafkaRouter](api/faststream/kafka/fastapi/KafkaRouter.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.exceptions.BatchBufferOverflowException
2 changes: 1 addition & 1 deletion docs/docs/en/public_api
14 changes: 14 additions & 0 deletions faststream/kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faststream.exceptions import FastStreamException


class BatchBufferOverflowException(FastStreamException):
"""Exception raised when a buffer overflow occurs when adding a new message to the batches."""

def __init__(self, message_position: int) -> None:
self.message_position = message_position

def __str__(self) -> str:
return (
f"The batch buffer is full. The position of the message"
f" in the transferred collection at which the overflow occurred: {self.message_position}"
)
7 changes: 5 additions & 2 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from faststream.broker.publisher.proto import ProducerProto
from faststream.broker.utils import resolve_custom_func
from faststream.exceptions import OperationForbiddenError
from faststream.kafka.exceptions import BatchBufferOverflowException
from faststream.kafka.message import KafkaMessage
from faststream.kafka.parser import AioKafkaParser

Expand Down Expand Up @@ -100,7 +101,7 @@ async def publish_batch(
reply_to,
)

for msg in msgs:
for message_position, msg in enumerate(msgs):
message, content_type = encode_message(msg)

if content_type:
Expand All @@ -111,12 +112,14 @@ async def publish_batch(
else:
final_headers = headers_to_send.copy()

batch.append(
metadata = batch.append(
key=None,
value=message,
timestamp=timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
)
if metadata is None:
raise BatchBufferOverflowException(message_position=message_position)

send_future = await self._producer.send_batch(batch, topic, partition=partition)
if not no_confirm:
Expand Down
22 changes: 20 additions & 2 deletions tests/brokers/kafka/test_publish.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import asyncio
from typing import Any
from unittest.mock import Mock

import pytest

from faststream import Context
from faststream.kafka import KafkaBroker, KafkaResponse
from faststream.kafka.exceptions import BatchBufferOverflowException
from tests.brokers.base.publish import BrokerPublishTestcase


@pytest.mark.kafka
class TestPublish(BrokerPublishTestcase):
def get_broker(self, apply_types: bool = False):
return KafkaBroker(apply_types=apply_types)
def get_broker(self, apply_types: bool = False, **kwargs: Any) -> KafkaBroker:
return KafkaBroker(apply_types=apply_types, **kwargs)

@pytest.mark.asyncio
async def test_publish_batch(self, queue: str):
Expand Down Expand Up @@ -133,3 +135,19 @@ async def handle_next(msg=Context("message")):
body=b"1",
key=b"1",
)

@pytest.mark.asyncio
async def test_raise_buffer_overflow_exception(
self, queue: str, mock: Mock
) -> None:
pub_broker = self.get_broker(max_batch_size=16)

@pub_broker.subscriber(queue)
async def handler(m) -> None:
pass

async with self.patch_broker(pub_broker) as br:
await br.start()
with pytest.raises(BatchBufferOverflowException) as e:
await br.publish_batch(1, "Hello, world!", topic=queue, no_confirm=True)
assert e.value.message_position == 1
6 changes: 4 additions & 2 deletions tests/opentelemetry/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,19 @@ async def handler(m, baggage: CurrentBaggage):

@pytest.mark.kafka
class TestPublishWithTelemetry(TestPublish):
def get_broker(self, apply_types: bool = False):
def get_broker(self, apply_types: bool = False, **kwargs):
return KafkaBroker(
middlewares=(KafkaTelemetryMiddleware(),),
apply_types=apply_types,
**kwargs,
)


@pytest.mark.kafka
class TestConsumeWithTelemetry(TestConsume):
def get_broker(self, apply_types: bool = False):
def get_broker(self, apply_types: bool = False, **kwargs):
return KafkaBroker(
middlewares=(KafkaTelemetryMiddleware(),),
apply_types=apply_types,
**kwargs,
)

0 comments on commit 684ef1a

Please sign in to comment.