-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DelayQueen 延迟队列 #12
Labels
Comments
JemmyH
added
documentation
Improvements or additions to documentation
Implementation by Go
labels
Jul 8, 2021
Repository owner
locked and limited conversation to collaborators
Jul 8, 2021
以 expiration 为优先级的优先级队列我们先回顾一下优先级队列的实现,其中有一个方法 type item struct {
Value interface{}
Priority int64
Index int
}
// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type priorityQueue []*item
// ........
// PopWithPriority 弹出优先级小于 maxP 的堆顶元素,如果没有,返回 nil 和 当前堆顶和maxP的距离
func (mh2 *PriorityQueen) PopWithPriority(maxP int64) (*Item, int64) {
if mh2.Len() == 0 {
return nil, 0
}
item := (*mh2)[0]
if item.priority > maxP {
return nil, item.priority - maxP
}
heap.Remove(mh2, 0)
return item, 0
} 这个方法的效果是从优先级队列中弹出 优先级小于maxP的堆顶元素,如果堆顶元素的优先级大于给定的
|
DelayQueen 的实现// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
// an element can only be taken when its delay has expired. The head of the
// queue is the *Delayed* element whose delay expired furthest in the past.
type DelayQueue struct {
// 出口,当最小堆中有元素弹出(到期该执行)的时候,从这个 channel 发送出去
C chan interface{}
mu sync.Mutex
// 优先级队列
pq priorityQueue
// Similar to the sleeping state of runtime.timers。delay_queen 状态,1:休眠,0:工作
sleeping int32
// 当有新的任务添加进来并且需要有可能会立即执行时,用于唤醒 delay_queen
wakeupC chan struct{}
}
// NewDelayQueen creates an instance of delayQueue with the specified size.
func NewDelayQueen(size int) *DelayQueue {
return &DelayQueue{
C: make(chan interface{}),
pq: newPriorityQueue(size),
wakeupC: make(chan struct{}),
}
}
Offer// Offer 将一个元素添加到当前的队列中
func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
item := &item{Value: elem, Priority: expiration}
dq.mu.Lock()
heap.Push(&dq.pq, item)
index := item.Index
dq.mu.Unlock()
if index == 0 {
// index 表示这个 item 添加到队列中的 index,0 表示数组第一个元素,此时有可能需要马上去检查这个任务是否到期,
// 因此需要启动已经休眠的 delay_queen 进行处理(已经启动的话就忽略)
// CompareAndSwapInt32(addr *int32, old int32, new int32) ==> 如果 addr 处的值和 old 相同,则将 addr 的值设置为 new,并返回 true
if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
dq.wakeupC <- struct{}{}
}
}
} Poll// Poll 启动一个死循环,不断地从优先级队列中取出优先级最高的那个元素,并将元素对应的 value 通过 dq.C channel 发送出去
func (dq *DelayQueue) Poll(
exitC chan struct{} /*如果从这个 channel 中收到消息,则停止循环*/,
nowF func() int64 /*获取当前时间的函数,一般是 time.Now().Unix() 相关*/) {
for {
now := nowF()
dq.mu.Lock()
item, delta := dq.pq.PopWithPriority(now)
if item == nil {
// 没有到期的任务需要弹出,让 loop 休眠
// No items left or at least one item is pending.
// We must ensure the atomicity of the whole operation, which is
// composed of the above PopWithPriority and the following StoreInt32,
// to avoid possible race conditions between Offer and Poll.
atomic.StoreInt32(&dq.sleeping, 1)
}
dq.mu.Unlock()
if item == nil {
if delta == 0 {
// 优先级队列是空的,此时等待 Offer 调用,插入新的 item
select {
case <-dq.wakeupC:
// Wait until a new item is added.
// 当有元素加入时,继续循环,否则阻塞在此处
continue
case <-exitC:
goto exit
}
} else if delta > 0 {
// 优先级队列中有元素,但是优先级最高的那个还没到执行时间,还有 delta 的时间
// 此时,可以一直等 delta 后,唤醒 loop,进行调度弹出;这个等待的过程中有可能就又有新的元素通过 Offer 进来,也会触发调度
select {
case <-dq.wakeupC:
// A new item with an "earlier" expiration than the current "earliest" one is added.
continue
case <-time.After(time.Duration(delta) * time.Millisecond):
// The current "earliest" item expires.
// Reset the sleeping state since there's no need to receive from wakeupC.
// 一直等到了堆顶元素执行,此时直接更新 loop 的状态为未休眠,并唤醒之
if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
// A caller of Offer() is being blocked on sending to wakeupC,
// drain wakeupC to unblock the caller.
<-dq.wakeupC
}
continue
case <-exitC:
goto exit
}
}
}
select {
case dq.C <- item.Value:
// 将到期的 item 的 Value 发送出去
// The expired element has been sent out successfully.
case <-exitC:
goto exit
}
}
exit:
// Reset the states
atomic.StoreInt32(&dq.sleeping, 0)
} |
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Labels
延迟队列是管理延时任务的一种方式。你可以往里面添加定时任务(Offer),它会一直轮训等待(Poll)同时将指定时间点到期的任务告诉调用方。
它的实现依赖于优先级队列,关于优先级队列的实现,可以看 最小堆 以及 优先级队列的 Golang 实现。
本 issue 的目的是使用
Golang
实现一个DelayQueen
,为后续的 时间轮 的实现做准备。The text was updated successfully, but these errors were encountered: