-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
114 lines (105 loc) · 3.08 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package beans
import (
"fmt"
"github.com/beanstalkd/go-beanstalk"
"github.com/chenbo29/go-beanstalkd-client/loglocal"
"strconv"
"time"
)
type job struct {
id uint64
body []byte
}
// TubeFactory Tube工厂
type TubeFactory struct {
workerNum int
name string
conn *beanstalk.Conn
executeFunc *JobExecuteFunc
tubeSet *beanstalk.TubeSet
jobChan chan job
}
// NewTubeFactory 创建Tube工厂
func NewTubeFactory(name string, num int, conn *beanstalk.Conn, executeFunc *JobExecuteFunc) *TubeFactory {
w := TubeFactory{
workerNum: num,
name: name,
conn: conn,
executeFunc: executeFunc,
}
w.tubeSet = beanstalk.NewTubeSet(conn, name)
w.jobChan = make(chan job)
return &w
}
// Run 工厂启动
func (tf *TubeFactory) Run() {
loglocal.Info(fmt.Sprintf("TubeFactory(%s) Running, %d`s Worker", tf.name, tf.workerNum))
go tf.ReserveJob()
for i := 0; i < tf.workerNum; i++ {
w := NewWorker(strconv.Itoa(i), func(name string, conn *beanstalk.Conn) error {
executeFunc := *tf.executeFunc
tubeSet := beanstalk.NewTubeSet(tf.conn, tf.name)
var errorReserve error
//var errorBury error
var jobID uint64
var jobBody []byte
for {
jobID, jobBody, errorReserve = tubeSet.Reserve(reserveTime)
//jobID = j.id
//jobBody = j.body
if errorReserve != nil {
//loglocal.Error(fmt.Sprintf("%s Error: %s", tf.name, errorReserve))
} else {
loglocal.Info(fmt.Sprintf("%s Worker(%s) Get JobId [%d] JobBody [%s] And Start To Do", tf.name, name, jobID, string(jobBody)))
if executeFunc.Execute(jobID, jobBody) {
// 业务执行结果成功
DeleteJob(tf, name, jobID)
} else {
// 业务执行结果失败
BuryJob(tf, name, jobID)
}
}
time.Sleep(time.Second * 1)
}
return errorReserve
})
go w.Execute(tf)
}
}
// DeleteJob 业务函数执行成功后删除job
func DeleteJob(tf *TubeFactory, workerName string, jobID uint64) {
var errorDeleteJob error
for {
if errorDeleteJob = tf.conn.Delete(jobID); errorDeleteJob != nil {
time.Sleep(1 * time.Second)
//loglocal.Error(errorDeleteJob)
//loglocal.Error(tf.conn.StatsJob(jobId))
continue
} else {
loglocal.Info(fmt.Sprintf("%s Worker(%s) Start To Do Job(%d) Finish ✔ !", tf.name, workerName, jobID))
break
}
}
}
// BuryJob 回收Job
func BuryJob(tf *TubeFactory, workerName string, jobID uint64) {
if err := tf.conn.Bury(jobID, 0); err != nil {
loglocal.Info(fmt.Sprintf("%s Worker(%s) Start To Do Job(%d) Failed ⚠ !", tf.name, workerName, jobID))
loglocal.Error(err)
} else {
loglocal.Info(fmt.Sprintf("%s Worker(%s) Start To Do Job(%d) Failed And Buried ⚠ !", tf.name, workerName, jobID))
}
}
// ReserveJob reserve job
func (tf *TubeFactory) ReserveJob() {
for {
jobID, jobBody, err := tf.tubeSet.Reserve(reserveTime)
if err != nil {
//loglocal.Error(fmt.Sprintf("%s Error: %s", tf.name, err))
} else {
loglocal.Info(fmt.Sprintf("%s Get JobId [%d] JobBody [%s]", tf.name, jobID, string(jobBody)))
tf.jobChan <- job{id: jobID, body: jobBody}
}
//time.Sleep(reserveTime * time.Second)
}
}