Skip to content

Commit

Permalink
Recommendations from ruff, updates to reflect we no longer support Py…
Browse files Browse the repository at this point in the history
…thon 3.8.
  • Loading branch information
jackgene committed Nov 16, 2024
1 parent 551ea69 commit 7662b0e
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 156 deletions.
2 changes: 1 addition & 1 deletion aiokafka/abc.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc

from aiokafka.structs import TopicPartition

class ConsumerRebalanceListener(abc.ABC):
Expand Down Expand Up @@ -134,6 +135,5 @@ class AbstractTokenProvider(abc.ABC):
This feature is only available in Kafka >= 2.1.0.
"""
...

__all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"]
43 changes: 12 additions & 31 deletions aiokafka/cluster.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections.abc import Sequence
from concurrent.futures import Future
from typing import Any, Callable, Optional, Sequence, Set, TypedDict, Union
from typing import Any, Callable, TypedDict

from aiokafka.client import CoordinationType
from aiokafka.protocol.commit import (
GroupCoordinatorResponse_v0,
Expand All @@ -16,23 +18,18 @@ from aiokafka.protocol.metadata import (
from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

log = ...
MetadataResponse = Union[
MetadataResponse_v0,
MetadataResponse_v1,
MetadataResponse_v2,
MetadataResponse_v3,
MetadataResponse_v4,
MetadataResponse_v5,
]
GroupCoordinatorResponse = Union[
GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1
]
MetadataResponse = MetadataResponse_v0 |\
MetadataResponse_v1 |\
MetadataResponse_v2 |\
MetadataResponse_v3 |\
MetadataResponse_v4 |\
MetadataResponse_v5
GroupCoordinatorResponse = GroupCoordinatorResponse_v0 | GroupCoordinatorResponse_v1

class ClusterConfig(TypedDict):
retry_backoff_ms: int
metadata_max_age_ms: int
bootstrap_servers: str | list[str]
...

class ClusterMetadata:
"""
Expand Down Expand Up @@ -65,7 +62,6 @@ class ClusterMetadata:
Returns:
set: {BrokerMetadata, ...}
"""
...

def broker_metadata(self, broker_id: str) -> BrokerMetadata | None:
"""Get BrokerMetadata
Expand All @@ -76,9 +72,8 @@ class ClusterMetadata:
Returns:
BrokerMetadata or None if not found
"""
...

def partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
def partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of all partitions for topic (whether available or not)
Arguments:
Expand All @@ -87,9 +82,8 @@ class ClusterMetadata:
Returns:
set: {partition (int), ...}
"""
...

def available_partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
def available_partitions_for_topic(self, topic: str) -> set[int] | None:
"""Return set of partitions with known leaders
Arguments:
Expand All @@ -99,11 +93,9 @@ class ClusterMetadata:
set: {partition (int), ...}
None if topic not found.
"""
...

def leader_for_partition(self, partition: PartitionMetadata) -> int | None:
"""Return node_id of leader, -1 unavailable, None if unknown."""
...

def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None:
"""Return TopicPartitions for which the broker is a leader.
Expand All @@ -115,7 +107,6 @@ class ClusterMetadata:
set: {TopicPartition, ...}
None if the broker either has no partitions or does not exist.
"""
...

def coordinator_for_group(self, group: str) -> int | str | None:
"""Return node_id of group coordinator.
Expand All @@ -127,7 +118,6 @@ class ClusterMetadata:
int: node_id for group coordinator
None if the group does not exist.
"""
...

def request_update(self) -> Future[ClusterMetadata]:
"""Flags metadata for update, return Future()
Expand All @@ -138,7 +128,6 @@ class ClusterMetadata:
Returns:
Future (value will be the cluster object after update)
"""
...

def topics(self, exclude_internal_topics: bool = ...) -> set[str]:
"""Get set of known topics.
Expand All @@ -152,11 +141,9 @@ class ClusterMetadata:
Returns:
set: {topic (str), ...}
"""
...

def failed_update(self, exception: BaseException) -> None:
"""Update cluster state given a failed MetadataRequest."""
...

def update_metadata(self, metadata: MetadataResponse) -> None:
"""Update cluster state given a MetadataResponse.
Expand All @@ -166,15 +153,12 @@ class ClusterMetadata:
Returns: None
"""
...

def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Add a callback function to be called on each metadata update"""
...

def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Remove a previously added listener callback"""
...

def add_group_coordinator(
self, group: str, response: GroupCoordinatorResponse
Expand All @@ -188,13 +172,11 @@ class ClusterMetadata:
Returns:
string: coordinator node_id if metadata is updated, None on error
"""
...

def with_partitions(
self, partitions_to_add: Sequence[PartitionMetadata]
) -> ClusterMetadata:
"""Returns a copy of cluster metadata with partitions added"""
...

def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ...
def add_coordinator(
Expand All @@ -210,6 +192,5 @@ class ClusterMetadata:
a new one was elected for the same purpose (For example group
coordinator for group X).
"""
...

def __str__(self) -> str: ...
Loading

0 comments on commit 7662b0e

Please sign in to comment.