diff --git a/.travis.yml b/.travis.yml index b62a50a..9291252 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,25 @@ language: go +sudo: false + +os: + - linux + - osx + go: - - 1.3 - - 1.4 - - 1.5 - - 1.6 - - tip + - 1.5.x + - 1.6.x + - 1.7.x + - 1.8.x + - master script: + - go vet ./... - go test -v -cpu=2 ./... - go test -cpu=2 -short -race ./... + - diff <(gofmt -d .) <("") + +matrix: + allow_failures: + - go: 'master' + fast_finish: true diff --git a/README.md b/README.md index e90149f..e0f7c38 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This package will allow you to add readhead to any reader. This means a separate This is helpful for splitting an input stream into concurrent processing, and also helps smooth out **bursts** of input or output. -This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. +This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. A panic will be caught and returned as an error. The readahead object also fulfills the [`io.WriterTo`](https://golang.org/pkg/io/#WriterTo) interface, which is likely to speed up `io.Copy` and other code that use the interface. diff --git a/reader.go b/reader.go index 5436eb2..1ad8801 100644 --- a/reader.go +++ b/reader.go @@ -131,7 +131,7 @@ func (a *reader) Read(p []byte) (n int, err error) { // Copy what we can n = copy(p, a.cur.buffer()) - a.cur.increment(n) + a.cur.inc(n) // If at end of buffer, return any error, if present if a.cur.isEmpty() { @@ -152,7 +152,7 @@ func (a *reader) WriteTo(w io.Writer) (n int64, err error) { return n, err } n2, err := w.Write(a.cur.buffer()) - a.cur.increment(n2) + a.cur.inc(n2) n += int64(n2) if err != nil { return n, err @@ -180,7 +180,7 @@ func (a *reader) Close() (err error) { return nil } -// Internal buffer +// Internal buffer representing a single read. // If an error is present, it must be returned // once all buffer content has been served. type buffer struct { @@ -195,7 +195,7 @@ func newBuffer(size int) *buffer { } // isEmpty returns true is offset is at end of -// buffer, or +// buffer, or if the buffer is nil func (b *buffer) isEmpty() bool { if b == nil { return true @@ -209,7 +209,13 @@ func (b *buffer) isEmpty() bool { // read into start of the buffer from the supplied reader, // resets the offset and updates the size of the buffer. // Any error encountered during the read is returned. -func (b *buffer) read(rd io.Reader) error { +func (b *buffer) read(rd io.Reader) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic reading: %v", r) + b.err = err + } + }() var n int n, b.err = rd.Read(b.buf[0:b.size]) b.buf = b.buf[0:n] @@ -222,7 +228,7 @@ func (b *buffer) buffer() []byte { return b.buf[b.offset:] } -// increment the offset -func (b *buffer) increment(n int) { +// inc will increment the read offset +func (b *buffer) inc(n int) { b.offset += n } diff --git a/reader_test.go b/reader_test.go index 666a3dd..ee95515 100644 --- a/reader_test.go +++ b/reader_test.go @@ -3,14 +3,15 @@ package readahead_test import ( "bufio" "bytes" + "errors" + "fmt" "io" "io/ioutil" "strings" + "sync" "testing" "testing/iotest" - "fmt" - "github.com/klauspost/readahead" ) @@ -172,6 +173,86 @@ func reads(buf io.Reader, m int) string { return string(b[0:nb]) } +type dummyReader struct { + readFN func([]byte) (int, error) +} + +func (d dummyReader) Read(dst []byte) (int, error) { + return d.readFN(dst) +} + +func TestReaderPanic(t *testing.T) { + r := dummyReader{readFN: func(dst []byte) (int, error) { + panic("some underlying panic") + }} + reader := readahead.NewReader(r) + defer reader.Close() + + // Copy the content to dst + var dst = &bytes.Buffer{} + _, err := io.Copy(dst, reader) + if err == nil { + t.Fatal("Want error, got nil") + } +} + +func TestReaderLatePanic(t *testing.T) { + var n int + var mu sync.Mutex + r := dummyReader{readFN: func(dst []byte) (int, error) { + mu.Lock() + defer mu.Unlock() + if n >= 10 { + panic("some underlying panic") + } + n++ + return len(dst), nil + }} + reader := readahead.NewReader(r) + defer reader.Close() + + // Copy the content to dst + var dst = &bytes.Buffer{} + _, err := io.Copy(dst, reader) + if err == nil { + t.Fatal("Want error, got nil") + } + mu.Lock() + if n < 10 { + t.Fatalf("Want at least 10 calls, got %v", n) + } + mu.Unlock() +} + +func TestReaderLateError(t *testing.T) { + var n int + var mu sync.Mutex + theErr := errors.New("some error") + r := dummyReader{readFN: func(dst []byte) (int, error) { + mu.Lock() + defer mu.Unlock() + if n >= 10 { + return 0, theErr + } + n++ + return len(dst), nil + }} + reader := readahead.NewReader(r) + defer reader.Close() + + // Copy the content to dst + var dst = &bytes.Buffer{} + _, err := io.Copy(dst, reader) + if err != theErr { + t.Fatalf("Want %#v, got %#v", theErr, err) + } + mu.Lock() + if n < 10 { + t.Fatalf("Want at least 10 calls, got %v", n) + } + mu.Unlock() +} + type bufReader struct { name string fn func(io.Reader) string