Skip to content

Commit

Permalink
blackfml.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgene committed Nov 16, 2024
1 parent 4d714f7 commit 18a97f3
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 253 deletions.
18 changes: 16 additions & 2 deletions aiokafka/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
from .abc import ConsumerRebalanceListener
from .consumer import AIOKafkaConsumer
from .errors import ConsumerStoppedError, IllegalOperation
from .structs import ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, TopicPartition
from .structs import (
ConsumerRecord,
OffsetAndMetadata,
OffsetAndTimestamp,
TopicPartition,
)

__version__ = ...
__all__ = ["AIOKafkaConsumer", "ConsumerRebalanceListener", "ConsumerStoppedError", "IllegalOperation", "ConsumerRecord", "TopicPartition", "OffsetAndTimestamp", "OffsetAndMetadata"]
__all__ = [
"AIOKafkaConsumer",
"ConsumerRebalanceListener",
"ConsumerStoppedError",
"IllegalOperation",
"ConsumerRecord",
"TopicPartition",
"OffsetAndTimestamp",
"OffsetAndMetadata",
]
10 changes: 4 additions & 6 deletions aiokafka/abc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ConsumerRebalanceListener(abc.ABC):
taking over that partition has their :meth:`on_partitions_assigned` callback
called to load the state.
"""

@abc.abstractmethod
def on_partitions_revoked(self, revoked: list[TopicPartition]) -> None:
"""
Expand All @@ -63,7 +64,7 @@ class ConsumerRebalanceListener(abc.ABC):
to the consumer on the last rebalance
"""
...

@abc.abstractmethod
def on_partitions_assigned(self, assigned: list[TopicPartition]) -> None:
"""
Expand All @@ -83,8 +84,6 @@ class ConsumerRebalanceListener(abc.ABC):
consumer (may include partitions that were previously assigned)
"""
...



class AbstractTokenProvider(abc.ABC):
"""
Expand All @@ -103,6 +102,7 @@ class AbstractTokenProvider(abc.ABC):
.. _SASL OAuthBearer:
https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_oauth.html
"""

@abc.abstractmethod
async def token(self) -> None:
"""
Expand All @@ -123,7 +123,7 @@ class AbstractTokenProvider(abc.ABC):
# The actual synchronous token callback.
"""
...

def extensions(self) -> dict[str, str]:
"""
This is an OPTIONAL method that may be implemented.
Expand All @@ -135,7 +135,5 @@ class AbstractTokenProvider(abc.ABC):
This feature is only available in Kafka >= 2.1.0.
"""
...



__all__ = ["ConsumerRebalanceListener", "AbstractTokenProvider"]
1 change: 1 addition & 0 deletions aiokafka/client.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import IntEnum

log = ...

class CoordinationType(IntEnum):
GROUP = ...
TRANSACTION = ...
96 changes: 59 additions & 37 deletions aiokafka/cluster.pyi
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
from concurrent.futures import Future
from typing import Any, Callable, Optional, Sequence, Set, TypedDict, Union
from aiokafka.client import CoordinationType
from aiokafka.protocol.commit import GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1
from aiokafka.protocol.metadata import MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
from aiokafka.protocol.commit import (
GroupCoordinatorResponse_v0,
GroupCoordinatorResponse_v1,
)
from aiokafka.protocol.metadata import (
MetadataResponse_v0,
MetadataResponse_v1,
MetadataResponse_v2,
MetadataResponse_v3,
MetadataResponse_v4,
MetadataResponse_v5,
)
from aiokafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

log = ...
MetadataResponse = Union[MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,]
GroupCoordinatorResponse = Union[GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1]
MetadataResponse = Union[
MetadataResponse_v0,
MetadataResponse_v1,
MetadataResponse_v2,
MetadataResponse_v3,
MetadataResponse_v4,
MetadataResponse_v5,
]
GroupCoordinatorResponse = Union[
GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1
]

class ClusterConfig(TypedDict):
retry_backoff_ms: int
metadata_max_age_ms: int
bootstrap_servers: str | list[str]
...


class ClusterMetadata:
"""
A class to manage kafka cluster metadata.
Expand All @@ -36,21 +55,18 @@ class ClusterMetadata:
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
"""

DEFAULT_CONFIG: ClusterConfig = ...
def __init__(self, **configs: int | str | list[str]) -> None:
...

def is_bootstrap(self, node_id: str) -> bool:
...

def __init__(self, **configs: int | str | list[str]) -> None: ...
def is_bootstrap(self, node_id: str) -> bool: ...
def brokers(self) -> set[BrokerMetadata]:
"""Get all BrokerMetadata
Returns:
set: {BrokerMetadata, ...}
"""
...

def broker_metadata(self, broker_id: str) -> BrokerMetadata | None:
"""Get BrokerMetadata
Expand All @@ -61,7 +77,7 @@ class ClusterMetadata:
BrokerMetadata or None if not found
"""
...

def partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
"""Return set of all partitions for topic (whether available or not)
Expand All @@ -72,7 +88,7 @@ class ClusterMetadata:
set: {partition (int), ...}
"""
...

def available_partitions_for_topic(self, topic: str) -> Optional[Set[int]]:
"""Return set of partitions with known leaders
Expand All @@ -84,11 +100,11 @@ class ClusterMetadata:
None if topic not found.
"""
...

def leader_for_partition(self, partition: PartitionMetadata) -> int | None:
"""Return node_id of leader, -1 unavailable, None if unknown."""
...

def partitions_for_broker(self, broker_id: int | str) -> set[TopicPartition] | None:
"""Return TopicPartitions for which the broker is a leader.
Expand All @@ -100,7 +116,7 @@ class ClusterMetadata:
None if the broker either has no partitions or does not exist.
"""
...

def coordinator_for_group(self, group: str) -> int | str | None:
"""Return node_id of group coordinator.
Expand All @@ -112,7 +128,7 @@ class ClusterMetadata:
None if the group does not exist.
"""
...

def request_update(self) -> Future[ClusterMetadata]:
"""Flags metadata for update, return Future()
Expand All @@ -123,7 +139,7 @@ class ClusterMetadata:
Future (value will be the cluster object after update)
"""
...

def topics(self, exclude_internal_topics: bool = ...) -> set[str]:
"""Get set of known topics.
Expand All @@ -137,11 +153,11 @@ class ClusterMetadata:
set: {topic (str), ...}
"""
...

def failed_update(self, exception: BaseException) -> None:
"""Update cluster state given a failed MetadataRequest."""
...

def update_metadata(self, metadata: MetadataResponse) -> None:
"""Update cluster state given a MetadataResponse.
Expand All @@ -151,16 +167,18 @@ class ClusterMetadata:
Returns: None
"""
...

def add_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Add a callback function to be called on each metadata update"""
...

