From 4ac07d90d1d98b564091c71de686d499cef1aada Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Sun, 22 Oct 2023 18:52:30 +0300 Subject: [PATCH] Move partitioner --- .../default.py => aiokafka/partitioner.py | 64 +++++++++---------- aiokafka/producer/producer.py | 2 +- tests/{kafka => }/test_partitioner.py | 27 ++++---- 3 files changed, 46 insertions(+), 47 deletions(-) rename kafka/partitioner/default.py => aiokafka/partitioner.py (63%) rename tests/{kafka => }/test_partitioner.py (67%) diff --git a/kafka/partitioner/default.py b/aiokafka/partitioner.py similarity index 63% rename from kafka/partitioner/default.py rename to aiokafka/partitioner.py index d0914c68..dabc3def 100644 --- a/kafka/partitioner/default.py +++ b/aiokafka/partitioner.py @@ -1,9 +1,5 @@ -from __future__ import absolute_import - import random -from kafka.vendor import six - class DefaultPartitioner(object): """Default partitioner. @@ -12,6 +8,7 @@ class DefaultPartitioner(object): If key is None, selects partition randomly from available, or from all partitions if none are currently available """ + @classmethod def __call__(cls, key, all_partitions, available): """ @@ -27,7 +24,7 @@ def __call__(cls, key, all_partitions, available): return random.choice(all_partitions) idx = murmur2(key) - idx &= 0x7fffffff + idx &= 0x7FFFFFFF idx %= len(all_partitions) return all_partitions[idx] @@ -43,16 +40,11 @@ def murmur2(data): Returns: MurmurHash2 of data """ - # Python2 bytes is really a str, causing the bitwise operations below to fail - # so convert to bytearray. - if six.PY2: - data = bytearray(bytes(data)) - length = len(data) - seed = 0x9747b28c + seed = 0x9747B28C # 'm' and 'r' are mixing constants generated offline. # They're not really 'magic', they just happen to work well. - m = 0x5bd1e995 + m = 0x5BD1E995 r = 24 # Initialize the hash to a random value @@ -61,42 +53,44 @@ def murmur2(data): for i in range(length4): i4 = i * 4 - k = ((data[i4 + 0] & 0xff) + - ((data[i4 + 1] & 0xff) << 8) + - ((data[i4 + 2] & 0xff) << 16) + - ((data[i4 + 3] & 0xff) << 24)) - k &= 0xffffffff + k = ( + (data[i4 + 0] & 0xFF) + + ((data[i4 + 1] & 0xFF) << 8) + + ((data[i4 + 2] & 0xFF) << 16) + + ((data[i4 + 3] & 0xFF) << 24) + ) + k &= 0xFFFFFFFF k *= m - k &= 0xffffffff - k ^= (k % 0x100000000) >> r # k ^= k >>> r - k &= 0xffffffff + k &= 0xFFFFFFFF + k ^= (k % 0x100000000) >> r # k ^= k >>> r + k &= 0xFFFFFFFF k *= m - k &= 0xffffffff + k &= 0xFFFFFFFF h *= m - h &= 0xffffffff + h &= 0xFFFFFFFF h ^= k - h &= 0xffffffff + h &= 0xFFFFFFFF # Handle the last few bytes of the input array extra_bytes = length % 4 if extra_bytes >= 3: - h ^= (data[(length & ~3) + 2] & 0xff) << 16 - h &= 0xffffffff + h ^= (data[(length & ~3) + 2] & 0xFF) << 16 + h &= 0xFFFFFFFF if extra_bytes >= 2: - h ^= (data[(length & ~3) + 1] & 0xff) << 8 - h &= 0xffffffff + h ^= (data[(length & ~3) + 1] & 0xFF) << 8 + h &= 0xFFFFFFFF if extra_bytes >= 1: - h ^= (data[length & ~3] & 0xff) - h &= 0xffffffff + h ^= data[length & ~3] & 0xFF + h &= 0xFFFFFFFF h *= m - h &= 0xffffffff + h &= 0xFFFFFFFF - h ^= (h % 0x100000000) >> 13 # h >>> 13; - h &= 0xffffffff + h ^= (h % 0x100000000) >> 13 # h >>> 13; + h &= 0xFFFFFFFF h *= m - h &= 0xffffffff - h ^= (h % 0x100000000) >> 15 # h >>> 15; - h &= 0xffffffff + h &= 0xFFFFFFFF + h ^= (h % 0x100000000) >> 15 # h >>> 15; + h &= 0xFFFFFFFF return h diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 7bfd1089..c8d763b6 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -4,12 +4,12 @@ import traceback import warnings -from kafka.partitioner.default import DefaultPartitioner from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from aiokafka.client import AIOKafkaClient from aiokafka.errors import ( MessageSizeTooLargeError, UnsupportedVersionError, IllegalOperation) +from aiokafka.partitioner import DefaultPartitioner from aiokafka.record.default_records import DefaultRecordBatch from aiokafka.record.legacy_records import LegacyRecordBatchBuilder from aiokafka.structs import TopicPartition diff --git a/tests/kafka/test_partitioner.py b/tests/test_partitioner.py similarity index 67% rename from tests/kafka/test_partitioner.py rename to tests/test_partitioner.py index 853fbf69..af0a7cb2 100644 --- a/tests/kafka/test_partitioner.py +++ b/tests/test_partitioner.py @@ -1,16 +1,14 @@ -from __future__ import absolute_import - import pytest -from kafka.partitioner import DefaultPartitioner, murmur2 +from aiokafka.partitioner import DefaultPartitioner, murmur2 def test_default_partitioner(): partitioner = DefaultPartitioner() all_partitions = available = list(range(100)) # partitioner should return the same partition for the same key - p1 = partitioner(b'foo', all_partitions, available) - p2 = partitioner(b'foo', all_partitions, available) + p1 = partitioner(b"foo", all_partitions, available) + p2 = partitioner(b"foo", all_partitions, available) assert p1 == p2 assert p1 in all_partitions @@ -21,10 +19,17 @@ def test_default_partitioner(): assert partitioner(None, all_partitions, []) in all_partitions -@pytest.mark.parametrize("bytes_payload,partition_number", [ - (b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566), - (b'\x00 ', 742) -]) +@pytest.mark.parametrize( + "bytes_payload,partition_number", + [ + (b"", 681), + (b"a", 524), + (b"ab", 434), + (b"abc", 107), + (b"123456789", 566), + (b"\x00 ", 742), + ], +) def test_murmur2_java_compatibility(bytes_payload, partition_number): partitioner = DefaultPartitioner() all_partitions = available = list(range(1000)) @@ -34,5 +39,5 @@ def test_murmur2_java_compatibility(bytes_payload, partition_number): def test_murmur2_not_ascii(): # Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode - murmur2(b'\xa4') - murmur2(b'\x81' * 1000) + murmur2(b"\xa4") + murmur2(b"\x81" * 1000)