Skip to content

Commit

Permalink
Add ReadCloser input. (#8)
Browse files Browse the repository at this point in the history
* Add ReadCloser input.
* Update go version.
  • Loading branch information
klauspost committed Oct 7, 2017
1 parent d2a445e commit 7f90b27
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ os:
- osx

go:
- 1.5.x
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- master

script:
Expand Down
50 changes: 49 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type reader struct {
in io.Reader // Input reader
closer io.Closer // Optional closer
ready chan *buffer // Buffers ready to be handed to the reader
reuse chan *buffer // Buffers to reuse for input reading
exit chan struct{} // Closes when finished
Expand Down Expand Up @@ -51,7 +52,30 @@ func NewReader(rd io.Reader) io.ReadCloser {
return ret
}

// NewSize returns a reader with a custom number of buffers and size.
// New returns a reader that will asynchronously read from
// the supplied reader into 4 buffers of 1MB each.
//
// It will start reading from the input at once, maybe even before this
// function has returned.
//
// The input can be read from the returned reader.
// When done use Close() to release the buffers,
// which will also close the supplied closer.
func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
if rd == nil {
return nil
}

ret, err := NewReadCloserSize(rd, 4, 1<<20)

// Should not be possible to trigger from other packages.
if err != nil {
panic("unexpected error:" + err.Error())
}
return ret
}

// NewReaderSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
Expand All @@ -69,6 +93,24 @@ func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
return a, nil
}

// NewReadCloserSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (io.ReadCloser, error) {
if size <= 0 {
return nil, fmt.Errorf("buffer size too small")
}
if buffers <= 0 {
return nil, fmt.Errorf("number of buffers too small")
}
if rc == nil {
return nil, fmt.Errorf("nil input reader supplied")
}
a := &reader{closer: rc}
a.init(rc, buffers, size)
return a, nil
}

// initialize the reader
func (a *reader) init(rd io.Reader, buffers, size int) {
a.in = rd
Expand Down Expand Up @@ -177,6 +219,12 @@ func (a *reader) Close() (err error) {
case a.exit <- struct{}{}:
<-a.exited
}
if a.closer != nil {
// Only call once
c := a.closer
a.closer = nil
return c.Close()
}
return nil
}

Expand Down
89 changes: 89 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,95 @@ func TestReader(t *testing.T) {
}
}

type testCloser struct {
io.Reader
closed int
onClose error
}

func (t *testCloser) Close() error {
t.closed++
return t.onClose
}

func TestReadCloser(t *testing.T) {
buf := bytes.NewBufferString("Testbuffer")
cl := &testCloser{Reader: buf}
ar, err := readahead.NewReadCloserSize(cl, 4, 10000)
if err != nil {
t.Fatal("error when creating:", err)
}

var dst = make([]byte, 100)
n, err := ar.Read(dst)
if err != nil {
t.Fatal("error when reading:", err)
}
if n != 10 {
t.Fatal("unexpected length, expected 10, got ", n)
}

n, err = ar.Read(dst)
if err != io.EOF {
t.Fatal("expected io.EOF, got", err)
}
if n != 0 {
t.Fatal("unexpected length, expected 0, got ", n)
}

// Test read after error
n, err = ar.Read(dst)
if err != io.EOF {
t.Fatal("expected io.EOF, got", err)
}
if n != 0 {
t.Fatal("unexpected length, expected 0, got ", n)
}

err = ar.Close()
if err != nil {
t.Fatal("error when closing:", err)
}
if cl.closed != 1 {
t.Fatal("want close count 1, got:", cl.closed)
}
// Test Close without reading everything
buf = bytes.NewBuffer(make([]byte, 50000))
cl = &testCloser{Reader: buf}
ar = readahead.NewReadCloser(cl)
err = ar.Close()
if err != nil {
t.Fatal("error when closing:", err)
}
if cl.closed != 1 {
t.Fatal("want close count 1, got:", cl.closed)
}
// Test error forwarding
cl = &testCloser{Reader: buf, onClose: errors.New("an error")}
ar = readahead.NewReadCloser(cl)
err = ar.Close()
if err != cl.onClose {
t.Fatal("want error when closing, got", err)
}
if cl.closed != 1 {
t.Fatal("want close count 1, got:", cl.closed)
}
// Test multiple closes
cl = &testCloser{Reader: buf}
ar = readahead.NewReadCloser(cl)
err = ar.Close()
if err != nil {
t.Fatal("error when closing:", err)
}
err = ar.Close()
if err != nil {
t.Fatal("error when closing:", err)
}
if cl.closed != 1 {
t.Fatal("want close count 1, got:", cl.closed)
}
}

func TestWriteTo(t *testing.T) {
buf := bytes.NewBufferString("Testbuffer")
ar, err := readahead.NewReaderSize(buf, 4, 10000)
Expand Down

0 comments on commit 7f90b27

Please sign in to comment.