Skip to content

Commit

Permalink
Merge pull request #932 from ods/merge-kafka-python
Browse files Browse the repository at this point in the history
Merge kafka-python
  • Loading branch information
ods authored Nov 8, 2023
2 parents 00349a8 + 66e2999 commit 88c6945
Show file tree
Hide file tree
Showing 110 changed files with 13,657 additions and 513 deletions.
3 changes: 1 addition & 2 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ A clear and concise description of what you expected to happen.

**Environment (please complete the following information):**
- aiokafka version (`python -c "import aiokafka; print(aiokafka.__version__)"`):
- kafka-python version (`python -c "import kafka; print(kafka.__version__)"`):
- Kafka Broker version (`kafka-topics.sh --version`):
- Kafka Broker version (`kafka-topics.sh --version`):
- Other information (Confluent Cloud version, etc.):

**Reproducible example**
Expand Down
3 changes: 1 addition & 2 deletions aiokafka/abc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import abc
from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener


class ConsumerRebalanceListener(BaseConsumerRebalanceListener):
class ConsumerRebalanceListener(abc.ABC):
"""
A callback interface that the user can implement to trigger custom actions
when the set of partitions assigned to the consumer changes.
Expand Down
5 changes: 5 additions & 0 deletions aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
65 changes: 49 additions & 16 deletions aiokafka/admin.py → aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from ssl import SSLContext
from typing import List, Optional, Dict, Tuple, Any

from kafka.errors import IncompatibleBrokerVersion, for_code
from kafka.protocol.api import Request, Response
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from kafka.protocol.admin import (
from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.protocol.api import Request, Response
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from aiokafka.protocol.admin import (
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteTopicsRequest,
Expand All @@ -17,12 +19,10 @@
AlterConfigsRequest,
ListGroupsRequest,
ApiVersionRequest_v0)
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.admin import NewTopic, KafkaAdminClient as Admin
from kafka.admin.config_resource import ConfigResourceType, ConfigResource
from aiokafka.structs import TopicPartition, OffsetAndMetadata

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from .config_resource import ConfigResourceType, ConfigResource
from .new_topic import NewTopic

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,9 +109,9 @@ def __init__(self, *, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

async def close(self):
"""Close the KafkaAdminClient connection to the Kafka broker."""
"""Close the AIOKafkaAdminClient connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
log.info("KafkaAdminClient already closed.")
log.info("AIOKafkaAdminClient already closed.")
return

await self._client.close()
Expand Down Expand Up @@ -148,7 +148,7 @@ def _matching_api_version(self, operation: List[Request]) -> int:
supported by the broker.
:param operation: A list of protocol operation versions from
kafka.protocol.
aiokafka.protocol.
:return: The max matching version number between client and broker.
"""
api_key = operation[0].API_KEY
Expand All @@ -165,6 +165,22 @@ def _matching_api_version(self, operation: List[Request]) -> int:
.format(operation[0].__name__))
return version

@staticmethod
def _convert_new_topic_request(new_topic):
return (
new_topic.name,
new_topic.num_partitions,
new_topic.replication_factor,
[
(partition_id, replicas)
for partition_id, replicas in new_topic.replica_assignments.items()
],
[
(config_key, config_value)
for config_key, config_value in new_topic.topic_configs.items()
]
)

async def create_topics(
self,
new_topics: List[NewTopic],
Expand All @@ -181,7 +197,7 @@ async def create_topics(
:return: Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
topics = [Admin._convert_new_topic_request(nt) for nt in new_topics]
topics = [self._convert_new_topic_request(nt) for nt in new_topics]
log.debug("Attempting to send create topic request for %r", new_topics)
timeout_ms = timeout_ms or self._request_timeout_ms
if version == 0:
Expand Down Expand Up @@ -320,15 +336,32 @@ async def alter_configs(self, config_resources: List[ConfigResource]) -> Respons
return await asyncio.gather(*futures)

@staticmethod
def _convert_describe_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
list(config_resource.configs.keys()) if config_resource.configs else None
)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
list(config_resource.configs.items())
)

@classmethod
def _convert_config_resources(
cls,
config_resources: List[ConfigResource],
op_type: str = "describe") -> Tuple[Dict[int, Any], List[Any]]:
broker_resources = defaultdict(list)
topic_resources = []
if op_type == "describe":
convert_func = Admin._convert_describe_config_resource_request
convert_func = cls._convert_describe_config_resource_request
else:
convert_func = Admin._convert_alter_config_resource_request
convert_func = cls._convert_alter_config_resource_request
for config_resource in config_resources:
resource = convert_func(config_resource)
if config_resource.resource_type == ConfigResourceType.BROKER:
Expand Down
29 changes: 29 additions & 0 deletions aiokafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from enum import IntEnum


class ConfigResourceType(IntEnum):
"""An enumerated type of config resources"""

BROKER = 4,
TOPIC = 2


class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
name (string): The name of the kafka resource
configs ({key : value}): A maps of config keys to values.
"""

