diff --git a/src/ammonite/core/threadManager.cpp b/src/ammonite/core/threadManager.cpp index e7da74c..e2703aa 100644 --- a/src/ammonite/core/threadManager.cpp +++ b/src/ammonite/core/threadManager.cpp @@ -35,6 +35,9 @@ namespace ammonite { //If transitioning between the two, it'll remain at its old value until complete std::atomic_flag threadsUnblockedFlag; std::atomic blockedThreadCount; + //0 when an unblock starts, 1 when a block starts + //Any other value means the system is broken + std::atomic blockBalance; std::queue workQueue; std::mutex workQueueMutex; @@ -181,6 +184,7 @@ namespace ammonite { unblockThreadsTrigger.test_and_set(); threadsUnblockedFlag.test_and_set(); blockedThreadCount = 0; + blockBalance = 0; extraThreadCount = extraThreads; return 0; } @@ -188,6 +192,12 @@ namespace ammonite { //Jobs submitted at the same time may execute, but the threads will block after //Guarantees work submitted after won't begin yet void blockThreads(bool sync) { + //Skip blocking if it's already blocked / going to block + if (blockBalance > 0) { + return; + } + blockBalance++; + //Submit a job for each thread that waits for the trigger unblockThreadsTrigger.clear(); workQueueMutex.lock(); @@ -206,6 +216,12 @@ namespace ammonite { } void unblockThreads(bool sync) { + //Only unblock if it's already blocked / blocking + if (blockBalance == 0) { + return; + } + blockBalance--; + //Unblock threads and wake them up unblockThreadsTrigger.test_and_set(); unblockThreadsTrigger.notify_all(); @@ -243,12 +259,13 @@ namespace ammonite { //Finish work already in the queue and kill the threads void destroyThreadPool() { //Finish existing work and block new work from starting + unblockThreads(true); blockThreads(true); - //Kill all threads when they wake up + //Kill all threads when the're unblocked wake up stayAlive = false; - //Unlock threads and wake them up + //Unblock threads and wake them up unblockThreads(true); //Wait until all threads are done