Skip to content

Commit

Permalink
Move partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 22, 2023
1 parent 7a2106b commit 497c857
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 47 deletions.
64 changes: 29 additions & 35 deletions kafka/partitioner/default.py → aiokafka/partitioner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from __future__ import absolute_import

import random

from kafka.vendor import six


class DefaultPartitioner(object):
"""Default partitioner.
Expand All @@ -12,6 +8,7 @@ class DefaultPartitioner(object):
If key is None, selects partition randomly from available,
or from all partitions if none are currently available
"""

@classmethod
def __call__(cls, key, all_partitions, available):
"""
Expand All @@ -27,7 +24,7 @@ def __call__(cls, key, all_partitions, available):
return random.choice(all_partitions)

idx = murmur2(key)
idx &= 0x7fffffff
idx &= 0x7FFFFFFF
idx %= len(all_partitions)
return all_partitions[idx]

Expand All @@ -43,16 +40,11 @@ def murmur2(data):
Returns: MurmurHash2 of data
"""
# Python2 bytes is really a str, causing the bitwise operations below to fail
# so convert to bytearray.
if six.PY2:
data = bytearray(bytes(data))

length = len(data)
seed = 0x9747b28c
seed = 0x9747B28C
# 'm' and 'r' are mixing constants generated offline.
# They're not really 'magic', they just happen to work well.
m = 0x5bd1e995
m = 0x5BD1E995
r = 24

# Initialize the hash to a random value
Expand All @@ -61,42 +53,44 @@ def murmur2(data):

for i in range(length4):
i4 = i * 4
k = ((data[i4 + 0] & 0xff) +
((data[i4 + 1] & 0xff) << 8) +
((data[i4 + 2] & 0xff) << 16) +
((data[i4 + 3] & 0xff) << 24))
k &= 0xffffffff
k = (
(data[i4 + 0] & 0xFF)
+ ((data[i4 + 1] & 0xFF) << 8)
+ ((data[i4 + 2] & 0xFF) << 16)
+ ((data[i4 + 3] & 0xFF) << 24)
)
k &= 0xFFFFFFFF
k *= m
k &= 0xffffffff
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k &= 0xffffffff
k &= 0xFFFFFFFF
k ^= (k % 0x100000000) >> r # k ^= k >>> r
k &= 0xFFFFFFFF
k *= m
k &= 0xffffffff
k &= 0xFFFFFFFF

h *= m
h &= 0xffffffff
h &= 0xFFFFFFFF
h ^= k
h &= 0xffffffff
h &= 0xFFFFFFFF

# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff
h ^= (data[(length & ~3) + 2] & 0xFF) << 16
h &= 0xFFFFFFFF
if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff
h ^= (data[(length & ~3) + 1] & 0xFF) << 8
h &= 0xFFFFFFFF
if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h ^= data[length & ~3] & 0xFF
h &= 0xFFFFFFFF
h *= m
h &= 0xffffffff
h &= 0xFFFFFFFF

h ^= (h % 0x100000000) >> 13 # h >>> 13;
h &= 0xffffffff
h ^= (h % 0x100000000) >> 13 # h >>> 13;
h &= 0xFFFFFFFF
h *= m
h &= 0xffffffff
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h &= 0xffffffff
h &= 0xFFFFFFFF
h ^= (h % 0x100000000) >> 15 # h >>> 15;
h &= 0xFFFFFFFF

return h
2 changes: 1 addition & 1 deletion aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import traceback
import warnings

from kafka.partitioner.default import DefaultPartitioner
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd

from aiokafka.client import AIOKafkaClient
from aiokafka.errors import (
MessageSizeTooLargeError, UnsupportedVersionError, IllegalOperation)
from aiokafka.partitioner import DefaultPartitioner
from aiokafka.record.default_records import DefaultRecordBatch
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.structs import TopicPartition
Expand Down
27 changes: 16 additions & 11 deletions tests/kafka/test_partitioner.py → tests/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from __future__ import absolute_import

import pytest

from kafka.partitioner import DefaultPartitioner, murmur2
from aiokafka.partitioner import DefaultPartitioner, murmur2


def test_default_partitioner():
partitioner = DefaultPartitioner()
all_partitions = available = list(range(100))
# partitioner should return the same partition for the same key
p1 = partitioner(b'foo', all_partitions, available)
p2 = partitioner(b'foo', all_partitions, available)
p1 = partitioner(b"foo", all_partitions, available)
p2 = partitioner(b"foo", all_partitions, available)
assert p1 == p2
assert p1 in all_partitions

Expand All @@ -21,10 +19,17 @@ def test_default_partitioner():
assert partitioner(None, all_partitions, []) in all_partitions


@pytest.mark.parametrize("bytes_payload,partition_number", [
(b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566),
(b'\x00 ', 742)
])
@pytest.mark.parametrize(
"bytes_payload,partition_number",
[
(b"", 681),
(b"a", 524),
(b"ab", 434),
(b"abc", 107),
(b"123456789", 566),
(b"\x00 ", 742),
],
)
def test_murmur2_java_compatibility(bytes_payload, partition_number):
partitioner = DefaultPartitioner()
all_partitions = available = list(range(1000))
Expand All @@ -34,5 +39,5 @@ def test_murmur2_java_compatibility(bytes_payload, partition_number):

def test_murmur2_not_ascii():
# Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode
murmur2(b'\xa4')
murmur2(b'\x81' * 1000)
murmur2(b"\xa4")
murmur2(b"\x81" * 1000)

0 comments on commit 497c857

Please sign in to comment.