-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathhedged_bench_test.go
107 lines (91 loc) · 2.14 KB
/
hedged_bench_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package hedgedhttp_test
import (
"fmt"
"io"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/cristalhq/hedgedhttp"
)
var localRandom = sync.Pool{
New: func() interface{} {
return rand.New(rand.NewSource(time.Now().Unix()))
},
}
func getLocalRand() *rand.Rand {
return localRandom.Get().(*rand.Rand)
}
func returnLocalRand(rnd *rand.Rand) {
localRandom.Put(rnd)
}
type FuncRoundTripper struct {
f func(request *http.Request) (*http.Response, error)
}
func (f *FuncRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
return f.f(request)
}
func BenchmarkHedgedRequest(b *testing.B) {
benchmarks := []struct {
concurrency int
}{
{concurrency: 1},
{concurrency: 2},
{concurrency: 4},
{concurrency: 8},
{concurrency: 12},
{concurrency: 16},
{concurrency: 24},
{concurrency: 32},
}
for _, bm := range benchmarks {
b.Run(fmt.Sprintf("concurrency-%v", bm.concurrency), func(b *testing.B) {
b.ReportAllocs()
target := &FuncRoundTripper{
f: func(request *http.Request) (*http.Response, error) {
rnd := getLocalRand()
defer returnLocalRand(rnd)
if rnd.Float32() < 0.3 {
return &http.Response{}, nil
}
return nil, io.EOF
},
}
errors := uint64(0)
var snapshot atomic.Value
hedgedTarget, metrics, err := hedgedhttp.NewRoundTripperAndStats(10*time.Nanosecond, 10, target)
mustOk(b, err)
initialSnapshot := metrics.ActualRoundTrips()
snapshot.Store(&initialSnapshot)
go func() {
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
currentSnapshot := metrics.ActualRoundTrips()
snapshot.Store(¤tSnapshot)
}
}()
req := newGetReq("whatever")
mustOk(b, err)
var wg sync.WaitGroup
wg.Add(bm.concurrency)
for i := 0; i < bm.concurrency; i++ {
go func() {
for i := 0; i < b.N/bm.concurrency; i++ {
_, err := hedgedTarget.RoundTrip(req)
if err != nil {
atomic.AddUint64(&errors, 1)
}
}
wg.Done()
}()
}
wg.Wait()
if rand.Float32() < 0.001 {
b.Logf("Snapshot: %+v\n", snapshot.Load())
}
})
}
}