Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DelayQueen 延迟队列 #12

Open
JemmyH opened this issue Jul 8, 2021 · 2 comments
Open

DelayQueen 延迟队列 #12

JemmyH opened this issue Jul 8, 2021 · 2 comments
Assignees
Labels
documentation Improvements or additions to documentation done Implementation by Go

Comments

@JemmyH
Copy link
Owner

JemmyH commented Jul 8, 2021

延迟队列是管理延时任务的一种方式。你可以往里面添加定时任务(Offer),它会一直轮训等待(Poll)同时将指定时间点到期的任务告诉调用方。
它的实现依赖于优先级队列,关于优先级队列的实现,可以看 最小堆 以及 优先级队列的 Golang 实现

本 issue 的目的是使用 Golang 实现一个 DelayQueen,为后续的 时间轮 的实现做准备。

@JemmyH JemmyH self-assigned this Jul 8, 2021
@JemmyH JemmyH added documentation Improvements or additions to documentation Implementation by Go labels Jul 8, 2021
Repository owner locked and limited conversation to collaborators Jul 8, 2021
@JemmyH
Copy link
Owner Author

JemmyH commented Jul 8, 2021

以 expiration 为优先级的优先级队列

我们先回顾一下优先级队列的实现,其中有一个方法 PopWithPriority

type item struct {
	Value    interface{}
	Priority int64
	Index    int
}

// this is a priority queue as implemented by a min heap
// ie. the 0th element is the *lowest* value
type priorityQueue []*item

// ........

// PopWithPriority 弹出优先级小于 maxP 的堆顶元素,如果没有,返回 nil 和 当前堆顶和maxP的距离
func (mh2 *PriorityQueen) PopWithPriority(maxP int64) (*Item, int64) {
	if mh2.Len() == 0 {
		return nil, 0
	}
	item := (*mh2)[0]
	if item.priority > maxP {
		return nil, item.priority - maxP
	}

	heap.Remove(mh2, 0)

	return item, 0
}

这个方法的效果是从优先级队列中弹出 优先级小于maxP的堆顶元素,如果堆顶元素的优先级大于给定的 maxP,则不弹出,返回 nil堆顶元素优先级与maxP的距离。这个场景我们更新一下:item 表示一个定时任务,item.Priority 表示这个任务的过期时间(时间戳),加上最小堆的性质,优先级队列就变成了以“哪个任务的执行时间最早(时间戳最小)”为优先级的队列,当我们调用这个方法时,传入当前时间戳,如果堆顶的元素的时间戳大于当前时间戳(item.Priority > maxP),说明最小堆中的最小的元素还没到执行时间,那么不弹出这个定时任务,返回 nil还剩多长时间才会到达,如此对调用方而言,就可以根据返回值进行一系列的逻辑判断,这正好应用于我们今天要实现的延迟队列:

  • 从最小堆中去拿距离现在最近的任务,如果拿到了,说明到期该执行了,则执行之;
  • 如果返回空的任务,但是时间间隔大于 0,说明队列中有任务,但是还没到期,这个时候调度程序可以等待这个任务到期;
  • 如果返回空任务并且时间间隔等于 0,说明队列中没有任务,这个时候可以放心去休眠,等待新的任务到来。

@JemmyH
Copy link
Owner Author

JemmyH commented Jul 8, 2021

DelayQueen 的实现

// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
// an element can only be taken when its delay has expired. The head of the
// queue is the *Delayed* element whose delay expired furthest in the past.
type DelayQueue struct {
	// 出口,当最小堆中有元素弹出(到期该执行)的时候,从这个 channel 发送出去
	C chan interface{}

	mu sync.Mutex
	// 优先级队列
	pq priorityQueue

	// Similar to the sleeping state of runtime.timers。delay_queen 状态,1:休眠,0:工作
	sleeping int32
	// 当有新的任务添加进来并且需要有可能会立即执行时,用于唤醒 delay_queen
	wakeupC chan struct{}
}

