From ffaf6a0577f6e2ad92fae051547d58503937784e Mon Sep 17 00:00:00 2001 From: Pastukhov Nikita Date: Sun, 11 Aug 2024 19:43:28 +0300 Subject: [PATCH] fix: add ConfuelntRouter FastAPI missed init options (#1664) --- faststream/confluent/broker/broker.py | 4 +- faststream/confluent/fastapi/fastapi.py | 196 +++++++++++++++++++++++- 2 files changed, 196 insertions(+), 4 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 0239981133..dc45b12f85 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -26,7 +26,6 @@ AsyncConfluentConsumer, AsyncConfluentProducer, ) -from faststream.confluent.config import ConfluentConfig from faststream.confluent.publisher.producer import AsyncConfluentFastProducer from faststream.confluent.schemas.params import ConsumerConnectionParams from faststream.confluent.security import parse_security @@ -45,6 +44,7 @@ BrokerMiddleware, CustomCallable, ) + from faststream.confluent.config import ConfluentConfig from faststream.security import BaseSecurity from faststream.types import ( AnyDict, @@ -131,7 +131,7 @@ def __init__( ), ] = True, config: Annotated[ - Optional[ConfluentConfig], + Optional["ConfluentConfig"], Doc( """ Extra configuration for the confluent-kafka-python diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index 8017680250..01138b84f9 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -11,6 +11,7 @@ Sequence, Tuple, Type, + TypeVar, Union, cast, overload, @@ -45,6 +46,7 @@ PublisherMiddleware, SubscriberMiddleware, ) + from faststream.confluent.config import ConfluentConfig from faststream.confluent.message import KafkaMessage from faststream.confluent.publisher.asyncapi import ( AsyncAPIBatchPublisher, @@ -59,6 +61,9 @@ from faststream.types import AnyDict, LoggerProto +Partition = TypeVar("Partition") + + class KafkaRouter(StreamRouter[Union[Message, Tuple[Message, ...]]]): """A class to represent a Kafka router.""" @@ -67,12 +72,185 @@ class KafkaRouter(StreamRouter[Union[Message, Tuple[Message, ...]]]): def __init__( self, - bootstrap_servers: Union[str, Iterable[str]] = "localhost", + bootstrap_servers: Annotated[ + Union[str, Iterable[str]], + Doc( + """ + A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap + initial cluster metadata. + + This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. + """ + ), + ] = "localhost", *, + # both + request_timeout_ms: Annotated[ + int, + Doc("Client request timeout in milliseconds."), + ] = 40 * 1000, + retry_backoff_ms: Annotated[ + int, + Doc(" Milliseconds to backoff when retrying on errors."), + ] = 100, + metadata_max_age_ms: Annotated[ + int, + Doc( + """ + The period of time in milliseconds after + which we force a refresh of metadata even if we haven't seen any + partition leadership changes to proactively discover any new + brokers or partitions. + """ + ), + ] = 5 * 60 * 1000, + connections_max_idle_ms: Annotated[ + int, + Doc( + """ + Close idle connections after the number + of milliseconds specified by this config. Specifying `None` will + disable idle checks. + """ + ), + ] = 9 * 60 * 1000, client_id: Annotated[ Optional[str], - Doc("Application name to mark connections by."), + Doc( + """ + A name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to :class:`~.consumer.group_coordinator.GroupCoordinator` + for logging with respect to consumer group administration. + """ + ), ] = SERVICE_NAME, + allow_auto_create_topics: Annotated[ + bool, + Doc( + """ + Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. + """ + ), + ] = True, + config: Annotated[ + Optional["ConfluentConfig"], + Doc( + """ + Extra configuration for the confluent-kafka-python + producer/consumer. See `confluent_kafka.Config `_. + """ + ), + ] = None, + # publisher args + acks: Annotated[ + Literal[0, 1, -1, "all"], + Doc( + """ + One of ``0``, ``1``, ``all``. The number of acknowledgments + the producer requires the leader to have received before considering a + request complete. This controls the durability of records that are + sent. The following settings are common: + + * ``0``: Producer will not wait for any acknowledgment from the server + at all. The message will immediately be added to the socket + buffer and considered sent. No guarantee can be made that the + server has received the record in this case, and the retries + configuration will not take effect (as the client won't + generally know of any failures). The offset given back for each + record will always be set to -1. + * ``1``: The broker leader will write the record to its local log but + will respond without awaiting full acknowledgement from all + followers. In this case should the leader fail immediately + after acknowledging the record but before the followers have + replicated it then the record will be lost. + * ``all``: The broker leader will wait for the full set of in-sync + replicas to acknowledge the record. This guarantees that the + record will not be lost as long as at least one in-sync replica + remains alive. This is the strongest available guarantee. + + If unset, defaults to ``acks=1``. If `enable_idempotence` is + :data:`True` defaults to ``acks=all``. + """ + ), + ] = EMPTY, + compression_type: Annotated[ + Optional[Literal["gzip", "snappy", "lz4", "zstd"]], + Doc( + """ + The compression type for all data generated bythe producer. + Compression is of full batches of data, so the efficacy of batching + will also impact the compression ratio (more batching means better + compression). + """ + ), + ] = None, + partitioner: Annotated[ + Union[ + str, + Callable[ + [bytes, List[Partition], List[Partition]], + Partition, + ], + ], + Doc( + """ + Callable used to determine which partition + each message is assigned to. Called (after key serialization): + ``partitioner(key_bytes, all_partitions, available_partitions)``. + The default partitioner implementation hashes each non-None key + using the same murmur2 algorithm as the Java client so that + messages with the same key are assigned to the same partition. + When a key is :data:`None`, the message is delivered to a random partition + (filtered to partitions with available leaders only, if possible). + """ + ), + ] = "consistent_random", + max_request_size: Annotated[ + int, + Doc( + """ + The maximum size of a request. This is also + effectively a cap on the maximum record size. Note that the server + has its own cap on record size which may be different from this. + This setting will limit the number of record batches the producer + will send in a single request to avoid sending huge requests. + """ + ), + ] = 1024 * 1024, + linger_ms: Annotated[ + int, + Doc( + """ + The producer groups together any records that arrive + in between request transmissions into a single batched request. + Normally this occurs only under load when records arrive faster + than they can be sent out. However in some circumstances the client + may want to reduce the number of requests even under moderate load. + This setting accomplishes this by adding a small amount of + artificial delay; that is, if first request is processed faster, + than `linger_ms`, producer will wait ``linger_ms - process_time``. + """ + ), + ] = 0, + enable_idempotence: Annotated[ + bool, + Doc( + """ + When set to `True`, the producer will + ensure that exactly one copy of each message is written in the + stream. If `False`, producer retries due to broker failures, + etc., may write duplicates of the retried message in the stream. + Note that enabling idempotence acks to set to ``all``. If it is not + explicitly set by the user it will be chosen. + """ + ), + ] = False, + transactional_id: Optional[str] = None, + transaction_timeout_ms: int = 60 * 1000, # broker base args graceful_timeout: Annotated[ Optional[float], @@ -368,6 +546,20 @@ def __init__( super().__init__( bootstrap_servers=bootstrap_servers, client_id=client_id, + request_timeout_ms=request_timeout_ms, + retry_backoff_ms=retry_backoff_ms, + metadata_max_age_ms=metadata_max_age_ms, + connections_max_idle_ms=connections_max_idle_ms, + allow_auto_create_topics=allow_auto_create_topics, + acks=acks, + config=config, + compression_type=compression_type, + partitioner=partitioner, + max_request_size=max_request_size, + linger_ms=linger_ms, + enable_idempotence=enable_idempotence, + transactional_id=transactional_id, + transaction_timeout_ms=transaction_timeout_ms, # broker args graceful_timeout=graceful_timeout, decoder=decoder,