-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool_test.go
100 lines (87 loc) · 2.7 KB
/
pool_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//go:build integration
package goomerang_test
import (
"testing"
"github.com/stretchr/testify/require"
"go.eloylp.dev/goomerang/client"
"go.eloylp.dev/goomerang/internal/test"
"go.eloylp.dev/goomerang/message"
"go.eloylp.dev/goomerang/server"
)
func TestWorkerPoolUsage(t *testing.T) {
t.Run("Pool not running with concurrency 0", WorkerPoolTest(0, false))
t.Run("Pool not running with concurrency 1", WorkerPoolTest(1, false))
t.Run("Pool running with concurrency 2", WorkerPoolTest(2, true))
t.Run("Pool running with concurrency 3", WorkerPoolTest(3, true))
}
func WorkerPoolTest(maxConcurrency int, shouldBeActive bool) func(t *testing.T) {
return func(t *testing.T) {
arbiter := test.NewArbiter(t)
s, run := Server(t,
server.WithMaxConcurrency(maxConcurrency),
server.WithOnWorkerStart(func() {
arbiter.ItsAFactThat("SERVER_POOL_WORKER_STARTED")
}),
server.WithOnWorkerEnd(func() {
arbiter.ItsAFactThat("SERVER_POOL_WORKER_ENDED")
}))
s.Handle(defaultMsg().Payload, nilHandler)
run()
defer s.Shutdown(defaultCtx)
c, connect := Client(t,
client.WithServerAddr(s.Addr()),
client.WithMaxConcurrency(maxConcurrency),
client.WithOnWorkerStart(func() {
arbiter.ItsAFactThat("CLIENT_POOL_WORKER_STARTED")
}),
client.WithOnWorkerEnd(func() {
arbiter.ItsAFactThat("CLIENT_POOL_WORKER_ENDED")
}),
)
c.Handle(defaultMsg().Payload, nilHandler)
connect()
defer c.Close(defaultCtx)
_, err := c.Send(defaultMsg())
require.NoError(t, err)
_, err = s.Broadcast(defaultCtx, defaultMsg())
require.NoError(t, err)
if shouldBeActive {
arbiter.RequireHappenedInOrder(
"SERVER_POOL_WORKER_STARTED",
"SERVER_POOL_WORKER_ENDED",
)
arbiter.RequireHappenedInOrder(
"CLIENT_POOL_WORKER_STARTED",
"CLIENT_POOL_WORKER_ENDED",
)
return
}
arbiter.RequireNoEvents()
}
}
func TestSendVariousMessagesWithNoConcurrency(t *testing.T) {
arbiter := test.NewArbiter(t)
s, run := Server(t, server.WithMaxConcurrency(0))
defer s.Shutdown(defaultCtx)
s.Handle(defaultMsg().Payload, message.HandlerFunc(func(s message.Sender, msg *message.Message) {
arbiter.ItsAFactThat("SERVER_RECEIVED_MSG")
_, _ = s.Send(msg)
}))
run()
c, connect := Client(t,
client.WithServerAddr(s.Addr()),
client.WithMaxConcurrency(0),
)
defer c.Close(defaultCtx)
c.Handle(defaultMsg().Payload, message.HandlerFunc(func(c message.Sender, msg *message.Message) {
arbiter.ItsAFactThat("CLIENT_RECEIVED_MSG")
}))
connect()
_, err := c.Send(defaultMsg())
require.NoError(t, err)
_, err = c.Send(defaultMsg())
require.NoError(t, err)
arbiter.RequireNoErrors()
arbiter.RequireHappenedTimes("CLIENT_RECEIVED_MSG", 2)
arbiter.RequireHappenedTimes("SERVER_RECEIVED_MSG", 2)
}