From 62905e4bf01be520cfe6a89215b115c2eccfcc4a Mon Sep 17 00:00:00 2001 From: vajexal <72415539+vajexal@users.noreply.github.com> Date: Sat, 31 Aug 2024 22:07:14 +0300 Subject: [PATCH] read data written with partial flush (#996) * read data written with partial flush --- flate/inflate.go | 74 +++++++++++++++++++++++++++--------- flate/inflate_test.go | 21 ++++++++++ flate/testdata/partial-block | 1 + 3 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 flate/testdata/partial-block diff --git a/flate/inflate.go b/flate/inflate.go index 2f410d64f5..0d7b437f1c 100644 --- a/flate/inflate.go +++ b/flate/inflate.go @@ -298,6 +298,14 @@ const ( huffmanGenericReader ) +// flushMode tells decompressor when to return data +type flushMode uint8 + +const ( + syncFlush flushMode = iota // return data after sync flush block + partialFlush // return data after each block +) + // Decompress state. type decompressor struct { // Input source. @@ -332,6 +340,8 @@ type decompressor struct { nb uint final bool + + flushMode flushMode } func (f *decompressor) nextBlock() { @@ -618,7 +628,10 @@ func (f *decompressor) dataBlock() { } if n == 0 { - f.toRead = f.dict.readFlush() + if f.flushMode == syncFlush { + f.toRead = f.dict.readFlush() + } + f.finishBlock() return } @@ -657,8 +670,12 @@ func (f *decompressor) finishBlock() { if f.dict.availRead() > 0 { f.toRead = f.dict.readFlush() } + f.err = io.EOF + } else if f.flushMode == partialFlush && f.dict.availRead() > 0 { + f.toRead = f.dict.readFlush() } + f.step = nextBlock } @@ -789,15 +806,25 @@ func (f *decompressor) Reset(r io.Reader, dict []byte) error { return nil } -// NewReader returns a new ReadCloser that can be used -// to read the uncompressed version of r. -// If r does not also implement io.ByteReader, -// the decompressor may read more data than necessary from r. -// It is the caller's responsibility to call Close on the ReadCloser -// when finished reading. -// -// The ReadCloser returned by NewReader also implements Resetter. -func NewReader(r io.Reader) io.ReadCloser { +type ReaderOpt func(*decompressor) + +// WithPartialBlock tells decompressor to return after each block, +// so it can read data written with partial flush +func WithPartialBlock() ReaderOpt { + return func(f *decompressor) { + f.flushMode = partialFlush + } +} + +// WithDict initializes the reader with a preset dictionary +func WithDict(dict []byte) ReaderOpt { + return func(f *decompressor) { + f.dict.init(maxMatchOffset, dict) + } +} + +// NewReaderOpts returns new reader with provided options +func NewReaderOpts(r io.Reader, opts ...ReaderOpt) io.ReadCloser { fixedHuffmanDecoderInit() var f decompressor @@ -806,9 +833,26 @@ func NewReader(r io.Reader) io.ReadCloser { f.codebits = new([numCodes]int) f.step = nextBlock f.dict.init(maxMatchOffset, nil) + + for _, opt := range opts { + opt(&f) + } + return &f } +// NewReader returns a new ReadCloser that can be used +// to read the uncompressed version of r. +// If r does not also implement io.ByteReader, +// the decompressor may read more data than necessary from r. +// It is the caller's responsibility to call Close on the ReadCloser +// when finished reading. +// +// The ReadCloser returned by NewReader also implements Resetter. +func NewReader(r io.Reader) io.ReadCloser { + return NewReaderOpts(r) +} + // NewReaderDict is like NewReader but initializes the reader // with a preset dictionary. The returned Reader behaves as if // the uncompressed data stream started with the given dictionary, @@ -817,13 +861,5 @@ func NewReader(r io.Reader) io.ReadCloser { // // The ReadCloser returned by NewReader also implements Resetter. func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { - fixedHuffmanDecoderInit() - - var f decompressor - f.r = makeReader(r) - f.bits = new([maxNumLit + maxNumDist]int) - f.codebits = new([numCodes]int) - f.step = nextBlock - f.dict.init(maxMatchOffset, dict) - return &f + return NewReaderOpts(r, WithDict(dict)) } diff --git a/flate/inflate_test.go b/flate/inflate_test.go index aac29c6d2b..d018991cdd 100644 --- a/flate/inflate_test.go +++ b/flate/inflate_test.go @@ -8,6 +8,7 @@ import ( "bytes" "crypto/rand" "io" + "os" "strconv" "strings" "testing" @@ -279,3 +280,23 @@ func TestWriteTo(t *testing.T) { t.Fatal("output did not match input") } } + +func TestReaderPartialBlock(t *testing.T) { + data, err := os.ReadFile("testdata/partial-block") + if err != nil { + t.Error(err) + } + + r := NewReaderOpts(bytes.NewReader(data), WithPartialBlock()) + rb := make([]byte, 32) + n, err := r.Read(rb) + if err != nil { + t.Fatalf("Read: %v", err) + } + + expected := "hello, world" + actual := string(rb[:n]) + if expected != actual { + t.Fatalf("expected: %v, got: %v", expected, actual) + } +} diff --git a/flate/testdata/partial-block b/flate/testdata/partial-block new file mode 100644 index 0000000000..b14e816aa8 --- /dev/null +++ b/flate/testdata/partial-block @@ -0,0 +1 @@ +ÊHÍÉÉ×Q(Ï/ÊI \ No newline at end of file