diff --git a/aiokafka/codec.py b/aiokafka/codec.py index aab1975f..8e8553e9 100644 --- a/aiokafka/codec.py +++ b/aiokafka/codec.py @@ -11,28 +11,6 @@ except ImportError: cramjam = None -try: - import lz4.frame as lz4 - - def _lz4_compress(payload, **kwargs): - # Kafka does not support LZ4 dependent blocks - # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing - kwargs.pop("block_linked", None) - return lz4.compress(payload, block_linked=False, **kwargs) - -except ImportError: - lz4 = None - -try: - import lz4f -except ImportError: - lz4f = None - -try: - import lz4framed -except ImportError: - lz4framed = None - def has_gzip(): return True @@ -47,13 +25,7 @@ def has_zstd(): def has_lz4(): - if lz4 is not None: - return True - if lz4f is not None: - return True - if lz4framed is not None: - return True - return False + return cramjam is not None def gzip_encode(payload, compresslevel=None): @@ -161,7 +133,7 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = struct.unpack("!" + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + header = struct.unpack("!" + _XERIAL_V1_FORMAT, memoryview(payload)[:16]) return header == _XERIAL_V1_HEADER return False @@ -191,38 +163,26 @@ def snappy_decode(payload): return bytes(cramjam.snappy.decompress_raw(payload)) -if lz4: - lz4_encode = _lz4_compress # pylint: disable-msg=no-member -elif lz4f: - lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member -elif lz4framed: - lz4_encode = lz4framed.compress # pylint: disable-msg=no-member -else: - lz4_encode = None - - -def lz4f_decode(payload): - """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" - # pylint: disable-msg=no-member - ctx = lz4f.createDecompContext() - data = lz4f.decompressFrame(payload, ctx) - lz4f.freeDecompContext(ctx) - - # lz4f python module does not expose how much of the payload was - # actually read if the decompression was only partial. - if data["next"] != 0: - raise RuntimeError("lz4f unable to decompress full payload") - return data["decomp"] - - -if lz4: - lz4_decode = lz4.decompress # pylint: disable-msg=no-member -elif lz4f: - lz4_decode = lz4f_decode -elif lz4framed: - lz4_decode = lz4framed.decompress # pylint: disable-msg=no-member -else: - lz4_decode = None +def lz4_encode(payload, level=9): + # level=9 is used by default by broker itself + # https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level + if not has_lz4(): + raise NotImplementedError("LZ4 codec is not available") + + # Kafka broker doesn't support linked-block compression + # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing + compressor = cramjam.lz4.Compressor( + level=level, content_checksum=False, block_linked=False + ) + compressor.compress(payload) + return bytes(compressor.finish()) + + +def lz4_decode(payload): + if not has_lz4(): + raise NotImplementedError("LZ4 codec is not available") + + return bytes(cramjam.lz4.decompress(payload)) def zstd_encode(payload, level=None): diff --git a/pyproject.toml b/pyproject.toml index 191c841a..4852931b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,10 +35,11 @@ dependencies = [ [project.optional-dependencies] snappy = ["cramjam"] -lz4 = ["lz4 >=3.1.3"] +# v2.8.0 adds support for independent-block mode +lz4 = ["cramjam >=2.8.0"] zstd = ["cramjam"] gssapi = ["gssapi"] -all = ["cramjam", "lz4 >=3.1.3", "gssapi"] +all = ["cramjam >=2.8.0", "gssapi"] [tool.setuptools.dynamic] version = { attr = "aiokafka.__version__" } diff --git a/requirements-ci.txt b/requirements-ci.txt index 78f2d8bc..bbeb734b 100644 --- a/requirements-ci.txt +++ b/requirements-ci.txt @@ -8,9 +8,8 @@ pytest-cov==4.1.0 pytest-asyncio==0.21.1 pytest-mock==3.12.0 docker==6.1.3 -lz4==3.1.3 -docutils==0.17.1 +docutils==0.20.1 Pygments==2.15.0 gssapi==1.8.3 async-timeout==4.0.1 -cramjam==2.7.0 +cramjam==2.8.0 diff --git a/requirements-win-test.txt b/requirements-win-test.txt index 71c2bff1..a6fb2609 100644 --- a/requirements-win-test.txt +++ b/requirements-win-test.txt @@ -8,5 +8,4 @@ pytest-cov==4.1.0 pytest-asyncio==0.21.1 pytest-mock==3.12.0 docker==6.1.3 -lz4==3.1.3 -cramjam==2.7.0 +cramjam==2.8.0 diff --git a/tests/record/test_default_records.py b/tests/record/test_default_records.py index 0037c76a..a137fdf3 100644 --- a/tests/record/test_default_records.py +++ b/tests/record/test_default_records.py @@ -10,12 +10,15 @@ @pytest.mark.parametrize("compression_type,crc", [ - (DefaultRecordBatch.CODEC_NONE, 3950153926), + pytest.param(DefaultRecordBatch.CODEC_NONE, 3950153926, id="none"), # Gzip header includes timestamp, so checksum varies - (DefaultRecordBatch.CODEC_GZIP, None), - (DefaultRecordBatch.CODEC_SNAPPY, 2171068483), - (DefaultRecordBatch.CODEC_LZ4, 462121143), - (DefaultRecordBatch.CODEC_ZSTD, 1714138923), + pytest.param(DefaultRecordBatch.CODEC_GZIP, None, id="gzip"), + pytest.param(DefaultRecordBatch.CODEC_SNAPPY, 2171068483, id="snappy"), + # Checksum is + # 462121143 with content size (header = 01101000) + # 1260758266 without content size (header = 01100000) + pytest.param(DefaultRecordBatch.CODEC_LZ4, 1260758266, id="lz4"), + pytest.param(DefaultRecordBatch.CODEC_ZSTD, 1714138923, id="zstd"), ]) def test_read_write_serde_v2(compression_type, crc): builder = DefaultRecordBatchBuilder( diff --git a/tests/test_codec.py b/tests/test_codec.py index f047d648..7365cc20 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -1,4 +1,3 @@ -import platform import struct import pytest @@ -88,10 +87,7 @@ def test_snappy_encode_xerial(): assert compressed == to_ensure -@pytest.mark.skipif( - not has_lz4() or platform.python_implementation() == "PyPy", - reason="python-lz4 crashes on old versions of pypy", -) +@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") def test_lz4(): for i in range(1000): b1 = random_string(100) @@ -100,10 +96,7 @@ def test_lz4(): assert b1 == b2 -@pytest.mark.skipif( - not has_lz4() or platform.python_implementation() == "PyPy", - reason="python-lz4 crashes on old versions of pypy", -) +@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") def test_lz4_incremental(): for i in range(1000): # lz4 max single block size is 4MB