Skip to content

Commit

Permalink
fix(crontab): Fixed that when sending SIGINT signal, it could not b…
Browse files Browse the repository at this point in the history
…e stopped properly (#44)
  • Loading branch information
flc1125 authored Dec 25, 2023
1 parent 555f0d5 commit e4b6231
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
# Dependency directories (remove the comment below to include it)
vendor/
.idea
_backup
_backup
_example
46 changes: 23 additions & 23 deletions crontab/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package main

import (
"fmt"
"log"

"github.com/go-kratos/kratos/v2"
"github.com/redis/go-redis/v9"
Expand All @@ -17,35 +17,35 @@ import (
)

func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})

if err := kratos.New(
kratos.Server(
NewCrontabServer(rdb),
),
).Run(); err != nil {
panic(err)
}
}

func NewCrontabServer(rdb redis.Cmdable) *crontab.Server {
c := cron.New(
cron.WithSeconds(),
)

c.AddFunc("*/1 * * * * *", func() {
fmt.Println("Every hour on the half hour")
c.AddFunc("* * * * * *", func() {
log.Println("Hello world")
})

return crontab.NewServer(
c,
crontab.WithName("crontab:server"),
crontab.WithDebug(),
crontab.WithMutex(
redisMutex.New(rdb),
app := kratos.New(
kratos.Server(
crontab.NewServer(c,
crontab.WithMutex(redisMutex.New(redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}))),
crontab.WithDebug(),
),
),
)

app.Run()
}
```

output:

```bash
2023/12/25 14:25:56 crontab: server started
2023/12/25 14:25:57 Hello world
2023/12/25 14:25:58 Hello world
2023/12/25 14:25:59 Hello world
2023/12/25 14:26:00 Hello world
```
33 changes: 12 additions & 21 deletions crontab/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,37 @@ func NewServer(c *cron.Cron, opts ...Option) *Server {
}

func (s *Server) Start(ctx context.Context) error {
go s.run(ctx)

return nil
}

func (s *Server) run(ctx context.Context) {
timer := time.NewTicker(time.Second)

defer func() {
_ = s.mutex.Unlock(ctx, s.name)
if s.running {
s.cron.Stop()
}

s.mutex.Unlock(ctx, s.name)
timer.Stop()
}()

for {
select {
case <-ctx.Done():
s.log("crontab: server done")
return
return ctx.Err()
case <-s.stoped:
s.log("crontab: server stoped")
return
return nil
case <-timer.C:
if err := s.mutex.Lock(ctx, s.name); err != nil {
s.log(err)
continue
}

s.start(ctx)
s.start()
}
}
}

func (s *Server) start(ctx context.Context) {
func (s *Server) start() {
s.runningMu.Lock()
defer s.runningMu.Unlock()

Expand All @@ -111,19 +110,11 @@ func (s *Server) start(ctx context.Context) {
}

func (s *Server) Stop(ctx context.Context) error {
s.runningMu.Lock()
defer s.runningMu.Unlock()

if !s.running {
return nil
}

s.running = false
s.cron.Stop()
s.log("crontab: server stopping")

close(s.stoped)

return s.mutex.Unlock(ctx, s.name)
return nil
}

func (s *Server) log(v ...interface{}) {
Expand Down

0 comments on commit e4b6231

Please sign in to comment.