Skip to content

Commit

Permalink
fix: reader returns errors on unexpected EOF (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Apr 22, 2024
1 parent f138d7f commit 141e857
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 16 deletions.
2 changes: 1 addition & 1 deletion codec_skip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestDecoder_SkipRecord(t *testing.T) {
func TestDecoder_SkipRef(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f}
data := []byte{0x02, 0x66, 0x06, 0x66, 0x6f, 0x6f, 0x02, 0x66}
schema := `{
"type": "record",
"name": "test",
Expand Down
56 changes: 56 additions & 0 deletions decoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ func TestDecoder_Int(t *testing.T) {
assert.Equal(t, 27, i)
}

func TestDecoder_IntShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xe6}
schema := "int"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var i int
err = dec.Decode(&i)

assert.Error(t, err)
}

func TestDecoder_IntInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -272,6 +286,20 @@ func TestDecoder_Int64(t *testing.T) {
assert.Equal(t, int64(27), i)
}

func TestDecoder_Int64ShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xe6}
schema := "long"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var i int64
err = dec.Decode(&i)

assert.Error(t, err)
}

func TestDecoder_Int64InvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -359,6 +387,20 @@ func TestDecoder_String(t *testing.T) {
assert.Equal(t, "foo", str)
}

func TestDecoder_StringShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x08}
schema := "string"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var str string
err = dec.Decode(&str)

require.Error(t, err)
}

func TestDecoder_StringInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -388,6 +430,20 @@ func TestDecoder_Bytes(t *testing.T) {
assert.Equal(t, []byte{0xEC, 0xAB, 0x44, 0x00}, b)
}

func TestDecoder_BytesShortRead(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x08, 0xEC}
schema := "bytes"
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var b []byte
err = dec.Decode(&b)

assert.Error(t, err)
}

func TestDecoder_BytesInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down
13 changes: 0 additions & 13 deletions decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,6 @@ func TestDecoder_DecodeNilPtr(t *testing.T) {
assert.Error(t, err)
}

func TestDecoder_DecodeEOFDoesntReturnError(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xE2}
schema := "int"
dec, _ := avro.NewDecoder(schema, bytes.NewReader(data))

var i int
err := dec.Decode(&i)

assert.NoError(t, err)
}

func TestUnmarshal(t *testing.T) {
defer ConfigTeardown()

Expand Down
8 changes: 8 additions & 0 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,16 @@ func (d *Decoder) Error() error {
}

func (d *Decoder) readBlock() int64 {
_ = d.reader.Peek()
if errors.Is(d.reader.Error, io.EOF) {
// There is no next block
return 0
}

count := d.reader.ReadLong()
size := d.reader.ReadLong()

// Read the blocks data
if count > 0 {
data := make([]byte, size)
d.reader.Read(data)
Expand All @@ -129,6 +136,7 @@ func (d *Decoder) readBlock() int64 {
d.resetReader.Reset(data)
}

// Read the sync.
var sync [16]byte
d.reader.Read(sync[:])
if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) {
Expand Down
18 changes: 18 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ func (r *Reader) readByte() byte {
return b
}

// Peek returns the next byte in the buffer.
// The Reader Error will be io.EOF if no next byte exists.
func (r *Reader) Peek() byte {
if r.head == r.tail {
if !r.loadMore() {
return 0
}
}
return r.buf[r.head]
}

// Read reads data into the given bytes.
func (r *Reader) Read(b []byte) {
size := len(b)
Expand All @@ -117,6 +128,7 @@ func (r *Reader) Read(b []byte) {
for read < size {
if r.head == r.tail {
if !r.loadMore() {
r.Error = io.ErrUnexpectedEOF
return
}
}
Expand All @@ -138,6 +150,8 @@ func (r *Reader) ReadBool() bool {
}

// ReadInt reads an Int from the Reader.
//
//nolint:dupl
func (r *Reader) ReadInt() int32 {
if r.Error != nil {
return 0
Expand Down Expand Up @@ -174,12 +188,15 @@ func (r *Reader) ReadInt() int32 {
// We ran out of buffer and are not at the end of the int,
// Read more into the buffer.
if !r.loadMore() {
r.Error = fmt.Errorf("reading int: %w", r.Error)
return 0
}
}
}

// ReadLong reads a Long from the Reader.
//
//nolint:dupl
func (r *Reader) ReadLong() int64 {
if r.Error != nil {
return 0
Expand Down Expand Up @@ -216,6 +233,7 @@ func (r *Reader) ReadLong() int64 {
// We ran out of buffer and are not at the end of the long,
// Read more into the buffer.
if !r.loadMore() {
r.Error = fmt.Errorf("reading long: %w", r.Error)
return 0
}
}
Expand Down
27 changes: 25 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package avro_test
import (
"bytes"
"errors"
"io"
"strconv"
"testing"

Expand Down Expand Up @@ -44,6 +45,28 @@ func TestReader_ReportErrorExistingError(t *testing.T) {
assert.Equal(t, err, r.Error)
}

func TestReader_Peek(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0x36})

b := r.Peek()

i := r.ReadInt()

require.NoError(t, r.Error)
assert.Equal(t, byte(0x36), b)
assert.Equal(t, int32(27), i)
}

func TestReader_PeekNoData(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0x36})

_ = r.ReadInt()

_ = r.Peek()

assert.ErrorIs(t, r.Error, io.EOF)
}

func TestReader_ReadPastBuffer(t *testing.T) {
r := (&avro.Reader{}).Reset([]byte{0xE2})

Expand Down Expand Up @@ -77,7 +100,7 @@ func TestReader_Read(t *testing.T) {
},
{
name: "eof",
data: []byte{0xAC}, // io.EOF
data: []byte{0xAC}, // io.ErrUnexpectedEOF
want: []byte{0xAC, 0x00, 0x00, 0x00, 0x00, 0x00},
wantErr: require.Error,
},
Expand Down Expand Up @@ -124,7 +147,7 @@ func TestReader_ReadBool(t *testing.T) {
},
{
name: "eof",
data: []byte(nil), // io.EOF
data: []byte(nil), // io.ErrUnexpectedEOF
want: false,
wantErr: require.Error,
},
Expand Down

0 comments on commit 141e857

Please sign in to comment.