Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (#1748): add Kafka publish no_confirm option #1749

Merged
merged 13 commits into from
Sep 6, 2024
4 changes: 4 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ async def publish( # type: ignore[override]
correlation_id: Optional[str] = None,
*,
reply_to: str = "",
no_confirm: bool = False,
# extra options to be compatible with test client
**kwargs: Any,
) -> Optional[Any]:
Expand All @@ -496,6 +497,7 @@ async def publish( # type: ignore[override]
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
no_confirm=no_confirm,
**kwargs,
)

Expand Down Expand Up @@ -534,6 +536,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand All @@ -551,6 +554,7 @@ async def publish_batch(
headers=headers,
reply_to=reply_to,
correlation_id=correlation_id,
no_confirm=no_confirm,
)

@override
Expand Down
42 changes: 38 additions & 4 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import logging
from contextlib import suppress
from threading import Thread
from time import time
from typing import (
TYPE_CHECKING,
Expand All @@ -24,8 +27,17 @@
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from typing_extensions import NotRequired, TypedDict

from faststream.types import AnyDict, LoggerProto

class _SendKwargs(TypedDict):
value: Optional[Union[str, bytes]]
key: Optional[Union[str, bytes]]
headers: Optional[List[Tuple[str, Union[str, bytes]]]]
partition: NotRequired[int]
timestamp: NotRequired[int]


class AsyncConfluentProducer:
"""An asynchronous Python Kafka client using the "confluent-kafka" package."""
Expand Down Expand Up @@ -102,9 +114,21 @@ def __init__(

self.producer = Producer(self.config, logger=self.logger)

self.__running = True
self._poll_thread = Thread(target=self._poll_loop)
Copy link
Member Author

@Lancetnik Lancetnik Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._poll_thread.start()

def _poll_loop(self) -> None:
while self.__running:
with suppress(Exception):
self.producer.poll(0.1)

async def stop(self) -> None:
"""Stop the Kafka producer and flush remaining messages."""
await call_or_await(self.producer.flush)
if self.__running:
await call_or_await(self.producer.flush)
self.__running = False
self._poll_thread.join()

async def send(
self,
Expand All @@ -114,9 +138,10 @@ async def send(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None,
no_confirm: bool = False,
) -> None:
"""Sends a single message to a Kafka topic."""
kwargs: AnyDict = {
kwargs: _SendKwargs = {
"value": value,
"key": key,
"headers": headers,
Expand All @@ -128,9 +153,18 @@ async def send(
if timestamp_ms is not None:
kwargs["timestamp"] = timestamp_ms

result_future: asyncio.Future[Optional[Message]] = asyncio.Future()

def ack_callback(err: Any, msg: Optional[Message]) -> None:
if err or (msg is not None and (err := msg.error())):
result_future.set_exception(KafkaException(err))
else:
result_future.set_result(msg)

# should be sync to prevent segfault
self.producer.produce(topic, **kwargs)
self.producer.poll(0)
self.producer.produce(topic, **kwargs, on_delivery=ack_callback)
if not no_confirm:
await result_future

def create_batch(self) -> "BatchBuilder":
"""Creates a batch for sending multiple messages."""
Expand Down
6 changes: 5 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def publish(
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
Expand All @@ -184,6 +185,7 @@ async def publish(
kwargs: AnyDict = {
"key": key or self.key,
# basic args
"no_confirm": no_confirm,
"topic": topic or self.topic,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
Expand Down Expand Up @@ -243,8 +245,9 @@ async def publish(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> None:
Expand All @@ -258,6 +261,7 @@ async def publish(

kwargs: AnyDict = {
"topic": topic or self.topic,
"no_confirm": no_confirm,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
"headers": headers or self.headers,
Expand Down
17 changes: 9 additions & 8 deletions faststream/confluent/schemas/partition.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Optional
from typing import TYPE_CHECKING, Optional

from confluent_kafka import TopicPartition as ConfluentPartition
from typing_extensions import NotRequired, TypedDict

if TYPE_CHECKING:
from typing_extensions import NotRequired, TypedDict

class _TopicKwargs(TypedDict):
topic: str
partition: int
offset: int
metadata: NotRequired[str]
leader_epoch: NotRequired[int]
class _TopicKwargs(TypedDict):
topic: str
partition: int
offset: int
metadata: NotRequired[str]
leader_epoch: NotRequired[int]


class TopicPartition:
Expand Down
2 changes: 2 additions & 0 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def publish( # type: ignore[override]
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
*,
no_confirm: bool = False,
reply_to: str = "",
rpc: bool = False,
rpc_timeout: Optional[float] = None,
Expand Down Expand Up @@ -147,6 +148,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to the Kafka broker."""
for handler in self.broker._subscribers.values(): # pragma: no branch
Expand Down
10 changes: 10 additions & 0 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ async def publish( # type: ignore[override]
str,
Doc("Reply message topic name to send response."),
] = "",
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
# extra options to be compatible with test client
**kwargs: Any,
) -> Optional[Any]:
Expand All @@ -741,6 +745,7 @@ async def publish( # type: ignore[override]
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
no_confirm=no_confirm,
**kwargs,
)

Expand Down Expand Up @@ -861,6 +866,10 @@ async def publish_batch(
"**correlation_id** is a useful option to trace messages."
),
] = None,
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand All @@ -879,6 +888,7 @@ async def publish_batch(
headers=headers,
reply_to=reply_to,
correlation_id=correlation_id,
no_confirm=no_confirm,
)

@override
Expand Down
10 changes: 8 additions & 2 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async def publish( # type: ignore[override]
timestamp_ms: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a message to a topic."""
message, content_type = encode_message(message)
Expand All @@ -63,14 +64,16 @@ async def publish( # type: ignore[override]
reply_to,
)

await self._producer.send(
send_future = await self._producer.send(
topic=topic,
value=message,
key=key,
partition=partition,
timestamp_ms=timestamp_ms,
headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
)
if not no_confirm:
await send_future

async def stop(self) -> None:
await self._producer.stop()
Expand All @@ -84,6 +87,7 @@ async def publish_batch(
timestamp_ms: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to a topic."""
batch = self._producer.create_batch()
Expand Down Expand Up @@ -114,7 +118,9 @@ async def publish_batch(
headers=[(i, j.encode()) for i, j in final_headers.items()],
)

await self._producer.send_batch(batch, topic, partition=partition)
send_future = await self._producer.send_batch(batch, topic, partition=partition)
if not no_confirm:
await send_future

@override
async def request(self, *args: Any, **kwargs: Any) -> Optional[Any]:
Expand Down
10 changes: 10 additions & 0 deletions faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ async def publish(
str,
Doc("Reply message topic name to send response."),
] = "",
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
# publisher specific
_extra_middlewares: Annotated[
Iterable["PublisherMiddleware"],
Expand Down Expand Up @@ -311,6 +315,7 @@ async def publish(
reply_to=reply_to,
correlation_id=correlation_id,
timestamp_ms=timestamp_ms,
no_confirm=no_confirm,
)

@override
Expand Down Expand Up @@ -440,6 +445,10 @@ async def publish(
"**correlation_id** is a useful option to trace messages."
),
] = None,
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
# publisher specific
_extra_middlewares: Annotated[
Iterable["PublisherMiddleware"],
Expand Down Expand Up @@ -479,4 +488,5 @@ async def publish(
reply_to=reply_to,
correlation_id=correlation_id,
timestamp_ms=timestamp_ms,
no_confirm=no_confirm,
)
2 changes: 2 additions & 0 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async def publish( # type: ignore[override]
rpc: bool = False,
rpc_timeout: Optional[float] = None,
raise_timeout: bool = False,
no_confirm: bool = False,
) -> Optional[Any]:
"""Publish a message to the Kafka broker."""
incoming = build_message(
Expand Down Expand Up @@ -184,6 +185,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to the Kafka broker."""
for handler in self.broker._subscribers.values(): # pragma: no branch
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ minversion = "7.0"
addopts = "-q -m 'not slow'"
testpaths = ["tests"]
markers = ["rabbit", "kafka", "confluent", "nats", "redis", "slow", "all"]
asyncio_default_fixture_loop_scope = "function"

[tool.coverage.run]
parallel = true
Expand Down
10 changes: 5 additions & 5 deletions tests/brokers/base/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def test_subscriber_middleware(
raw_broker,
):
async def mid(call_next, msg):
mock.start(msg._decoded_body)
mock.start(await msg.decode())
result = await call_next(msg)
mock.end()
event.set()
Expand Down Expand Up @@ -468,7 +468,7 @@ async def subscriber1(m):

@broker.subscriber(*args, **kwargs)
async def subscriber2(msg=Context("message")):
mock(msg.decoded_body)
mock(await msg.decode())
event.set()

broker = self.patch_broker(raw_broker, broker)
Expand Down Expand Up @@ -509,7 +509,7 @@ async def subscriber1(m):

@broker.subscriber(*args2, **kwargs2)
async def subscriber2(msg=Context("message")):
mock(msg.decoded_body)
mock(await msg.decode())

broker = self.patch_broker(raw_broker, broker)

Expand Down Expand Up @@ -581,7 +581,7 @@ async def subscriber1(m):

@broker.subscriber(*args2, **kwargs2)
async def subscriber2(msg=Context("message")):
mock(msg.decoded_body)
mock(await msg.decode())

broker = self.patch_broker(raw_broker, broker)

Expand Down Expand Up @@ -632,7 +632,7 @@ async def subscriber2(m):

@broker.subscriber(*args3, **kwargs3)
async def subscriber3(msg=Context("message")):
mock(msg.decoded_body)
mock(await msg.decode())
if mock.call_count > 1:
event.set()

Expand Down
Loading
Loading