Skip to content

Commit

Permalink
fix (#1748): add Kafka publish no_confirm option (#1749)
Browse files Browse the repository at this point in the history
* fix (#1748): add Kafka publish no_wait option

* feat: add no_confirm option to Confluent

* chore: fix CI

* chore: fix CI

* docs: gen API

* refactor: mv confluent thread to asyncio task

* fix: add no_confirm option to Confluent publish batch method

* chore: fix precommit

---------

Co-authored-by: Kumaran Rajendhiran <kumaran@airt.ai>
  • Loading branch information
Lancetnik and kumaranvpl authored Sep 6, 2024
1 parent f1a537b commit 54223e1
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 60 deletions.
32 changes: 0 additions & 32 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ jobs:
env:
COVERAGE_FILE: coverage/.coverage.${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
CONTEXT: ${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.pydantic-version }}
- run: ls coverage
- name: Store coverage files
uses: actions/upload-artifact@v4
with:
Expand All @@ -90,36 +89,6 @@ jobs:
if-no-files-found: error
include-hidden-files: true

test-orjson:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install uv
uv pip install --system .[optionals,testing] orjson
- run: mkdir coverage
- name: Test
run: >
bash scripts/test.sh
-m "(slow and (${{env.ALL_PYTEST_MARKERS}})) or (${{env.ALL_PYTEST_MARKERS}})"
env:
COVERAGE_FILE: coverage/.coverage.orjson
CONTEXT: orjson
- name: Store coverage files
uses: actions/upload-artifact@v4
with:
name: .coverage.orjson
path: coverage
if-no-files-found: error
include-hidden-files: true

test-macos-latest:
if: github.event.pull_request.draft == false
runs-on: macos-latest
Expand Down Expand Up @@ -519,7 +488,6 @@ jobs:
- coverage-combine
- test-macos-latest
- test-windows-latest
- test-orjson

runs-on: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2024-09-05T19:20:25Z"
"generated_at": "2024-09-06T15:06:47Z"
}
2 changes: 0 additions & 2 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ plugins:
import:
- https://docs.python.org/3/objects.inv
options:
# extensions:
# - griffe_typingdoc
preload_modules:
- httpx
- starlette
Expand Down
4 changes: 4 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ async def publish( # type: ignore[override]
correlation_id: Optional[str] = None,
*,
reply_to: str = "",
no_confirm: bool = False,
# extra options to be compatible with test client
**kwargs: Any,
) -> Optional[Any]:
Expand All @@ -497,6 +498,7 @@ async def publish( # type: ignore[override]
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
no_confirm=no_confirm,
**kwargs,
)

Expand Down Expand Up @@ -535,6 +537,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand All @@ -552,6 +555,7 @@ async def publish_batch(
headers=headers,
reply_to=reply_to,
correlation_id=correlation_id,
no_confirm=no_confirm,
)

@override
Expand Down
53 changes: 49 additions & 4 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import logging
from contextlib import suppress
from time import time
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Expand All @@ -24,8 +27,18 @@
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from typing_extensions import NotRequired, TypedDict

from faststream.types import AnyDict, LoggerProto

class _SendKwargs(TypedDict):
value: Optional[Union[str, bytes]]
key: Optional[Union[str, bytes]]
headers: Optional[List[Tuple[str, Union[str, bytes]]]]
partition: NotRequired[int]
timestamp: NotRequired[int]
on_delivery: NotRequired[Callable[..., None]]


class AsyncConfluentProducer:
"""An asynchronous Python Kafka client using the "confluent-kafka" package."""
Expand Down Expand Up @@ -101,9 +114,21 @@ def __init__(

self.producer = Producer(final_config, logger=self.logger)

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())

async def _poll_loop(self) -> None:
while self.__running:
with suppress(Exception):
await call_or_await(self.producer.poll, 0.1)

async def stop(self) -> None:
"""Stop the Kafka producer and flush remaining messages."""
await call_or_await(self.producer.flush)
if self.__running:
self.__running = False
if not self._poll_task.done():
self._poll_task.cancel()
await call_or_await(self.producer.flush)

async def send(
self,
Expand All @@ -113,9 +138,10 @@ async def send(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[List[Tuple[str, Union[str, bytes]]]] = None,
no_confirm: bool = False,
) -> None:
"""Sends a single message to a Kafka topic."""
kwargs: AnyDict = {
kwargs: _SendKwargs = {
"value": value,
"key": key,
"headers": headers,
Expand All @@ -127,16 +153,34 @@ async def send(
if timestamp_ms is not None:
kwargs["timestamp"] = timestamp_ms

if not no_confirm:
result_future: asyncio.Future[Optional[Message]] = asyncio.Future()

def ack_callback(err: Any, msg: Optional[Message]) -> None:
if err or (msg is not None and (err := msg.error())):
result_future.set_exception(KafkaException(err))
else:
result_future.set_result(msg)

kwargs["on_delivery"] = ack_callback

# should be sync to prevent segfault
self.producer.produce(topic, **kwargs)
self.producer.poll(0)

if not no_confirm:
await result_future

def create_batch(self) -> "BatchBuilder":
"""Creates a batch for sending multiple messages."""
return BatchBuilder()

async def send_batch(
self, batch: "BatchBuilder", topic: str, *, partition: Optional[int]
self,
batch: "BatchBuilder",
topic: str,
*,
partition: Optional[int],
no_confirm: bool = False,
) -> None:
"""Sends a batch of messages to a Kafka topic."""
async with anyio.create_task_group() as tg:
Expand All @@ -149,6 +193,7 @@ async def send_batch(
partition,
msg["timestamp_ms"],
msg["headers"],
no_confirm,
)

async def ping(
Expand Down
10 changes: 9 additions & 1 deletion faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async def publish( # type: ignore[override]
headers: Optional[Dict[str, str]] = None,
correlation_id: str = "",
reply_to: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a message to a topic."""
message, content_type = encode_message(message)
Expand All @@ -65,6 +66,7 @@ async def publish( # type: ignore[override]
partition=partition,
timestamp_ms=timestamp_ms,
headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
no_confirm=no_confirm,
)

async def stop(self) -> None:
Expand All @@ -79,6 +81,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: str = "",
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to a topic."""
batch = self._producer.create_batch()
Expand Down Expand Up @@ -109,7 +112,12 @@ async def publish_batch(
headers=[(i, j.encode()) for i, j in final_headers.items()],
)

await self._producer.send_batch(batch, topic, partition=partition)
await self._producer.send_batch(
batch,
topic,
partition=partition,
no_confirm=no_confirm,
)

@override
async def request(self, *args: Any, **kwargs: Any) -> Optional[Any]:
Expand Down
6 changes: 5 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def publish(
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
Expand All @@ -184,6 +185,7 @@ async def publish(
kwargs: AnyDict = {
"key": key or self.key,
# basic args
"no_confirm": no_confirm,
"topic": topic or self.topic,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
Expand Down Expand Up @@ -243,8 +245,9 @@ async def publish(
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> None:
Expand All @@ -258,6 +261,7 @@ async def publish(

kwargs: AnyDict = {
"topic": topic or self.topic,
"no_confirm": no_confirm,
"partition": partition or self.partition,
"timestamp_ms": timestamp_ms,
"headers": headers or self.headers,
Expand Down
17 changes: 9 additions & 8 deletions faststream/confluent/schemas/partition.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from typing import Optional
from typing import TYPE_CHECKING, Optional

from confluent_kafka import TopicPartition as ConfluentPartition
from typing_extensions import NotRequired, TypedDict

if TYPE_CHECKING:
from typing_extensions import NotRequired, TypedDict

class _TopicKwargs(TypedDict):
topic: str
partition: int
offset: int
metadata: NotRequired[str]
leader_epoch: NotRequired[int]
class _TopicKwargs(TypedDict):
topic: str
partition: int
offset: int
metadata: NotRequired[str]
leader_epoch: NotRequired[int]


class TopicPartition:
Expand Down
2 changes: 2 additions & 0 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def publish( # type: ignore[override]
headers: Optional[Dict[str, str]] = None,
correlation_id: Optional[str] = None,
*,
no_confirm: bool = False,
reply_to: str = "",
rpc: bool = False,
rpc_timeout: Optional[float] = None,
Expand Down Expand Up @@ -147,6 +148,7 @@ async def publish_batch(
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> None:
"""Publish a batch of messages to the Kafka broker."""
for handler in self.broker._subscribers.values(): # pragma: no branch
Expand Down
10 changes: 10 additions & 0 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ async def publish( # type: ignore[override]
str,
Doc("Reply message topic name to send response."),
] = "",
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
# extra options to be compatible with test client
**kwargs: Any,
) -> Optional[Any]:
Expand All @@ -741,6 +745,7 @@ async def publish( # type: ignore[override]
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
no_confirm=no_confirm,
**kwargs,
)

Expand Down Expand Up @@ -861,6 +866,10 @@ async def publish_batch(
"**correlation_id** is a useful option to trace messages."
),
] = None,
no_confirm: Annotated[
bool,
Doc("Do not wait for Kafka publish confirmation."),
] = False,
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand All @@ -879,6 +888,7 @@ async def publish_batch(
headers=headers,
reply_to=reply_to,
correlation_id=correlation_id,
no_confirm=no_confirm,
)

@override
Expand Down
Loading

0 comments on commit 54223e1

Please sign in to comment.