def remove_listener(self, listener: Callable[[ClusterMetadata], Any]) -> None:
"""Remove a previously added listener callback"""
...

def add_group_coordinator(self, group: str, response: GroupCoordinatorResponse) -> str | None:

def add_group_coordinator(
self, group: str, response: GroupCoordinatorResponse
) -> str | None:
"""Update with metadata for a group coordinator
Arguments:
Expand All @@ -171,23 +189,27 @@ class ClusterMetadata:
string: coordinator node_id if metadata is updated, None on error
"""
...

def with_partitions(self, partitions_to_add: Sequence[PartitionMetadata]) -> ClusterMetadata:

def with_partitions(
self, partitions_to_add: Sequence[PartitionMetadata]
) -> ClusterMetadata:
"""Returns a copy of cluster metadata with partitions added"""
...

def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None:
...

def add_coordinator(self, node_id: int | str, host: str, port: int, rack: str | None = ..., *, purpose: tuple[CoordinationType, str]) -> None:

def coordinator_metadata(self, node_id: int | str) -> BrokerMetadata | None: ...
def add_coordinator(
self,
node_id: int | str,
host: str,
port: int,
rack: str | None = ...,
*,
purpose: tuple[CoordinationType, str],
) -> None:
"""Keep track of all coordinator nodes separately and remove them if
a new one was elected for the same purpose (For example group
coordinator for group X).
"""
...

def __str__(self) -> str:
...



def __str__(self) -> str: ...
Loading

0 comments on commit 18a97f3

Please sign in to comment.