Skip to content

Commit

Permalink
Merge util
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 23, 2023
1 parent 8c4cd40 commit 6d70164
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 73 deletions.
2 changes: 1 addition & 1 deletion aiokafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import time

from kafka.future import Future
from kafka.util import WeakMethod

import aiokafka.errors as Errors
from aiokafka.metrics import AnonMeasurable
from aiokafka.metrics.stats import Avg, Count, Max, Rate
from aiokafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.util import WeakMethod

from .base import BaseCoordinator, Generation
from .assignors.range import RangePartitionAssignor
Expand Down
10 changes: 6 additions & 4 deletions aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import time
from binascii import crc32

from aiokafka.codec import (
has_gzip,
Expand All @@ -12,23 +13,24 @@
lz4_decode,
lz4_decode_old_kafka,
)
from aiokafka.util import WeakMethod

from .frame import KafkaBytes
from .struct import Struct
from .types import Int8, Int32, Int64, Bytes, Schema, AbstractType
from kafka.util import crc32, WeakMethod
from .types import Int8, Int32, UInt32, Int64, Bytes, Schema, AbstractType


class Message(Struct):
SCHEMAS = [
Schema(
("crc", Int32),
("crc", UInt32),
("magic", Int8),
("attributes", Int8),
("key", Bytes),
("value", Bytes),
),
Schema(
("crc", Int32),
("crc", UInt32),
("magic", Int8),
("attributes", Int8),
("timestamp", Int64),
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/protocol/struct.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from io import BytesIO

from aiokafka.util import WeakMethod

from .abstract import AbstractType
from .types import Schema

from kafka.util import WeakMethod


class Struct(AbstractType):
SCHEMA = Schema()
Expand Down
13 changes: 13 additions & 0 deletions aiokafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ def decode(cls, data):
return _unpack(cls._unpack, data.read(4))


class UInt32(AbstractType):
_pack = struct.Struct(">I").pack
_unpack = struct.Struct(">I").unpack

@classmethod
def encode(cls, value):
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(4))


class Int64(AbstractType):
_pack = struct.Struct(">q").pack
_unpack = struct.Struct(">q").unpack
Expand Down
37 changes: 37 additions & 0 deletions aiokafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import os
import weakref
from asyncio import AbstractEventLoop
from types import MethodType
from typing import Any, Awaitable, Coroutine, Dict, Tuple, TypeVar, Union, cast

import async_timeout
Expand Down Expand Up @@ -91,3 +93,38 @@ def get_running_loop() -> asyncio.AbstractEventLoop:

INTEGER_MAX_VALUE = 2**31 - 1
INTEGER_MIN_VALUE = -(2**31)


class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It
is based on https://stackoverflow.com/a/24287465.
Arguments:
object_dot_method: A bound instance method (i.e. 'object.method').
"""

def __init__(self, object_dot_method: MethodType) -> None:
self.target = weakref.ref(object_dot_method.__self__)
self._target_id = id(self.target())
self.method = weakref.ref(object_dot_method.__func__)
self._method_id = id(self.method())

def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""
Calls the method on target with args and kwargs.
"""
method = self.method()
assert method is not None
return method(self.target(), *args, **kwargs)

def __hash__(self) -> int:
return hash(self.target) ^ hash(self.method)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, WeakMethod):
return False
return (
self._target_id == other._target_id and self._method_id == other._method_id
)
66 changes: 0 additions & 66 deletions kafka/util.py

This file was deleted.

0 comments on commit 6d70164

Please sign in to comment.