Skip to content

Commit

Permalink
fix slotqueue worker starvation (#1081)
Browse files Browse the repository at this point in the history
* fix slotqueue worker starvation

* improve slotqueue tests

Co-Authored-By: Marcin Czenko <marcin.czenko@pm.me>

* slotqueue nph formatting

---------

Co-authored-by: Marcin Czenko <marcin.czenko@pm.me>
  • Loading branch information
markspanbroek and marcinczenko authored Jan 23, 2025
1 parent 1c4184f commit f6c792d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
2 changes: 2 additions & 0 deletions codex/sales/slotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ proc run(self: SlotQueue) {.async: (raises: []).} =
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
worker.doneProcessing.complete()
if err =? self.addWorker().errorOption:
error "error adding new worker", error = err.msg
await sleepAsync(1.millis) # poll
continue

Expand Down
50 changes: 43 additions & 7 deletions tests/codex/sales/testslotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ suite "Slot queue":
check queue.len == 0
check $queue == "[]"

test "starts with 0 active workers":
newSlotQueue(maxSize = 2, maxWorkers = 2)
check eventually queue.running
check queue.activeWorkers == 0

test "reports correct size":
newSlotQueue(maxSize = 2, maxWorkers = 2)
check queue.size == 2
Expand Down Expand Up @@ -506,14 +511,9 @@ suite "Slot queue":
]
)

test "processing a 'seen' item pauses the queue":
test "queue starts paused":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item =
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = true)
check queue.push(item).isOk
check eventually queue.paused
check onProcessSlotCalledWith.len == 0
check queue.paused

test "pushing items to queue unpauses queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
Expand Down Expand Up @@ -546,6 +546,42 @@ suite "Slot queue":
check eventually onProcessSlotCalledWith == @[(item.requestId, item.slotIndex)]
check eventually queue.len == 0

test "processing a 'seen' item pauses the queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let unseen =
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false)
let seen =
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true)
# push causes unpause
check queue.push(unseen).isSuccess
# check all items processed
check eventually queue.len == 0
# push seen item
check queue.push(seen).isSuccess
# queue should be paused
check eventually queue.paused

test "processing a 'seen' item does not decrease the number of workers":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let unseen =
SlotQueueItem.init(request.id, 0'u16, request.ask, request.expiry, seen = false)
let seen =
SlotQueueItem.init(request.id, 1'u16, request.ask, request.expiry, seen = true)
# push seen item to ensure that queue is pausing
check queue.push(seen).isSuccess
# unpause and pause a number of times
for _ in 0 ..< 10:
# push unseen item to unpause the queue
check queue.push(unseen).isSuccess
# wait for unseen item to be processed
check eventually queue.len == 1
# wait for queue to pause because of seen item
check eventually queue.paused
# check that the number of workers equals maximimum workers
check eventually queue.activeWorkers == 0

test "item 'seen' flags can be cleared":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example
Expand Down

0 comments on commit f6c792d

Please sign in to comment.