Skip to content

Commit

Permalink
Use cramjam for LZ4
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Jan 17, 2024
1 parent 425ce26 commit c4c5cd8
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 83 deletions.
84 changes: 22 additions & 62 deletions aiokafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,6 @@
except ImportError:
cramjam = None

try:
import lz4.frame as lz4

def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)

except ImportError:
lz4 = None

try:
import lz4f
except ImportError:
lz4f = None

try:
import lz4framed
except ImportError:
lz4framed = None


def has_gzip():
return True
Expand All @@ -47,13 +25,7 @@ def has_zstd():


def has_lz4():
if lz4 is not None:
return True
if lz4f is not None:
return True
if lz4framed is not None:
return True
return False
return cramjam is not None


def gzip_encode(payload, compresslevel=None):
Expand Down Expand Up @@ -161,7 +133,7 @@ def _detect_xerial_stream(payload):
"""

if len(payload) > 16:
header = struct.unpack("!" + _XERIAL_V1_FORMAT, bytes(payload)[:16])
header = struct.unpack("!" + _XERIAL_V1_FORMAT, memoryview(payload)[:16])
return header == _XERIAL_V1_HEADER
return False

Expand Down Expand Up @@ -191,38 +163,26 @@ def snappy_decode(payload):
return bytes(cramjam.snappy.decompress_raw(payload))


if lz4:
lz4_encode = _lz4_compress # pylint: disable-msg=no-member
elif lz4f:
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
elif lz4framed:
lz4_encode = lz4framed.compress # pylint: disable-msg=no-member
else:
lz4_encode = None


def lz4f_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
data = lz4f.decompressFrame(payload, ctx)
lz4f.freeDecompContext(ctx)

# lz4f python module does not expose how much of the payload was
# actually read if the decompression was only partial.
if data["next"] != 0:
raise RuntimeError("lz4f unable to decompress full payload")
return data["decomp"]


if lz4:
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
elif lz4f:
lz4_decode = lz4f_decode
elif lz4framed:
lz4_decode = lz4framed.decompress # pylint: disable-msg=no-member
else:
lz4_decode = None
def lz4_encode(payload, level=9):
# level=9 is used by default by broker itself
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
if not has_lz4():
raise NotImplementedError("LZ4 codec is not available")

Check warning on line 170 in aiokafka/codec.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/codec.py#L170

Added line #L170 was not covered by tests

# Kafka broker doesn't support linked-block compression
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
compressor = cramjam.lz4.Compressor(
level=level, content_checksum=False, block_linked=False
)
compressor.compress(payload)
return bytes(compressor.finish())


def lz4_decode(payload):
if not has_lz4():
raise NotImplementedError("LZ4 codec is not available")

Check warning on line 183 in aiokafka/codec.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/codec.py#L183

Added line #L183 was not covered by tests

return bytes(cramjam.lz4.decompress(payload))


def zstd_encode(payload, level=None):
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ dependencies = [

[project.optional-dependencies]
snappy = ["cramjam"]
lz4 = ["lz4 >=3.1.3"]
# v2.8.0 adds support for independent-block mode
lz4 = ["cramjam >=2.8.0"]
zstd = ["cramjam"]
gssapi = ["gssapi"]
all = ["cramjam", "lz4 >=3.1.3", "gssapi"]
all = ["cramjam >=2.8.0", "gssapi"]

[tool.setuptools.dynamic]
version = { attr = "aiokafka.__version__" }
Expand Down
5 changes: 2 additions & 3 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ pytest-cov==4.1.0
pytest-asyncio==0.21.1
pytest-mock==3.12.0
docker==6.1.3
lz4==3.1.3
docutils==0.17.1
docutils==0.20.1
Pygments==2.15.0
gssapi==1.8.3
async-timeout==4.0.1
cramjam==2.7.0
cramjam==2.8.0
3 changes: 1 addition & 2 deletions requirements-win-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ pytest-cov==4.1.0
pytest-asyncio==0.21.1
pytest-mock==3.12.0
docker==6.1.3
lz4==3.1.3
cramjam==2.7.0
cramjam==2.8.0
13 changes: 8 additions & 5 deletions tests/record/test_default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@


@pytest.mark.parametrize("compression_type,crc", [
(DefaultRecordBatch.CODEC_NONE, 3950153926),
pytest.param(DefaultRecordBatch.CODEC_NONE, 3950153926, id="none"),
# Gzip header includes timestamp, so checksum varies
(DefaultRecordBatch.CODEC_GZIP, None),
(DefaultRecordBatch.CODEC_SNAPPY, 2171068483),
(DefaultRecordBatch.CODEC_LZ4, 462121143),
(DefaultRecordBatch.CODEC_ZSTD, 1714138923),
pytest.param(DefaultRecordBatch.CODEC_GZIP, None, id="gzip"),
pytest.param(DefaultRecordBatch.CODEC_SNAPPY, 2171068483, id="snappy"),
# Checksum is
# 462121143 with content size (header = 01101000)
# 1260758266 without content size (header = 01100000)
pytest.param(DefaultRecordBatch.CODEC_LZ4, 1260758266, id="lz4"),
pytest.param(DefaultRecordBatch.CODEC_ZSTD, 1714138923, id="zstd"),
])
def test_read_write_serde_v2(compression_type, crc):
builder = DefaultRecordBatchBuilder(
Expand Down
11 changes: 2 additions & 9 deletions tests/test_codec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import platform
import struct

import pytest
Expand Down Expand Up @@ -88,10 +87,7 @@ def test_snappy_encode_xerial():
assert compressed == to_ensure


@pytest.mark.skipif(
not has_lz4() or platform.python_implementation() == "PyPy",
reason="python-lz4 crashes on old versions of pypy",
)
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
def test_lz4():
for i in range(1000):
b1 = random_string(100)
Expand All @@ -100,10 +96,7 @@ def test_lz4():
assert b1 == b2


@pytest.mark.skipif(
not has_lz4() or platform.python_implementation() == "PyPy",
reason="python-lz4 crashes on old versions of pypy",
)
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
def test_lz4_incremental():
for i in range(1000):
# lz4 max single block size is 4MB
Expand Down

0 comments on commit c4c5cd8

Please sign in to comment.