From 18a97f3b68bf63dbbf52623570e93218fb4b2043 Mon Sep 17 00:00:00 2001 From: Jack Leow Date: Fri, 15 Nov 2024 17:38:00 -0800 Subject: [PATCH] blackfml. --- aiokafka/__init__.pyi | 18 ++- aiokafka/abc.pyi | 10 +- aiokafka/client.pyi | 1 + aiokafka/cluster.pyi | 96 ++++++++----- aiokafka/consumer/consumer.pyi | 180 +++++++++++++++++------- aiokafka/errors.pyi | 243 +++++++++++++-------------------- aiokafka/structs.pyi | 24 ++-- 7 files changed, 319 insertions(+), 253 deletions(-) diff --git a/aiokafka/__init__.pyi b/aiokafka/__init__.pyi index 38189711..296f5708 100644 --- a/aiokafka/__init__.pyi +++ b/aiokafka/__init__.pyi @@ -1,7 +1,21 @@ from .abc import ConsumerRebalanceListener from .consumer import AIOKafkaConsumer from .errors import ConsumerStoppedError, IllegalOperation -from .structs import ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, TopicPartition +from .structs import ( + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp, + TopicPartition, +) __version__ = ... -__all__ = ["AIOKafkaConsumer", "ConsumerRebalanceListener", "ConsumerStoppedError", "IllegalOperation", "ConsumerRecord", "TopicPartition", "OffsetAndTimestamp", "OffsetAndMetadata"] +__all__ = [ + "AIOKafkaConsumer", + "ConsumerRebalanceListener", + "ConsumerStoppedError", + "IllegalOperation", + "ConsumerRecord", + "TopicPartition", + "OffsetAndTimestamp", + "OffsetAndMetadata", +] diff --git a/aiokafka/abc.pyi b/aiokafka/abc.pyi index 8a258eda..6616a91a 100644 --- a/aiokafka/abc.pyi +++ b/aiokafka/abc.pyi @@ -43,6 +43,7 @@ class ConsumerRebalanceListener(abc.ABC): taking over that partition has their :meth:`on_partitions_assigned` callback called to load the state. """ + @abc.abstractmethod def on_partitions_revoked(self, revoked: list[TopicPartition]) -> None: """ @@ -63,7 +64,7 @@ class ConsumerRebalanceListener(abc.ABC): to the consumer on the last rebalance """ ... - + @abc.abstractmethod def on_partitions_assigned(self, assigned: list[TopicPartition]) -> None: """ @@ -83,8 +84,6 @@ class ConsumerRebalanceListener(abc.ABC): consumer (may include partitions that were previously assigned) """ ... - - class AbstractTokenProvider(abc.ABC): """ @@ -103,6 +102,7 @@ class AbstractTokenProvider(abc.ABC): .. _SASL OAuthBearer: https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html """ + @abc.abstractmethod async def token(self) -> None: """ @@ -123,7 +123,7 @@ class AbstractTokenProvider(abc.ABC): # The actual synchronous token callback. """ ... - + def extensions(self) -> dict[str, str]: """ This is an OPTIONAL method that may be implemented. @@ -135,7 +135,5 @@ class AbstractTokenProvider(abc.ABC): This feature is only available in Kafka >= 2.1.0. """ ... - - __all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"] diff --git a/aiokafka/client.pyi b/aiokafka/client.pyi index ad5d7c70..3fbd3254 100644 --- a/aiokafka/client.pyi +++ b/aiokafka/client.pyi @@ -1,6 +1,7 @@ from enum import IntEnum log = ... + class CoordinationType(IntEnum): GROUP = ... TRANSACTION = ... diff --git a/aiokafka/cluster.pyi b/aiokafka/cluster.pyi index c93c8fc3..2c67e97f 100644 --- a/aiokafka/cluster.pyi +++ b/aiokafka/cluster.pyi @@ -1,20 +1,39 @@ from concurrent.futures import Future from typing import Any, Callable, Optional, Sequence, Set, TypedDict, Union from aiokafka.client import CoordinationType -from aiokafka.protocol.commit import GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1 -from aiokafka.protocol.metadata import MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5 +from aiokafka.protocol.commit import ( + GroupCoordinatorResponse_v0, + GroupCoordinatorResponse_v1, +) +from aiokafka.protocol.metadata import ( + MetadataResponse_v0, + MetadataResponse_v1, + MetadataResponse_v2, + MetadataResponse_v3, + MetadataResponse_v4, + MetadataResponse_v5, +) from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition log = ... -MetadataResponse = Union[MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,] -GroupCoordinatorResponse = Union[GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1] +MetadataResponse = Union[ + MetadataResponse_v0, + MetadataResponse_v1, + MetadataResponse_v2, + MetadataResponse_v3, + MetadataResponse_v4, + MetadataResponse_v5, +] +GroupCoordinatorResponse = Union[ + GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1 +] + class ClusterConfig(TypedDict): retry_backoff_ms: int metadata_max_age_ms: int bootstrap_servers: str | list[str] ... - class ClusterMetadata: """ A class to manage kafka cluster metadata. @@ -36,13 +55,10 @@ class ClusterMetadata: Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092. """ + DEFAULT_CONFIG: ClusterConfig = ... - def __init__(self, **configs: int | str | list[str]) -> None: - ... - - def is_bootstrap(self, node_id: str) -> bool: - ... - + def __init__(self, **configs: int | str | list[str]) -> None: ... + def is_bootstrap(self, node_id: str) -> bool: ... def brokers(self) -> set[BrokerMetadata]: """Get all BrokerMetadata @@ -50,7 +66,7 @@ class ClusterMetadata: set: {BrokerMetadata, ...} """ ... - + def broker_metadata(self, broker_id: str) -> BrokerMetadata | None: """Get BrokerMetadata @@ -61,7 +77,7 @@ class ClusterMetadata: BrokerMetadata or None if not found """ ... - + def partitions_for_topic(self, topic: str) -> Optional[Set[int]]: """Return set of all partitions for topic (whether available or not) @@ -72,7 +88,7 @@ class ClusterMetadata: set: {partition (int), ...} """ ... - + def available_partitions_for_topic(self, topic: str) -> Optional[Set[int]]: """Return set of partitions with known leaders @@ -84,11 +100,11 @@ class ClusterMetadata: None if topic not found. """ ... - + def leader_for_partition(self, partition: PartitionMetadata) -> int | None: """Return node_id of leader, -1 unavailable, None if unknown.""" ... - + def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None: """Return TopicPartitions for which the broker is a leader. @@ -100,7 +116,7 @@ class ClusterMetadata: None if the broker either has no partitions or does not exist. """ ... - + def coordinator_for_group(self, group: str) -> int | str | None: """Return node_id of group coordinator. @@ -112,7 +128,7 @@ class ClusterMetadata: None if the group does not exist. """ ... - + def request_update(self) -> Future[ClusterMetadata]: """Flags metadata for update, return Future() @@ -123,7 +139,7 @@ class ClusterMetadata: Future (value will be the cluster object after update) """ ... - + def topics(self, exclude_internal_topics: bool = ...) -> set[str]: """Get set of known topics. @@ -137,11 +153,11 @@ class ClusterMetadata: set: {topic (str), ...} """ ... - + def failed_update(self, exception: BaseException) -> None: """Update cluster state given a failed MetadataRequest.""" ... - + def update_metadata(self, metadata: MetadataResponse) -> None: """Update cluster state given a MetadataResponse. @@ -151,16 +167,18 @@ class ClusterMetadata: Returns: None """ ... - + def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: """Add a callback function to be called on each metadata update""" ... - + def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: """Remove a previously added listener callback""" ... - - def add_group_coordinator(self, group: str, response: GroupCoordinatorResponse) -> str | None: + + def add_group_coordinator( + self, group: str, response: GroupCoordinatorResponse + ) -> str | None: """Update with metadata for a group coordinator Arguments: @@ -171,23 +189,27 @@ class ClusterMetadata: string: coordinator node_id if metadata is updated, None on error """ ... - - def with_partitions(self, partitions_to_add: Sequence[PartitionMetadata]) -> ClusterMetadata: + + def with_partitions( + self, partitions_to_add: Sequence[PartitionMetadata] + ) -> ClusterMetadata: """Returns a copy of cluster metadata with partitions added""" ... - - def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: - ... - - def add_coordinator(self, node_id: int | str, host: str, port: int, rack: str | None = ..., *, purpose: tuple[CoordinationType, str]) -> None: + + def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ... + def add_coordinator( + self, + node_id: int | str, + host: str, + port: int, + rack: str | None = ..., + *, + purpose: tuple[CoordinationType, str], + ) -> None: """Keep track of all coordinator nodes separately and remove them if a new one was elected for the same purpose (For example group coordinator for group X). """ ... - - def __str__(self) -> str: - ... - - + def __str__(self) -> str: ... diff --git a/aiokafka/consumer/consumer.pyi b/aiokafka/consumer/consumer.pyi index dc83d6fd..fb32daaf 100644 --- a/aiokafka/consumer/consumer.pyi +++ b/aiokafka/consumer/consumer.pyi @@ -4,12 +4,18 @@ from types import ModuleType, TracebackType from typing import Callable, Dict, Generic, List, Literal, TypeVar from aiokafka.abc import AbstractTokenProvider, ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from aiokafka.structs import ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, TopicPartition +from aiokafka.structs import ( + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp, + TopicPartition, +) log = ... KT = TypeVar("KT", covariant=True) VT = TypeVar("VT", covariant=True) ET = TypeVar("ET", bound=BaseException) + class AIOKafkaConsumer(Generic[KT, VT]): """ A client that consumes records from a Kafka cluster. @@ -78,7 +84,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. Default: 1048576. - max_poll_records (int or None): The maximum number of records + max_poll_records (int or None): The maximum number of records returned in a single call to :meth:`.getmany`. Defaults ``None``, no limit. request_timeout_ms (int): Client request timeout in milliseconds. @@ -207,14 +213,66 @@ class AIOKafkaConsumer(Generic[KT, VT]): https://kafka.apache.org/documentation.html#newconsumerconfigs """ + _closed = ... _source_traceback = ... - def __init__(self, *topics: str, loop: asyncio.AbstractEventLoop | None = ..., bootstrap_servers: str | list[str] = ..., client_id: str = ..., group_id: str | None = ..., group_instance_id: str | None = ..., key_deserializer: Callable[[bytes], KT] = lambda x: x, value_deserializer: Callable[[bytes], VT] = lambda x: x, fetch_max_wait_ms: int = ..., fetch_max_bytes: int = ..., fetch_min_bytes: int = ..., max_partition_fetch_bytes: int = ..., request_timeout_ms: int = ..., retry_backoff_ms: int = ..., auto_offset_reset: Literal["earliest"] | Literal["latest"] | Literal["none"] = ..., enable_auto_commit: bool = ..., auto_commit_interval_ms: int = ..., check_crcs: bool = ..., metadata_max_age_ms: int = ..., partition_assignment_strategy: tuple[type[AbstractPartitionAssignor], ...] = ..., max_poll_interval_ms: int = ..., rebalance_timeout_ms: int | None = ..., session_timeout_ms: int = ..., heartbeat_interval_ms: int = ..., consumer_timeout_ms: int = ..., max_poll_records: int | None = ..., ssl_context: SSLContext | None = ..., security_protocol: Literal["PLAINTEXT"] | Literal["SSL"] | Literal["SASL_PLAINTEXT"] | Literal["SASL_SSL"] = ..., api_version: str = ..., exclude_internal_topics: bool = ..., connections_max_idle_ms: int = ..., isolation_level: Literal["read_committed"] | Literal["read_uncommitted"] = ..., sasl_mechanism: Literal["PLAIN"] | Literal["GSSAPI"] | Literal["SCRAM-SHA-256"] | Literal["SCRAM-SHA-512"] | Literal["OAUTHBEARER"] = ..., sasl_plain_password: str | None = ..., sasl_plain_username: str | None = ..., sasl_kerberos_service_name: str = ..., sasl_kerberos_domain_name: str | None = ..., sasl_oauth_token_provider: AbstractTokenProvider | None = ...) -> None: - ... - - def __del__(self, _warnings: ModuleType = ...) -> None: - ... - + def __init__( + self, + *topics: str, + loop: asyncio.AbstractEventLoop | None = ..., + bootstrap_servers: str | list[str] = ..., + client_id: str = ..., + group_id: str | None = ..., + group_instance_id: str | None = ..., + key_deserializer: Callable[[bytes], KT] = lambda x: x, + value_deserializer: Callable[[bytes], VT] = lambda x: x, + fetch_max_wait_ms: int = ..., + fetch_max_bytes: int = ..., + fetch_min_bytes: int = ..., + max_partition_fetch_bytes: int = ..., + request_timeout_ms: int = ..., + retry_backoff_ms: int = ..., + auto_offset_reset: ( + Literal["earliest"] | Literal["latest"] | Literal["none"] + ) = ..., + enable_auto_commit: bool = ..., + auto_commit_interval_ms: int = ..., + check_crcs: bool = ..., + metadata_max_age_ms: int = ..., + partition_assignment_strategy: tuple[ + type[AbstractPartitionAssignor], ... + ] = ..., + max_poll_interval_ms: int = ..., + rebalance_timeout_ms: int | None = ..., + session_timeout_ms: int = ..., + heartbeat_interval_ms: int = ..., + consumer_timeout_ms: int = ..., + max_poll_records: int | None = ..., + ssl_context: SSLContext | None = ..., + security_protocol: ( + Literal["PLAINTEXT"] + | Literal["SSL"] + | Literal["SASL_PLAINTEXT"] + | Literal["SASL_SSL"] + ) = ..., + api_version: str = ..., + exclude_internal_topics: bool = ..., + connections_max_idle_ms: int = ..., + isolation_level: Literal["read_committed"] | Literal["read_uncommitted"] = ..., + sasl_mechanism: ( + Literal["PLAIN"] + | Literal["GSSAPI"] + | Literal["SCRAM-SHA-256"] + | Literal["SCRAM-SHA-512"] + | Literal["OAUTHBEARER"] + ) = ..., + sasl_plain_password: str | None = ..., + sasl_plain_username: str | None = ..., + sasl_kerberos_service_name: str = ..., + sasl_kerberos_domain_name: str | None = ..., + sasl_oauth_token_provider: AbstractTokenProvider | None = ..., + ) -> None: ... + def __del__(self, _warnings: ModuleType = ...) -> None: ... async def start(self) -> None: """Connect to Kafka cluster. This will: @@ -223,7 +281,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): * Join group if ``group_id`` provided """ ... - + def assign(self, partitions: list[TopicPartition]) -> None: """Manually assign a list of :class:`.TopicPartition` to this consumer. @@ -247,7 +305,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): cluster and topic metadata change. """ ... - + def assignment(self) -> set[TopicPartition]: """Get the set of partitions currently assigned to this consumer. @@ -264,7 +322,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): this consumer """ ... - + async def stop(self) -> None: """Close the consumer, while waiting for finalizers: @@ -272,8 +330,13 @@ class AIOKafkaConsumer(Generic[KT, VT]): * Leave group if used Consumer Groups """ ... - - async def commit(self, offsets: dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata] | None = ...) -> None: + + async def commit( + self, + offsets: ( + dict[TopicPartition, int | tuple[int, str] | OffsetAndMetadata] | None + ) = ..., + ) -> None: """Commit offsets to Kafka. This commits offsets only to Kafka. The offsets committed using this @@ -328,7 +391,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. _kafka-python: https://github.com/dpkp/kafka-python """ ... - + async def committed(self, partition: TopicPartition) -> int | None: """Get the last committed offset for the given partition. (whether the commit happened by this process or another). @@ -350,7 +413,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): IllegalOperation: If used with ``group_id == None`` """ ... - + async def topics(self) -> set[str]: """Get all topics the user is authorized to view. @@ -358,7 +421,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): set: topics """ ... - + def partitions_for_topic(self, topic: str) -> set[int] | None: """Get metadata about the partitions for a given topic. @@ -372,7 +435,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): set: partition ids """ ... - + async def position(self, partition: TopicPartition) -> int: """Get the offset of the *next record* that will be fetched (if a record with that offset exists on broker). @@ -393,7 +456,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): partition """ ... - + def highwater(self, partition: TopicPartition) -> int | None: """Last known highwater offset for a partition. @@ -413,7 +476,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): int or None: offset if available """ ... - + def last_stable_offset(self, partition: TopicPartition) -> int | None: """Returns the Last Stable Offset of a topic. It will be the last offset up to which point all transactions were completed. Only @@ -431,7 +494,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): int or None: offset if available """ ... - + def last_poll_timestamp(self, partition: TopicPartition) -> int | None: """Returns the timestamp of the last poll of this partition (in ms). It is the last time :meth:`highwater` and :meth:`last_stable_offset` were @@ -447,7 +510,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): int or None: timestamp if available """ ... - + def seek(self, partition: TopicPartition, offset: int) -> None: """Manually specify the fetch offset for a :class:`.TopicPartition`. @@ -476,7 +539,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): respective cases. """ ... - + async def seek_to_beginning(self, *partitions: TopicPartition) -> bool: """Seek to the oldest available offset for partitions. @@ -492,7 +555,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - + async def seek_to_end(self, *partitions: TopicPartition) -> bool: """Seek to the most recent available offset for partitions. @@ -508,8 +571,10 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - - async def seek_to_committed(self, *partitions: TopicPartition) -> dict[TopicPartition, int | None]: + + async def seek_to_committed( + self, *partitions: TopicPartition + ) -> dict[TopicPartition, int | None]: """Seek to the committed offset for partitions. Arguments: @@ -531,8 +596,10 @@ class AIOKafkaConsumer(Generic[KT, VT]): partition """ ... - - async def offsets_for_times(self, timestamps: dict[TopicPartition, int]) -> dict[TopicPartition, OffsetAndTimestamp | None]: + + async def offsets_for_times( + self, timestamps: dict[TopicPartition, int] + ) -> dict[TopicPartition, OffsetAndTimestamp | None]: """ Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is @@ -568,8 +635,10 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - - async def beginning_offsets(self, partitions: list[TopicPartition]) -> dict[TopicPartition, int]: + + async def beginning_offsets( + self, partitions: list[TopicPartition] + ) -> dict[TopicPartition, int]: """Get the first offset for the given partitions. This method does not change the current consumer position of the @@ -595,8 +664,10 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - - async def end_offsets(self, partitions: list[TopicPartition]) -> dict[TopicPartition, int]: + + async def end_offsets( + self, partitions: list[TopicPartition] + ) -> dict[TopicPartition, int]: """Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1. @@ -624,8 +695,13 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - - def subscribe(self, topics: list[str] | tuple[str, ...] = ..., pattern: str | None = ..., listener: ConsumerRebalanceListener | None = ...) -> None: + + def subscribe( + self, + topics: list[str] | tuple[str, ...] = ..., + pattern: str | None = ..., + listener: ConsumerRebalanceListener | None = ..., + ) -> None: """Subscribe to a list of topics, or a topic regex pattern. Partitions will be dynamically assigned via a group coordinator. @@ -666,7 +742,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): TypeError: if listener is not a :class:`.ConsumerRebalanceListener` """ ... - + def subscription(self) -> set[str]: """Get the current topics subscription. @@ -674,11 +750,11 @@ class AIOKafkaConsumer(Generic[KT, VT]): set(str): a set of topics """ ... - + def unsubscribe(self) -> None: """Unsubscribe from all topics and clear all assigned partitions.""" ... - + async def getone(self, *partitions: TopicPartition) -> ConsumerRecord[KT, VT]: """ Get one message from Kafka. @@ -713,8 +789,13 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - - async def getmany(self, *partitions: TopicPartition, timeout_ms: int = ..., max_records: int | None = ...) -> Dict[TopicPartition, List[ConsumerRecord[KT, VT]]]: + + async def getmany( + self, + *partitions: TopicPartition, + timeout_ms: int = ..., + max_records: int | None = ..., + ) -> Dict[TopicPartition, List[ConsumerRecord[KT, VT]]]: """Get messages from assigned topics / partitions. Prefetched messages are returned in batches by topic-partition. @@ -749,7 +830,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): """ ... - + def pause(self, *partitions: TopicPartition) -> None: """Suspend fetching from the requested partitions. @@ -764,7 +845,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): *partitions (list[TopicPartition]): Partitions to pause. """ ... - + def paused(self) -> set[TopicPartition]: """Get the partitions that were previously paused using :meth:`.pause`. @@ -773,7 +854,7 @@ class AIOKafkaConsumer(Generic[KT, VT]): set[TopicPartition]: partitions """ ... - + def resume(self, *partitions: TopicPartition) -> None: """Resume fetching from the specified (paused) partitions. @@ -781,10 +862,8 @@ class AIOKafkaConsumer(Generic[KT, VT]): *partitions (tuple[TopicPartition,...]): Partitions to resume. """ ... - - def __aiter__(self) -> AIOKafkaConsumer[KT, VT]: - ... - + + def __aiter__(self) -> AIOKafkaConsumer[KT, VT]: ... async def __anext__(self) -> ConsumerRecord[KT, VT]: """Asyncio iterator interface for consumer @@ -794,9 +873,8 @@ class AIOKafkaConsumer(Generic[KT, VT]): All other KafkaError exceptions will be logged and not raised """ ... - - async def __aenter__(self) -> AIOKafkaConsumer[KT, VT]: - ... - - async def __aexit__(self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None) -> None: - ... + + async def __aenter__(self) -> AIOKafkaConsumer[KT, VT]: ... + async def __aexit__( + self, exc_type: type[ET] | None, exc: ET | None, tb: TracebackType | None + ) -> None: ... diff --git a/aiokafka/errors.pyi b/aiokafka/errors.pyi index db48223f..b6d1dad9 100644 --- a/aiokafka/errors.pyi +++ b/aiokafka/errors.pyi @@ -1,139 +1,160 @@ from typing import Any, Type, TypeVar -__all__ = ["ConsumerStoppedError", "NoOffsetForPartitionError", "RecordTooLargeError", "ProducerClosed", "KafkaError", "IllegalStateError", "IllegalArgumentError", "NoBrokersAvailable", "NodeNotReadyError", "KafkaProtocolError", "CorrelationIdError", "Cancelled", "TooManyInFlightRequests", "StaleMetadata", "UnrecognizedBrokerVersion", "IncompatibleBrokerVersion", "CommitFailedError", "AuthenticationMethodNotSupported", "AuthenticationFailedError", "BrokerResponseError", "NoError", "UnknownError", "OffsetOutOfRangeError", "CorruptRecordException", "UnknownTopicOrPartitionError", "InvalidFetchRequestError", "LeaderNotAvailableError", "NotLeaderForPartitionError", "RequestTimedOutError", "BrokerNotAvailableError", "ReplicaNotAvailableError", "MessageSizeTooLargeError", "StaleControllerEpochError", "OffsetMetadataTooLargeError", "StaleLeaderEpochCodeError", "GroupLoadInProgressError", "GroupCoordinatorNotAvailableError", "NotCoordinatorForGroupError", "InvalidTopicError", "RecordListTooLargeError", "NotEnoughReplicasError", "NotEnoughReplicasAfterAppendError", "InvalidRequiredAcksError", "IllegalGenerationError", "InconsistentGroupProtocolError", "InvalidGroupIdError", "UnknownMemberIdError", "InvalidSessionTimeoutError", "RebalanceInProgressError", "InvalidCommitOffsetSizeError", "TopicAuthorizationFailedError", "GroupAuthorizationFailedError", "ClusterAuthorizationFailedError", "InvalidTimestampError", "UnsupportedSaslMechanismError", "IllegalSaslStateError", "UnsupportedVersionError", "TopicAlreadyExistsError", "InvalidPartitionsError", "InvalidReplicationFactorError", "InvalidReplicationAssignmentError", "InvalidConfigurationError", "NotControllerError", "InvalidRequestError", "UnsupportedForMessageFormatError", "PolicyViolationError", "KafkaUnavailableError", "KafkaTimeoutError", "KafkaConnectionError", "UnsupportedCodecError"] +__all__ = [ + "ConsumerStoppedError", + "NoOffsetForPartitionError", + "RecordTooLargeError", + "ProducerClosed", + "KafkaError", + "IllegalStateError", + "IllegalArgumentError", + "NoBrokersAvailable", + "NodeNotReadyError", + "KafkaProtocolError", + "CorrelationIdError", + "Cancelled", + "TooManyInFlightRequests", + "StaleMetadata", + "UnrecognizedBrokerVersion", + "IncompatibleBrokerVersion", + "CommitFailedError", + "AuthenticationMethodNotSupported", + "AuthenticationFailedError", + "BrokerResponseError", + "NoError", + "UnknownError", + "OffsetOutOfRangeError", + "CorruptRecordException", + "UnknownTopicOrPartitionError", + "InvalidFetchRequestError", + "LeaderNotAvailableError", + "NotLeaderForPartitionError", + "RequestTimedOutError", + "BrokerNotAvailableError", + "ReplicaNotAvailableError", + "MessageSizeTooLargeError", + "StaleControllerEpochError", + "OffsetMetadataTooLargeError", + "StaleLeaderEpochCodeError", + "GroupLoadInProgressError", + "GroupCoordinatorNotAvailableError", + "NotCoordinatorForGroupError", + "InvalidTopicError", + "RecordListTooLargeError", + "NotEnoughReplicasError", + "NotEnoughReplicasAfterAppendError", + "InvalidRequiredAcksError", + "IllegalGenerationError", + "InconsistentGroupProtocolError", + "InvalidGroupIdError", + "UnknownMemberIdError", + "InvalidSessionTimeoutError", + "RebalanceInProgressError", + "InvalidCommitOffsetSizeError", + "TopicAuthorizationFailedError", + "GroupAuthorizationFailedError", + "ClusterAuthorizationFailedError", + "InvalidTimestampError", + "UnsupportedSaslMechanismError", + "IllegalSaslStateError", + "UnsupportedVersionError", + "TopicAlreadyExistsError", + "InvalidPartitionsError", + "InvalidReplicationFactorError", + "InvalidReplicationAssignmentError", + "InvalidConfigurationError", + "NotControllerError", + "InvalidRequestError", + "UnsupportedForMessageFormatError", + "PolicyViolationError", + "KafkaUnavailableError", + "KafkaTimeoutError", + "KafkaConnectionError", + "UnsupportedCodecError", +] + class KafkaError(RuntimeError): retriable = ... invalid_metadata = ... - def __str__(self) -> str: - ... - - - -class IllegalStateError(KafkaError): - ... - - -class IllegalArgumentError(KafkaError): - ... + def __str__(self) -> str: ... +class IllegalStateError(KafkaError): ... +class IllegalArgumentError(KafkaError): ... class NoBrokersAvailable(KafkaError): retriable = ... invalid_metadata = ... - class NodeNotReadyError(KafkaError): retriable = ... - class KafkaProtocolError(KafkaError): retriable = ... - class CorrelationIdError(KafkaProtocolError): retriable = ... - class Cancelled(KafkaError): retriable = ... - class TooManyInFlightRequests(KafkaError): retriable = ... - class StaleMetadata(KafkaError): retriable = ... invalid_metadata = ... - class MetadataEmptyBrokerList(KafkaError): retriable = ... - -class UnrecognizedBrokerVersion(KafkaError): - ... - - -class IncompatibleBrokerVersion(KafkaError): - ... - +class UnrecognizedBrokerVersion(KafkaError): ... +class IncompatibleBrokerVersion(KafkaError): ... class CommitFailedError(KafkaError): - def __init__(self, *args: Any, **kwargs: Any) -> None: - ... - - - -class AuthenticationMethodNotSupported(KafkaError): - ... + def __init__(self, *args: Any, **kwargs: Any) -> None: ... +class AuthenticationMethodNotSupported(KafkaError): ... class AuthenticationFailedError(KafkaError): retriable = ... - -class KafkaUnavailableError(KafkaError): - ... - - -class KafkaTimeoutError(KafkaError): - ... - +class KafkaUnavailableError(KafkaError): ... +class KafkaTimeoutError(KafkaError): ... class KafkaConnectionError(KafkaError): retriable = ... invalid_metadata = ... - -class UnsupportedCodecError(KafkaError): - ... - - -class KafkaConfigurationError(KafkaError): - ... - - -class QuotaViolationError(KafkaError): - ... - +class UnsupportedCodecError(KafkaError): ... +class KafkaConfigurationError(KafkaError): ... +class QuotaViolationError(KafkaError): ... class ConsumerStoppedError(Exception): """Raised on `get*` methods of Consumer if it's cancelled, even pending ones. """ - ... + ... class IllegalOperation(Exception): """Raised if you try to execute an operation, that is not available with current configuration. For example trying to commit if no group_id was given. """ - ... - - -class NoOffsetForPartitionError(KafkaError): - ... - - -class RecordTooLargeError(KafkaError): - ... - -class ProducerClosed(KafkaError): ... +class NoOffsetForPartitionError(KafkaError): ... +class RecordTooLargeError(KafkaError): ... +class ProducerClosed(KafkaError): ... class ProducerFenced(KafkaError): """Another producer with the same transactional ID went online. NOTE: As it seems this will be raised by Broker if transaction timeout occurred also. """ - def __init__(self, msg: str = ...) -> None: - ... - + def __init__(self, msg: str = ...) -> None: ... class BrokerResponseError(KafkaError): errno: int @@ -142,34 +163,29 @@ class BrokerResponseError(KafkaError): def __str__(self) -> str: """Add errno to standard KafkaError str""" ... - - class NoError(BrokerResponseError): errno = ... message = ... description = ... - class UnknownError(BrokerResponseError): errno = ... message = ... description = ... - class OffsetOutOfRangeError(BrokerResponseError): errno = ... message = ... description = ... - class CorruptRecordException(BrokerResponseError): errno = ... message = ... description = ... - InvalidMessageError = CorruptRecordException + class UnknownTopicOrPartitionError(BrokerResponseError): errno = ... message = ... @@ -177,13 +193,11 @@ class UnknownTopicOrPartitionError(BrokerResponseError): retriable = ... invalid_metadata = ... - class InvalidFetchRequestError(BrokerResponseError): errno = ... message = ... description = ... - class LeaderNotAvailableError(BrokerResponseError): errno = ... message = ... @@ -191,7 +205,6 @@ class LeaderNotAvailableError(BrokerResponseError): retriable = ... invalid_metadata = ... - class NotLeaderForPartitionError(BrokerResponseError): errno = ... message = ... @@ -199,420 +212,354 @@ class NotLeaderForPartitionError(BrokerResponseError): retriable = ... invalid_metadata = ... - class RequestTimedOutError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - class BrokerNotAvailableError(BrokerResponseError): errno = ... message = ... description = ... - class ReplicaNotAvailableError(BrokerResponseError): errno = ... message = ... description = ... - class MessageSizeTooLargeError(BrokerResponseError): errno = ... message = ... description = ... - class StaleControllerEpochError(BrokerResponseError): errno = ... message = ... description = ... - class OffsetMetadataTooLargeError(BrokerResponseError): errno = ... message = ... description = ... - class StaleLeaderEpochCodeError(BrokerResponseError): errno = ... message = ... - class GroupLoadInProgressError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - CoordinatorLoadInProgressError = GroupLoadInProgressError + class GroupCoordinatorNotAvailableError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - CoordinatorNotAvailableError = GroupCoordinatorNotAvailableError + class NotCoordinatorForGroupError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - NotCoordinatorError = NotCoordinatorForGroupError + class InvalidTopicError(BrokerResponseError): errno = ... message = ... description = ... - class RecordListTooLargeError(BrokerResponseError): errno = ... message = ... description = ... - class NotEnoughReplicasError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - class NotEnoughReplicasAfterAppendError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - class InvalidRequiredAcksError(BrokerResponseError): errno = ... message = ... description = ... - class IllegalGenerationError(BrokerResponseError): errno = ... message = ... description = ... - class InconsistentGroupProtocolError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidGroupIdError(BrokerResponseError): errno = ... message = ... description = ... - class UnknownMemberIdError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidSessionTimeoutError(BrokerResponseError): errno = ... message = ... description = ... - class RebalanceInProgressError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidCommitOffsetSizeError(BrokerResponseError): errno = ... message = ... description = ... - class TopicAuthorizationFailedError(BrokerResponseError): errno = ... message = ... description = ... - class GroupAuthorizationFailedError(BrokerResponseError): errno = ... message = ... description = ... - class ClusterAuthorizationFailedError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidTimestampError(BrokerResponseError): errno = ... message = ... description = ... - class UnsupportedSaslMechanismError(BrokerResponseError): errno = ... message = ... description = ... - class IllegalSaslStateError(BrokerResponseError): errno = ... message = ... description = ... - class UnsupportedVersionError(BrokerResponseError): errno = ... message = ... description = ... - class TopicAlreadyExistsError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidPartitionsError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidReplicationFactorError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidReplicationAssignmentError(BrokerResponseError): errno = ... message = ... description = ... - class InvalidConfigurationError(BrokerResponseError): errno = ... message = ... description = ... - class NotControllerError(BrokerResponseError): errno = ... message = ... description = ... retriable = ... - class InvalidRequestError(BrokerResponseError): errno = ... message = ... description = ... - class UnsupportedForMessageFormatError(BrokerResponseError): errno = ... message = ... description = ... - class PolicyViolationError(BrokerResponseError): errno = ... message = ... description = ... - class OutOfOrderSequenceNumber(BrokerResponseError): errno = ... message = ... description = ... - class DuplicateSequenceNumber(BrokerResponseError): errno = ... message = ... description = ... - class InvalidProducerEpoch(BrokerResponseError): errno = ... message = ... description = ... - class InvalidTxnState(BrokerResponseError): errno = ... message = ... description = ... - class InvalidProducerIdMapping(BrokerResponseError): errno = ... message = ... description = ... - class InvalidTransactionTimeout(BrokerResponseError): errno = ... message = ... description = ... - class ConcurrentTransactions(BrokerResponseError): errno = ... message = ... description = ... - class TransactionCoordinatorFenced(BrokerResponseError): errno = ... message = ... description = ... - class TransactionalIdAuthorizationFailed(BrokerResponseError): errno = ... message = ... description = ... - class SecurityDisabled(BrokerResponseError): errno = ... message = ... description = ... - class OperationNotAttempted(BrokerResponseError): errno = ... message = ... description = ... - class KafkaStorageError(BrokerResponseError): errno = ... message = ... description = ... - class LogDirNotFound(BrokerResponseError): errno = ... message = ... description = ... - class SaslAuthenticationFailed(BrokerResponseError): errno = ... message = ... description = ... - class UnknownProducerId(BrokerResponseError): errno = ... message = ... description = ... - class ReassignmentInProgress(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenAuthDisabled(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenNotFound(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenOwnerMismatch(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenRequestNotAllowed(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenAuthorizationFailed(BrokerResponseError): errno = ... message = ... description = ... - class DelegationTokenExpired(BrokerResponseError): errno = ... message = ... description = ... - class InvalidPrincipalType(BrokerResponseError): errno = ... message = ... description = ... - class NonEmptyGroup(BrokerResponseError): errno = ... message = ... description = ... - class GroupIdNotFound(BrokerResponseError): errno = ... message = ... description = ... - class FetchSessionIdNotFound(BrokerResponseError): errno = ... message = ... description = ... - class InvalidFetchSessionEpoch(BrokerResponseError): errno = ... message = ... description = ... - class ListenerNotFound(BrokerResponseError): errno = ... message = ... description = ... - class MemberIdRequired(BrokerResponseError): errno = ... message = ... description = ... - _T = TypeVar("_T", bound=type) kafka_errors = ... -def for_code(error_code: int) -> Type[BrokerResponseError]: - ... +def for_code(error_code: int) -> Type[BrokerResponseError]: ... diff --git a/aiokafka/structs.pyi b/aiokafka/structs.pyi index 82010ec5..c06f3b3f 100644 --- a/aiokafka/structs.pyi +++ b/aiokafka/structs.pyi @@ -2,25 +2,34 @@ from dataclasses import dataclass from typing import Generic, List, NamedTuple, Optional, Sequence, Tuple, TypeVar from aiokafka.errors import KafkaError -__all__ = ["OffsetAndMetadata", "TopicPartition", "RecordMetadata", "ConsumerRecord", "BrokerMetadata", "PartitionMetadata"] +__all__ = [ + "OffsetAndMetadata", + "TopicPartition", + "RecordMetadata", + "ConsumerRecord", + "BrokerMetadata", + "PartitionMetadata", +] + class TopicPartition(NamedTuple): """A topic and partition tuple""" + topic: str partition: int ... - class BrokerMetadata(NamedTuple): """A Kafka broker metadata used by admin tools""" + nodeId: int | str host: str port: int rack: Optional[str] ... - class PartitionMetadata(NamedTuple): """A topic partition metadata describing the state in the MetadataResponse""" + topic: str partition: int leader: int @@ -29,7 +38,6 @@ class PartitionMetadata(NamedTuple): error: Optional[KafkaError] ... - class OffsetAndMetadata(NamedTuple): """The Kafka offset commit API @@ -38,13 +46,14 @@ class OffsetAndMetadata(NamedTuple): (for example) to store information about which node made the commit, what time the commit was made, etc. """ + offset: int metadata: str ... - class RecordMetadata(NamedTuple): """Returned when a :class:`~.AIOKafkaProducer` sends a message""" + topic: str partition: int topic_partition: TopicPartition @@ -54,9 +63,9 @@ class RecordMetadata(NamedTuple): log_start_offset: Optional[int] ... - KT = TypeVar("KT", covariant=True) VT = TypeVar("VT", covariant=True) + @dataclass class ConsumerRecord(Generic[KT, VT]): topic: str @@ -72,10 +81,7 @@ class ConsumerRecord(Generic[KT, VT]): headers: Sequence[Tuple[str, bytes]] ... - class OffsetAndTimestamp(NamedTuple): offset: int timestamp: Optional[int] ... - -