Skip to content

Commit

Permalink
Fixed cycle queue
Browse files Browse the repository at this point in the history
  • Loading branch information
willzhen committed Nov 30, 2023
1 parent 36b0ad2 commit 24528fc
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,17 @@ func (s *TaskScheduler) checkProcessed(taskId string) bool {
}
s.lock.Lock()
defer s.lock.Unlock()

if _, ok := s.processedTask[taskId]; !ok {
if s.count >= s.bufflen {
// 满了, 清除一个头部数据
delete(s.processedTask, s.taskProcessedTime[s.head].taskId)
s.head++
s.count--
if s.head >= s.bufflen {
s.head = 0
}
}
s.processedTask[taskId] = true
s.taskProcessedTime[s.tail] = processTime{
t: time.Now(),
Expand All @@ -180,24 +190,17 @@ func (s *TaskScheduler) checkProcessed(taskId string) bool {
if s.tail >= s.bufflen {
s.tail = 0
}
if s.count >= s.bufflen {
// 满了, head 被覆盖
s.head++
if s.head >= s.bufflen {
s.head = 0
}
} else {
s.count++
}
s.count++
return true
} else {
return false
}
}

func (s *TaskScheduler) cleanProcessTask() {
ticker := time.NewTicker(3 * time.Second)
ticker := time.NewTicker(time.Second)
for {

select {
case <-s.ctx.Done():
return
Expand All @@ -212,12 +215,12 @@ func (s *TaskScheduler) cleanProcessTask() {
}()

for s.count > 0 {
if s.taskProcessedTime[s.head].t.After(time.Now().Add(-5 * time.Second)) {
if s.taskProcessedTime[s.head].t.Before(time.Now().Add(-time.Second)) {
delete(s.processedTask, s.taskProcessedTime[s.head].taskId)
s.head++
s.count--
if s.head >= s.bufflen {
s.bufflen = 0
s.head = 0
}
} else {
break
Expand Down

0 comments on commit 24528fc

Please sign in to comment.