Skip to content

Commit

Permalink
Merge protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 23, 2023
1 parent b994339 commit 91759b8
Show file tree
Hide file tree
Showing 53 changed files with 4,196 additions and 3,616 deletions.
17 changes: 8 additions & 9 deletions aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
from ssl import SSLContext
from typing import List, Optional, Dict, Tuple, Any

from kafka.protocol.api import Request, Response
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from kafka.protocol.admin import (
from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.protocol.api import Request, Response
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from aiokafka.protocol.admin import (
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteTopicsRequest,
Expand All @@ -16,10 +19,6 @@
AlterConfigsRequest,
ListGroupsRequest,
ApiVersionRequest_v0)

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.structs import TopicPartition, OffsetAndMetadata

from .config_resource import ConfigResourceType, ConfigResource
Expand Down Expand Up @@ -149,7 +148,7 @@ def _matching_api_version(self, operation: List[Request]) -> int:
supported by the broker.
:param operation: A list of protocol operation versions from
kafka.protocol.
aiokafka.protocol.
:return: The max matching version number between client and broker.
"""
api_key = operation[0].API_KEY
Expand Down
19 changes: 9 additions & 10 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import random
import time

from kafka.protocol.admin import DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka import __version__
from aiokafka.conn import collect_hosts, create_conn, CloseReason
from aiokafka.cluster import ClusterMetadata
from aiokafka.protocol.admin import DescribeAclsRequest_v2
from aiokafka.protocol.commit import OffsetFetchRequest
from aiokafka.protocol.coordination import FindCoordinatorRequest
from aiokafka.protocol.fetch import FetchRequest
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.protocol.offset import OffsetRequest
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.errors import (
KafkaError,
KafkaConnectionError,
Expand Down Expand Up @@ -525,11 +524,11 @@ async def check_version(self, node_id=None):
assert self.cluster.brokers(), 'no brokers in metadata'
node_id = list(self.cluster.brokers())[0].nodeId

from kafka.protocol.admin import (
from aiokafka.protocol.admin import (
ListGroupsRequest_v0, ApiVersionRequest_v0)
from kafka.protocol.commit import (
from aiokafka.protocol.commit import (
OffsetFetchRequest_v0, GroupCoordinatorRequest_v0)
from kafka.protocol.metadata import MetadataRequest_v0
from aiokafka.protocol.metadata import MetadataRequest_v0
test_cases = [
((0, 10), ApiVersionRequest_v0()),
((0, 9), ListGroupsRequest_v0()),
Expand Down
13 changes: 6 additions & 7 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
import weakref

import async_timeout
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import (

import aiokafka.errors as Errors
from aiokafka.abc import AbstractTokenProvider
from aiokafka.protocol.api import RequestHeader
from aiokafka.protocol.admin import (
SaslHandShakeRequest, SaslAuthenticateRequest, ApiVersionRequest
)
from kafka.protocol.commit import (
from aiokafka.protocol.commit import (
GroupCoordinatorResponse_v0 as GroupCoordinatorResponse)

import aiokafka.errors as Errors
from aiokafka.util import create_future, create_task, get_running_loop, wait_for

from aiokafka.abc import AbstractTokenProvider

try:
import gssapi
except ImportError:
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from itertools import chain

import async_timeout
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.fetch import FetchRequest

import aiokafka.errors as Errors
from aiokafka.errors import (
ConsumerStoppedError, RecordTooLargeError, KafkaTimeoutError)
from aiokafka.protocol.offset import OffsetRequest
from aiokafka.protocol.fetch import FetchRequest
from aiokafka.record.memory_records import MemoryRecords
from aiokafka.record.control_record import ControlRecord, ABORT_MARKER
from aiokafka.structs import OffsetAndTimestamp, TopicPartition, ConsumerRecord
Expand Down
13 changes: 6 additions & 7 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
import copy
import time

from kafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from kafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)

import aiokafka.errors as Errors
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from aiokafka.coordinator.protocol import ConsumerProtocol
from aiokafka.protocol.commit import (
OffsetCommitRequest_v2 as OffsetCommitRequest,
OffsetFetchRequest_v1 as OffsetFetchRequest)
from aiokafka.protocol.group import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest)
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import create_future, create_task

log = logging.getLogger(__name__)
Expand Down
5 changes: 2 additions & 3 deletions aiokafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.protocol.struct import Struct
from kafka.protocol.types import String, Array, Int32

from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from aiokafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from aiokafka.coordinator.assignors.sticky.sorted_set import SortedSet
Expand All @@ -13,6 +10,8 @@
ConsumerProtocolMemberAssignment,
)
from aiokafka.coordinator.protocol import Schema
from aiokafka.protocol.struct import Struct
from aiokafka.protocol.types import String, Array, Int32
from aiokafka.structs import TopicPartition

log = logging.getLogger(__name__)
Expand Down
12 changes: 6 additions & 6 deletions aiokafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
import weakref

from kafka.future import Future
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from kafka.protocol.group import (

from aiokafka import errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate
from aiokafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from aiokafka.protocol.group import (
HeartbeatRequest,
JoinGroupRequest,
LeaveGroupRequest,
SyncGroupRequest,
)

from aiokafka import errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate

from .heartbeat import Heartbeat

log = logging.getLogger("aiokafka.coordinator")
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import time

from kafka.future import Future
from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from kafka.util import WeakMethod

import aiokafka.errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate
from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from aiokafka.structs import OffsetAndMetadata, TopicPartition

from .base import BaseCoordinator, Generation
Expand Down
5 changes: 2 additions & 3 deletions aiokafka/coordinator/protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String

from aiokafka.protocol.struct import Struct
from aiokafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
from aiokafka.structs import TopicPartition


Expand Down
2 changes: 1 addition & 1 deletion aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class AIOKafkaProducer:
brokers or partitions. Default: 300000
request_timeout_ms (int): Produce request timeout in milliseconds.
As it's sent as part of
:class:`~kafka.protocol.produce.ProduceRequest` (it's a blocking
:class:`~aiokafka.protocol.produce.ProduceRequest` (it's a blocking
call), maximum waiting time can be up to ``2 *
request_timeout_ms``.
Default: 40000.
Expand Down
3 changes: 1 addition & 2 deletions aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import logging
import time

from kafka.protocol.produce import ProduceRequest

import aiokafka.errors as Errors
from aiokafka.client import ConnectionGroup, CoordinationType
from aiokafka.errors import (
Expand All @@ -16,6 +14,7 @@
OutOfOrderSequenceNumber, TopicAuthorizationFailedError,
GroupAuthorizationFailedError, TransactionalIdAuthorizationFailed,
OperationNotAttempted)
from aiokafka.protocol.produce import ProduceRequest
from aiokafka.protocol.transaction import (
InitProducerIdRequest, AddPartitionsToTxnRequest, EndTxnRequest,
AddOffsetsToTxnRequest, TxnOffsetCommitRequest
Expand Down
46 changes: 46 additions & 0 deletions aiokafka/protocol/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
API_KEYS = {
0: "Produce",
1: "Fetch",
2: "ListOffsets",
3: "Metadata",
4: "LeaderAndIsr",
5: "StopReplica",
6: "UpdateMetadata",
7: "ControlledShutdown",
8: "OffsetCommit",
9: "OffsetFetch",
10: "FindCoordinator",
11: "JoinGroup",
12: "Heartbeat",
13: "LeaveGroup",
14: "SyncGroup",
15: "DescribeGroups",
16: "ListGroups",
17: "SaslHandshake",
18: "ApiVersions",
19: "CreateTopics",
20: "DeleteTopics",
21: "DeleteRecords",
22: "InitProducerId",
23: "OffsetForLeaderEpoch",
24: "AddPartitionsToTxn",
25: "AddOffsetsToTxn",
26: "EndTxn",
27: "WriteTxnMarkers",
28: "TxnOffsetCommit",
29: "DescribeAcls",
30: "CreateAcls",
31: "DeleteAcls",
32: "DescribeConfigs",
33: "AlterConfigs",
36: "SaslAuthenticate",
37: "CreatePartitions",
38: "CreateDelegationToken",
39: "RenewDelegationToken",
40: "ExpireDelegationToken",
41: "DescribeDelegationToken",
42: "DeleteGroups",
45: "AlterPartitionReassignments",
46: "ListPartitionReassignments",
48: "DescribeClientQuotas",
}
6 changes: 2 additions & 4 deletions kafka/protocol/abstract.py → aiokafka/protocol/abstract.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from __future__ import absolute_import

import abc


class AbstractType(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def encode(cls, value): # pylint: disable=no-self-argument
def encode(cls, value): # pylint: disable=no-self-argument
pass

@abc.abstractmethod
def decode(cls, data): # pylint: disable=no-self-argument
def decode(cls, data): # pylint: disable=no-self-argument
pass

@classmethod
Expand Down
Loading

0 comments on commit 91759b8

Please sign in to comment.