Skip to content

Commit

Permalink
Catch panics (#6)
Browse files Browse the repository at this point in the history
Recover from panics and return them as errors.

Fixes Issue #5.

Also: 

* Update travis script
* Minor stylish adjustments
  • Loading branch information
klauspost committed Jul 16, 2017
1 parent e2cfa52 commit d2a445e
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 15 deletions.
23 changes: 18 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
language: go

sudo: false

os:
- linux
- osx

go:
- 1.3
- 1.4
- 1.5
- 1.6
- tip
- 1.5.x
- 1.6.x
- 1.7.x
- 1.8.x
- master

script:
- go vet ./...
- go test -v -cpu=2 ./...
- go test -cpu=2 -short -race ./...
- diff <(gofmt -d .) <("")

matrix:
allow_failures:
- go: 'master'
fast_finish: true
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This package will allow you to add readhead to any reader. This means a separate

This is helpful for splitting an input stream into concurrent processing, and also helps smooth out **bursts** of input or output.

This should be fully transparent, except that once an error has been returned from the Reader, it will not recover.
This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. A panic will be caught and returned as an error.

The readahead object also fulfills the [`io.WriterTo`](https://golang.org/pkg/io/#WriterTo) interface, which is likely to speed up `io.Copy` and other code that use the interface.

Expand Down
20 changes: 13 additions & 7 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (a *reader) Read(p []byte) (n int, err error) {

// Copy what we can
n = copy(p, a.cur.buffer())
a.cur.increment(n)
a.cur.inc(n)

// If at end of buffer, return any error, if present
if a.cur.isEmpty() {
Expand All @@ -152,7 +152,7 @@ func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
n2, err := w.Write(a.cur.buffer())
a.cur.increment(n2)
a.cur.inc(n2)
n += int64(n2)
if err != nil {
return n, err
Expand Down Expand Up @@ -180,7 +180,7 @@ func (a *reader) Close() (err error) {
return nil
}

// Internal buffer
// Internal buffer representing a single read.
// If an error is present, it must be returned
// once all buffer content has been served.
type buffer struct {
Expand All @@ -195,7 +195,7 @@ func newBuffer(size int) *buffer {
}

// isEmpty returns true is offset is at end of
// buffer, or
// buffer, or if the buffer is nil
func (b *buffer) isEmpty() bool {
if b == nil {
return true
Expand All @@ -209,7 +209,13 @@ func (b *buffer) isEmpty() bool {
// read into start of the buffer from the supplied reader,
// resets the offset and updates the size of the buffer.
// Any error encountered during the read is returned.
func (b *buffer) read(rd io.Reader) error {
func (b *buffer) read(rd io.Reader) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic reading: %v", r)
b.err = err
}
}()
var n int
n, b.err = rd.Read(b.buf[0:b.size])
b.buf = b.buf[0:n]
Expand All @@ -222,7 +228,7 @@ func (b *buffer) buffer() []byte {
return b.buf[b.offset:]
}

// increment the offset
func (b *buffer) increment(n int) {
// inc will increment the read offset
func (b *buffer) inc(n int) {
b.offset += n
}
85 changes: 83 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package readahead_test
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"testing"
"testing/iotest"

"fmt"

"github.com/klauspost/readahead"
)

Expand Down Expand Up @@ -172,6 +173,86 @@ func reads(buf io.Reader, m int) string {
return string(b[0:nb])
}

type dummyReader struct {
readFN func([]byte) (int, error)
}

func (d dummyReader) Read(dst []byte) (int, error) {
return d.readFN(dst)
}

func TestReaderPanic(t *testing.T) {
r := dummyReader{readFN: func(dst []byte) (int, error) {
panic("some underlying panic")
}}
reader := readahead.NewReader(r)
defer reader.Close()

// Copy the content to dst
var dst = &bytes.Buffer{}
_, err := io.Copy(dst, reader)
if err == nil {
t.Fatal("Want error, got nil")
}
}

func TestReaderLatePanic(t *testing.T) {
var n int
var mu sync.Mutex
r := dummyReader{readFN: func(dst []byte) (int, error) {
mu.Lock()
defer mu.Unlock()
if n >= 10 {
panic("some underlying panic")
}
n++
return len(dst), nil
}}
reader := readahead.NewReader(r)
defer reader.Close()

// Copy the content to dst
var dst = &bytes.Buffer{}
_, err := io.Copy(dst, reader)
if err == nil {
t.Fatal("Want error, got nil")
}
mu.Lock()
if n < 10 {
t.Fatalf("Want at least 10 calls, got %v", n)
}
mu.Unlock()
}

func TestReaderLateError(t *testing.T) {
var n int
var mu sync.Mutex
theErr := errors.New("some error")
r := dummyReader{readFN: func(dst []byte) (int, error) {
mu.Lock()
defer mu.Unlock()
if n >= 10 {
return 0, theErr
}
n++
return len(dst), nil
}}
reader := readahead.NewReader(r)
defer reader.Close()

// Copy the content to dst
var dst = &bytes.Buffer{}
_, err := io.Copy(dst, reader)
if err != theErr {
t.Fatalf("Want %#v, got %#v", theErr, err)
}
mu.Lock()
if n < 10 {
t.Fatalf("Want at least 10 calls, got %v", n)
}
mu.Unlock()
}

type bufReader struct {
name string
fn func(io.Reader) string
Expand Down

0 comments on commit d2a445e

Please sign in to comment.