Skip to content

Commit

Permalink
Move codec
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Oct 23, 2023
1 parent 91759b8 commit 8c4cd40
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 209 deletions.
124 changes: 58 additions & 66 deletions kafka/codec.py → aiokafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from __future__ import absolute_import

import gzip
import io
import platform
import struct

from kafka.vendor import six
from kafka.vendor.six.moves import range

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
_XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1)
_XERIAL_V1_FORMAT = "bccccccBii"
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
Expand All @@ -29,11 +24,11 @@ def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
try:
# For lz4>=0.12.0
kwargs.pop('block_linked', None)
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)
except TypeError:
# For earlier versions of lz4
kwargs.pop('block_mode', None)
kwargs.pop("block_mode", None)
return lz4.compress(payload, block_mode=1, **kwargs)

except ImportError:
Expand All @@ -54,7 +49,8 @@ def _lz4_compress(payload, **kwargs):
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == 'PyPy')
PYPY = bool(platform.python_implementation() == "PyPy")


def has_gzip():
return True
Expand Down Expand Up @@ -100,14 +96,14 @@ def gzip_decode(payload):

# Gzip context manager introduced in python 2.7
# so old-fashioned way until we decide to not support 2.6
gzipper = gzip.GzipFile(fileobj=buf, mode='r')
gzipper = gzip.GzipFile(fileobj=buf, mode="r")
try:
return gzipper.read()
finally:
gzipper.close()


def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy compression.
If xerial_compatible is set then the stream is encoded in a fashion
Expand Down Expand Up @@ -141,59 +137,59 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):

out = io.BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack('!' + fmt, dat))
out.write(struct.pack("!" + fmt, dat))

# Chunk through buffers to avoid creating intermediate slice copies
if PYPY:
# on pypy, snappy.compress() on a sliced buffer consumes the entire
# buffer... likely a python-snappy bug, so just use a slice copy
chunker = lambda payload, i, size: payload[i:size+i]
def chunker(payload, i, size):
return payload[i:size + i]

elif six.PY2:
# Sliced buffer avoids additional copies
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: buffer(payload, i, size)
else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
chunker = lambda payload, i, size: memoryview(payload)[i:size+i].tobytes()
def chunker(payload, i, size):
return memoryview(payload)[i:size + i].tobytes()

for chunk in (chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)):
for chunk in (
chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)
):

block = snappy.compress(chunk)
block_size = len(block)
out.write(struct.pack('!i', block_size))
out.write(struct.pack("!i", block_size))
out.write(block)

return out.getvalue()


def _detect_xerial_stream(payload):
"""Detects if the data given might have been encoded with the blocking mode
of the xerial snappy library.
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
+--------+--------------+------------+---------+--------+
| byte | c-string | byte | int32 | int32 |
+--------+--------------+------------+---------+--------+
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
The pad appears to be to ensure that SNAPPY is a valid cstring
The version is the version of this format as written by xerial,
in the wild this is currently 1 as such we only support v1.
Compat is there to claim the minimum supported version that
can read a xerial block stream, presently in the wild this is
1.
of the xerial snappy library.
This mode writes a magic header of the format:
+--------+--------------+------------+---------+--------+
| Marker | Magic String | Null / Pad | Version | Compat |
+--------+--------------+------------+---------+--------+
| byte | c-string | byte | int32 | int32 |
+--------+--------------+------------+---------+--------+
| -126 | 'SNAPPY' | \0 | | |
+--------+--------------+------------+---------+--------+
The pad appears to be to ensure that SNAPPY is a valid cstring
The version is the version of this format as written by xerial,
in the wild this is currently 1 as such we only support v1.
Compat is there to claim the minimum supported version that
can read a xerial block stream, presently in the wild this is
1.
"""