def __init__(
self,
resource_type,
name,
configs=None
):
if not isinstance(resource_type, (ConfigResourceType)):
resource_type = ConfigResourceType[str(resource_type).upper()]
self.resource_type = resource_type
self.name = name
self.configs = configs
19 changes: 19 additions & 0 deletions aiokafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the
length of new_assignments, if specified, must be the difference between the
new total number of partitions and the existing number of partitions.
Arguments:
total_count (int):
the total number of partitions that should exist on the topic
new_assignments ([[int]]):
an array of arrays of replica assignments for new partitions.
If not set, broker assigns replicas per an internal algorithm.
"""

def __init__(
self,
total_count,
new_assignments=None
):
self.total_count = total_count
self.new_assignments = new_assignments
38 changes: 38 additions & 0 deletions aiokafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from aiokafka.errors import IllegalArgumentError


class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
num_partitions (int): number of partitions
or -1 if replica_assignment has been specified
replication_factor (int): replication factor or -1 if
replica assignment is specified
replica_assignment (dict of int: [int]): A mapping containing
partition id and replicas to assign to it.
topic_configs (dict of str: str): A mapping of config key
and value for the topic.
"""

def __init__(
self,
name,
num_partitions,
replication_factor,
replica_assignments=None,
topic_configs=None,
):
if not (
(num_partitions == -1 or replication_factor == -1)
^ (replica_assignments is None)
):
raise IllegalArgumentError(
"either num_partitions/replication_factor or replica_assignment "
"must be specified"
)
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}
30 changes: 14 additions & 16 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import random
import time

from kafka.conn import collect_hosts
from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import create_conn, CloseReason
from aiokafka.conn import collect_hosts, create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.admin import DescribeAclsRequest_v2
from aiokafka.protocol.commit import OffsetFetchRequest
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.fetch import FetchRequest
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.offset import OffsetRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -482,10 +480,10 @@ async def send(self, node_id, request, *, group=ConnectionGroup.DEFAULT):
request (Struct): request object (not-encoded)
Raises:
kafka.errors.RequestTimedOutError
kafka.errors.NodeNotReadyError
kafka.errors.KafkaConnectionError
kafka.errors.CorrelationIdError
aiokafka.errors.RequestTimedOutError
aiokafka.errors.NodeNotReadyError
aiokafka.errors.KafkaConnectionError
aiokafka.errors.CorrelationIdError
Returns:
Future: resolves to Response struct
Expand Down Expand Up @@ -526,11 +524,11 @@ async def check_version(self, node_id=None):
assert self.cluster.brokers(), 'no brokers in metadata'
node_id = list(self.cluster.brokers())[0].nodeId

from kafka.protocol.admin import (
from aiokafka.protocol.admin import (
ListGroupsRequest_v0, ApiVersionRequest_v0)
from kafka.protocol.commit import (
from aiokafka.protocol.commit import (
OffsetFetchRequest_v0, GroupCoordinatorRequest_v0)
from kafka.protocol.metadata import MetadataRequest_v0
from aiokafka.protocol.metadata import MetadataRequest_v0
test_cases = [
((0, 10), ApiVersionRequest_v0()),
((0, 9), ListGroupsRequest_v0()),
Expand Down
Loading

0 comments on commit 88c6945

Please sign in to comment.