-
-
Notifications
You must be signed in to change notification settings - Fork 66
/
result_test.go
111 lines (81 loc) · 2.41 KB
/
result_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package pond
import (
"errors"
"sync/atomic"
"testing"
"github.com/alitto/pond/v2/internal/assert"
)
func TestResultPoolSubmitAndWait(t *testing.T) {
pool := NewResultPool[int](1000)
defer pool.StopAndWait()
task := pool.Submit(func() int {
return 5
})
output, err := task.Wait()
assert.Equal(t, nil, err)
assert.Equal(t, 5, output)
}
func TestResultPoolSubmitTaskWithPanic(t *testing.T) {
pool := NewResultPool[int](1000)
task := pool.Submit(func() int {
panic("dummy panic")
})
output, err := task.Wait()
assert.True(t, errors.Is(err, ErrPanic))
assert.Equal(t, "task panicked: dummy panic", err.Error())
assert.Equal(t, 0, output)
}
func TestResultPoolMetrics(t *testing.T) {
pool := NewResultPool[int](1000)
// Assert counters
assert.Equal(t, int64(0), pool.RunningWorkers())
assert.Equal(t, uint64(0), pool.SubmittedTasks())
assert.Equal(t, uint64(0), pool.CompletedTasks())
assert.Equal(t, uint64(0), pool.FailedTasks())
assert.Equal(t, uint64(0), pool.SuccessfulTasks())
assert.Equal(t, uint64(0), pool.WaitingTasks())
var taskCount int = 10000
var executedCount atomic.Int64
for i := 0; i < taskCount; i++ {
i := i
pool.SubmitErr(func() (int, error) {
executedCount.Add(1)
if i%2 == 0 {
return i, nil
}
return 0, errors.New("sample error")
})
}
pool.Stop().Wait()
assert.Equal(t, int64(taskCount), executedCount.Load())
assert.Equal(t, int64(0), pool.RunningWorkers())
assert.Equal(t, uint64(taskCount), pool.SubmittedTasks())
assert.Equal(t, uint64(taskCount), pool.CompletedTasks())
assert.Equal(t, uint64(taskCount/2), pool.FailedTasks())
assert.Equal(t, uint64(taskCount/2), pool.SuccessfulTasks())
}
func TestResultPoolSubpool(t *testing.T) {
pool := NewResultPool[int](1000)
subpool := pool.NewSubpool(10)
var executedCount atomic.Int64
for i := 0; i < 100; i++ {
i := i
subpool.SubmitErr(func() (int, error) {
executedCount.Add(1)
return i, nil
})
}
subpool.StopAndWait()
assert.Equal(t, int64(100), executedCount.Load())
}
func TestResultSubpoolMaxConcurrency(t *testing.T) {
pool := NewResultPool[int](10)
assert.PanicsWithError(t, "maxConcurrency must be greater or equal to 0", func() {
pool.NewSubpool(-1)
})
assert.PanicsWithError(t, "maxConcurrency cannot be greater than the parent pool's maxConcurrency (10)", func() {
pool.NewSubpool(11)
})
subpool := pool.NewSubpool(0)
assert.Equal(t, 10, subpool.MaxConcurrency())
}