From 1e013de723508691d260ce187e26836ae859b408 Mon Sep 17 00:00:00 2001 From: Stuart Hayhurst Date: Mon, 23 Sep 2024 18:51:17 +0100 Subject: [PATCH] Replace the flaky atomic queue with a safer mutex-based one --- src/ammonite/utils/internal/threadPool.cpp | 75 ++++++---------------- 1 file changed, 18 insertions(+), 57 deletions(-) diff --git a/src/ammonite/utils/internal/threadPool.cpp b/src/ammonite/utils/internal/threadPool.cpp index 9726ff2..02361d9 100644 --- a/src/ammonite/utils/internal/threadPool.cpp +++ b/src/ammonite/utils/internal/threadPool.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "../../types.hpp" @@ -17,84 +18,44 @@ namespace { AmmoniteGroup* group; }; - struct Node { - WorkItem workItem; - Node* nextNode; - }; - + //Implements a thread-safe queue to store and retrieve jobs from class WorkQueue { private: - std::mutex readMutex; - Node* nextPopped; - std::atomic nextPushed; + std::queue queue; + std::mutex queueLock; public: - WorkQueue() { - //Start with an empty queue, 1 'old' node - nextPushed = new Node{{nullptr, nullptr, nullptr}, nullptr}; - nextPopped = nextPushed; - } - - ~WorkQueue() { - //Clear out any remaining nodes - WorkItem workItem; - do { - this->pop(&workItem); - } while (workItem.work != nullptr); - - //Clear up next free node - delete nextPopped; - } - void push(AmmoniteWork work, void* userPtr, AmmoniteGroup* group) { - //Create a new empty node - Node* newNode = new Node{{nullptr, nullptr, nullptr}, nullptr}; - - //Atomically swap the next node with newNode, then fill in the old new node now it's free - *(nextPushed.exchange(newNode)) = {{work, userPtr, group}, newNode}; + queueLock.lock(); + queue.push({work, userPtr, group}); + queueLock.unlock(); } void pushMultiple(AmmoniteWork work, void* userBuffer, int stride, - AmmoniteGroup* group, unsigned int count) { - //Generate section of linked list to insert - Node* newNode = new Node{{nullptr, nullptr, nullptr}, nullptr}; - Node sectionStart; - Node* sectionPtr = §ionStart; + AmmoniteGroup* group, unsigned int count) { + //Add multiple jobs in 1 pass + queueLock.lock(); if (userBuffer == nullptr) { for (unsigned int i = 0; i < count; i++) { - sectionPtr->nextNode = new Node{{work, nullptr, group}, nullptr}; - sectionPtr = sectionPtr->nextNode; + queue.push({work, nullptr, group}); } } else { for (unsigned int i = 0; i < count; i++) { - sectionPtr->nextNode = new Node{{ - work, (void*)((char*)userBuffer + (std::size_t)(i) * stride), group}, nullptr}; - sectionPtr = sectionPtr->nextNode; + queue.push({work, (void*)((char*)userBuffer + (std::size_t)(i) * stride), group}); } } - - //Insert the generated section atomically - sectionPtr->nextNode = newNode; - *(nextPushed.exchange(newNode)) = *sectionStart.nextNode; - delete sectionStart.nextNode; + queueLock.unlock(); } void pop(WorkItem* workItemPtr) { - //Use the most recently popped node to find the next - readMutex.lock(); - - //Copy the data and free the old node, otherwise return if we don't have a new node - Node* currentNode = nextPopped; - if (currentNode->nextNode != nullptr) { - nextPopped = nextPopped->nextNode; - readMutex.unlock(); - - *workItemPtr = currentNode->workItem; - delete currentNode; + queueLock.lock(); + if (!queue.empty()) { + *workItemPtr = queue.front(); + queue.pop(); } else { - readMutex.unlock(); workItemPtr->work = nullptr; } + queueLock.unlock(); } }; }