Skip to content

Commit

Permalink
Merge pull request #940 from ods/cramjam
Browse files Browse the repository at this point in the history
Replace python-snappy and zstandard with cramjam
  • Loading branch information
ods authored Nov 19, 2023
2 parents 88c6945 + b3d8a3c commit 5266f46
Show file tree
Hide file tree
Showing 23 changed files with 68 additions and 178 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl
Expand Down Expand Up @@ -139,9 +138,6 @@ jobs:
with:
python-version: ${{ matrix.python }}

- name: Install system dependencies
run: |
brew install snappy
- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down Expand Up @@ -236,7 +232,7 @@ jobs:
source .env/bin/activate && \
yum install -y epel-release && \
yum-config-manager --enable epel && \
yum install -y snappy-devel libzstd-devel krb5-devel && \
yum install -y krb5-devel && \
pip install --upgrade pip setuptools wheel && \
pip install -r requirements-ci.txt && \
pip install ${{ matrix.aiokafka_whl }} && \
Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -168,10 +167,6 @@ jobs:
restore-keys: |
${{ runner.os }}-py-${{ matrix.python }}-
- name: Install system dependencies
run: |
brew install snappy
- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down
122 changes: 24 additions & 98 deletions aiokafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
import gzip
import io
import platform
import struct

_XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1)
_XERIAL_V1_FORMAT = "bccccccBii"
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
import cramjam
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None
cramjam = None

try:
import lz4.frame as lz4

def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
try:
# For lz4>=0.12.0
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)
except TypeError:
# For earlier versions of lz4
kwargs.pop("block_mode", None)
return lz4.compress(payload, block_mode=1, **kwargs)
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)

except ImportError:
lz4 = None
Expand All @@ -44,24 +33,17 @@ def _lz4_compress(payload, **kwargs):
except ImportError:
lz4framed = None

try:
import xxhash
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == "PyPy")


def has_gzip():
return True


def has_snappy():
return snappy is not None
return cramjam is not None


def has_zstd():
return zstd is not None
return cramjam is not None


def has_lz4():
Expand Down Expand Up @@ -133,32 +115,22 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32 * 1024):
raise NotImplementedError("Snappy codec is not available")

if not xerial_compatible:
return snappy.compress(payload)
return cramjam.snappy.compress_raw(payload)

out = io.BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack("!" + fmt, dat))

# Chunk through buffers to avoid creating intermediate slice copies
if PYPY:
# on pypy, snappy.compress() on a sliced buffer consumes the entire
# buffer... likely a python-snappy bug, so just use a slice copy
def chunker(payload, i, size):
return payload[i:size + i]

else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
def chunker(payload, i, size):
return memoryview(payload)[i:size + i].tobytes()
def chunker(payload, i, size):
return memoryview(payload)[i:size + i]

for chunk in (
chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)
):

block = snappy.compress(chunk)
block = cramjam.snappy.compress_raw(chunk)
block_size = len(block)
out.write(struct.pack("!i", block_size))
out.write(block)
Expand Down Expand Up @@ -210,13 +182,13 @@ def snappy_decode(payload):
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
out.write(cramjam.snappy.decompress_raw(byt[cursor:end]))
cursor = end

out.seek(0)
return out.read()
else:
return snappy.decompress(payload)
return bytes(cramjam.snappy.decompress_raw(payload))


if lz4:
Expand Down Expand Up @@ -253,66 +225,20 @@ def lz4f_decode(payload):
lz4_decode = None


def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
flg = data[4]
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = (flg >> 3) & 1
if content_size_bit:
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size + 8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size - 1]).digest()[
-2:-1
] # pylint: disable-msg=no-member

return b"".join([data[0:header_size - 1], hc, payload])


def lz4_decode_old_kafka(payload):
assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
flg = payload[4]
else:
flg = ord(payload[4])
content_size_bit = (flg >> 3) & 1
if content_size_bit:
header_size += 8

# This should be the correct hc
hc = xxhash.xxh32(payload[4:header_size - 1]).digest()[-2:-1]

munged_payload = b"".join([payload[0:header_size - 1], hc, payload[header_size:]])
return lz4_decode(munged_payload)
def zstd_encode(payload, level=None):
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")

if level is None:
# Default for kafka broker
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
level = 3

def zstd_encode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdCompressor().compress(payload)
return bytes(cramjam.zstd.compress(payload, level=level))


def zstd_decode(payload):
if not zstd:
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(
payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE
)

return bytes(cramjam.zstd.decompress(payload))
7 changes: 5 additions & 2 deletions aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
snappy_decode,
zstd_decode,
lz4_decode,
lz4_decode_old_kafka,
)
from aiokafka.errors import UnsupportedCodecError
from aiokafka.util import WeakMethod

from .frame import KafkaBytes
Expand Down Expand Up @@ -156,7 +156,10 @@ def decompress(self):
elif codec == self.CODEC_LZ4:
assert has_lz4(), "LZ4 decompression unsupported"
if self.magic == 0:
raw_bytes = lz4_decode_old_kafka(self.value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
raw_bytes = lz4_decode(self.value)
elif codec == self.CODEC_ZSTD:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro
Expand Down Expand Up @@ -141,7 +141,10 @@ cdef class LegacyRecordBatch:
uncompressed = snappy_decode(value)
elif compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(value)

Expand Down Expand Up @@ -437,7 +440,10 @@ cdef class LegacyRecordBatchBuilder:
compressed = snappy_encode(self._buffer)
elif self._compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(self._buffer))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(self._buffer))
else:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
Expand Down Expand Up @@ -159,7 +159,10 @@ def _decompress(self, key_offset):
uncompressed = snappy_decode(data.tobytes())
elif compression_type == self.CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(data.tobytes())
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(data.tobytes())
return uncompressed
Expand Down Expand Up @@ -415,7 +418,10 @@ def _maybe_compress(self):
compressed = snappy_encode(buf)
elif self._compression_type == self.CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(buf))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(buf))
compressed_size = len(compressed)
Expand Down
28 changes: 1 addition & 27 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,7 @@ from http://landinghub.visualstudio.com/visual-cpp-build-tools
Optional Snappy install
+++++++++++++++++++++++

1. Download and build Snappy from http://google.github.io/snappy/

Ubuntu:

.. code:: bash
apt-get install libsnappy-dev
OSX:

.. code:: bash
brew install snappy
From Source:

.. code:: bash
wget https://github.com/google/snappy/tarball/master
tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz
cd google-snappy-X.X.X-X-XXXXXXXX
./configure
make
sudo make install
1. Install **aiokafka** with :code:`snappy` extra option
To enable Snappy compression/decompression, install **aiokafka** with :code:`snappy` extra option

.. code:: bash
Expand Down
4 changes: 1 addition & 3 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ pytest-mock==3.12.0
docker==6.1.2
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
docutils==0.17.1
Pygments==2.15.0
gssapi==1.8.2
async-timeout==4.0.1
zstandard==0.16.0
cramjam==2.7.0
4 changes: 1 addition & 3 deletions requirements-win-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ pytest-mock==3.12.0
docker==6.0.1
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
zstandard==0.16.0
cramjam==2.7.0
Loading

0 comments on commit 5266f46

Please sign in to comment.