-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
150 lines (127 loc) · 3 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package schedly
import (
"sync"
"time"
)
/*Scheduler is a simple task scheduling tool.
*/
type Scheduler interface {
Tick() time.Duration
Aligned() bool
SetAligned(aligned bool) Scheduler
NewSchedule(every time.Duration) *schedule
Schedule(every time.Duration, name string, jobFunc func()) *job
AddJob(schedule *schedule, name string, jobFunc func()) *job
Start()
Stop()
WaitForRunningTasks()
WaitUntilStopped()
}
/*NewScheduler creates a new Scheduler instance with provided precision.
Higher precision means more frequent evaluation of Schedules.
To reduce overhead please avoid using tick time smaller than 1 second unless you really need it.
*/
func NewScheduler(tick time.Duration) Scheduler {
return &scheduler{
tick: tick,
jobs: make(map[*schedule][]*job),
toFinish: make(chan int),
globalStop: make(chan bool),
wg: new(sync.WaitGroup),
}
}
type scheduler struct {
jobs map[*schedule][]*job
toFinish chan int
globalStop chan bool
tick time.Duration
aligned bool
ticker *time.Ticker
wg *sync.WaitGroup
}
func (s *scheduler) Tick() time.Duration {
return s.tick
}
func (s *scheduler) Aligned() bool {
return s.aligned
}
func (s *scheduler) SetAligned(aligned bool) Scheduler {
s.aligned = aligned
return s
}
func (s *scheduler) NewSchedule(every time.Duration) *schedule {
schedule := newSchedule(s.tick).SetEvery(every).SetAligned(s.aligned)
return schedule
}
func (s *scheduler) Schedule(every time.Duration, name string, jobFunc func()) *job {
schedule := s.NewSchedule(every)
return s.AddJob(schedule, name, jobFunc)
}
func (s *scheduler) AddJob(schedule *schedule, name string, jobFunc func()) *job {
// TODO: Check that job name is unique
// TODO: Create an interface for retrieving jobs state
if schedule.aligned {
s.SetAligned(true)
}
j := &job{
jobFunc: jobFunc,
name: name,
}
if jList, ok := s.jobs[schedule]; ok {
s.jobs[schedule] = append(jList, j)
} else {
s.jobs[schedule] = []*job{j}
}
return j
}
func (s *scheduler) Start() {
if s.aligned {
curTime := time.Now()
time.Sleep(s.tick - curTime.Sub(curTime.Truncate(s.tick)))
}
s.ticker = time.NewTicker(s.tick)
go func() {
for {
select {
case tick := <-s.ticker.C:
s.runPending(tick)
case <-s.toFinish:
s.ticker.Stop()
s.globalStop <- true
return
}
}
}()
}
func (s *scheduler) Stop() {
s.toFinish <- 1
}
func (s *scheduler) WaitUntilStopped() {
<-s.globalStop
}
func (s *scheduler) WaitForRunningTasks() {
s.wg.Wait()
}
func (s *scheduler) runPending(tick time.Time) {
for schedule, jobs := range s.jobs {
for _, j := range jobs {
if schedule.CanRun(tick, j) {
s.runJob(tick, j)
}
}
}
}
func (s *scheduler) runJob(tick time.Time, j *job) {
s.wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
// TODO: provide an interface for task state change listener
} else {
// TODO: provide an interface for task state change listener
}
s.wg.Done()
}()
j.Run(tick)
}()
}