From 321355c7efe1520de8de01eb5c776e7ee8d9a5cd Mon Sep 17 00:00:00 2001 From: keepchen Date: Wed, 24 Jan 2024 11:14:33 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=A1=86=E6=9E=B6=E5=90=AF=E5=8A=A8=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E4=BC=98=E5=8C=96,2.redislock=E6=96=B0=E5=A2=9EtryLoc?= =?UTF-8?q?k=E6=96=B9=E6=B3=95,3.schedule=E6=96=B0=E5=A2=9E=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=9F=A5=E8=AF=A2=E6=94=AF=E6=8C=81,4.=E5=85=B6?= =?UTF-8?q?=E4=BB=96=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- constants/errors.go | 2 +- examples/pkg/app/user/user.go | 34 +++- http/api/response.go | 26 ++- sail/components.go | 6 +- sail/config/utils.go | 24 +-- sail/httpserver/prometheus.go | 4 +- sail/httpserver/swagger.go | 4 +- sail/sail.go | 162 +++++++++++++++--- schedule/interval.go | 198 ++++++++++++++++++++++ schedule/schedule.go | 298 +++++++++------------------------- schedule/specs.go | 21 +++ utils/redislock.go | 55 ++++++- utils/sm4.go | 5 +- utils/strings.go | 154 ++++++++++++++++-- utils/strings_test.go | 94 ++++++++++- 15 files changed, 795 insertions(+), 292 deletions(-) create mode 100644 schedule/interval.go create mode 100644 schedule/specs.go diff --git a/constants/errors.go b/constants/errors.go index 2dbb1e5..1ca4360 100644 --- a/constants/errors.go +++ b/constants/errors.go @@ -16,7 +16,7 @@ const ( var initErrorCodeMsgMap = map[CodeType]string{ ErrNone: "SUCCESS", ErrRequestParamsInvalid: "Bad request parameters", - ErrAuthorizationTokenInvalid: "Token invalid", + ErrAuthorizationTokenInvalid: "Authorization token invalid", ErrInternalServerError: "Internal server error", } diff --git a/examples/pkg/app/user/user.go b/examples/pkg/app/user/user.go index a733678..93ac143 100644 --- a/examples/pkg/app/user/user.go +++ b/examples/pkg/app/user/user.go @@ -31,13 +31,13 @@ import ( "sync" "time" + "github.com/keepchen/go-sail/v3/examples/pkg/app/user/http/routes" + "github.com/keepchen/go-sail/v3/sail/config" "github.com/keepchen/go-sail/v3/schedule" "github.com/keepchen/go-sail/v3/lib/logger" - "github.com/keepchen/go-sail/v3/examples/pkg/app/user/http/routes" - "github.com/keepchen/go-sail/v3/constants" "github.com/keepchen/go-sail/v3/http/api" @@ -83,24 +83,42 @@ func StartServer(wg *sync.WaitGroup) { ErrNoneCodeMsg: "SUCCEED", ForceHttpCode200: true, } - before = func() { + beforeFunc = func() { fmt.Println("call user function [before] to do something...") } - after = func() { + afterFunc = func() { fmt.Println("call user function [after] to do something...") - cancel0 := schedule.Job("print now datetime", func() { + job0 := "print now datetime" + cancel0 := schedule.Job(job0, func() { fmt.Println("now: ", utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN)) }).RunAt(schedule.EveryMinute) time.AfterFunc(time.Minute*3, cancel0) - cancel1 := schedule.Job("print hello", func() { + job1 := "print hello" + cancel1 := schedule.Job(job1, func() { + time.Sleep(time.Second * 10) fmt.Println(utils.FormatDate(time.Now(), utils.YYYY_MM_DD_HH_MM_SS_EN), "hello") }).EverySecond() - time.AfterFunc(time.Second*5, cancel1) + time.AfterFunc(time.Second*33, cancel1) + + ticker := time.NewTicker(time.Second) + times := 0 + LOOP: + for range ticker.C { + times++ + fmt.Printf("job: {%s} is running: %t | job: {%s} is running: %t\n", + job0, schedule.JobIsRunning(job0), job1, schedule.JobIsRunning(job1)) + if times > 50 { + break LOOP + } + } } ) - sail.WakeupHttp("go-sail", conf, apiOption).Launch(routes.RegisterRoutes, before, after) + //直接启动 + //sail.WakeupHttp("go-sail", conf).Launch(routes.RegisterRoutes) + //挂载处理方法后启动 + sail.WakeupHttp("go-sail", conf).SetupApiOption(apiOption).Hook(routes.RegisterRoutes, beforeFunc, afterFunc).Launch() } // RegisterServicesToNacos 将服务注册到注册中心 diff --git a/http/api/response.go b/http/api/response.go index 974bef9..447c416 100644 --- a/http/api/response.go +++ b/http/api/response.go @@ -10,14 +10,24 @@ import ( "github.com/keepchen/go-sail/v3/http/pojo/dto" ) +type Emitter interface { + Builder(code constants.ICodeType, resp dto.IResponse, message ...string) Emitter + Assemble(code constants.ICodeType, resp dto.IResponse, message ...string) Emitter + Status(httpCode int) Emitter + SendWithCode(httpCode int) + Send() +} + type API struct { engine *gin.Context httpCode int data interface{} } -func New(c *gin.Context) API { - return API{ +var _ Emitter = &API{} + +func New(c *gin.Context) Emitter { + return &API{ engine: c, } } @@ -31,21 +41,21 @@ func New(c *gin.Context) API { // Response(c).Builder(...).Send() // // New 方法的语法糖 -func Response(c *gin.Context) API { +func Response(c *gin.Context) Emitter { return New(c) } // Builder 组装返回数据 // // Assemble 方法的语法糖 -func (a API) Builder(code constants.ICodeType, resp dto.IResponse, message ...string) API { +func (a *API) Builder(code constants.ICodeType, resp dto.IResponse, message ...string) Emitter { return a.Assemble(code, resp, message...) } // Assemble 组装返回数据 // // 该方法会根据传递的code码自动设置http状态、描述信息、当前系统毫秒时间戳以及请求id(需要在路由配置中调用middleware.Before中间件) -func (a API) Assemble(code constants.ICodeType, resp dto.IResponse, message ...string) API { +func (a *API) Assemble(code constants.ICodeType, resp dto.IResponse, message ...string) Emitter { var ( body dto.Base requestId string @@ -124,18 +134,18 @@ func (a API) Assemble(code constants.ICodeType, resp dto.IResponse, message ...s // Status 指定http状态码 // // 该方法会覆盖 Assemble 解析的http状态码值 -func (a API) Status(httpCode int) API { +func (a *API) Status(httpCode int) Emitter { a.httpCode = httpCode return a } // SendWithCode 以指定http状态码响应请求 -func (a API) SendWithCode(httpCode int) { +func (a *API) SendWithCode(httpCode int) { a.engine.AbortWithStatusJSON(httpCode, a.data) } // Send 响应请求 -func (a API) Send() { +func (a *API) Send() { a.SendWithCode(a.httpCode) } diff --git a/sail/components.go b/sail/components.go index 42ee30a..ec04a20 100644 --- a/sail/components.go +++ b/sail/components.go @@ -56,12 +56,12 @@ func GetNats() *natsLib.Conn { } // GetLogger 获取日志实例 -func GetLogger(modules ...string) *zap.Logger { - return logger.GetLogger(modules...) +func GetLogger(module ...string) *zap.Logger { + return logger.GetLogger(module...) } // Response http响应组件 -func Response(c *gin.Context) api.API { +func Response(c *gin.Context) api.Emitter { return api.New(c) } diff --git a/sail/config/utils.go b/sail/config/utils.go index 5c1e1dd..65f5c95 100644 --- a/sail/config/utils.go +++ b/sail/config/utils.go @@ -18,17 +18,17 @@ import ( func PrintTemplateConfig(format string, writeToFile ...string) { var ( abort bool - configStr []byte - config = Config{} + cfgStr []byte + cfg = Config{} formatList = [...]string{"json", "yaml", "toml"} ) switch format { case formatList[0]: - configStr, _ = json.MarshalIndent(&config, "", " ") + cfgStr, _ = json.MarshalIndent(&cfg, "", " ") case formatList[1]: - configStr, _ = yaml.Marshal(&config) + cfgStr, _ = yaml.Marshal(&cfg) case formatList[2]: - configStr, _ = toml.Marshal(&config) + cfgStr, _ = toml.Marshal(&cfg) default: fmt.Printf("[GO-SAIL] dump config by using unknown format: %s\n", format) abort = true @@ -39,13 +39,13 @@ func PrintTemplateConfig(format string, writeToFile ...string) { } if len(writeToFile) > 0 { - err := utils.FilePutContents(configStr, writeToFile[0]) + err := utils.FilePutContents(cfgStr, writeToFile[0]) if err != nil { fmt.Printf("[GO-SAIL] dump config to file {%s} error: %s\n", writeToFile[0], err.Error()) } } else { fmt.Printf("[GO-SAIL] dump config (%s) to stdout:\n", format) - fmt.Println(string(configStr)) + fmt.Println(string(cfgStr)) } } @@ -57,21 +57,21 @@ func PrintTemplateConfig(format string, writeToFile ...string) { func ParseConfigFromBytes(format string, source []byte) (*Config, error) { var ( formatList = [...]string{"json", "yaml", "toml"} - conf Config + cfg Config err error ) switch format { case formatList[0]: - err = json.Unmarshal(source, &conf) + err = json.Unmarshal(source, &cfg) case formatList[1]: - err = yaml.Unmarshal(source, &conf) + err = yaml.Unmarshal(source, &cfg) case formatList[2]: - err = toml.Unmarshal(source, &conf) + err = toml.Unmarshal(source, &cfg) default: fmt.Printf("[GO-SAIL] dump config by using unknown format: %s\n", format) err = fmt.Errorf("[GO-SAIL] dump config by using unknown format: %s\n", format) } - return &conf, err + return &cfg, err } diff --git a/sail/httpserver/prometheus.go b/sail/httpserver/prometheus.go index ac0dff6..c90fc54 100644 --- a/sail/httpserver/prometheus.go +++ b/sail/httpserver/prometheus.go @@ -8,10 +8,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -// RunPrometheusServer 启动prometheus指标收集服务 +// RunPrometheusServerOnDebugMode 启动prometheus指标收集服务 // // 当配置文件指明启用时才会启动 -func RunPrometheusServer(conf config.PrometheusConf) { +func RunPrometheusServerOnDebugMode(conf config.PrometheusConf) { if !conf.Enable { return } diff --git a/sail/httpserver/swagger.go b/sail/httpserver/swagger.go index 7982a05..fc7a56c 100644 --- a/sail/httpserver/swagger.go +++ b/sail/httpserver/swagger.go @@ -7,10 +7,10 @@ import ( ginSwagger "github.com/swaggo/gin-swagger" ) -// RunSwaggerServer 启动swagger文档服务 +// RunSwaggerServerOnDebugMode 启动swagger文档服务 // // 当配置文件指明启用时才会启动 -func RunSwaggerServer(conf config.SwaggerConf, ginEngine *gin.Engine) { +func RunSwaggerServerOnDebugMode(conf config.SwaggerConf, ginEngine *gin.Engine) { if !conf.Enable { //如果不是调试模式就不注册swagger路由 return diff --git a/sail/sail.go b/sail/sail.go index adc349f..0506af5 100644 --- a/sail/sail.go +++ b/sail/sail.go @@ -19,6 +19,13 @@ import ( "go.uber.org/zap" ) +// Sailor 船员就位 +type Sailor interface { + SetupApiOption(opt *api.Option) Sailor + Launch(registerRoutes func(ginEngine *gin.Engine)) + Hook(registerRoutes func(ginEngine *gin.Engine), beforeFunc, afterFunc func()) Launchpad +} + // Framework 框架配置 type Framework struct { appName string @@ -26,6 +33,23 @@ type Framework struct { apiOption *api.Option } +var _ Sailor = &Framework{} + +// Launchpad 启动台 +type Launchpad interface { + Launch() +} + +// Launcher 启动器 +type Launcher struct { + fw *Framework + registerRoutesFunc func(ginEngine *gin.Engine) + beforeFunc func() + afterFunc func() +} + +var _ Launchpad = &Launcher{} + // WakeupHttp 唤醒http // // 启动前的配置准备 @@ -35,22 +59,30 @@ type Framework struct { // @param conf 配置文件 // // @param apiOption 统一返回配置(可选) -func WakeupHttp(appName string, conf *config.Config, apiOption *api.Option) *Framework { +func WakeupHttp(appName string, conf *config.Config) Sailor { return &Framework{ - appName: appName, - conf: conf, - apiOption: apiOption, + appName: appName, + conf: conf, } } +// SetupApiOption +// +// 设置统一返回配置 +func (f *Framework) SetupApiOption(opt *api.Option) Sailor { + f.apiOption = opt + + return f +} + // Launch 启动 // // @param registerRoutes 注册路由函数 // -// @param before 前置自定义处理函数(可选),在框架函数之前执行,注意自定义函数是同步执行的 +// # Note: // -// @param after 后置自定义处理函数(可选),在框架函数之后执行,注意自定义函数是同步执行的 -func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine), before, after func()) { +// 未设置前置自动函数、未设置后置自定义函数 +func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine)) { defer func() { if err := recover(); err != nil { logger.GetLogger().Error("---- Recovered ----", zap.Any("error", err)) @@ -59,12 +91,6 @@ func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine), before, a wg := &sync.WaitGroup{} - //:: 根据配置依次初始化组件、启动服务 :: - //- before,自定义前置函数调用 - if before != nil { - before() - } - //- logger logger.Init(f.conf.LoggerConf, f.appName) @@ -105,8 +131,6 @@ func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine), before, a //- gin ginEngine := httpserver.InitGinEngine(f.conf.HttpServer) - - //- 注册自定义路由 if registerRoutes != nil { registerRoutes(ginEngine) } @@ -115,10 +139,10 @@ func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine), before, a httpserver.EnablePProfOnDebugMode(f.conf.HttpServer, ginEngine) //- prometheus - httpserver.RunPrometheusServer(f.conf.HttpServer.Prometheus) + httpserver.RunPrometheusServerOnDebugMode(f.conf.HttpServer.Prometheus) //- swagger - httpserver.RunSwaggerServer(f.conf.HttpServer.Swagger, ginEngine) + httpserver.RunSwaggerServerOnDebugMode(f.conf.HttpServer.Swagger, ginEngine) //- http server wg.Add(1) @@ -126,9 +150,109 @@ func (f *Framework) Launch(registerRoutes func(ginEngine *gin.Engine), before, a printSummaryInfo(f.conf.HttpServer, ginEngine) + wg.Wait() +} + +// Hook 挂载相关方法 +// +// @param registerRoutes 注册路由函数 +// +// @param beforeFunc 前置自定义处理函数(可选),在框架函数之前执行,注意自定义函数是同步执行的 +// +// @param afterFunc 后置自定义处理函数(可选),在框架函数之后执行,注意自定义函数是同步执行的 +func (f *Framework) Hook(registerRoutes func(ginEngine *gin.Engine), beforeFunc, afterFunc func()) Launchpad { + return &Launcher{ + fw: f, + registerRoutesFunc: registerRoutes, + beforeFunc: beforeFunc, + afterFunc: afterFunc, + } +} + +// Launch 启动 +// +// # Note: +// +// 已注册路由、已设置前置自动函数、已设置后置自定义函数 +func (l *Launcher) Launch() { + defer func() { + if err := recover(); err != nil { + logger.GetLogger().Error("---- Recovered ----", zap.Any("error", err)) + } + }() + + wg := &sync.WaitGroup{} + + //:: 根据配置依次初始化组件、启动服务 :: + //- before,自定义前置函数调用 + if l.beforeFunc != nil { + l.beforeFunc() + } + + //- logger + logger.Init(l.fw.conf.LoggerConf, l.fw.appName) + + //- redis(standalone) + if len(l.fw.conf.RedisConf.Host) > 0 { + redis.InitRedis(l.fw.conf.RedisConf) + } + + //- redis(cluster) + if len(l.fw.conf.RedisClusterConf.AddrList) > 0 { + redis.InitRedisCluster(l.fw.conf.RedisClusterConf) + } + + //- database + if len(l.fw.conf.DBConf.DriverName) > 0 { + db.Init(l.fw.conf.DBConf) + } + + //- jwt + if len(l.fw.conf.JwtConf.PublicKey) > 0 { + l.fw.conf.JwtConf.Load() + } + + //- nats + if len(l.fw.conf.NatsConf.Servers) > 0 { + nats.Init(l.fw.conf.NatsConf) + } + + //- kafka + if len(l.fw.conf.KafkaConf.Conf.AddrList) > 0 { + kafka.Init(l.fw.conf.KafkaConf.Conf, l.fw.conf.KafkaConf.Topic, l.fw.conf.KafkaConf.GroupID) + } + + //- etcd + if len(l.fw.conf.EtcdConf.Endpoints) > 0 { + etcd.Init(l.fw.conf.EtcdConf) + } + + //- gin + ginEngine := httpserver.InitGinEngine(l.fw.conf.HttpServer) + + //- 注册自定义路由 + if l.registerRoutesFunc != nil { + l.registerRoutesFunc(ginEngine) + } + + //- pprof + httpserver.EnablePProfOnDebugMode(l.fw.conf.HttpServer, ginEngine) + + //- prometheus + httpserver.RunPrometheusServerOnDebugMode(l.fw.conf.HttpServer.Prometheus) + + //- swagger + httpserver.RunSwaggerServerOnDebugMode(l.fw.conf.HttpServer.Swagger, ginEngine) + + //- http server + wg.Add(1) + go httpserver.RunHttpServer(l.fw.conf.HttpServer, ginEngine, l.fw.apiOption, wg) + + printSummaryInfo(l.fw.conf.HttpServer, ginEngine) + //- after,自定义后置函数调用 - if after != nil { - after() + if l.afterFunc != nil { + l.afterFunc() } wg.Wait() diff --git a/schedule/interval.go b/schedule/interval.go new file mode 100644 index 0000000..4ca5f6a --- /dev/null +++ b/schedule/interval.go @@ -0,0 +1,198 @@ +package schedule + +import "time" + +// Every 每隔多久执行一次 +// +// Note: interval至少需要大于等于1毫秒,否则将被设置为1毫秒 +func (j *TaskJob) Every(interval time.Duration) (cancel CancelFunc) { + if interval.Milliseconds() < 1 { + interval = time.Millisecond + } + j.interval = interval + j.run() + + cancel = j.cancelFunc + + return +} + +// EverySecond 每秒执行一次 +func (j *TaskJob) EverySecond() (cancel CancelFunc) { + j.interval = time.Second + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryFiveSeconds 每5秒执行一次 +func (j *TaskJob) EveryFiveSeconds() (cancel CancelFunc) { + j.interval = time.Second * 5 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTenSeconds 每10秒执行一次 +func (j *TaskJob) EveryTenSeconds() (cancel CancelFunc) { + j.interval = time.Second * 10 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTwentySeconds 每20秒执行一次 +func (j *TaskJob) EveryTwentySeconds() (cancel CancelFunc) { + j.interval = time.Second * 20 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryThirtySeconds 每30秒执行一次 +func (j *TaskJob) EveryThirtySeconds() (cancel CancelFunc) { + j.interval = time.Second * 30 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryMinute 每分钟执行一次 +func (j *TaskJob) EveryMinute() (cancel CancelFunc) { + j.interval = time.Minute + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryFiveMinutes 每5分钟执行一次 +func (j *TaskJob) EveryFiveMinutes() (cancel CancelFunc) { + j.interval = time.Minute * 5 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTenMinutes 每10分钟执行一次 +func (j *TaskJob) EveryTenMinutes() (cancel CancelFunc) { + j.interval = time.Minute * 10 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTwentyMinutes 每20分钟执行一次 +func (j *TaskJob) EveryTwentyMinutes() (cancel CancelFunc) { + j.interval = time.Minute * 20 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryThirtyMinutes 每30分钟执行一次 +func (j *TaskJob) EveryThirtyMinutes() (cancel CancelFunc) { + j.interval = time.Minute * 30 + j.run() + + cancel = j.cancelFunc + + return +} + +// Hourly 每1小时执行一次 +func (j *TaskJob) Hourly() (cancel CancelFunc) { + j.interval = time.Hour + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryFiveHours 每5小时执行一次 +func (j *TaskJob) EveryFiveHours() (cancel CancelFunc) { + j.interval = time.Hour * 5 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTenHours 每10小时执行一次 +func (j *TaskJob) EveryTenHours() (cancel CancelFunc) { + j.interval = time.Hour * 10 + j.run() + + cancel = j.cancelFunc + + return +} + +// EveryTwentyHours 每20小时执行一次 +func (j *TaskJob) EveryTwentyHours() (cancel CancelFunc) { + j.interval = time.Hour * 20 + j.run() + + cancel = j.cancelFunc + + return +} + +// Daily 每天执行一次 +func (j *TaskJob) Daily() (cancel CancelFunc) { + j.interval = time.Hour * 24 + j.run() + + cancel = j.cancelFunc + + return +} + +// Weekly 每周执行一次(每7天) +func (j *TaskJob) Weekly() (cancel CancelFunc) { + j.interval = time.Hour * 24 * 7 + j.run() + + cancel = j.cancelFunc + + return +} + +// Monthly 每月执行一次(每30天) +func (j *TaskJob) Monthly() (cancel CancelFunc) { + j.interval = time.Hour * 24 * 30 + j.run() + + cancel = j.cancelFunc + + return +} + +// Yearly 每年执行一次(每365天) +func (j *TaskJob) Yearly() (cancel CancelFunc) { + j.interval = time.Hour * 24 * 365 + j.run() + + cancel = j.cancelFunc + + return +} diff --git a/schedule/schedule.go b/schedule/schedule.go index 02d4c5c..07cc8b3 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -9,19 +9,43 @@ import ( "github.com/robfig/cron/v3" ) +type CancelFunc func() + // TaskJob 任务 type TaskJob struct { name string task func() interval time.Duration lockerKey string + lockedByMe bool + running bool withoutOverlapping bool - cancelFunc func() + cancelFunc CancelFunc cancelTaskChan chan struct{} } +type TaskJobPool struct { + mux *sync.RWMutex + pool map[string]*TaskJob +} + +var taskSchedules *TaskJobPool + var cronJob *cron.Cron +func init() { + (&sync.Once{}).Do(func() { + taskSchedules = &TaskJobPool{ + mux: &sync.RWMutex{}, + pool: make(map[string]*TaskJob), + } + }) +} + +func generateJobNameKey(name string) string { + return fmt.Sprintf("go-sail:task-schedule-locker:%s", utils.MD5Encrypt(name)) +} + // Job 实例化任务 // // @param name 任务名称唯一标识 @@ -30,7 +54,7 @@ var cronJob *cron.Cron func Job(name string, task func()) *TaskJob { job := &TaskJob{ name: name, - lockerKey: fmt.Sprintf("go-sail:task-schedule-locker:%s", utils.MD5Encrypt(name)), + lockerKey: generateJobNameKey(name), task: task, cancelTaskChan: make(chan struct{}), } @@ -43,6 +67,10 @@ func Job(name string, task func()) *TaskJob { }() } + taskSchedules.mux.Lock() + taskSchedules.pool[job.lockerKey] = job + taskSchedules.mux.Unlock() + return job } @@ -61,213 +89,28 @@ func (j *TaskJob) WithoutOverlapping() *TaskJob { return j } -// Every 每隔多久执行一次 -// -// Note: interval至少需要大于等于1毫秒,否则将被设置为1毫秒 -func (j *TaskJob) Every(interval time.Duration) (cancel func()) { - if interval.Milliseconds() < 1 { - interval = time.Millisecond - } - j.interval = interval - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EverySecond 每秒执行一次 -func (j *TaskJob) EverySecond() (cancel func()) { - j.interval = time.Second - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryFiveSeconds 每5秒执行一次 -func (j *TaskJob) EveryFiveSeconds() (cancel func()) { - j.interval = time.Second * 5 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTenSeconds 每10秒执行一次 -func (j *TaskJob) EveryTenSeconds() (cancel func()) { - j.interval = time.Second * 10 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTwentySeconds 每20秒执行一次 -func (j *TaskJob) EveryTwentySeconds() (cancel func()) { - j.interval = time.Second * 20 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryThirtySeconds 每30秒执行一次 -func (j *TaskJob) EveryThirtySeconds() (cancel func()) { - j.interval = time.Second * 30 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryMinute 每分钟执行一次 -func (j *TaskJob) EveryMinute() (cancel func()) { - j.interval = time.Minute - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryFiveMinutes 每5分钟执行一次 -func (j *TaskJob) EveryFiveMinutes() (cancel func()) { - j.interval = time.Minute * 5 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTenMinutes 每10分钟执行一次 -func (j *TaskJob) EveryTenMinutes() (cancel func()) { - j.interval = time.Minute * 10 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTwentyMinutes 每20分钟执行一次 -func (j *TaskJob) EveryTwentyMinutes() (cancel func()) { - j.interval = time.Minute * 20 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryThirtyMinutes 每30分钟执行一次 -func (j *TaskJob) EveryThirtyMinutes() (cancel func()) { - j.interval = time.Minute * 30 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// Hourly 每1小时执行一次 -func (j *TaskJob) Hourly() (cancel func()) { - j.interval = time.Hour - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryFiveHours 每5小时执行一次 -func (j *TaskJob) EveryFiveHours() (cancel func()) { - j.interval = time.Hour * 5 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTenHours 每10小时执行一次 -func (j *TaskJob) EveryTenHours() (cancel func()) { - j.interval = time.Hour * 10 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// EveryTwentyHours 每20小时执行一次 -func (j *TaskJob) EveryTwentyHours() (cancel func()) { - j.interval = time.Hour * 20 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// Daily 每天执行一次 -func (j *TaskJob) Daily() (cancel func()) { - j.interval = time.Hour * 24 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// Weekly 每周执行一次(每7天) -func (j *TaskJob) Weekly() (cancel func()) { - j.interval = time.Hour * 24 * 7 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// Monthly 每月执行一次(每30天) -func (j *TaskJob) Monthly() (cancel func()) { - j.interval = time.Hour * 24 * 30 - j.run() - - cancel = j.cancelFunc - - return cancel -} - -// Yearly 每年执行一次(每365天) -func (j *TaskJob) Yearly() (cancel func()) { - j.interval = time.Hour * 24 * 365 - j.run() - - cancel = j.cancelFunc - - return cancel -} - // 任务执行函数 func (j *TaskJob) run() { go func() { ticker := time.NewTicker(j.interval) defer ticker.Stop() wrappedTaskFunc := func() { + j.running = true + + defer func() { + j.running = false + }() + if !j.withoutOverlapping { j.task() return } - if utils.RedisLock(j.lockerKey) { - defer utils.RedisUnlock(j.lockerKey) + if utils.RedisTryLock(j.lockerKey) { + defer func() { + utils.RedisUnlock(j.lockerKey) + j.lockedByMe = false + }() + j.lockedByMe = true j.task() } } @@ -278,9 +121,14 @@ func (j *TaskJob) run() { go wrappedTaskFunc() //收到退出信号,终止任务 case <-j.cancelTaskChan: - if j.withoutOverlapping { + if j.withoutOverlapping && j.lockedByMe { utils.RedisUnlock(j.lockerKey) } + + taskSchedules.mux.Lock() + delete(taskSchedules.pool, j.lockerKey) + taskSchedules.mux.Unlock() + break LISTEN } } @@ -297,7 +145,7 @@ func (j *TaskJob) run() { // // | | | | | // -// | | | | +----- day of week (0 - 7) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat +// | | | | +----- day of week (0 - 7) (Sunday=0 or 7) OR sun...sat // // | | | +---------- month (1 - 12) OR jan,feb,mar,apr ... // @@ -306,7 +154,7 @@ func (j *TaskJob) run() { // | +-------------------- hour (0 - 23) // // +------------------------- minute (0 - 59) -func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { +func (j *TaskJob) RunAt(crontabExpr string) (cancel CancelFunc) { (&sync.Once{}).Do(func() { cronJob = cron.New() cronJob.Start() @@ -314,12 +162,22 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { //因为AddFunc内部是协程启动,因此这里的方法使用同步方式调用 wrappedTaskFunc := func() { + j.running = true + + defer func() { + j.running = false + }() + if !j.withoutOverlapping { j.task() return } - if utils.RedisLock(j.lockerKey) { - defer utils.RedisUnlock(j.lockerKey) + if utils.RedisTryLock(j.lockerKey) { + defer func() { + utils.RedisUnlock(j.lockerKey) + j.lockedByMe = false + }() + j.lockedByMe = true j.task() } } @@ -332,6 +190,9 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { cancel = func() { go func() { cronJob.Remove(jobID) + taskSchedules.mux.Lock() + delete(taskSchedules.pool, j.lockerKey) + taskSchedules.mux.Unlock() fmt.Printf("[GO-SAIL] cancel job {%s} successfully\n", j.name) }() } @@ -339,22 +200,17 @@ func (j *TaskJob) RunAt(crontabExpr string) (cancel func()) { return } -// TenClockAtWeekday 每个工作日(周一~周五)上午10点 -func (j *TaskJob) TenClockAtWeekday() (cancel func()) { - return j.RunAt(TenClockAtWeekday) -} - -// TenClockAtWeekend 每个周末(周六和周日)上午10点 -func (j *TaskJob) TenClockAtWeekend() (cancel func()) { - return j.RunAt(TenClockAtWeekend) -} - -// FirstDayOfMonthly 每月1号 -func (j *TaskJob) FirstDayOfMonthly() (cancel func()) { - return j.RunAt(FirstDayOfMonth) -} +// JobIsRunning 查看任务是否正在执行 +func JobIsRunning(jobName string) bool { + var ( + running = false + name = generateJobNameKey(jobName) + ) + taskSchedules.mux.RLock() + if job, ok := taskSchedules.pool[name]; ok { + running = job.running + } + taskSchedules.mux.RUnlock() -// LastDayOfMonthly 每月最后一天 -func (j *TaskJob) LastDayOfMonthly() (cancel func()) { - return j.RunAt(LastDayOfMonth) + return running } diff --git a/schedule/specs.go b/schedule/specs.go new file mode 100644 index 0000000..6a1b38c --- /dev/null +++ b/schedule/specs.go @@ -0,0 +1,21 @@ +package schedule + +// TenClockAtWeekday 每个工作日(周一~周五)上午10点 +func (j *TaskJob) TenClockAtWeekday() (cancel CancelFunc) { + return j.RunAt(TenClockAtWeekday) +} + +// TenClockAtWeekend 每个周末(周六和周日)上午10点 +func (j *TaskJob) TenClockAtWeekend() (cancel CancelFunc) { + return j.RunAt(TenClockAtWeekend) +} + +// FirstDayOfMonthly 每月1号 +func (j *TaskJob) FirstDayOfMonthly() (cancel CancelFunc) { + return j.RunAt(FirstDayOfMonth) +} + +// LastDayOfMonthly 每月最后一天 +func (j *TaskJob) LastDayOfMonthly() (cancel CancelFunc) { + return j.RunAt(LastDayOfMonth) +} diff --git a/utils/redislock.go b/utils/redislock.go index c75c12c..b643f08 100644 --- a/utils/redislock.go +++ b/utils/redislock.go @@ -15,10 +15,16 @@ var ( cancelRenewalFuncChannelCluster = make(chan struct{}) ) -// RedisLock redis锁-上锁(自动推测连接类型) +type CancelFunc func() + +// RedisTryLock redis锁-尝试上锁(自动推测连接类型) // // using SetNX -func RedisLock(key string) bool { +// +// # Note +// +// 该方法会立即返回锁定成功与否的结果 +func RedisTryLock(key string) bool { if redis.GetInstance() != nil { return RedisStandaloneLock(key) } @@ -30,6 +36,51 @@ func RedisLock(key string) bool { panic("using redis lock on nil redis instance") } +// RedisLock redis锁-上锁(自动推测连接类型) +// +// using SetNX +// +// # Note +// +// 该方法会阻塞住线程直到上锁成功或者调用cancel取消 +func RedisLock(key string) (cancel CancelFunc) { + if redis.GetInstance() == nil && redis.GetClusterInstance() == nil { + panic("using redis lock on nil redis instance") + } + + var ( + locked = false + ticker = time.NewTicker(time.Millisecond) + cancelCh = make(chan struct{}) + cancelFunc = func() { + cancelCh <- struct{}{} + close(cancelCh) + } + ) + +LOOP: + for { + select { + case <-ticker.C: + if redis.GetInstance() != nil { + locked = RedisStandaloneLock(key) + } + if redis.GetClusterInstance() != nil { + locked = RedisClusterLock(key) + } + if locked { + break LOOP + } + case <-cancelCh: + break LOOP + } + } + + ticker.Stop() + + return cancelFunc +} + // RedisUnlock redis锁-解锁(自动推测连接类型) // // using SetNX diff --git a/utils/sm4.go b/utils/sm4.go index d656f91..aba8973 100644 --- a/utils/sm4.go +++ b/utils/sm4.go @@ -1,7 +1,6 @@ package utils import ( - "encoding/base64" "encoding/hex" "github.com/tjfoc/gmsm/sm4" @@ -20,7 +19,7 @@ func SM4ECBEncrypt(hexKey, raw string) (string, error) { return "", err } - return base64.StdEncoding.EncodeToString(out), nil + return Base64Encode(out), nil } // SM4ECBDecrypt ECB解密 @@ -32,7 +31,7 @@ func SM4ECBDecrypt(hexKey, base64Raw string) (string, error) { return "", err } - raw, err := base64.StdEncoding.DecodeString(base64Raw) + raw, err := Base64Decode(base64Raw) if err != nil { return "", err } diff --git a/utils/strings.go b/utils/strings.go index f587579..e9f97cc 100644 --- a/utils/strings.go +++ b/utils/strings.go @@ -6,7 +6,7 @@ import ( "strings" ) -//Wordwrap 以给定的字符和长度来打断字符串 +// Wordwrap 以给定的字符和长度来打断字符串 func Wordwrap(rawStr string, length int, split string) string { if len(rawStr) <= length || length < 1 { return rawStr @@ -37,25 +37,26 @@ func Wordwrap(rawStr string, length int, split string) string { return finalStr } -//WrapRedisKey 包装redis键名 +// WrapRedisKey 包装redis键名 // -//给redis的键加入应用名前缀,如: +// 给redis的键加入应用名前缀,如: // -//appName=game key=user +// appName=game key=user // -//最终的redis键名为:game:user +// 最终的redis键名为:game:user // -//此方法的主要作用是按应用来划分redis键名 +// 此方法的主要作用是按应用来划分redis键名 func WrapRedisKey(appName, key string) string { return fmt.Sprintf("%s:%s", appName, key) } const ( - letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - digitalChars = "0123456789" + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + digitalChars = "0123456789" + specificSymbols = "~`!@#$%^&*-_+=/\\|/<>.:;'\"" ) -//RandomLetters 随机字符串(字母) +// RandomLetters 随机字符串(字母) func RandomLetters(length int) string { if length < 1 { return "" @@ -70,7 +71,7 @@ func RandomLetters(length int) string { return string(b) } -//RandomDigitalChars 随机字符串(数字) +// RandomDigitalChars 随机字符串(数字) func RandomDigitalChars(length int) string { if length < 1 { return "" @@ -84,3 +85,136 @@ func RandomDigitalChars(length int) string { return string(b) } + +// RandomComplexString 随机字符串(可带特殊符号) +func RandomComplexString(length int) string { + var s = fmt.Sprintf("%s%s%s", letters, digitalChars, specificSymbols) + + b := make([]byte, length) + + for i := range b { + b[i] = s[rand.Intn(len(s))] + } + + return string(b) +} + +// StringReverse 翻转字符串 +func StringReverse(s string) string { + length := len(s) + + if length < 1 { + return s + } + + b := make([]byte, length) + + for i := range b { + b[length-1-i] = s[i] + } + + return string(b) +} + +// StringShuffle 打乱字符串 +func StringShuffle(s string) string { + length := len(s) + + if length < 1 { + return s + } + + arr := strings.Split(s, "") + + for range arr { + r0, r1 := rand.Intn(length), rand.Intn(length) + arr[r0], arr[r1] = arr[r1], arr[r0] + } + + return strings.Join(arr, "") +} + +// StringPaddingLeft 向左填充字符串 +// +// @param rawString 原字符 +// +// @param padChar 填充字符 +// +// @param length 最终字符长度 +func StringPaddingLeft(rawString, padChar string, length int) string { + return paddingString(rawString, padChar, length, 0) +} + +// StringPaddingRight 向右填充字符串 +// +// @param rawString 原字符 +// +// @param padChar 填充字符 +// +// @param length 最终字符长度 +func StringPaddingRight(rawString, padChar string, length int) string { + return paddingString(rawString, padChar, length, 1) +} + +// StringPaddingBoth 向两端填充字符串 +// +// @param rawString 原字符 +// +// @param padChar 填充字符 +// +// @param length 最终字符长度 +// +// # Note +// +// 如果填充长度不能均分,那么右侧多填充一个字符,如: +// +// rawString = "a",padChar = "#",length = 4 +// +// 则: +// +// result = "#a##" +func StringPaddingBoth(rawString, padChar string, length int) string { + return paddingString(rawString, padChar, length, 2) +} + +// paddingString 填充字符串 +// +// @param rawString 原字符 +// +// @param padChar 填充字符 +// +// @param length 最终字符长度 +// +// @param padType 0:向左填充,1:向右填充,2:向两端填充 +func paddingString(rawString, padChar string, length, padType int) string { + if length < 1 || len(padChar) == 0 { + return rawString + } + padLength := length - len(rawString) + if padLength < 1 { + return rawString + } + + //如果填充字符长度大于1,则取第一个字符 + if len(padChar) > 1 { + var s = make([]byte, 1) + s[0] = padChar[0] + padChar = string(s) + } + + switch padType { + default: + return fmt.Sprintf("%s%s", strings.Repeat(padChar, padLength), rawString) + case 0: + return fmt.Sprintf("%s%s", strings.Repeat(padChar, padLength), rawString) + case 1: + return fmt.Sprintf("%s%s", rawString, strings.Repeat(padChar, padLength)) + case 2: + left, right := padLength/2, padLength/2 + //如果填充长度不能均分,那么右侧多填充一个字符 + if padLength&1 == 1 { + right += 1 + } + return fmt.Sprintf("%s%s%s", strings.Repeat(padChar, left), rawString, strings.Repeat(padChar, right)) + } +} diff --git a/utils/strings_test.go b/utils/strings_test.go index 3d5bdd7..2a94d1a 100644 --- a/utils/strings_test.go +++ b/utils/strings_test.go @@ -1,8 +1,9 @@ package utils import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) var rawStrArr = []string{ @@ -89,3 +90,94 @@ func TestRandomDigitalChars(t *testing.T) { assert.Equal(t, v, len(s)) } } + +func TestRandomComplexString(t *testing.T) { + holders := []int{0, 1, 3, 5, 7, 8, 9, 33, 100} + for _, v := range holders { + s := RandomComplexString(v) + t.Log(s) + assert.Equal(t, v, len(s)) + } +} + +func TestStringReverse(t *testing.T) { + holders := []string{"", "a", "ab", "abc", "1234567890"} + result := []string{"", "a", "ba", "cba", "0987654321"} + for index, v := range holders { + s := StringReverse(v) + t.Log(s) + assert.Equal(t, result[index], s) + } +} + +func TestStringShuffle(t *testing.T) { + holders := []string{"", "a", "ab", "abc", "1234567890"} + for _, v := range holders { + s := StringShuffle(v) + t.Log(s) + } +} + +func TestStringPaddingLeft(t *testing.T) { + holders := []struct { + raw string + padChar string + length int + result string + }{ + {"", "=", 1, "="}, + {"", "=", 2, "=="}, + {"", "=", 3, "==="}, + {"a", "=", 1, "a"}, + {"a", "=", 2, "=a"}, + {"a", "=", 3, "==a"}, + } + for _, v := range holders { + s := StringPaddingLeft(v.raw, v.padChar, v.length) + t.Log(s) + assert.Equal(t, v.result, s) + } +} + +func TestStringPaddingRight(t *testing.T) { + holders := []struct { + raw string + padChar string + length int + result string + }{ + {"", "=", 1, "="}, + {"", "=", 2, "=="}, + {"", "=", 3, "==="}, + {"a", "=", 1, "a"}, + {"a", "=", 2, "a="}, + {"a", "=", 3, "a=="}, + } + for _, v := range holders { + s := StringPaddingRight(v.raw, v.padChar, v.length) + t.Log(s) + assert.Equal(t, v.result, s) + } +} + +func TestStringPaddingBoth(t *testing.T) { + holders := []struct { + raw string + padChar string + length int + result string + }{ + {"", "=", 1, "="}, + {"", "=", 2, "=="}, + {"", "=", 3, "==="}, + {"a", "=", 1, "a"}, + {"a", "=", 2, "a="}, + {"a", "+=", 3, "+a+"}, + {"a", "=+", 4, "=a=="}, + } + for _, v := range holders { + s := StringPaddingBoth(v.raw, v.padChar, v.length) + t.Log(s) + assert.Equal(t, v.result, s) + } +}