diff --git a/.travis.yml b/.travis.yml index 86b80c47..7619aae8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,11 +7,11 @@ os: - osx go: - - 1.3 - - 1.4 - 1.5 - 1.6 - - tip + - 1.7 + - 1.8 + - master install: - go get ./... @@ -26,3 +26,8 @@ script: - go build examples/stream-decoder.go - go build examples/stream-encoder.go - diff <(gofmt -d .) <("") + +matrix: + allow_failures: + - go: 'master' + fast_finish: true diff --git a/README.md b/README.md index 3f5577f3..1016256d 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,18 @@ There is no buffering or timeouts/retry specified. If you want to add that, you For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples). +#Advanced Options + +You can modify internal options which affects how jobs are split between and processed by goroutines. + +To create options, use the WithXXX functions. You can supply options to `New`, `NewStream` and `NewStreamC`. If no Options are supplied, default options are used. + +Example of how to supply options: + + ```Go + enc, err := reedsolomon.New(10, 3, WithMaxGoroutines(25)) + ``` + # Performance Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time. diff --git a/galois_amd64.go b/galois_amd64.go index e4d686e7..bb99ea65 100644 --- a/galois_amd64.go +++ b/galois_amd64.go @@ -5,10 +5,6 @@ package reedsolomon -import ( - "github.com/klauspost/cpuid" -) - //go:noescape func galMulSSSE3(low, high, in, out []byte) @@ -40,12 +36,12 @@ func galMulSSSE3Xor(low, high, in, out []byte) { } */ -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } @@ -58,12 +54,12 @@ func galMulSlice(c byte, in, out []byte) { } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } diff --git a/galois_noasm.go b/galois_noasm.go index 1c6b8c4d..be90a331 100644 --- a/galois_noasm.go +++ b/galois_noasm.go @@ -4,14 +4,14 @@ package reedsolomon -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] = mt[input] } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] ^= mt[input] diff --git a/galois_test.go b/galois_test.go index be12aecc..2b91f686 100644 --- a/galois_test.go +++ b/galois_test.go @@ -131,13 +131,13 @@ func TestGalois(t *testing.T) { // Test slices (>16 entries to test assembler) in := []byte{0, 1, 2, 3, 4, 5, 6, 10, 50, 100, 150, 174, 201, 255, 99, 32, 67, 85} out := make([]byte, len(in)) - galMulSlice(25, in, out) + galMulSlice(25, in, out, false, false) expect := []byte{0x0, 0x19, 0x32, 0x2b, 0x64, 0x7d, 0x56, 0xfa, 0xb8, 0x6d, 0xc7, 0x85, 0xc3, 0x1f, 0x22, 0x7, 0x25, 0xfe} if 0 != bytes.Compare(out, expect) { t.Errorf("got %#v, expected %#v", out, expect) } - galMulSlice(177, in, out) + galMulSlice(177, in, out, false, false) expect = []byte{0x0, 0xb1, 0x7f, 0xce, 0xfe, 0x4f, 0x81, 0x9e, 0x3, 0x6, 0xe8, 0x75, 0xbd, 0x40, 0x36, 0xa3, 0x95, 0xcb} if 0 != bytes.Compare(out, expect) { t.Errorf("got %#v, expected %#v", out, expect) diff --git a/options.go b/options.go new file mode 100644 index 00000000..9f285fce --- /dev/null +++ b/options.go @@ -0,0 +1,67 @@ +package reedsolomon + +import ( + "runtime" + + "github.com/klauspost/cpuid" +) + +// Option allows to override processing parameters. +type Option func(*options) + +type options struct { + maxGoroutines int + minSplitSize int + useAVX2, useSSSE3 bool +} + +var defaultOptions = options{ + maxGoroutines: 50, + minSplitSize: 512, +} + +func init() { + if runtime.GOMAXPROCS(0) <= 1 { + defaultOptions.maxGoroutines = 1 + } + // Detect CPU capabilities. + defaultOptions.useSSSE3 = cpuid.CPU.SSSE3() + defaultOptions.useAVX2 = cpuid.CPU.AVX2() +} + +// WithMaxGoroutines is the maximum number of goroutines number for encoding & decoding. +// Jobs will be split into this many parts, unless each goroutine would have to process +// less than minSplitSize bytes (set with WithMinSplitSize). +// For the best speed, keep this well above the GOMAXPROCS number for more fine grained +// scheduling. +// If n <= 0, it is ignored. +func WithMaxGoroutines(n int) Option { + return func(o *options) { + if n > 0 { + o.maxGoroutines = n + } + } +} + +// MinSplitSize Is the minimum encoding size in bytes per goroutine. +// See WithMaxGoroutines on how jobs are split. +// If n <= 0, it is ignored. +func WithMinSplitSize(n int) Option { + return func(o *options) { + if n > 0 { + o.maxGoroutines = n + } + } +} + +func withSSE3(enabled bool) Option { + return func(o *options) { + o.useSSSE3 = enabled + } +} + +func withAVX2(enabled bool) Option { + return func(o *options) { + o.useAVX2 = enabled + } +} diff --git a/reedsolomon.go b/reedsolomon.go index 914ebe0a..33b5c2de 100644 --- a/reedsolomon.go +++ b/reedsolomon.go @@ -15,7 +15,6 @@ import ( "bytes" "errors" "io" - "runtime" "sync" ) @@ -83,6 +82,7 @@ type reedSolomon struct { m matrix tree inversionTree parity [][]byte + o options } // ErrInvShardNum will be returned by New, if you attempt to create @@ -98,13 +98,18 @@ var ErrMaxShardNum = errors.New("cannot create Encoder with 255 or more data+par // the number of data shards and parity shards that // you want to use. You can reuse this encoder. // Note that the maximum number of data shards is 256. -func New(dataShards, parityShards int) (Encoder, error) { +// If no options are supplied, default options are used. +func New(dataShards, parityShards int, opts ...Option) (Encoder, error) { r := reedSolomon{ DataShards: dataShards, ParityShards: parityShards, Shards: dataShards + parityShards, + o: defaultOptions, } + for _, opt := range opts { + opt(&r.o) + } if dataShards <= 0 || parityShards <= 0 { return nil, ErrInvShardNum } @@ -201,7 +206,7 @@ func (r reedSolomon) Verify(shards [][]byte) (bool, error) { // number of matrix rows used, is determined by // outputCount, which is the number of outputs to compute. func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { - if runtime.GOMAXPROCS(0) > 1 && len(inputs[0]) > minSplitSize { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount) return } @@ -209,26 +214,21 @@ func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, output in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in, outputs[iRow]) + galMulSlice(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } } } -const ( - minSplitSize = 512 // min split size per goroutine - maxGoroutines = 50 // max goroutines number for encoding & decoding -) - // Perform the same as codeSomeShards, but split the workload into // several goroutines. func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -241,9 +241,9 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } } } @@ -258,13 +258,36 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu // except this will check values and return // as soon as a difference is found. func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { + return r.checkSomeShardsP(matrixRows, inputs, toCheck, outputCount, byteCount) + } + outputs := make([][]byte, len(toCheck)) + for i := range outputs { + outputs[i] = make([]byte, byteCount) + } + for c := 0; c < r.DataShards; c++ { + in := inputs[c] + for iRow := 0; iRow < outputCount; iRow++ { + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) + } + } + + for i, calc := range outputs { + if !bytes.Equal(calc, toCheck[i]) { + return false + } + } + return true +} + +func (r reedSolomon) checkSomeShardsP(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { same := true var mu sync.RWMutex // For above var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -287,7 +310,7 @@ func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outpu mu.RUnlock() in := inputs[c][start : start+do] for iRow := 0; iRow < outputCount; iRow++ { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } diff --git a/reedsolomon_test.go b/reedsolomon_test.go index d9c876e5..82ac7a4c 100644 --- a/reedsolomon_test.go +++ b/reedsolomon_test.go @@ -14,9 +14,43 @@ import ( "testing" ) +func testOpts() [][]Option { + if !testing.Short() { + return [][]Option{} + } + opts := [][]Option{ + {WithMaxGoroutines(1), WithMinSplitSize(500), withSSE3(false), withAVX2(false)}, + {WithMaxGoroutines(5000), WithMinSplitSize(50), withSSE3(false), withAVX2(false)}, + {WithMaxGoroutines(5000), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)}, + {WithMaxGoroutines(1), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)}, + } + for _, o := range opts[:] { + if defaultOptions.useSSSE3 { + n := make([]Option, len(o), len(o)+1) + copy(n, o) + n = append(n, withSSE3(true)) + opts = append(opts, n) + } + if defaultOptions.useAVX2 { + n := make([]Option, len(o), len(o)+1) + copy(n, o) + n = append(n, withAVX2(true)) + opts = append(opts, n) + } + } + return opts +} + func TestEncoding(t *testing.T) { + testEncoding(t) + for _, o := range testOpts() { + testEncoding(t, o...) + } +} + +func testEncoding(t *testing.T, o ...Option) { perShard := 50000 - r, err := New(10, 3) + r, err := New(10, 3, o...) if err != nil { t.Fatal(err) } @@ -56,8 +90,15 @@ func TestEncoding(t *testing.T) { } func TestReconstruct(t *testing.T) { + testReconstruct(t) + for _, o := range testOpts() { + testReconstruct(t, o...) + } +} + +func testReconstruct(t *testing.T, o ...Option) { perShard := 50000 - r, err := New(10, 3) + r, err := New(10, 3, o...) if err != nil { t.Fatal(err) } @@ -122,8 +163,15 @@ func TestReconstruct(t *testing.T) { } func TestVerify(t *testing.T) { + testVerify(t) + for _, o := range testOpts() { + testVerify(t, o...) + } +} + +func testVerify(t *testing.T, o ...Option) { perShard := 33333 - r, err := New(10, 4) + r, err := New(10, 4, o...) if err != nil { t.Fatal(err) } @@ -536,14 +584,27 @@ func BenchmarkReconstructP10x4x16M(b *testing.B) { } func TestEncoderReconstruct(t *testing.T) { + testEncoderReconstruct(t) + for _, o := range testOpts() { + testEncoderReconstruct(t, o...) + } +} + +func testEncoderReconstruct(t *testing.T, o ...Option) { // Create some sample data var data = make([]byte, 250000) fillRandom(data) // Create 5 data slices of 50000 elements each - enc, _ := New(5, 3) - shards, _ := enc.Split(data) - err := enc.Encode(shards) + enc, err := New(5, 3, o...) + if err != nil { + t.Fatal(err) + } + shards, err := enc.Split(data) + if err != nil { + t.Fatal(err) + } + err = enc.Encode(shards) if err != nil { t.Fatal(err) } diff --git a/streaming.go b/streaming.go index 293a8b12..0db01ceb 100644 --- a/streaming.go +++ b/streaming.go @@ -145,8 +145,8 @@ type rsStream struct { // the number of data shards and parity shards that // you want to use. You can reuse this encoder. // Note that the maximum number of data shards is 256. -func NewStream(dataShards, parityShards int) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err } @@ -161,8 +161,8 @@ func NewStream(dataShards, parityShards int) (StreamEncoder, error) { // the number of data shards and parity shards given. // // This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes. -func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err }