-
Notifications
You must be signed in to change notification settings - Fork 1
/
cupid.go
146 lines (124 loc) · 3.58 KB
/
cupid.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
. "./jobQueue"
"./process"
"fmt"
"github.com/jinzhu/configor"
"go.uber.org/zap"
"gopkg.in/urfave/cli.v2"
"os"
"os/signal"
"sort"
"sync"
"syscall"
)
var (
app = &cli.App{
Name: "cupid-go",
Usage: "简单快速的数据补偿工具,可监听insert和update数据",
Version: "0.0.1",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "configPath",
Aliases: []string{"c"},
Value: "/tmp/cupid-go.json",
Usage: "配置json,里面存储的是你对项目的配置和监听表的配置",
},
&cli.IntFlag{
Name: "startId",
Aliases: []string{"s"},
Value: 1,
Usage: "起始的源table的id,没有的话从0开始,有的话从当前设置开始",
},
},
//Commands: []*cli.Command{
// {
// Name: "start",
// Aliases: []string{},
// Usage: "开始我们的程序",
// Action: func(c *cli.Context) error {
// return nil
// },
//
// },
//},
}
)
func init() {
app.Action = rundeamon
app.Before = func(ctx *cli.Context) error {
fmt.Println("cupid-go 程序已开启")
return nil
}
app.After = func(c *cli.Context) error {
fmt.Println("cupid-go 程序成功关闭")
return nil
}
sort.Sort(cli.FlagsByName(app.Flags))
sort.Sort(cli.CommandsByName(app.Commands))
}
func rundeamon(ctx *cli.Context) error {
if args := ctx.Args(); args.Len() > 0 {
return fmt.Errorf("invalid command: %q", args.Get(0))
}
configPath := ctx.String("configPath")
startId := ctx.Int("startId")
StartId = startId
fmt.Println("使用的configPath", configPath)
fmt.Println("设置的startId", startId)
start(configPath, startId)
return nil
}
func start(configPath string, startId int) {
configor.Load(&process.Config, configPath)
fmt.Println("日志输出路径", process.Config.LogDir)
InitLogger(process.Config.LogDir, "debug")
ProcessJobQueue = make(JobChan, process.Config.WorkerNumber)
CallbackJobQueue = make(JobChan, process.Config.WorkerNumber)
FailJobQueue = make(JobChan, process.Config.WorkerNumber)
callbackProcess := process.NewProcess(&process.CallbackProcess{}, process.Config.WorkerNumber, CallbackJobQueue)
failProcess := process.NewFailProcess()
callbackProcess.Run()
failProcess.Run()
masterProcess := process.NewProcess(&process.MasterProcess{}, process.Config.WorkerNumber, ProcessJobQueue)
masterProcess.Run()
InitSignal(masterProcess, callbackProcess, failProcess)
}
func main() {
err := app.Run(os.Args)
if err != nil {
fmt.Println(err)
}
}
func InitSignal(masterProcess, callbackProcess *process.Process, failProcess *process.FailProcess) {
var wg sync.WaitGroup
wg.Add(1)
go func(wg sync.WaitGroup) {
for {
c := make(chan os.Signal) //监听指定信号 ctrl+c kill
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
s := <-c
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
Logger.Info("进程收到退出信号", zap.String("signal", s.String()))
masterProcess.Exit()
Logger.Info("masterProcess退出成功")
callbackProcess.Exit()
Logger.Info("callbackProcess退出成功")
failProcess.Exit()
Logger.Info("failProcess退出成功")
wg.Done()
os.Exit(0)
//case syscall.SIGUSR1: //kill -10 pid
// //SetLogLevel("DEBUG") //切换日志级别DEBUG
// Logger.Info("进程收到SIGUSR1信号")
//case syscall.SIGUSR2: //kill -12 pid
// //SetLogLevel("ERROR") //切换日志级别ERROR
// Logger.Info("进程收到SIGUSR2信号")
default:
Logger.Info("进程收到其它信号", zap.String("signal", s.String()))
}
}
}(wg)
wg.Wait()
}