Skip to content

Commit

Permalink
Replace the flaky atomic queue with a safer mutex-based one
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarthayhurst committed Oct 3, 2024
1 parent 0bcde65 commit b76f197
Showing 1 changed file with 18 additions and 57 deletions.
75 changes: 18 additions & 57 deletions src/ammonite/utils/internal/threadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <queue>
#include <thread>

#include "../../types.hpp"
Expand All @@ -17,84 +18,44 @@ namespace {
AmmoniteGroup* group;
};

struct Node {
WorkItem workItem;
Node* nextNode;
};

//Implements a thread-safe queue to store and retrieve jobs from
class WorkQueue {
private:
std::mutex readMutex;
Node* nextPopped;
std::atomic<Node*> nextPushed;
std::queue<WorkItem> queue;
std::mutex queueLock;

public:
WorkQueue() {
//Start with an empty queue, 1 'old' node
nextPushed = new Node{{nullptr, nullptr, nullptr}, nullptr};
nextPopped = nextPushed;
}

~WorkQueue() {
//Clear out any remaining nodes
WorkItem workItem;
do {
this->pop(&workItem);
} while (workItem.work != nullptr);

//Clear up next free node
delete nextPopped;
}

void push(AmmoniteWork work, void* userPtr, AmmoniteGroup* group) {
//Create a new empty node
Node* newNode = new Node{{nullptr, nullptr, nullptr}, nullptr};

//Atomically swap the next node with newNode, then fill in the old new node now it's free
*(nextPushed.exchange(newNode)) = {{work, userPtr, group}, newNode};
queueLock.lock();
queue.push({work, userPtr, group});
queueLock.unlock();
}

void pushMultiple(AmmoniteWork work, void* userBuffer, int stride,
AmmoniteGroup* group, unsigned int count) {
//Generate section of linked list to insert
Node* newNode = new Node{{nullptr, nullptr, nullptr}, nullptr};
Node sectionStart;
Node* sectionPtr = &sectionStart;
AmmoniteGroup* group, unsigned int count) {
//Add multiple jobs in 1 pass
queueLock.lock();
if (userBuffer == nullptr) {
for (unsigned int i = 0; i < count; i++) {
sectionPtr->nextNode = new Node{{work, nullptr, group}, nullptr};
sectionPtr = sectionPtr->nextNode;
queue.push({work, nullptr, group});
}
} else {
for (unsigned int i = 0; i < count; i++) {
sectionPtr->nextNode = new Node{{
work, (void*)((char*)userBuffer + (std::size_t)(i) * stride), group}, nullptr};
sectionPtr = sectionPtr->nextNode;
queue.push({work, (void*)((char*)userBuffer + (std::size_t)(i) * stride), group});
}
}

//Insert the generated section atomically
sectionPtr->nextNode = newNode;
*(nextPushed.exchange(newNode)) = *sectionStart.nextNode;
delete sectionStart.nextNode;
queueLock.unlock();
}

void pop(WorkItem* workItemPtr) {
//Use the most recently popped node to find the next
readMutex.lock();

//Copy the data and free the old node, otherwise return if we don't have a new node
Node* currentNode = nextPopped;
if (currentNode->nextNode != nullptr) {
nextPopped = nextPopped->nextNode;
readMutex.unlock();

*workItemPtr = currentNode->workItem;
delete currentNode;
queueLock.lock();
if (!queue.empty()) {
*workItemPtr = queue.front();
queue.pop();
} else {
readMutex.unlock();
workItemPtr->work = nullptr;
}
queueLock.unlock();
}
};
}
Expand Down

0 comments on commit b76f197

Please sign in to comment.