Skip to content

Commit

Permalink
Merge admin
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 22, 2023
1 parent ee1f564 commit 819015e
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 3,697 deletions.
5 changes: 5 additions & 0 deletions aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
50 changes: 43 additions & 7 deletions aiokafka/admin.py → aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
ListGroupsRequest,
ApiVersionRequest_v0)
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.admin import NewTopic, KafkaAdminClient as Admin
from kafka.admin.config_resource import ConfigResourceType, ConfigResource

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient

from .config_resource import ConfigResourceType, ConfigResource
from .new_topic import NewTopic

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -109,9 +110,9 @@ def __init__(self, *, loop=None,
sasl_oauth_token_provider=sasl_oauth_token_provider)

async def close(self):
"""Close the KafkaAdminClient connection to the Kafka broker."""
"""Close the AIOKafkaAdminClient connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
log.info("KafkaAdminClient already closed.")
log.info("AIOKafkaAdminClient already closed.")
return

await self._client.close()
Expand Down Expand Up @@ -165,6 +166,20 @@ def _matching_api_version(self, operation: List[Request]) -> int:
.format(operation[0].__name__))
return version

@staticmethod
def _convert_new_topic_request(new_topic):
return (
new_topic.name,
new_topic.num_partitions,
new_topic.replication_factor,
[
(partition_id, replicas) for partition_id, replicas in new_topic.replica_assignments.items()
],
[
(config_key, config_value) for config_key, config_value in new_topic.topic_configs.items()
]
)

async def create_topics(
self,
new_topics: List[NewTopic],
Expand All @@ -181,7 +196,7 @@ async def create_topics(
:return: Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
topics = [Admin._convert_new_topic_request(nt) for nt in new_topics]
topics = [self._convert_new_topic_request(nt) for nt in new_topics]
log.debug("Attempting to send create topic request for %r", new_topics)
timeout_ms = timeout_ms or self._request_timeout_ms
if version == 0:
Expand Down Expand Up @@ -320,15 +335,36 @@ async def alter_configs(self, config_resources: List[ConfigResource]) -> Respons
return await asyncio.gather(*futures)

@staticmethod
def _convert_describe_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
[
config_key for config_key, config_value in config_resource.configs.items()
] if config_resource.configs else None
)

@staticmethod
def _convert_alter_config_resource_request(config_resource):
return (
config_resource.resource_type,
config_resource.name,
[
(config_key, config_value) for config_key, config_value in config_resource.configs.items()
]
)

@classmethod
def _convert_config_resources(
cls,
config_resources: List[ConfigResource],
op_type: str = "describe") -> Tuple[Dict[int, Any], List[Any]]:
broker_resources = defaultdict(list)
topic_resources = []
if op_type == "describe":
convert_func = Admin._convert_describe_config_resource_request
convert_func = cls._convert_describe_config_resource_request
else:
convert_func = Admin._convert_alter_config_resource_request
convert_func = cls._convert_alter_config_resource_request
for config_resource in config_resources:
resource = convert_func(config_resource)
if config_resource.resource_type == ConfigResourceType.BROKER:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand All @@ -15,7 +8,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py → aiokafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
5 changes: 1 addition & 4 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ def emit(self, record):
logging.getLogger(__name__).addHandler(NullHandler())


from kafka.admin import KafkaAdminClient
from kafka.client_async import KafkaClient
from kafka.conn import BrokerConnection
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata


__all__ = [
'BrokerConnection', 'ConsumerRebalanceListener', 'KafkaAdminClient',
'KafkaClient', 'KafkaConsumer', 'KafkaProducer',
'BrokerConnection', 'ConsumerRebalanceListener',
]
14 changes: 0 additions & 14 deletions kafka/admin/__init__.py

This file was deleted.

Loading

0 comments on commit 819015e

Please sign in to comment.