From 141e857efe21f90dcc3c58a9f72577184e24a85b Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Mon, 22 Apr 2024 19:03:29 +0200 Subject: [PATCH] fix: reader returns errors on unexpected EOF (#382) --- codec_skip_test.go | 2 +- decoder_native_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++ decoder_test.go | 13 ---------- ocf/ocf.go | 8 ++++++ reader.go | 18 ++++++++++++++ reader_test.go | 27 ++++++++++++++++++-- 6 files changed, 108 insertions(+), 16 deletions(-) diff --git a/codec_skip_test.go b/codec_skip_test.go index e5ad484f..54c41448 100644 --- a/codec_skip_test.go +++ b/codec_skip_test.go @@ -194,7 +194,7 @@ func TestDecoder_SkipRecord(t *testing.T) { func TestDecoder_SkipRef(t *testing.T) { defer ConfigTeardown() - data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f} + data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f, 0x02, 0x66} schema := `{ "type": "record", "name": "test", diff --git a/decoder_native_test.go b/decoder_native_test.go index 398bed14..ce6dbf91 100644 --- a/decoder_native_test.go +++ b/decoder_native_test.go @@ -69,6 +69,20 @@ func TestDecoder_Int(t *testing.T) { assert.Equal(t, 27, i) } +func TestDecoder_IntShortRead(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0xe6} + schema := "int" + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var i int + err = dec.Decode(&i) + + assert.Error(t, err) +} + func TestDecoder_IntInvalidSchema(t *testing.T) { defer ConfigTeardown() @@ -272,6 +286,20 @@ func TestDecoder_Int64(t *testing.T) { assert.Equal(t, int64(27), i) } +func TestDecoder_Int64ShortRead(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0xe6} + schema := "long" + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var i int64 + err = dec.Decode(&i) + + assert.Error(t, err) +} + func TestDecoder_Int64InvalidSchema(t *testing.T) { defer ConfigTeardown() @@ -359,6 +387,20 @@ func TestDecoder_String(t *testing.T) { assert.Equal(t, "foo", str) } +func TestDecoder_StringShortRead(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x08} + schema := "string" + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var str string + err = dec.Decode(&str) + + require.Error(t, err) +} + func TestDecoder_StringInvalidSchema(t *testing.T) { defer ConfigTeardown() @@ -388,6 +430,20 @@ func TestDecoder_Bytes(t *testing.T) { assert.Equal(t, []byte{0xEC, 0xAB, 0x44, 0x00}, b) } +func TestDecoder_BytesShortRead(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x08, 0xEC} + schema := "bytes" + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var b []byte + err = dec.Decode(&b) + + assert.Error(t, err) +} + func TestDecoder_BytesInvalidSchema(t *testing.T) { defer ConfigTeardown() diff --git a/decoder_test.go b/decoder_test.go index bda9d27d..5ea727b4 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -68,19 +68,6 @@ func TestDecoder_DecodeNilPtr(t *testing.T) { assert.Error(t, err) } -func TestDecoder_DecodeEOFDoesntReturnError(t *testing.T) { - defer ConfigTeardown() - - data := []byte{0xE2} - schema := "int" - dec, _ := avro.NewDecoder(schema, bytes.NewReader(data)) - - var i int - err := dec.Decode(&i) - - assert.NoError(t, err) -} - func TestUnmarshal(t *testing.T) { defer ConfigTeardown() diff --git a/ocf/ocf.go b/ocf/ocf.go index 75216656..f258bde0 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -114,9 +114,16 @@ func (d *Decoder) Error() error { } func (d *Decoder) readBlock() int64 { + _ = d.reader.Peek() + if errors.Is(d.reader.Error, io.EOF) { + // There is no next block + return 0 + } + count := d.reader.ReadLong() size := d.reader.ReadLong() + // Read the blocks data if count > 0 { data := make([]byte, size) d.reader.Read(data) @@ -129,6 +136,7 @@ func (d *Decoder) readBlock() int64 { d.resetReader.Reset(data) } + // Read the sync. var sync [16]byte d.reader.Read(sync[:]) if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) { diff --git a/reader.go b/reader.go index 698a6846..80cdb550 100644 --- a/reader.go +++ b/reader.go @@ -109,6 +109,17 @@ func (r *Reader) readByte() byte { return b } +// Peek returns the next byte in the buffer. +// The Reader Error will be io.EOF if no next byte exists. +func (r *Reader) Peek() byte { + if r.head == r.tail { + if !r.loadMore() { + return 0 + } + } + return r.buf[r.head] +} + // Read reads data into the given bytes. func (r *Reader) Read(b []byte) { size := len(b) @@ -117,6 +128,7 @@ func (r *Reader) Read(b []byte) { for read < size { if r.head == r.tail { if !r.loadMore() { + r.Error = io.ErrUnexpectedEOF return } } @@ -138,6 +150,8 @@ func (r *Reader) ReadBool() bool { } // ReadInt reads an Int from the Reader. +// +//nolint:dupl func (r *Reader) ReadInt() int32 { if r.Error != nil { return 0 @@ -174,12 +188,15 @@ func (r *Reader) ReadInt() int32 { // We ran out of buffer and are not at the end of the int, // Read more into the buffer. if !r.loadMore() { + r.Error = fmt.Errorf("reading int: %w", r.Error) return 0 } } } // ReadLong reads a Long from the Reader. +// +//nolint:dupl func (r *Reader) ReadLong() int64 { if r.Error != nil { return 0 @@ -216,6 +233,7 @@ func (r *Reader) ReadLong() int64 { // We ran out of buffer and are not at the end of the long, // Read more into the buffer. if !r.loadMore() { + r.Error = fmt.Errorf("reading long: %w", r.Error) return 0 } } diff --git a/reader_test.go b/reader_test.go index ec8ef4e0..b9760216 100644 --- a/reader_test.go +++ b/reader_test.go @@ -3,6 +3,7 @@ package avro_test import ( "bytes" "errors" + "io" "strconv" "testing" @@ -44,6 +45,28 @@ func TestReader_ReportErrorExistingError(t *testing.T) { assert.Equal(t, err, r.Error) } +func TestReader_Peek(t *testing.T) { + r := (&avro.Reader{}).Reset([]byte{0x36}) + + b := r.Peek() + + i := r.ReadInt() + + require.NoError(t, r.Error) + assert.Equal(t, byte(0x36), b) + assert.Equal(t, int32(27), i) +} + +func TestReader_PeekNoData(t *testing.T) { + r := (&avro.Reader{}).Reset([]byte{0x36}) + + _ = r.ReadInt() + + _ = r.Peek() + + assert.ErrorIs(t, r.Error, io.EOF) +} + func TestReader_ReadPastBuffer(t *testing.T) { r := (&avro.Reader{}).Reset([]byte{0xE2}) @@ -77,7 +100,7 @@ func TestReader_Read(t *testing.T) { }, { name: "eof", - data: []byte{0xAC}, // io.EOF + data: []byte{0xAC}, // io.ErrUnexpectedEOF want: []byte{0xAC, 0x00, 0x00, 0x00, 0x00, 0x00}, wantErr: require.Error, }, @@ -124,7 +147,7 @@ func TestReader_ReadBool(t *testing.T) { }, { name: "eof", - data: []byte(nil), // io.EOF + data: []byte(nil), // io.ErrUnexpectedEOF want: false, wantErr: require.Error, },