Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZstdFrameCompressor via endOp #52

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CodecZstd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module CodecZstd
export
ZstdCompressor,
ZstdCompressorStream,
ZstdFrameCompressor,
ZstdDecompressor,
ZstdDecompressorStream

Expand Down
64 changes: 57 additions & 7 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
struct ZstdCompressor <: TranscodingStreams.Codec
cstream::CStream
level::Int
endOp::LibZstd.ZSTD_EndDirective
end

function Base.show(io::IO, codec::ZstdCompressor)
print(io, summary(codec), "(level=$(codec.level))")
if codec.endOp == LibZstd.ZSTD_e_end
print(io, "ZstdFrameCompressor(level=$(codec.level))")
else
print(io, summary(codec), "(level=$(codec.level))")
end
end

# Same as the zstd command line tool (v1.2.0).
Expand All @@ -28,6 +33,34 @@
end
return ZstdCompressor(CStream(), level)
end
ZstdCompressor(cstream, level) = ZstdCompressor(cstream, level, :continue)

"""
ZstdFrameCompressor(;level=$(DEFAULT_COMPRESSION_LEVEL))

Create a new zstd compression codec that reads the available input and then
closes the frame, encoding the decompressed size of that frame.

Arguments
---------
- `level`: compression level (1..$(MAX_CLEVEL))
"""
function ZstdFrameCompressor(;level::Integer=DEFAULT_COMPRESSION_LEVEL)
if !(1 ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within 1..$(MAX_CLEVEL)"))

Check warning on line 50 in src/compression.jl

View check run for this annotation

Codecov / codecov/patch

src/compression.jl#L50

Added line #L50 was not covered by tests
end
return ZstdCompressor(CStream(), level, :end)
end
# pretend that ZstdFrameCompressor is a compressor type
function TranscodingStreams.transcode(C::typeof(ZstdFrameCompressor), args...)
codec = C()
initialize(codec)
try
return transcode(codec, args...)
finally
finalize(codec)
end
end

const ZstdCompressorStream{S} = TranscodingStream{ZstdCompressor,S} where S<:IO

Expand All @@ -50,6 +83,9 @@
if iserror(code)
zstderror(codec.cstream, code)
end
codec.cstream.ibuffer.src = C_NULL
codec.cstream.ibuffer.size = 0
codec.cstream.ibuffer.pos = 0
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
return
end

Expand All @@ -61,6 +97,9 @@
end
codec.cstream.ptr = C_NULL
end
codec.cstream.ibuffer.src = C_NULL
codec.cstream.ibuffer.size = 0
codec.cstream.ibuffer.pos = 0
return
end

Expand All @@ -75,21 +114,32 @@

function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, error::Error)
cstream = codec.cstream
cstream.ibuffer.src = input.ptr
cstream.ibuffer.size = input.size
cstream.ibuffer.pos = 0
ibuffer_starting_pos = UInt(0)
if codec.endOp == LibZstd.ZSTD_e_end &&
cstream.ibuffer.size != cstream.ibuffer.pos
# While saving a frame, the prior process run did not complete processing.
# Re-run with the same size and pos
ibuffer_starting_pos = cstream.ibuffer.pos
# Set the pointer relative to input.ptr, but keep size and pos
cstream.ibuffer.src = input.ptr - cstream.ibuffer.pos
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
else
cstream.ibuffer.src = input.ptr
cstream.ibuffer.size = input.size
cstream.ibuffer.pos = 0
end
cstream.obuffer.dst = output.ptr
cstream.obuffer.size = output.size
cstream.obuffer.pos = 0
if input.size == 0
code = finish!(cstream)
else
code = compress!(cstream)
code = compress!(cstream; endOp = codec.endOp)
end
Δin = Int(cstream.ibuffer.pos)
Δin = Int(cstream.ibuffer.pos - ibuffer_starting_pos)
Δout = Int(cstream.obuffer.pos)
if iserror(code)
error[] = ErrorException("zstd error")
ptr = LibZstd.ZSTD_getErrorName(code)
error[] = ErrorException("zstd error: " * unsafe_string(ptr))

Check warning on line 142 in src/compression.jl

View check run for this annotation

Codecov / codecov/patch

src/compression.jl#L141-L142

Added lines #L141 - L142 were not covered by tests
return Δin, Δout, :error
else
return Δin, Δout, input.size == 0 && code == 0 ? :end : :ok
Expand Down
12 changes: 12 additions & 0 deletions test/compress_endOp.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,15 @@ end
Base.Libc.free(cstream.obuffer.dst)
end
end

@testset "ZstdFrameCompressor" begin
data = rand(1:100, 1024*1024)
compressed = transcode(ZstdFrameCompressor, copy(reinterpret(UInt8, data)))
GC.@preserve compressed begin
@test CodecZstd.find_decompressed_size(pointer(compressed), sizeof(compressed)) == sizeof(data)
end
@test reinterpret(Int, transcode(ZstdDecompressor, compressed)) == data
iob = IOBuffer()
print(iob, ZstdFrameCompressor())
@test startswith(String(take!(iob)), "ZstdFrameCompressor")
end
38 changes: 38 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,43 @@ Random.seed!(1234)
TranscodingStreams.test_roundtrip_lines(ZstdCompressorStream, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_transcode(ZstdCompressor, ZstdDecompressor)

frame_encoder = io -> TranscodingStream(ZstdFrameCompressor(), io)
TranscodingStreams.test_roundtrip_read(frame_encoder, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_write(frame_encoder, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_lines(frame_encoder, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_transcode(ZstdFrameCompressor, ZstdDecompressor)

@testset "ZstdFrameCompressor streaming edge case" begin
codec = ZstdFrameCompressor()
TranscodingStreams.initialize(codec)
e = TranscodingStreams.Error()
r = TranscodingStreams.startproc(codec, :write, e)
@test r == :ok
# data buffers
data = rand(UInt8, 32*1024*1024)
buffer1 = copy(data)
buffer2 = zeros(UInt8, length(data)*2)
GC.@preserve buffer1 buffer2 begin
total_out = 0
total_in = 0
while total_in < length(data) || r != :end
in_size = min(length(buffer1) - total_in, 1024*1024)
out_size = min(length(buffer2) - total_out, 1024)
input = TranscodingStreams.Memory(pointer(buffer1, total_in + 1), UInt(in_size))
output = TranscodingStreams.Memory(pointer(buffer2, total_out + 1), UInt(out_size))
Δin, Δout, r = TranscodingStreams.process(codec, input, output, e)
if r == :error
throw(e[])
end
total_out += Δout
total_in += Δin
end
@test r == :end
end
TranscodingStreams.finalize(codec)
resize!(buffer2, total_out)
@test transcode(ZstdDecompressor, buffer2) == data
end

include("compress_endOp.jl")
end