-
Notifications
You must be signed in to change notification settings - Fork 0
/
janitor.go
177 lines (158 loc) · 4.8 KB
/
janitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package pico
import (
"sync"
"time"
"github.com/tuxdude/zzzlogi"
"golang.org/x/sys/unix"
)
const (
defaultFailedExitStatus = 77
)
// Service janitor.
type serviceJanitor struct {
// Logger used by the janitor.
log zzzlogi.Logger
// Service repository.
repo janitorRepo
// True if more than one service is being managed by the service
// manager, false otherwise.
multiServiceMode bool
// Mutex controlling access to the field shuttingDown.
shuttingDownMu sync.Mutex
// True if shutting down, false otherwise.
shuttingDown bool
// Service termination notification channel.
termNotificationCh chan *terminatedService
}
// janitorRepo is the repository interface used by the janitor to remove
// the terminated services from the repository.
type janitorRepo interface {
removeService(pid int) (*launchedServiceOrHook, bool)
count() int
}
type janitorSignalManager interface {
multicastSig(sig unix.Signal) int
}
// terminatedService contains information about the launched service that
// was terminated along with its exit status.
type terminatedService struct {
service *launchedServiceOrHook
exitStatus int
}
// newServiceJanitor instantiates a new janitor.
func newServiceJanitor(
log zzzlogi.Logger,
repo janitorRepo,
multiServiceMode bool,
) *serviceJanitor {
return &serviceJanitor{
log: log,
repo: repo,
multiServiceMode: multiServiceMode,
termNotificationCh: make(chan *terminatedService, 1),
}
}
// handleProcTerminaton handles the termination of the specified processes.
func (s *serviceJanitor) handleProcTerminaton(procs []*reapedProc) {
for _, proc := range procs {
s.log.Tracef("Observed reaped pid: %d wstatus: %v", proc.pid, proc.waitStatus)
// We could be reaping processes that weren't one of the service processes
// we launched directly (however, likely to be one of its children).
serv, match := s.repo.removeService(proc.pid)
if match {
// Only handle services that service manager cares about.
s.handleServiceTermination(serv, proc.waitStatus.ExitStatus())
}
}
}
// wait waits till the first service terminates and returns the terminated
// service information along with its exit status.
func (s *serviceJanitor) wait() (*launchedServiceOrHook, int) {
t := <-s.termNotificationCh
if t == nil {
s.log.Warnf("Service termination notification channel closed Unexpectedly, possibly indicates a bug")
return nil, defaultFailedExitStatus
}
return t.service, t.exitStatus
}
// handleServiceTermination handles the termination of the specified service.
func (s *serviceJanitor) handleServiceTermination(serv *launchedServiceOrHook, exitStatus int) {
s.log.Infof("Service: %v exited, exit status: %d", serv, exitStatus)
if !s.markShutDown() {
// We are already in the middle of a shut down, nothing more to do.
return
}
resultExitStatus := defaultFailedExitStatus
if !s.multiServiceMode {
// In single service mode persist the exit code same as the
// terminated service.
resultExitStatus = exitStatus
} else if exitStatus != 0 {
// In multi service mode calculate the exit code based on:
// - Use the terminated process's exit code if non-zero.
// - Set exit code to a pre-determined non-zero value if
// terminated process's exit code is zero.
resultExitStatus = exitStatus
}
// Wake up the waiter goroutine to handle the rest.
s.termNotificationCh <- &terminatedService{
service: serv,
exitStatus: resultExitStatus,
}
close(s.termNotificationCh)
}
// markShutDown marks shut down state within the janitor which prevents
// future notifications over the channel for service terminations.
func (s *serviceJanitor) markShutDown() bool {
s.shuttingDownMu.Lock()
defer s.shuttingDownMu.Unlock()
if s.shuttingDown {
return false
}
s.shuttingDown = true
return true
}
// shutDown terminates any running services launched by Init.
func (s *serviceJanitor) shutDown(signals janitorSignalManager) {
s.markShutDown()
sig := unix.SIGTERM
totalAttempts := 3
pendingTries := totalAttempts + 1
for pendingTries > 0 {
if pendingTries == 1 {
sig = unix.SIGKILL
}
pendingTries--
count := signals.multicastSig(sig)
if count == 0 {
break
}
if pendingTries > 0 {
s.log.Infof(
"Graceful termination Attempt [%d/%d] - Sent signal %s to %d services",
totalAttempts+1-pendingTries,
totalAttempts,
sigInfo(sig),
count,
)
} else {
s.log.Infof("All graceful termination attempts exhausted, sent signal %s to %d services", sigInfo(sig), count)
}
sleepUntil := time.NewTimer(5 * time.Second)
tick := time.NewTicker(10 * time.Millisecond)
keepWaiting := true
for keepWaiting {
select {
case <-tick.C:
if s.repo.count() == 0 {
keepWaiting = false
pendingTries = 0
}
case <-sleepUntil.C:
keepWaiting = false
}
}
sleepUntil.Stop()
tick.Stop()
}
}