Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge kafka-python #932

Merged
merged 22 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ A clear and concise description of what you expected to happen.

**Environment (please complete the following information):**
- aiokafka version (`python -c "import aiokafka; print(aiokafka.__version__)"`):
- kafka-python version (`python -c "import kafka; print(kafka.__version__)"`):
- Kafka Broker version (`kafka-topics.sh --version`):
- Kafka Broker version (`kafka-topics.sh --version`):
- Other information (Confluent Cloud version, etc.):

**Reproducible example**
Expand Down
3 changes: 1 addition & 2 deletions aiokafka/abc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import abc
from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener


class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
class ConsumerRebalanceListener(abc.ABC):
"""
A callback interface that the user can implement to trigger custom actions
when the set of partitions assigned to the consumer changes.
Expand Down
5 changes: 5 additions & 0 deletions aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
65 changes: 49 additions & 16 deletions aiokafka/admin.py → aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from ssl import SSLContext
from typing import List, Optional, Dict, Tuple, Any

from kafka.errors import IncompatibleBrokerVersion, for_code
from kafka.protocol.api import Request, Response
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from kafka.protocol.admin import (
from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.protocol.api import Request, Response
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from aiokafka.protocol.admin import (
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteTopicsRequest,
Expand All @@ -17,12 +19,10 @@
AlterConfigsRequest,
ListGroupsRequest,
ApiVersionRequest_v0)
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.admin import NewTopic, KafkaAdminClient as Admin
from kafka.admin.config_resource import ConfigResourceType, ConfigResource
from aiokafka.structs import TopicPartition, OffsetAndMetadata

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from .config_resource import ConfigResourceType, ConfigResource
from .new_topic import NewTopic

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,9 +109,9 @@ def __init__(self, *, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

async def close(self):
"""Close the KafkaAdminClient connection to the Kafka broker."""
"""Close the AIOKafkaAdminClient connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
log.info("KafkaAdminClient already closed.")
log.info("AIOKafkaAdminClient already closed.")
return

await self._client.close()
Expand Down Expand Up @@ -148,7 +148,7 @@ def _matching_api_version(self, operation: List[Request]) -> int:
supported by the broker.

:param operation: A list of protocol operation versions from
kafka.protocol.
aiokafka.protocol.
:return: The max matching version number between client and broker.
"""
api_key = operation[0].API_KEY
Expand All @@ -165,6 +165,22 @@ def _matching_api_version(self, operation: List[Request]) -> int:
.format(operation[0].__name__))
return version

@staticmethod
def _convert_new_topic_request(new_topic):
return (
new_topic.name,
new_topic.num_partitions,
new_topic.replication_factor,
[
(partition_id, replicas)
for partition_id, replicas in new_topic.replica_assignments.items()
],
[
(config_key, config_value)
for config_key, config_value in new_topic.topic_configs.items()
]
)

async def create_topics(
self,
new_topics: List[NewTopic],
Expand All @@ -181,7 +197,7 @@ async def create_topics(
:return: Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
topics = [Admin._convert_new_topic_request(nt) for nt in new_topics]
topics = [self._convert_new_topic_request(nt) for nt in new_topics]
log.debug("Attempting to send create topic request for %r", new_topics)
timeout_ms = timeout_ms or self._request_timeout_ms
if version == 0:
Expand Down Expand Up @@ -320,15 +336,32 @@ async def alter_configs(self, config_resources: List[ConfigResource]) -> Respons
return await asyncio.gather(*futures)

@staticmethod
def _convert_describe_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
list(config_resource.configs.keys()) if config_resource.configs else None
)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
list(config_resource.configs.items())
)

@classmethod
def _convert_config_resources(
cls,
config_resources: List[ConfigResource],
op_type: str = "describe") -> Tuple[Dict[int, Any], List[Any]]:
broker_resources = defaultdict(list)
topic_resources = []
if op_type == "describe":
convert_func = Admin._convert_describe_config_resource_request
convert_func = cls._convert_describe_config_resource_request
else:
convert_func = Admin._convert_alter_config_resource_request
convert_func = cls._convert_alter_config_resource_request
for config_resource in config_resources:
resource = convert_func(config_resource)
if config_resource.resource_type == ConfigResourceType.BROKER:
Expand Down
29 changes: 29 additions & 0 deletions aiokafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from enum import IntEnum


class ConfigResourceType(IntEnum):
"""An enumerated type of config resources"""

BROKER = 4,
TOPIC = 2


class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
name (string): The name of the kafka resource
configs ({key : value}): A maps of config keys to values.
"""

def __init__(
self,
resource_type,
name,
configs=None
):
if not isinstance(resource_type, (ConfigResourceType)):
resource_type = ConfigResourceType[str(resource_type).upper()]
self.resource_type = resource_type
self.name = name
self.configs = configs
19 changes: 19 additions & 0 deletions aiokafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the
length of new_assignments, if specified, must be the difference between the
new total number of partitions and the existing number of partitions.
Arguments:
total_count (int):
the total number of partitions that should exist on the topic
new_assignments ([[int]]):
an array of arrays of replica assignments for new partitions.
If not set, broker assigns replicas per an internal algorithm.
"""

def __init__(
self,
total_count,
new_assignments=None
):
self.total_count = total_count
self.new_assignments = new_assignments
38 changes: 38 additions & 0 deletions aiokafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from aiokafka.errors import IllegalArgumentError


class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
num_partitions (int): number of partitions
or -1 if replica_assignment has been specified
replication_factor (int): replication factor or -1 if
replica assignment is specified
replica_assignment (dict of int: [int]): A mapping containing
partition id and replicas to assign to it.
topic_configs (dict of str: str): A mapping of config key
and value for the topic.
"""

def __init__(
self,
name,
num_partitions,
replication_factor,
replica_assignments=None,
topic_configs=None,
):
if not (
(num_partitions == -1 or replication_factor == -1)
^ (replica_assignments is None)
):
raise IllegalArgumentError(
"either num_partitions/replication_factor or replica_assignment "
"must be specified"
)
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}
30 changes: 14 additions & 16 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import random
import time

from kafka.conn import collect_hosts
from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import create_conn, CloseReason
from aiokafka.conn import collect_hosts, create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.admin import DescribeAclsRequest_v2
from aiokafka.protocol.commit import OffsetFetchRequest
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.fetch import FetchRequest
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.offset import OffsetRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -482,10 +480,10 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
request (Struct): request object (not-encoded)

Raises:
kafka.errors.RequestTimedOutError
kafka.errors.NodeNotReadyError
kafka.errors.KafkaConnectionError
kafka.errors.CorrelationIdError
aiokafka.errors.RequestTimedOutError
aiokafka.errors.NodeNotReadyError
aiokafka.errors.KafkaConnectionError
aiokafka.errors.CorrelationIdError

Returns:
Future: resolves to Response struct
Expand Down Expand Up @@ -526,11 +524,11 @@ async def check_version(self, node_id=None):
assert self.cluster.brokers(), 'no brokers in metadata'
node_id = list(self.cluster.brokers())[0].nodeId

from kafka.protocol.admin import (
from aiokafka.protocol.admin import (
ListGroupsRequest_v0, ApiVersionRequest_v0)
from kafka.protocol.commit import (
from aiokafka.protocol.commit import (
OffsetFetchRequest_v0, GroupCoordinatorRequest_v0)
from kafka.protocol.metadata import MetadataRequest_v0
from aiokafka.protocol.metadata import MetadataRequest_v0
test_cases = [
((0, 10), ApiVersionRequest_v0()),
((0, 9), ListGroupsRequest_v0()),
Expand Down
Loading