Skip to content

Commit

Permalink
Implement random queue scheduler strategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
marianogappa committed Oct 2, 2024
1 parent 215ffbe commit 0a60b22
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 420 deletions.
69 changes: 69 additions & 0 deletions scheduler/queue/active_work_signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package queue

import (
"sync"
"sync/atomic"
)

// activeWorkSignal is a thread-safe coordinator for awaiting a worker pool
// that relies on a queue that might be temporarily empty.
//
// If queue is empty and workers idle, done!
//
// If the queue is empty but a worker is working on a task, we must wait and check
// if it's empty after that worker finishes. That's why we need this.
//
// Use it like this:
//
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
//
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
// - If workers are still active, call `Wait()` to block until state changes.
type activeWorkSignal struct {
countChangeSignal *sync.Cond
activeWorkUnitCount *atomic.Int32
isStarted *atomic.Bool
}

func newActiveWorkSignal() *activeWorkSignal {
return &activeWorkSignal{
countChangeSignal: sync.NewCond(&sync.Mutex{}),
activeWorkUnitCount: &atomic.Int32{},
isStarted: &atomic.Bool{},
}
}

// Add means a worker has started working on a task.
//
// Wake up the work queuing goroutine.
func (s *activeWorkSignal) Add() {
s.activeWorkUnitCount.Add(1)
s.isStarted.Store(true)
s.countChangeSignal.Signal()
}

// Done means a worker has finished working on a task.
//
// If the count became zero, wake up the work queuing goroutine (might have finished).
func (s *activeWorkSignal) Done() {
s.activeWorkUnitCount.Add(-1)
// if s.activeWorkUnitCount.Load() == 0 {
s.countChangeSignal.Signal()
// }
}

// IsIdle returns true if no workers are active. If queue is empty and workers idle, done!
func (s *activeWorkSignal) IsIdle() bool {
return s.isStarted.Load() && s.activeWorkUnitCount.Load() <= 0
}

// Wait blocks until the count of active workers changes.
func (s *activeWorkSignal) Wait() {
if s.activeWorkUnitCount.Load() <= 0 {
return
}
s.countChangeSignal.L.Lock()
defer s.countChangeSignal.L.Unlock()
s.countChangeSignal.Wait()
}
40 changes: 40 additions & 0 deletions scheduler/queue/concurrent_random_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package queue

import (
"math/rand"
"sync"
)

// ConcurrentRandomQueue is a generic, thread-safe queue
// that pops random elements in O(1) time.
type ConcurrentRandomQueue[T any] struct {
mu sync.Mutex
queue []T
}

func NewConcurrentRandomQueue[T any](capacityHint int) *ConcurrentRandomQueue[T] {
return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint)}
}

func (q *ConcurrentRandomQueue[T]) Push(item T) {
q.mu.Lock()
defer q.mu.Unlock()

q.queue = append(q.queue, item)
}

func (q *ConcurrentRandomQueue[T]) Pop() *T {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.queue) == 0 {
return nil
}
idx := rand.Intn(len(q.queue))
lastIdx := len(q.queue) - 1
q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx]
item := q.queue[lastIdx]
q.queue = q.queue[:lastIdx]

return &item
}
129 changes: 0 additions & 129 deletions scheduler/queue/priorityqueue.go

This file was deleted.

93 changes: 0 additions & 93 deletions scheduler/queue/priorityqueue_test.go

This file was deleted.

Loading

0 comments on commit 0a60b22

Please sign in to comment.