-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.go
84 lines (75 loc) · 1.55 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package factory
import (
"sync"
"sync/atomic"
)
// Master 管理者角色
type Master struct {
sync.Mutex
maxNum int
ingNum int
cursor int64
workers sync.Map
resetGuard *guard
}
func NewMaster(maxNum, initNum int) *Master {
m := new(Master)
m.maxNum = maxNum
m.ingNum = initNum
for i := 0; i < initNum; i++ {
m.workers.Store(i, newWorker())
}
m.resetGuard = newGuard()
return m
}
func (m *Master) Resize(maxNum int) {
m.maxNum = maxNum
if m.ingNum > maxNum {
m.AdjustSize(maxNum)
}
}
func (m *Master) AddLine(action func(interface{})) *Line {
return NewLine(m, action)
}
func (m *Master) AdjustSize(newSize int) {
if newSize > m.maxNum {
newSize = m.maxNum
}
m.Lock()
defer m.Unlock()
if newSize > m.ingNum {
for i := m.ingNum; i < newSize; i++ {
m.workers.Store(i, newWorker())
}
} else if newSize < m.ingNum {
for i := m.ingNum; i > newSize; i-- {
idx := i - 1
if v, ok := m.workers.Load(idx); ok {
m.workers.Delete(idx)
v.(*worker).shutdown()
}
}
}
m.ingNum = newSize
}
func (m *Master) Running() int {
m.Lock()
defer m.Unlock()
return m.ingNum
}
func (m *Master) Shutdown() {
m.AdjustSize(0) // 关闭所有worker
}
func (m *Master) getWorker() *worker {
idx := int(atomic.AddInt64(&m.cursor, 1)) - 1
if w, ok := m.workers.Load(idx); ok && w != nil {
return w.(*worker)
} else if m.ingNum == 0 {
panic("factory: the master has been shutdown")
}
m.resetGuard.run("get-worker", func() (i interface{}, e error) {
atomic.StoreInt64(&m.cursor, 0)
return nil, nil
})
return m.getWorker()
}