if len(payload) > 16:
header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
header = struct.unpack("!" + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False

Expand All @@ -210,7 +206,7 @@ def snappy_decode(payload):
cursor = 0

while cursor < length:
block_size = struct.unpack_from('!i', byt[cursor:])[0]
block_size = struct.unpack_from("!i", byt[cursor:])[0]
# Skip the block size
cursor += 4
end = cursor + block_size
Expand All @@ -224,11 +220,11 @@ def snappy_decode(payload):


if lz4:
lz4_encode = _lz4_compress # pylint: disable-msg=no-member
lz4_encode = _lz4_compress # pylint: disable-msg=no-member
elif lz4f:
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
elif lz4framed:
lz4_encode = lz4framed.compress # pylint: disable-msg=no-member
lz4_encode = lz4framed.compress # pylint: disable-msg=no-member
else:
lz4_encode = None

Expand All @@ -242,17 +238,17 @@ def lz4f_decode(payload):

# lz4f python module does not expose how much of the payload was
# actually read if the decompression was only partial.
if data['next'] != 0:
raise RuntimeError('lz4f unable to decompress full payload')
return data['decomp']
if data["next"] != 0:
raise RuntimeError("lz4f unable to decompress full payload")
return data["decomp"]


if lz4:
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
elif lz4f:
lz4_decode = lz4f_decode
elif lz4framed:
lz4_decode = lz4framed.decompress # pylint: disable-msg=no-member
lz4_decode = lz4framed.decompress # pylint: disable-msg=no-member
else:
lz4_decode = None

Expand All @@ -266,26 +262,24 @@ def lz4_encode_old_kafka(payload):
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = ((flg >> 3) & 1)
content_size_bit = (flg >> 3) & 1
if content_size_bit:
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size+8:]
payload = data[header_size + 8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
hc = xxhash.xxh32(data[0:header_size - 1]).digest()[
-2:-1
] # pylint: disable-msg=no-member

return b''.join([
data[0:header_size-1],
hc,
payload
])
return b"".join([data[0:header_size - 1], hc, payload])


def lz4_decode_old_kafka(payload):
Expand All @@ -296,18 +290,14 @@ def lz4_decode_old_kafka(payload):
flg = payload[4]
else:
flg = ord(payload[4])
content_size_bit = ((flg >> 3) & 1)
content_size_bit = (flg >> 3) & 1
if content_size_bit:
header_size += 8

# This should be the correct hc
hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
hc = xxhash.xxh32(payload[4:header_size - 1]).digest()[-2:-1]

munged_payload = b''.join([
payload[0:header_size-1],
hc,
payload[header_size:]
])
munged_payload = b"".join([payload[0:header_size - 1], hc, payload[header_size:]])
return lz4_decode(munged_payload)


Expand All @@ -323,4 +313,6 @@ def zstd_decode(payload):
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
return zstd.ZstdDecompressor().decompress(
payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE
)
3 changes: 1 addition & 2 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import traceback
import warnings

from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd

from aiokafka.client import AIOKafkaClient
from aiokafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from aiokafka.errors import (
MessageSizeTooLargeError, UnsupportedVersionError, IllegalOperation)
from aiokafka.partitioner import DefaultPartitioner
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
import time

from kafka.codec import (
from aiokafka.codec import (
has_gzip,
has_snappy,
has_lz4,
Expand Down
6 changes: 3 additions & 3 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
# * Timestamp Type (3)
# * Compression Type (0-2)

from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
gzip_decode, snappy_decode, lz4_decode, zstd_decode
)
import kafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError

from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_WRITABLE, \
PyBUF_SIMPLE, PyBUF_READ, Py_buffer, \
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#cython: language_level=3

from kafka.codec import (
import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
import kafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro

Expand Down
7 changes: 4 additions & 3 deletions aiokafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@

import struct
import time
from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint

import aiokafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
gzip_decode, snappy_decode, lz4_decode, zstd_decode
)
import kafka.codec as codecs

from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint


class DefaultRecordBase:
Expand Down
8 changes: 4 additions & 4 deletions aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

from binascii import crc32

from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
import kafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS


NoneType = type(None)
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from aiokafka.record.default_records import (
DefaultRecordBatchBuilder, _DefaultRecordBatchBuilderPy)
from aiokafka.util import NO_EXTENSIONS

from ._testutil import wait_kafka


Expand Down
Loading

0 comments on commit 8c4cd40

Please sign in to comment.