Skip to content

Commit

Permalink
fix: add ConfuelntRouter FastAPI missed init options (#1664)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik authored Aug 11, 2024
1 parent 593734f commit ffaf6a0
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 4 deletions.
4 changes: 2 additions & 2 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
AsyncConfluentConsumer,
AsyncConfluentProducer,
)
from faststream.confluent.config import ConfluentConfig
from faststream.confluent.publisher.producer import AsyncConfluentFastProducer
from faststream.confluent.schemas.params import ConsumerConnectionParams
from faststream.confluent.security import parse_security
Expand All @@ -45,6 +44,7 @@
BrokerMiddleware,
CustomCallable,
)
from faststream.confluent.config import ConfluentConfig
from faststream.security import BaseSecurity
from faststream.types import (
AnyDict,
Expand Down Expand Up @@ -131,7 +131,7 @@ def __init__(
),
] = True,
config: Annotated[
Optional[ConfluentConfig],
Optional["ConfluentConfig"],
Doc(
"""
Extra configuration for the confluent-kafka-python
Expand Down
196 changes: 194 additions & 2 deletions faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
Expand Down Expand Up @@ -45,6 +46,7 @@
PublisherMiddleware,
SubscriberMiddleware,
)
from faststream.confluent.config import ConfluentConfig
from faststream.confluent.message import KafkaMessage
from faststream.confluent.publisher.asyncapi import (
AsyncAPIBatchPublisher,
Expand All @@ -59,6 +61,9 @@
from faststream.types import AnyDict, LoggerProto


Partition = TypeVar("Partition")


class KafkaRouter(StreamRouter[Union[Message, Tuple[Message, ...]]]):
"""A class to represent a Kafka router."""

Expand All @@ -67,12 +72,185 @@ class KafkaRouter(StreamRouter[Union[Message, Tuple[Message, ...]]]):

def __init__(
self,
bootstrap_servers: Union[str, Iterable[str]] = "localhost",
bootstrap_servers: Annotated[
Union[str, Iterable[str]],
Doc(
"""
A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap
initial cluster metadata.
This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092.
"""
),
] = "localhost",
*,
# both
request_timeout_ms: Annotated[
int,
Doc("Client request timeout in milliseconds."),
] = 40 * 1000,
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
] = 100,
metadata_max_age_ms: Annotated[
int,
Doc(
"""
The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions.
"""
),
] = 5 * 60 * 1000,
connections_max_idle_ms: Annotated[
int,
Doc(
"""
Close idle connections after the number
of milliseconds specified by this config. Specifying `None` will
disable idle checks.
"""
),
] = 9 * 60 * 1000,
client_id: Annotated[
Optional[str],
Doc("Application name to mark connections by."),
Doc(
"""
A name for this client. This string is passed in
each request to servers and can be used to identify specific
server-side log entries that correspond to this client. Also
submitted to :class:`~.consumer.group_coordinator.GroupCoordinator`
for logging with respect to consumer group administration.
"""
),
] = SERVICE_NAME,
allow_auto_create_topics: Annotated[
bool,
Doc(
"""
Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
"""
),
] = True,
config: Annotated[
Optional["ConfluentConfig"],
Doc(
"""
Extra configuration for the confluent-kafka-python
producer/consumer. See `confluent_kafka.Config <https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration>`_.
"""
),
] = None,
# publisher args
acks: Annotated[
Literal[0, 1, -1, "all"],
Doc(
"""
One of ``0``, ``1``, ``all``. The number of acknowledgments
the producer requires the leader to have received before considering a
request complete. This controls the durability of records that are
sent. The following settings are common:
* ``0``: Producer will not wait for any acknowledgment from the server
at all. The message will immediately be added to the socket
buffer and considered sent. No guarantee can be made that the
server has received the record in this case, and the retries
configuration will not take effect (as the client won't
generally know of any failures). The offset given back for each
record will always be set to -1.
* ``1``: The broker leader will write the record to its local log but
will respond without awaiting full acknowledgement from all
followers. In this case should the leader fail immediately
after acknowledging the record but before the followers have
replicated it then the record will be lost.
* ``all``: The broker leader will wait for the full set of in-sync
replicas to acknowledge the record. This guarantees that the
record will not be lost as long as at least one in-sync replica
remains alive. This is the strongest available guarantee.
If unset, defaults to ``acks=1``. If `enable_idempotence` is
:data:`True` defaults to ``acks=all``.
"""
),
] = EMPTY,
compression_type: Annotated[
Optional[Literal["gzip", "snappy", "lz4", "zstd"]],
Doc(
"""
The compression type for all data generated bythe producer.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression).
"""
),
] = None,
partitioner: Annotated[
Union[
str,
Callable[
[bytes, List[Partition], List[Partition]],
Partition,
],
],
Doc(
"""
Callable used to determine which partition
each message is assigned to. Called (after key serialization):
``partitioner(key_bytes, all_partitions, available_partitions)``.
The default partitioner implementation hashes each non-None key
using the same murmur2 algorithm as the Java client so that
messages with the same key are assigned to the same partition.
When a key is :data:`None`, the message is delivered to a random partition
(filtered to partitions with available leaders only, if possible).
"""
),
] = "consistent_random",
max_request_size: Annotated[
int,
Doc(
"""
The maximum size of a request. This is also
effectively a cap on the maximum record size. Note that the server
has its own cap on record size which may be different from this.
This setting will limit the number of record batches the producer
will send in a single request to avoid sending huge requests.
"""
),
] = 1024 * 1024,
linger_ms: Annotated[
int,
Doc(
"""
The producer groups together any records that arrive
in between request transmissions into a single batched request.
Normally this occurs only under load when records arrive faster
than they can be sent out. However in some circumstances the client
may want to reduce the number of requests even under moderate load.
This setting accomplishes this by adding a small amount of
artificial delay; that is, if first request is processed faster,
than `linger_ms`, producer will wait ``linger_ms - process_time``.
"""
),
] = 0,
enable_idempotence: Annotated[
bool,
Doc(
"""
When set to `True`, the producer will
ensure that exactly one copy of each message is written in the
stream. If `False`, producer retries due to broker failures,
etc., may write duplicates of the retried message in the stream.
Note that enabling idempotence acks to set to ``all``. If it is not
explicitly set by the user it will be chosen.
"""
),
] = False,
transactional_id: Optional[str] = None,
transaction_timeout_ms: int = 60 * 1000,
# broker base args
graceful_timeout: Annotated[
Optional[float],
Expand Down Expand Up @@ -368,6 +546,20 @@ def __init__(
super().__init__(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
request_timeout_ms=request_timeout_ms,
retry_backoff_ms=retry_backoff_ms,
metadata_max_age_ms=metadata_max_age_ms,
connections_max_idle_ms=connections_max_idle_ms,
allow_auto_create_topics=allow_auto_create_topics,
acks=acks,
config=config,
compression_type=compression_type,
partitioner=partitioner,
max_request_size=max_request_size,
linger_ms=linger_ms,
enable_idempotence=enable_idempotence,
transactional_id=transactional_id,
transaction_timeout_ms=transaction_timeout_ms,
# broker args
graceful_timeout=graceful_timeout,
decoder=decoder,
Expand Down

0 comments on commit ffaf6a0

Please sign in to comment.