diff --git a/aiokafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py index 8f6cdaba..7604e051 100644 --- a/aiokafka/coordinator/consumer.py +++ b/aiokafka/coordinator/consumer.py @@ -5,13 +5,13 @@ import time from kafka.future import Future -from kafka.util import WeakMethod import aiokafka.errors as Errors from aiokafka.metrics import AnonMeasurable from aiokafka.metrics.stats import Avg, Count, Max, Rate from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest from aiokafka.structs import OffsetAndMetadata, TopicPartition +from aiokafka.util import WeakMethod from .base import BaseCoordinator, Generation from .assignors.range import RangePartitionAssignor diff --git a/aiokafka/protocol/message.py b/aiokafka/protocol/message.py index 3fc665e2..a305f419 100644 --- a/aiokafka/protocol/message.py +++ b/aiokafka/protocol/message.py @@ -1,5 +1,6 @@ import io import time +from binascii import crc32 from aiokafka.codec import ( has_gzip, @@ -12,23 +13,24 @@ lz4_decode, lz4_decode_old_kafka, ) +from aiokafka.util import WeakMethod + from .frame import KafkaBytes from .struct import Struct -from .types import Int8, Int32, Int64, Bytes, Schema, AbstractType -from kafka.util import crc32, WeakMethod +from .types import Int8, Int32, UInt32, Int64, Bytes, Schema, AbstractType class Message(Struct): SCHEMAS = [ Schema( - ("crc", Int32), + ("crc", UInt32), ("magic", Int8), ("attributes", Int8), ("key", Bytes), ("value", Bytes), ), Schema( - ("crc", Int32), + ("crc", UInt32), ("magic", Int8), ("attributes", Int8), ("timestamp", Int64), diff --git a/aiokafka/protocol/struct.py b/aiokafka/protocol/struct.py index d7faa327..b24d7b2b 100644 --- a/aiokafka/protocol/struct.py +++ b/aiokafka/protocol/struct.py @@ -1,10 +1,10 @@ from io import BytesIO +from aiokafka.util import WeakMethod + from .abstract import AbstractType from .types import Schema -from kafka.util import WeakMethod - class Struct(AbstractType): SCHEMA = Schema() diff --git a/aiokafka/protocol/types.py b/aiokafka/protocol/types.py index 56613905..f1e106c5 100644 --- a/aiokafka/protocol/types.py +++ b/aiokafka/protocol/types.py @@ -64,6 +64,19 @@ def decode(cls, data): return _unpack(cls._unpack, data.read(4)) +class UInt32(AbstractType): + _pack = struct.Struct(">I").pack + _unpack = struct.Struct(">I").unpack + + @classmethod + def encode(cls, value): + return _pack(cls._pack, value) + + @classmethod + def decode(cls, data): + return _unpack(cls._unpack, data.read(4)) + + class Int64(AbstractType): _pack = struct.Struct(">q").pack _unpack = struct.Struct(">q").unpack diff --git a/aiokafka/util.py b/aiokafka/util.py index 38a08baf..9d01b899 100644 --- a/aiokafka/util.py +++ b/aiokafka/util.py @@ -1,5 +1,7 @@ import asyncio import os +from types import MethodType +import weakref from asyncio import AbstractEventLoop from typing import Any, Awaitable, Coroutine, Dict, Tuple, TypeVar, Union, cast @@ -91,3 +93,38 @@ def get_running_loop() -> asyncio.AbstractEventLoop: INTEGER_MAX_VALUE = 2**31 - 1 INTEGER_MIN_VALUE = -(2**31) + + +class WeakMethod(object): + """ + Callable that weakly references a method and the object it is bound to. It + is based on https://stackoverflow.com/a/24287465. + + Arguments: + + object_dot_method: A bound instance method (i.e. 'object.method'). + """ + + def __init__(self, object_dot_method: MethodType) -> None: + self.target = weakref.ref(object_dot_method.__self__) + self._target_id = id(self.target()) + self.method = weakref.ref(object_dot_method.__func__) + self._method_id = id(self.method()) + + def __call__(self, *args: Any, **kwargs: Any) -> Any: + """ + Calls the method on target with args and kwargs. + """ + method = self.method() + assert method is not None + return method(self.target(), *args, **kwargs) + + def __hash__(self) -> int: + return hash(self.target) ^ hash(self.method) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, WeakMethod): + return False + return ( + self._target_id == other._target_id and self._method_id == other._method_id + ) diff --git a/kafka/util.py b/kafka/util.py deleted file mode 100644 index e31d9930..00000000 --- a/kafka/util.py +++ /dev/null @@ -1,66 +0,0 @@ -from __future__ import absolute_import - -import binascii -import weakref - -from kafka.vendor import six - - -if six.PY3: - MAX_INT = 2 ** 31 - TO_SIGNED = 2 ** 32 - - def crc32(data): - crc = binascii.crc32(data) - # py2 and py3 behave a little differently - # CRC is encoded as a signed int in kafka protocol - # so we'll convert the py3 unsigned result to signed - if crc >= MAX_INT: - crc -= TO_SIGNED - return crc -else: - from binascii import crc32 - - -class WeakMethod(object): - """ - Callable that weakly references a method and the object it is bound to. It - is based on https://stackoverflow.com/a/24287465. - - Arguments: - - object_dot_method: A bound instance method (i.e. 'object.method'). - """ - def __init__(self, object_dot_method): - try: - self.target = weakref.ref(object_dot_method.__self__) - except AttributeError: - self.target = weakref.ref(object_dot_method.im_self) - self._target_id = id(self.target()) - try: - self.method = weakref.ref(object_dot_method.__func__) - except AttributeError: - self.method = weakref.ref(object_dot_method.im_func) - self._method_id = id(self.method()) - - def __call__(self, *args, **kwargs): - """ - Calls the method on target with args and kwargs. - """ - return self.method()(self.target(), *args, **kwargs) - - def __hash__(self): - return hash(self.target) ^ hash(self.method) - - def __eq__(self, other): - if not isinstance(other, WeakMethod): - return False - return self._target_id == other._target_id and self._method_id == other._method_id - - -class Dict(dict): - """Utility class to support passing weakrefs to dicts - - See: https://docs.python.org/2/library/weakref.html - """ - pass