// NewDelayQueen creates an instance of delayQueue with the specified size.
func NewDelayQueen(size int) *DelayQueue {
	return &DelayQueue{
		C:       make(chan interface{}),
		pq:      newPriorityQueue(size),
		wakeupC: make(chan struct{}),
	}
}

DelayQueen 只有两个方法:

  • Offer:将 task 添加到队列中;
  • Poll :持续轮训等待,将到期该执行的任务通过 channel C 发送出去。

Offer

// Offer 将一个元素添加到当前的队列中
func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
	item := &item{Value: elem, Priority: expiration}

	dq.mu.Lock()
	heap.Push(&dq.pq, item)
	index := item.Index
	dq.mu.Unlock()

	if index == 0 {
		// index 表示这个 item 添加到队列中的 index,0 表示数组第一个元素,此时有可能需要马上去检查这个任务是否到期,
		// 因此需要启动已经休眠的 delay_queen 进行处理(已经启动的话就忽略)
		// CompareAndSwapInt32(addr *int32, old int32, new int32) ==> 如果 addr 处的值和 old 相同,则将 addr 的值设置为 new,并返回 true
		if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
			dq.wakeupC <- struct{}{}
		}
	}
}

Poll

// Poll 启动一个死循环,不断地从优先级队列中取出优先级最高的那个元素,并将元素对应的 value 通过 dq.C channel 发送出去
func (dq *DelayQueue) Poll(
	exitC chan struct{} /*如果从这个 channel 中收到消息,则停止循环*/,
	nowF func() int64   /*获取当前时间的函数,一般是 time.Now().Unix() 相关*/) {
	for {
		now := nowF()

		dq.mu.Lock()
		item, delta := dq.pq.PopWithPriority(now)
		if item == nil {
			// 没有到期的任务需要弹出,让 loop 休眠
			// No items left or at least one item is pending.

			// We must ensure the atomicity of the whole operation, which is
			// composed of the above PopWithPriority and the following StoreInt32,
			// to avoid possible race conditions between Offer and Poll.
			atomic.StoreInt32(&dq.sleeping, 1)
		}
		dq.mu.Unlock()

		if item == nil {
			if delta == 0 {
				// 优先级队列是空的,此时等待 Offer 调用,插入新的 item
				select {
				case <-dq.wakeupC:
					// Wait until a new item is added.
					// 当有元素加入时,继续循环,否则阻塞在此处
					continue
				case <-exitC:
					goto exit
				}
			} else if delta > 0 {
				// 优先级队列中有元素,但是优先级最高的那个还没到执行时间,还有 delta 的时间
				// 此时,可以一直等 delta 后,唤醒 loop,进行调度弹出;这个等待的过程中有可能就又有新的元素通过 Offer 进来,也会触发调度
				select {
				case <-dq.wakeupC:
					// A new item with an "earlier" expiration than the current "earliest" one is added.
					continue
				case <-time.After(time.Duration(delta) * time.Millisecond):
					// The current "earliest" item expires.

					// Reset the sleeping state since there's no need to receive from wakeupC.
					// 一直等到了堆顶元素执行,此时直接更新 loop 的状态为未休眠,并唤醒之
					if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
						// A caller of Offer() is being blocked on sending to wakeupC,
						// drain wakeupC to unblock the caller.
						<-dq.wakeupC
					}
					continue
				case <-exitC:
					goto exit
				}
			}
		}

		select {
		case dq.C <- item.Value:
			// 将到期的 item 的 Value 发送出去
			// The expired element has been sent out successfully.
		case <-exitC:
			goto exit
		}
	}

exit:
	// Reset the states
	atomic.StoreInt32(&dq.sleeping, 0)
}

@JemmyH JemmyH added the done label Jul 8, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
documentation Improvements or additions to documentation done Implementation by Go
Projects
None yet
Development

No branches or pull requests

1 participant