-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
147 lines (127 loc) · 3.86 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package recordio
import (
"bytes"
"encoding/binary"
"fmt"
"io"
)
// ErrFrameTooLarge is an error that is returned if a frame that is larger than
// the maximum allowed size (not including the frame header) is read.
var ErrFrameTooLarge = fmt.Errorf("frame: frame size exceeds maximum")
// Reader reads individual frames from a frame-formatted input Reader.
type Reader interface {
// ReadFrame reads the next frame, returning the frame's size and an io.Reader
// for that frame's data. The io.Reader is restricted such that it cannot read
// past the frame.
//
// The frame must be fully read before another Reader call can be made.
// Failure to do so will cause the Reader to become unsynchronized.
ReadFrame() (int64, io.Reader, error)
// ReadFrame returns the contents of the next frame. If there are no more
// frames available, ReadFrame will return io.EOF.
ReadFrameAll() ([]byte, error)
}
// reader is an implementation of a Reader that uses an underlying
// io.Reader and io.ByteReader to read frames.
//
// The io.Reader and io.ByteReader must read from the same source.
type reader struct {
io.Reader
io.ByteReader
maxSize int64
}
// NewReader creates a new Reader which reads frame data from the
// supplied Reader instance.
//
// If the Reader instance is also an io.ByteReader, its ReadByte method will
// be used directly.
func NewReader(r io.Reader, maxSize int64) Reader {
br, ok := r.(io.ByteReader)
if !ok {
br = &simpleByteReader{Reader: r}
}
return &reader{
Reader: r,
ByteReader: br,
maxSize: maxSize,
}
}
func (r *reader) ReadFrame() (int64, io.Reader, error) {
// Read the frame size.
count, err := binary.ReadUvarint(r)
if err != nil {
return 0, nil, err
}
if count > uint64(r.maxSize) {
return 0, nil, ErrFrameTooLarge
}
lr := &io.LimitedReader{
R: r.Reader,
N: int64(count),
}
return int64(count), lr, nil
}
func (r *reader) ReadFrameAll() ([]byte, error) {
count, fr, err := r.ReadFrame()
if err != nil {
return nil, err
}
if count == 0 {
return nil, nil
}
data := make([]byte, count)
if _, err := fr.Read(data); err != nil {
return nil, err
}
return data, nil
}
// simpleByteReader implements the io.ByteReader interface for an io.Reader.
type simpleByteReader struct {
io.Reader
buf [1]byte
}
func (r *simpleByteReader) ReadByte() (byte, error) {
_, err := r.Read(r.buf[:])
return r.buf[0], err
}
// Split splits the supplied buffer into its component records.
//
// This method implements zero-copy segmentation, so the individual records are
// slices of the original data set.
func Split(data []byte) (records [][]byte, err error) {
br := bytes.NewReader(data)
for br.Len() > 0 {
var size uint64
size, err = binary.ReadUvarint(br)
if err != nil {
return
}
if size > uint64(br.Len()) {
err = ErrFrameTooLarge
return
}
// Pull out the record from the original byte stream without copying.
// Casting size to an integer is safe at this point, since we have asserted
// that it is less than the remaining length in the buffer, which is an int.
offset := len(data) - br.Len()
records = append(records, data[offset:offset+int(size)])
if _, err := br.Seek(int64(size), 1); err != nil {
// Our measurements should protect us from this being an invalid seek.
panic(err)
}
}
return records, nil
}