-
Notifications
You must be signed in to change notification settings - Fork 3
/
agentctl.go
164 lines (139 loc) · 4.85 KB
/
agentctl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright 2019 Koninklijke KPN N.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"crypto/tls"
"time"
"github.com/sirupsen/logrus"
)
type agentController struct {
active map[*agent]context.CancelFunc
brokerAddr string
target int
creds MqttCredentials
eventFunnel EventFunnel
refSet ReferenceSet
steps []Step
interval time.Duration
tlsConfig *tls.Config
disableMqttTLS bool
agentLogFormat string
targetUpdate chan int
}
func newAgentController(brokerAddr string, tlsConfig *tls.Config, target int, agentLogFormat string, interval time.Duration, creds MqttCredentials, eventFunnel EventFunnel, refSet ReferenceSet, steps []Step) *agentController {
return &agentController{
active: map[*agent]context.CancelFunc{},
brokerAddr: brokerAddr,
target: target,
creds: creds,
eventFunnel: eventFunnel,
refSet: refSet,
steps: steps,
interval: interval,
tlsConfig: tlsConfig,
agentLogFormat: agentLogFormat,
targetUpdate: make(chan int),
}
}
// spawn creates agents asynchronously. The finished and broken channels are used to communicate what happens with an agent after it has been created
func (s *agentController) spawn(finished, broken chan *agent) (*agent, error) {
user, pass, clientID, err := s.creds.Get()
if err != nil {
return nil, err
}
logger, filename, err := newLoggerForAgent(clientID, agentLogFormat)
if err != nil {
return nil, err
}
mqCliOpts := mqttOpts(s.brokerAddr, user, pass, clientID, s.tlsConfig, s.disableMqttTLS)
var msgHandler MessageHandlerer
if s.refSet != nil {
msgHandler = newValidatingHandler(clientID, s.refSet, s.eventFunnel, logger)
} else {
msgHandler = newMessageHandler(clientID, s.eventFunnel, logger)
}
logrus.WithFields(logrus.Fields{"clientID": clientID, "log": filename, "broker": s.brokerAddr, "disableTls": s.disableMqttTLS}).Infof("new agent")
logger.Infof("clientID: %s, broker: %s, disableMqttTls: %t, user: %s, pass: %s", clientID, s.brokerAddr, s.disableMqttTLS, user, pass)
agent := newAgent(mqCliOpts, clientID, logger, msgHandler, s.eventFunnel)
s.eventFunnel.AddAgent(clientID)
ctx, cancel := context.WithCancel(context.Background())
s.active[agent] = cancel
go func() {
scenario := NewInfiniteRandomScenario(ctx, s.steps)
subs, pubs, discs := scenario.Chans()
if err := agent.performScenario(ctx, subs, pubs, discs); err != nil {
logrus.Warnf("agent %s died: %s", clientID, err)
logger.Errorf("died: %s", err)
broken <- agent
} else {
logrus.Infof("agent %s finished", clientID)
finished <- agent
}
s.eventFunnel.RemoveAgent(clientID)
}()
return agent, nil
}
func (s *agentController) SetNumAgents(n int) {
logrus.Warnf("SetNumAgents not implemented") // TODO
}
func (s *agentController) GetNumAgents() (actual, desired int) {
logrus.Warnf("GetNumAgents not implemented") // TODO
return 0, 0
}
// Control tries to spawn as many agents as requested. When agents start to fail, the spawning rate is reduced
func (s *agentController) Control() {
tgt := s.target
act := 0
finished, broken := make(chan *agent), make(chan *agent)
lastBrokenAgentAt := time.Time{}
normalSpawn := s.interval // spawn agents every normalSpawn
slowSpawn := 2500*time.Millisecond + normalSpawn // in case of problems, spawn slower
const gracePeriod = 15 * time.Second // stay slow during this period
for {
sleep := slowSpawn // assume we're slow unless proven otherwise
if act < tgt {
if _, err := s.spawn(finished, broken); err == nil {
// good spawn? spawn normal
sleep = normalSpawn
act++
} else {
// bad spawn? go slow
logrus.Warnf("cannot spawn agent: %s", err)
sleep = slowSpawn
}
}
if time.Since(lastBrokenAgentAt) < gracePeriod {
// in the grace period? go slow
sleep = slowSpawn
}
// now process incoming messages for at most sleep seconds
ticker := time.After(sleep)
empty:
for {
select {
case <-finished:
act--
case <-broken:
act--
if time.Since(lastBrokenAgentAt) > gracePeriod {
logrus.Infof("agents are breaking, we'll spawn new agents slower (every %s instead of %s) for the next %s", slowSpawn, normalSpawn, gracePeriod)
}
lastBrokenAgentAt = time.Now()
case <-ticker:
break empty
}
}
}
}