diff --git a/orm/svc.go b/orm/svc.go index 8fca869..2f0c6df 100644 --- a/orm/svc.go +++ b/orm/svc.go @@ -14,6 +14,7 @@ import ( "context" "database/sql" + "github.com/keepchen/go-sail/v3/lib/db" "github.com/keepchen/go-sail/v3/lib/logger" "go.uber.org/zap" "gorm.io/gorm" @@ -97,6 +98,17 @@ var NewSvcImpl = func(dbr *gorm.DB, dbw *gorm.DB, logger *zap.Logger) Svc { } } +// NewSvcImplSilent 初始化 +// +// 使用默认的读写实例和日志对象 +var NewSvcImplSilent = func() Svc { + return &SvcImpl{ + dbr: db.GetInstance().R, + dbw: db.GetInstance().W, + logger: logger.GetLogger(), + } +} + func (a *SvcImpl) Model(value interface{}) Svc { if a.tx == nil { a.tx = a.dbw diff --git a/schedule/delay.go b/schedule/delay.go index ec933d6..4577507 100644 --- a/schedule/delay.go +++ b/schedule/delay.go @@ -15,7 +15,7 @@ func (j *taskJob) RunAfter(delay time.Duration) (cancel CancelFunc) { timer := time.After(delay) cancel = j.cancelFunc - wrappedTaskFunc := func() { + j.wrappedTaskFunc = func() { j.running = true defer func() { @@ -41,7 +41,7 @@ func (j *taskJob) RunAfter(delay time.Duration) (cancel CancelFunc) { for { select { case <-timer: - go wrappedTaskFunc() + go j.wrappedTaskFunc() break LOOP case <-j.cancelTaskChan: break LOOP diff --git a/schedule/interval.go b/schedule/interval.go index d28f3a1..6024758 100644 --- a/schedule/interval.go +++ b/schedule/interval.go @@ -23,7 +23,7 @@ func (j *taskJob) run() { go func() { ticker := time.NewTicker(j.interval) defer ticker.Stop() - wrappedTaskFunc := func() { + j.wrappedTaskFunc = func() { j.running = true defer func() { @@ -47,7 +47,7 @@ func (j *taskJob) run() { for { select { case <-ticker.C: - go wrappedTaskFunc() + go j.wrappedTaskFunc() //收到退出信号,终止任务 case <-j.cancelTaskChan: if j.withoutOverlapping && j.lockedByMe { diff --git a/schedule/schedule.go b/schedule/schedule.go index b4341db..849846a 100644 --- a/schedule/schedule.go +++ b/schedule/schedule.go @@ -119,6 +119,7 @@ type Scheduler interface { type taskJob struct { name string task func() + wrappedTaskFunc func() interval time.Duration lockerKey string lockedByMe bool @@ -238,3 +239,64 @@ func JobIsRunning(jobName string) bool { return running } + +// Call 手动启动任务 +// +// jobName 任务名称 +// +// mandatory 如果为true,将不检测堆叠状态而直接执行 +// +// # Note +// +// 内部函数将被同步式的调用 +func Call(jobName string, mandatory bool) { + var ( + job *taskJob + lockerKey = generateJobNameKey(jobName) + ) + taskSchedules.mux.RLock() + if jb, ok := taskSchedules.pool[lockerKey]; ok { + job = jb + } + taskSchedules.mux.RUnlock() + if job == nil { + fmt.Printf("[GO-SAIL] call job {%s} failed,cause job not registered.\n", job.name) + return + } + if mandatory { + job.task() + } else { + job.wrappedTaskFunc() + } +} + +// MustCall 手动启动任务 +// +// jobName 任务名称 +// +// mandatory 如果为true,将不检测堆叠状态而直接执行 +// +// # Note +// +// 1.若jobName在任务列表中不存在(如未注册或被取消),将panic +// +// 2.内部函数将被同步式的调用 +func MustCall(jobName string, mandatory bool) { + var ( + job *taskJob + lockerKey = generateJobNameKey(jobName) + ) + taskSchedules.mux.RLock() + if jb, ok := taskSchedules.pool[lockerKey]; ok { + job = jb + } + taskSchedules.mux.RUnlock() + if job == nil { + panic(fmt.Errorf("job name: %s not registered", jobName)) + } + if mandatory { + job.task() + } else { + job.wrappedTaskFunc() + } +} diff --git a/schedule/specs.go b/schedule/specs.go index d45081d..2feb555 100644 --- a/schedule/specs.go +++ b/schedule/specs.go @@ -35,7 +35,7 @@ func (j *taskJob) RunAt(crontabExpr string) (cancel CancelFunc) { }) //因为AddFunc内部是协程启动,因此这里的方法使用同步方式调用 - wrappedTaskFunc := func() { + j.wrappedTaskFunc = func() { j.running = true defer func() { @@ -56,7 +56,7 @@ func (j *taskJob) RunAt(crontabExpr string) (cancel CancelFunc) { } } - jobID, jobErr := cronJob.AddFunc(crontabExpr, wrappedTaskFunc) + jobID, jobErr := cronJob.AddFunc(crontabExpr, j.wrappedTaskFunc) if jobErr != nil { fmt.Printf("[GO-SAIL] add job {%s} failed: %v\n", j.name, jobErr.Error()) }