From 7662b0e30a8341c75074d5b7188b968caef84dd4 Mon Sep 17 00:00:00 2001 From: Jack Leow Date: Sat, 16 Nov 2024 11:10:50 -0800 Subject: [PATCH] Recommendations from ruff, updates to reflect we no longer support Python 3.8. --- aiokafka/abc.pyi | 2 +- aiokafka/cluster.pyi | 43 ++++--------- aiokafka/consumer/consumer.pyi | 33 +--------- aiokafka/coordinator/assignors/abstract.pyi | 5 +- aiokafka/coordinator/protocol.pyi | 12 ++-- aiokafka/errors.pyi | 7 +-- aiokafka/producer/producer.pyi | 14 ++--- aiokafka/protocol/api.pyi | 18 +++--- aiokafka/protocol/commit.pyi | 1 - aiokafka/protocol/metadata.pyi | 1 - aiokafka/protocol/struct.pyi | 5 +- aiokafka/protocol/types.pyi | 70 +++++++++------------ aiokafka/structs.pyi | 33 +++++----- 13 files changed, 88 insertions(+), 156 deletions(-) diff --git a/aiokafka/abc.pyi b/aiokafka/abc.pyi index 6616a91a..40246041 100644 --- a/aiokafka/abc.pyi +++ b/aiokafka/abc.pyi @@ -1,4 +1,5 @@ import abc + from aiokafka.structs import TopicPartition class ConsumerRebalanceListener(abc.ABC): @@ -134,6 +135,5 @@ class AbstractTokenProvider(abc.ABC): This feature is only available in Kafka >= 2.1.0. """ - ... __all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"] diff --git a/aiokafka/cluster.pyi b/aiokafka/cluster.pyi index 2c67e97f..ae3ecef4 100644 --- a/aiokafka/cluster.pyi +++ b/aiokafka/cluster.pyi @@ -1,5 +1,7 @@ +from collections.abc import Sequence from concurrent.futures import Future -from typing import Any, Callable, Optional, Sequence, Set, TypedDict, Union +from typing import Any, Callable, TypedDict + from aiokafka.client import CoordinationType from aiokafka.protocol.commit import ( GroupCoordinatorResponse_v0, @@ -16,23 +18,18 @@ from aiokafka.protocol.metadata import ( from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition log = ... -MetadataResponse = Union[ - MetadataResponse_v0, - MetadataResponse_v1, - MetadataResponse_v2, - MetadataResponse_v3, - MetadataResponse_v4, - MetadataResponse_v5, -] -GroupCoordinatorResponse = Union[ - GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1 -] +MetadataResponse = MetadataResponse_v0 |\ + MetadataResponse_v1 |\ + MetadataResponse_v2 |\ + MetadataResponse_v3 |\ + MetadataResponse_v4 |\ + MetadataResponse_v5 +GroupCoordinatorResponse = GroupCoordinatorResponse_v0 | GroupCoordinatorResponse_v1 class ClusterConfig(TypedDict): retry_backoff_ms: int metadata_max_age_ms: int bootstrap_servers: str | list[str] - ... class ClusterMetadata: """ @@ -65,7 +62,6 @@ class ClusterMetadata: Returns: set: {BrokerMetadata, ...} """ - ... def broker_metadata(self, broker_id: str) -> BrokerMetadata | None: """Get BrokerMetadata @@ -76,9 +72,8 @@ class ClusterMetadata: Returns: BrokerMetadata or None if not found """ - ... - def partitions_for_topic(self, topic: str) -> Optional[Set[int]]: + def partitions_for_topic(self, topic: str) -> set[int] | None: """Return set of all partitions for topic (whether available or not) Arguments: @@ -87,9 +82,8 @@ class ClusterMetadata: Returns: set: {partition (int), ...} """ - ... - def available_partitions_for_topic(self, topic: str) -> Optional[Set[int]]: + def available_partitions_for_topic(self, topic: str) -> set[int] | None: """Return set of partitions with known leaders Arguments: @@ -99,11 +93,9 @@ class ClusterMetadata: set: {partition (int), ...} None if topic not found. """ - ... def leader_for_partition(self, partition: PartitionMetadata) -> int | None: """Return node_id of leader, -1 unavailable, None if unknown.""" - ... def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None: """Return TopicPartitions for which the broker is a leader. @@ -115,7 +107,6 @@ class ClusterMetadata: set: {TopicPartition, ...} None if the broker either has no partitions or does not exist. """ - ... def coordinator_for_group(self, group: str) -> int | str | None: """Return node_id of group coordinator. @@ -127,7 +118,6 @@ class ClusterMetadata: int: node_id for group coordinator None if the group does not exist. """ - ... def request_update(self) -> Future[ClusterMetadata]: """Flags metadata for update, return Future() @@ -138,7 +128,6 @@ class ClusterMetadata: Returns: Future (value will be the cluster object after update) """ - ... def topics(self, exclude_internal_topics: bool = ...) -> set[str]: """Get set of known topics. @@ -152,11 +141,9 @@ class ClusterMetadata: Returns: set: {topic (str), ...} """ - ... def failed_update(self, exception: BaseException) -> None: """Update cluster state given a failed MetadataRequest.""" - ... def update_metadata(self, metadata: MetadataResponse) -> None: """Update cluster state given a MetadataResponse. @@ -166,15 +153,12 @@ class ClusterMetadata: Returns: None """ - ... def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: """Add a callback function to be called on each metadata update""" - ... def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None: """Remove a previously added listener callback""" - ... def add_group_coordinator( self, group: str, response: GroupCoordinatorResponse @@ -188,13 +172,11 @@ class ClusterMetadata: Returns: string: coordinator node_id if metadata is updated, None on error """ - ... def with_partitions( self, partitions_to_add: Sequence[PartitionMetadata] ) -> ClusterMetadata: """Returns a copy of cluster metadata with partitions added""" - ... def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ... def add_coordinator( @@ -210,6 +192,5 @@ class ClusterMetadata: a new one was elected for the same purpose (For example group coordinator for group X). """ - ... def __str__(self) -> str: ... diff --git a/aiokafka/consumer/consumer.pyi b/aiokafka/consumer/consumer.pyi index fb32daaf..5011e9af 100644 --- a/aiokafka/consumer/consumer.pyi +++ b/aiokafka/consumer/consumer.pyi @@ -1,7 +1,8 @@ import asyncio from ssl import SSLContext from types import ModuleType, TracebackType -from typing import Callable, Dict, Generic, List, Literal, TypeVar +from typing import Callable, Generic, Literal, TypeVar + from aiokafka.abc import AbstractTokenProvider, ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from aiokafka.structs import ( @@ -280,7 +281,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): * Wait for possible topic autocreation * Join group if ``group_id`` provided """ - ... def assign(self, partitions: list[TopicPartition]) -> None: """Manually assign a list of :class:`.TopicPartition` to this consumer. @@ -304,7 +304,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): **no rebalance operation triggered** when group membership or cluster and topic metadata change. """ - ... def assignment(self) -> set[TopicPartition]: """Get the set of partitions currently assigned to this consumer. @@ -321,7 +320,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): set(TopicPartition): the set of partitions currently assigned to this consumer """ - ... async def stop(self) -> None: """Close the consumer, while waiting for finalizers: @@ -329,7 +327,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): * Commit last consumed message if autocommit enabled * Leave group if used Consumer Groups """ - ... async def commit( self, @@ -390,7 +387,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. _kafka-python: https://github.com/dpkp/kafka-python """ - ... async def committed(self, partition: TopicPartition) -> int | None: """Get the last committed offset for the given partition. (whether the @@ -412,7 +408,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Raises: IllegalOperation: If used with ``group_id == None`` """ - ... async def topics(self) -> set[str]: """Get all topics the user is authorized to view. @@ -420,7 +415,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: set: topics """ - ... def partitions_for_topic(self, topic: str) -> set[int] | None: """Get metadata about the partitions for a given topic. @@ -434,7 +428,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: set: partition ids """ - ... async def position(self, partition: TopicPartition) -> int: """Get the offset of the *next record* that will be fetched (if a @@ -455,7 +448,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned partition """ - ... def highwater(self, partition: TopicPartition) -> int | None: """Last known highwater offset for a partition. @@ -475,7 +467,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: int or None: offset if available """ - ... def last_stable_offset(self, partition: TopicPartition) -> int | None: """Returns the Last Stable Offset of a topic. It will be the last @@ -493,7 +484,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: int or None: offset if available """ - ... def last_poll_timestamp(self, partition: TopicPartition) -> int | None: """Returns the timestamp of the last poll of this partition (in ms). @@ -509,7 +499,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: int or None: timestamp if available """ - ... def seek(self, partition: TopicPartition, offset: int) -> None: """Manually specify the fetch offset for a :class:`.TopicPartition`. @@ -538,7 +527,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): :exc:`~aiokafka.errors.IllegalStateError` and :exc:`ValueError` in respective cases. """ - ... async def seek_to_beginning(self, *partitions: TopicPartition) -> bool: """Seek to the oldest available offset for partitions. @@ -554,7 +542,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. versionadded:: 0.3.0 """ - ... async def seek_to_end(self, *partitions: TopicPartition) -> bool: """Seek to the most recent available offset for partitions. @@ -570,7 +557,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. versionadded:: 0.3.0 """ - ... async def seek_to_committed( self, *partitions: TopicPartition @@ -595,7 +581,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned partition """ - ... async def offsets_for_times( self, timestamps: dict[TopicPartition, int] @@ -634,7 +619,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. versionadded:: 0.3.0 """ - ... async def beginning_offsets( self, partitions: list[TopicPartition] @@ -663,7 +647,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. versionadded:: 0.3.0 """ - ... async def end_offsets( self, partitions: list[TopicPartition] @@ -694,7 +677,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): .. versionadded:: 0.3.0 """ - ... def subscribe( self, @@ -741,7 +723,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): are provided TypeError: if listener is not a :class:`.ConsumerRebalanceListener` """ - ... def subscription(self) -> set[str]: """Get the current topics subscription. @@ -749,11 +730,9 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: set(str): a set of topics """ - ... def unsubscribe(self) -> None: """Unsubscribe from all topics and clear all assigned partitions.""" - ... async def getone(self, *partitions: TopicPartition) -> ConsumerRecord[KT, VT]: """ @@ -788,14 +767,13 @@ class AIOKafkaConsumer(Generic[KT, VT]): print(message.offset, message.key, message.value) """ - ... async def getmany( self, *partitions: TopicPartition, timeout_ms: int = ..., max_records: int | None = ..., - ) -> Dict[TopicPartition, List[ConsumerRecord[KT, VT]]]: + ) -> dict[TopicPartition, list[ConsumerRecord[KT, VT]]]: """Get messages from assigned topics / partitions. Prefetched messages are returned in batches by topic-partition. @@ -829,7 +807,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): print(message.offset, message.key, message.value) """ - ... def pause(self, *partitions: TopicPartition) -> None: """Suspend fetching from the requested partitions. @@ -844,7 +821,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Arguments: *partitions (list[TopicPartition]): Partitions to pause. """ - ... def paused(self) -> set[TopicPartition]: """Get the partitions that were previously paused using @@ -853,7 +829,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Returns: set[TopicPartition]: partitions """ - ... def resume(self, *partitions: TopicPartition) -> None: """Resume fetching from the specified (paused) partitions. @@ -861,7 +836,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): Arguments: *partitions (tuple[TopicPartition,...]): Partitions to resume. """ - ... def __aiter__(self) -> AIOKafkaConsumer[KT, VT]: ... async def __anext__(self) -> ConsumerRecord[KT, VT]: @@ -872,7 +846,6 @@ class AIOKafkaConsumer(Generic[KT, VT]): exceptions can be raised in iterator. All other KafkaError exceptions will be logged and not raised """ - ... async def __aenter__(self) -> AIOKafkaConsumer[KT, VT]: ... async def __aexit__( diff --git a/aiokafka/coordinator/assignors/abstract.pyi b/aiokafka/coordinator/assignors/abstract.pyi index b6b624c8..3ee73f4d 100644 --- a/aiokafka/coordinator/assignors/abstract.pyi +++ b/aiokafka/coordinator/assignors/abstract.pyi @@ -1,5 +1,6 @@ import abc -from typing import Dict, Iterable, Mapping +from collections.abc import Iterable, Mapping + from aiokafka.cluster import ClusterMetadata from aiokafka.coordinator.protocol import ( ConsumerProtocolMemberAssignment, @@ -25,7 +26,7 @@ class AbstractPartitionAssignor(abc.ABC): cls, cluster: ClusterMetadata, members: Mapping[str, ConsumerProtocolMemberMetadata], - ) -> Dict[str, ConsumerProtocolMemberAssignment]: + ) -> dict[str, ConsumerProtocolMemberAssignment]: """Perform group assignment given cluster metadata and member subscriptions Arguments: diff --git a/aiokafka/coordinator/protocol.pyi b/aiokafka/coordinator/protocol.pyi index 88f3e83c..2f8e4492 100644 --- a/aiokafka/coordinator/protocol.pyi +++ b/aiokafka/coordinator/protocol.pyi @@ -1,24 +1,24 @@ -from typing import List, NamedTuple +from typing import NamedTuple + from aiokafka.protocol.struct import Struct from aiokafka.structs import TopicPartition class ConsumerProtocolMemberMetadata(Struct): version: int - subscription: List[str] + subscription: list[str] user_data: bytes SCHEMA = ... class ConsumerProtocolMemberAssignment(Struct): class Assignment(NamedTuple): topic: str - partitions: List[int] - ... + partitions: list[int] version: int - assignment: List[Assignment] + assignment: list[Assignment] user_data: bytes SCHEMA = ... - def partitions(self) -> List[TopicPartition]: ... + def partitions(self) -> list[TopicPartition]: ... class ConsumerProtocol: PROTOCOL_TYPE = ... diff --git a/aiokafka/errors.pyi b/aiokafka/errors.pyi index b6d1dad9..a41ad829 100644 --- a/aiokafka/errors.pyi +++ b/aiokafka/errors.pyi @@ -1,4 +1,4 @@ -from typing import Any, Type, TypeVar +from typing import Any, TypeVar __all__ = [ "ConsumerStoppedError", @@ -134,7 +134,6 @@ class ConsumerStoppedError(Exception): ones. """ - ... class IllegalOperation(Exception): """Raised if you try to execute an operation, that is not available with @@ -142,7 +141,6 @@ class IllegalOperation(Exception): given. """ - ... class NoOffsetForPartitionError(KafkaError): ... class RecordTooLargeError(KafkaError): ... @@ -162,7 +160,6 @@ class BrokerResponseError(KafkaError): description: str = ... def __str__(self) -> str: """Add errno to standard KafkaError str""" - ... class NoError(BrokerResponseError): errno = ... @@ -562,4 +559,4 @@ class MemberIdRequired(BrokerResponseError): _T = TypeVar("_T", bound=type) kafka_errors = ... -def for_code(error_code: int) -> Type[BrokerResponseError]: ... +def for_code(error_code: int) -> type[BrokerResponseError]: ... diff --git a/aiokafka/producer/producer.pyi b/aiokafka/producer/producer.pyi index c699e0b8..6a392ab7 100644 --- a/aiokafka/producer/producer.pyi +++ b/aiokafka/producer/producer.pyi @@ -1,9 +1,12 @@ import asyncio +from collections.abc import Iterable from ssl import SSLContext from types import ModuleType, TracebackType -from typing import Callable, Generic, Iterable, Literal, TypeVar +from typing import Callable, Generic, Literal, TypeVar + from aiokafka.abc import AbstractTokenProvider from aiokafka.structs import OffsetAndMetadata, RecordMetadata, TopicPartition + from .message_accumulator import BatchBuilder log = ... @@ -220,19 +223,15 @@ class AIOKafkaProducer(Generic[KT, VT]): def __del__(self, _warnings: ModuleType = ...) -> None: ... async def start(self) -> None: """Connect to Kafka cluster and check server version""" - ... async def flush(self) -> None: """Wait until all batches are Delivered and futures resolved""" - ... async def stop(self) -> None: """Flush all pending data and close all connections to kafka cluster""" - ... async def partitions_for(self, topic: str) -> set[int]: """Returns set of all known partitions for the topic.""" - ... async def send( self, @@ -286,7 +285,6 @@ class AIOKafkaProducer(Generic[KT, VT]): from being sent, but cancelling the :meth:`send` coroutine itself **will**. """ - ... async def send_and_wait( self, @@ -298,7 +296,6 @@ class AIOKafkaProducer(Generic[KT, VT]): headers: Iterable[tuple[str, bytes]] | None = ..., ) -> RecordMetadata: """Publish a message to a topic and wait the result""" - ... def create_batch(self) -> BatchBuilder: """Create and return an empty :class:`.BatchBuilder`. @@ -308,7 +305,6 @@ class AIOKafkaProducer(Generic[KT, VT]): Returns: BatchBuilder: empty batch to be filled and submitted by the caller. """ - ... async def send_batch( self, batch: BatchBuilder, topic: str, *, partition: int @@ -324,14 +320,12 @@ class AIOKafkaProducer(Generic[KT, VT]): asyncio.Future: object that will be set when the batch is delivered. """ - ... async def begin_transaction(self) -> None: ... async def commit_transaction(self) -> None: ... async def abort_transaction(self) -> None: ... def transaction(self) -> TransactionContext: """Start a transaction context""" - ... async def send_offsets_to_transaction( self, diff --git a/aiokafka/protocol/api.pyi b/aiokafka/protocol/api.pyi index 8095924e..ebf9c608 100644 --- a/aiokafka/protocol/api.pyi +++ b/aiokafka/protocol/api.pyi @@ -1,6 +1,7 @@ import abc from io import BytesIO -from typing import Any, ClassVar, Dict, Optional, Type, Union +from typing import Any, ClassVar + from .struct import Struct from .types import Schema @@ -17,7 +18,7 @@ class RequestHeader_v1(Struct): request: Request, correlation_id: int = ..., client_id: str = ..., - tags: Optional[Dict[int, bytes]] = ..., + tags: dict[int, bytes] | None = ..., ) -> None: ... class ResponseHeader_v0(Struct): @@ -42,7 +43,7 @@ class Request(Struct, metaclass=abc.ABCMeta): @property @abc.abstractmethod - def RESPONSE_TYPE(self) -> Type[Response]: + def RESPONSE_TYPE(self) -> type[Response]: """The Response class associated with the api request""" ... @@ -54,15 +55,14 @@ class Request(Struct, metaclass=abc.ABCMeta): def expect_response(self) -> bool: """Override this method if an api request does not always generate a response""" - ... - def to_object(self) -> Dict[str, Any]: ... + def to_object(self) -> dict[str, Any]: ... def build_request_header( self, correlation_id: int, client_id: str - ) -> Union[RequestHeader_v0, RequestHeader_v1]: ... + ) -> RequestHeader_v0 | RequestHeader_v1: ... def parse_response_header( - self, read_buffer: Union[BytesIO, bytes] - ) -> Union[ResponseHeader_v0, ResponseHeader_v1]: ... + self, read_buffer: BytesIO | bytes + ) -> ResponseHeader_v0 | ResponseHeader_v1: ... class Response(Struct, metaclass=abc.ABCMeta): @property @@ -83,4 +83,4 @@ class Response(Struct, metaclass=abc.ABCMeta): """An instance of Schema() representing the response structure""" ... - def to_object(self) -> Dict[str, Any]: ... + def to_object(self) -> dict[str, Any]: ... diff --git a/aiokafka/protocol/commit.pyi b/aiokafka/protocol/commit.pyi index 39535980..6135e8ec 100644 --- a/aiokafka/protocol/commit.pyi +++ b/aiokafka/protocol/commit.pyi @@ -19,7 +19,6 @@ class OffsetCommitRequest_v1(Request): pass class OffsetCommitRequest_v2(Request): - pass DEFAULT_GENERATION_ID = ... DEFAULT_RETENTION_TIME = ... diff --git a/aiokafka/protocol/metadata.pyi b/aiokafka/protocol/metadata.pyi index 1f9f0a16..dda24c38 100644 --- a/aiokafka/protocol/metadata.pyi +++ b/aiokafka/protocol/metadata.pyi @@ -39,7 +39,6 @@ class MetadataRequest_v5(Request): An additional field for offline_replicas has been added to the v5 metadata response """ - pass MetadataRequest = ... MetadataResponse = ... diff --git a/aiokafka/protocol/struct.pyi b/aiokafka/protocol/struct.pyi index 95f2ac03..3049d9a9 100644 --- a/aiokafka/protocol/struct.pyi +++ b/aiokafka/protocol/struct.pyi @@ -1,5 +1,6 @@ from io import BytesIO -from typing import Any, ClassVar, Union +from typing import Any, ClassVar + from typing_extensions import Self class Struct: @@ -7,7 +8,7 @@ class Struct: def __init__(self, *args: Any, **kwargs: Any) -> None: ... def encode(self) -> bytes: ... @classmethod - def decode(cls, data: Union[BytesIO, bytes]) -> Self: ... + def decode(cls, data: BytesIO | bytes) -> Self: ... def get_item(self, name: str) -> Any: ... def __repr__(self) -> str: ... def __eq__(self, other: object) -> bool: ... diff --git a/aiokafka/protocol/types.pyi b/aiokafka/protocol/types.pyi index 5e1d8a98..93d7ea5b 100644 --- a/aiokafka/protocol/types.pyi +++ b/aiokafka/protocol/types.pyi @@ -1,21 +1,13 @@ +from collections.abc import Sequence from io import BytesIO -from typing import ( - Any, - Dict, - List, - Optional, - Sequence, - Tuple, - Type, - TypeVar, - Union, - overload, -) +from typing import Any, TypeVar, overload + from typing_extensions import TypeAlias + from .abstract import AbstractType T = TypeVar("T") -ValueT: TypeAlias = Union[Type[AbstractType[Any]], "String", "Array", "Schema"] +ValueT: TypeAlias = type[AbstractType[Any]] | "String" | "Array" | "Schema" class Int8(AbstractType[int]): _pack = ... @@ -67,18 +59,18 @@ class Float64(AbstractType[float]): class String: def __init__(self, encoding: str = ...) -> None: ... - def encode(self, value: Optional[str]) -> bytes: ... - def decode(self, data: BytesIO) -> Optional[str]: ... + def encode(self, value: str | None) -> bytes: ... + def decode(self, data: BytesIO) -> str| None: ... @classmethod def repr(cls, value: str) -> str: ... -class Bytes(AbstractType[Optional[bytes]]): +class Bytes(AbstractType[bytes | None]): @classmethod - def encode(cls, value: Optional[bytes]) -> bytes: ... + def encode(cls, value: bytes | None) -> bytes: ... @classmethod - def decode(cls, data: BytesIO) -> Optional[bytes]: ... + def decode(cls, data: BytesIO) -> bytes | None: ... @classmethod - def repr(cls, value: Optional[bytes]) -> str: ... + def repr(cls, value: bytes | None) -> str: ... class Boolean(AbstractType[bool]): _pack = ... @@ -89,13 +81,13 @@ class Boolean(AbstractType[bool]): def decode(cls, data: BytesIO) -> bool: ... class Schema: - names: Tuple[str, ...] - fields: Tuple[ValueT, ...] - def __init__(self, *fields: Tuple[str, ValueT]) -> None: ... + names: tuple[str, ...] + fields: tuple[ValueT, ...] + def __init__(self, *fields: tuple[str, ValueT]) -> None: ... def encode(self, item: Sequence[Any]) -> bytes: ... def decode( self, data: BytesIO - ) -> Tuple[Union[Any, str, None, List[Union[Any, Tuple[Any, ...]]]], ...]: ... + ) -> tuple[Any | str | None | list[Any | tuple[Any, ...]], ...]: ... def __len__(self) -> int: ... def repr(self, value: Any) -> str: ... @@ -105,16 +97,16 @@ class Array: def __init__(self, array_of_0: ValueT) -> None: ... @overload def __init__( - self, array_of_0: Tuple[str, ValueT], *array_of: Tuple[str, ValueT] + self, array_of_0: tuple[str, ValueT], *array_of: tuple[str, ValueT] ) -> None: ... def __init__( self, - array_of_0: Union[ValueT, Tuple[str, ValueT]], - *array_of: Tuple[str, ValueT], + array_of_0: ValueT | tuple[str, ValueT], + *array_of: tuple[str, ValueT], ) -> None: ... - def encode(self, items: Optional[Sequence[Any]]) -> bytes: ... - def decode(self, data: BytesIO) -> Optional[List[Union[Any, Tuple[Any, ...]]]]: ... - def repr(self, list_of_items: Optional[Sequence[Any]]) -> str: ... + def encode(self, items: Sequence[Any] | None) -> bytes: ... + def decode(self, data: BytesIO) -> list[Any | tuple[Any, ...]] | None: ... + def repr(self, list_of_items: Sequence[Any] | None) -> str: ... class UnsignedVarInt32(AbstractType[int]): @classmethod @@ -135,21 +127,21 @@ class VarInt64(AbstractType[int]): def encode(cls, value: int) -> bytes: ... class CompactString(String): - def decode(self, data: BytesIO) -> Optional[str]: ... - def encode(self, value: Optional[str]) -> bytes: ... + def decode(self, data: BytesIO) -> str | None: ... + def encode(self, value: str | None) -> bytes: ... -class TaggedFields(AbstractType[Dict[int, bytes]]): +class TaggedFields(AbstractType[dict[int, bytes]]): @classmethod - def decode(cls, data: BytesIO) -> Dict[int, bytes]: ... + def decode(cls, data: BytesIO) -> dict[int, bytes]: ... @classmethod - def encode(cls, value: Dict[int, bytes]) -> bytes: ... + def encode(cls, value: dict[int, bytes]) -> bytes: ... -class CompactBytes(AbstractType[Optional[bytes]]): +class CompactBytes(AbstractType[bytes | None]): @classmethod - def decode(cls, data: BytesIO) -> Optional[bytes]: ... + def decode(cls, data: BytesIO) -> bytes | None: ... @classmethod - def encode(cls, value: Optional[bytes]) -> bytes: ... + def encode(cls, value: bytes | None) -> bytes: ... class CompactArray(Array): - def encode(self, items: Optional[Sequence[Any]]) -> bytes: ... - def decode(self, data: BytesIO) -> Optional[List[Union[Any, Tuple[Any, ...]]]]: ... + def encode(self, items: Sequence[Any] | None) -> bytes: ... + def decode(self, data: BytesIO) -> list[Any | tuple[Any, ...]] | None: ... diff --git a/aiokafka/structs.pyi b/aiokafka/structs.pyi index c06f3b3f..00adfb67 100644 --- a/aiokafka/structs.pyi +++ b/aiokafka/structs.pyi @@ -1,5 +1,7 @@ +from collections.abc import Sequence from dataclasses import dataclass -from typing import Generic, List, NamedTuple, Optional, Sequence, Tuple, TypeVar +from typing import Generic, NamedTuple, TypeVar + from aiokafka.errors import KafkaError __all__ = [ @@ -16,7 +18,6 @@ class TopicPartition(NamedTuple): topic: str partition: int - ... class BrokerMetadata(NamedTuple): """A Kafka broker metadata used by admin tools""" @@ -24,8 +25,7 @@ class BrokerMetadata(NamedTuple): nodeId: int | str host: str port: int - rack: Optional[str] - ... + rack: str | None class PartitionMetadata(NamedTuple): """A topic partition metadata describing the state in the MetadataResponse""" @@ -33,10 +33,9 @@ class PartitionMetadata(NamedTuple): topic: str partition: int leader: int - replicas: List[int] - isr: List[int] - error: Optional[KafkaError] - ... + replicas: list[int] + isr: list[int] + error: KafkaError | None class OffsetAndMetadata(NamedTuple): """The Kafka offset commit API @@ -49,7 +48,6 @@ class OffsetAndMetadata(NamedTuple): offset: int metadata: str - ... class RecordMetadata(NamedTuple): """Returned when a :class:`~.AIOKafkaProducer` sends a message""" @@ -58,10 +56,9 @@ class RecordMetadata(NamedTuple): partition: int topic_partition: TopicPartition offset: int - timestamp: Optional[int] + timestamp: int | None timestamp_type: int - log_start_offset: Optional[int] - ... + log_start_offset: int | None KT = TypeVar("KT", covariant=True) VT = TypeVar("VT", covariant=True) @@ -73,15 +70,13 @@ class ConsumerRecord(Generic[KT, VT]): offset: int timestamp: int timestamp_type: int - key: Optional[KT] - value: Optional[VT] - checksum: Optional[int] + key: KT | None + value: VT | None + checksum: int | None serialized_key_size: int serialized_value_size: int - headers: Sequence[Tuple[str, bytes]] - ... + headers: Sequence[tuple[str, bytes]] class OffsetAndTimestamp(NamedTuple): offset: int - timestamp: Optional[int] - ... + timestamp: int | None