Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace python-snappy and zstandard with cramjam #940

Merged
merged 5 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl
Expand Down Expand Up @@ -139,9 +138,6 @@ jobs:
with:
python-version: ${{ matrix.python }}

- name: Install system dependencies
run: |
brew install snappy
- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down Expand Up @@ -236,7 +232,7 @@ jobs:
source .env/bin/activate && \
yum install -y epel-release && \
yum-config-manager --enable epel && \
yum install -y snappy-devel libzstd-devel krb5-devel && \
yum install -y krb5-devel && \
pip install --upgrade pip setuptools wheel && \
pip install -r requirements-ci.txt && \
pip install ${{ matrix.aiokafka_whl }} && \
Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -168,10 +167,6 @@ jobs:
restore-keys: |
${{ runner.os }}-py-${{ matrix.python }}-

- name: Install system dependencies
run: |
brew install snappy

- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down
122 changes: 24 additions & 98 deletions aiokafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
import gzip
import io
import platform
import struct

_XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1)
_XERIAL_V1_FORMAT = "bccccccBii"
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
import cramjam
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None
cramjam = None

Check warning on line 12 in aiokafka/codec.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/codec.py#L12

Added line #L12 was not covered by tests

try:
import lz4.frame as lz4

def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
try:
# For lz4>=0.12.0
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)
except TypeError:
# For earlier versions of lz4
kwargs.pop("block_mode", None)
return lz4.compress(payload, block_mode=1, **kwargs)
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)

except ImportError:
lz4 = None
Expand All @@ -44,24 +33,17 @@
except ImportError:
lz4framed = None

try:
import xxhash
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == "PyPy")


def has_gzip():
return True


def has_snappy():
return snappy is not None
return cramjam is not None


def has_zstd():
return zstd is not None
return cramjam is not None


def has_lz4():
Expand Down Expand Up @@ -133,32 +115,22 @@
raise NotImplementedError("Snappy codec is not available")

if not xerial_compatible:
return snappy.compress(payload)
return cramjam.snappy.compress_raw(payload)

out = io.BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack("!" + fmt, dat))

# Chunk through buffers to avoid creating intermediate slice copies
if PYPY:
# on pypy, snappy.compress() on a sliced buffer consumes the entire
# buffer... likely a python-snappy bug, so just use a slice copy
def chunker(payload, i, size):
return payload[i:size + i]

else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
def chunker(payload, i, size):
return memoryview(payload)[i:size + i].tobytes()
def chunker(payload, i, size):
return memoryview(payload)[i:size + i]

for chunk in (
chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)
):

block = snappy.compress(chunk)
block = cramjam.snappy.compress_raw(chunk)
block_size = len(block)
out.write(struct.pack("!i", block_size))
out.write(block)
Expand Down Expand Up @@ -210,13 +182,13 @@
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
out.write(cramjam.snappy.decompress_raw(byt[cursor:end]))
cursor = end

out.seek(0)
return out.read()
else:
return snappy.decompress(payload)
return bytes(cramjam.snappy.decompress_raw(payload))

Check warning on line 191 in aiokafka/codec.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/codec.py#L191

Added line #L191 was not covered by tests


if lz4:
Expand Down Expand Up @@ -253,66 +225,20 @@
lz4_decode = None


def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
flg = data[4]
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = (flg >> 3) & 1
if content_size_bit:
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size + 8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size - 1]).digest()[
-2:-1
] # pylint: disable-msg=no-member

return b"".join([data[0:header_size - 1], hc, payload])


def lz4_decode_old_kafka(payload):
assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
flg = payload[4]
else:
flg = ord(payload[4])
content_size_bit = (flg >> 3) & 1
if content_size_bit:
header_size += 8

# This should be the correct hc
hc = xxhash.xxh32(payload[4:header_size - 1]).digest()[-2:-1]

munged_payload = b"".join([payload[0:header_size - 1], hc, payload[header_size:]])
return lz4_decode(munged_payload)
def zstd_encode(payload, level=None):
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")

Check warning on line 230 in aiokafka/codec.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/codec.py#L230

Added line #L230 was not covered by tests

if level is None:
# Default for kafka broker
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
level = 3

def zstd_encode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdCompressor().compress(payload)
return bytes(cramjam.zstd.compress(payload, level=level))


def zstd_decode(payload):
if not zstd:
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(
payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE
)

return bytes(cramjam.zstd.decompress(payload))
7 changes: 5 additions & 2 deletions aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
snappy_decode,
zstd_decode,
lz4_decode,
lz4_decode_old_kafka,
)
from aiokafka.errors import UnsupportedCodecError
from aiokafka.util import WeakMethod

from .frame import KafkaBytes
Expand Down Expand Up @@ -156,7 +156,10 @@
elif codec == self.CODEC_LZ4:
assert has_lz4(), "LZ4 decompression unsupported"
if self.magic == 0:
raw_bytes = lz4_decode_old_kafka(self.value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(

Check warning on line 160 in aiokafka/protocol/message.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/protocol/message.py#L160

Added line #L160 was not covered by tests
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
raw_bytes = lz4_decode(self.value)
elif codec == self.CODEC_ZSTD:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro
Expand Down Expand Up @@ -141,7 +141,10 @@ cdef class LegacyRecordBatch:
uncompressed = snappy_decode(value)
elif compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(value)

Expand Down Expand Up @@ -437,7 +440,10 @@ cdef class LegacyRecordBatchBuilder:
compressed = snappy_encode(self._buffer)
elif self._compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(self._buffer))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(self._buffer))
else:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
Expand Down Expand Up @@ -159,7 +159,10 @@
uncompressed = snappy_decode(data.tobytes())
elif compression_type == self.CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(data.tobytes())
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(

Check warning on line 163 in aiokafka/record/legacy_records.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/record/legacy_records.py#L163

Added line #L163 was not covered by tests
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(data.tobytes())
return uncompressed
Expand Down Expand Up @@ -415,7 +418,10 @@
compressed = snappy_encode(buf)
elif self._compression_type == self.CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(buf))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(

Check warning on line 422 in aiokafka/record/legacy_records.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/record/legacy_records.py#L422

Added line #L422 was not covered by tests
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(buf))
compressed_size = len(compressed)
Expand Down
28 changes: 1 addition & 27 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,7 @@ from http://landinghub.visualstudio.com/visual-cpp-build-tools
Optional Snappy install
+++++++++++++++++++++++

1. Download and build Snappy from http://google.github.io/snappy/

Ubuntu:

.. code:: bash

apt-get install libsnappy-dev

OSX:

.. code:: bash

brew install snappy

From Source:

.. code:: bash

wget https://github.com/google/snappy/tarball/master
tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz
cd google-snappy-X.X.X-X-XXXXXXXX
./configure
make
sudo make install


1. Install **aiokafka** with :code:`snappy` extra option
To enable Snappy compression/decompression, install **aiokafka** with :code:`snappy` extra option

.. code:: bash

Expand Down
4 changes: 1 addition & 3 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ pytest-mock==3.12.0
docker==6.1.2
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
docutils==0.17.1
Pygments==2.15.0
gssapi==1.8.2
async-timeout==4.0.1
zstandard==0.16.0
cramjam==2.7.0
4 changes: 1 addition & 3 deletions requirements-win-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ pytest-mock==3.12.0
docker==6.0.1
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
zstandard==0.16.0
cramjam==2.7.0
Loading
Loading