diff --git a/src/ammonite/core/threadManager.cpp b/src/ammonite/core/threadManager.cpp index 0d5164a..3d00f89 100644 --- a/src/ammonite/core/threadManager.cpp +++ b/src/ammonite/core/threadManager.cpp @@ -110,6 +110,48 @@ namespace ammonite { wakePool.notify_one(); } + //Specialised variants to submit multiple jobs + void submitMultiple(AmmoniteWork work, int newJobs) { + workQueueMutex.lock(); + for (int i = 0; i < newJobs; i++) { + workQueue.push({work, nullptr, nullptr}); + jobCount++; + wakePool.notify_one(); + } + workQueueMutex.unlock(); + } + + void submitMultipleUser(AmmoniteWork work, void** userPtrs, int newJobs) { + workQueueMutex.lock(); + for (int i = 0; i < newJobs; i++) { + workQueue.push({work, userPtrs[i], nullptr}); + jobCount++; + wakePool.notify_one(); + } + workQueueMutex.unlock(); + } + + void submitMultipleComp(AmmoniteWork work, std::atomic_flag* completions, int newJobs) { + workQueueMutex.lock(); + for (int i = 0; i < newJobs; i++) { + workQueue.push({work, nullptr, completions + i}); + jobCount++; + wakePool.notify_one(); + } + workQueueMutex.unlock(); + } + + void submitMultipleUserComp(AmmoniteWork work, void** userPtrs, + std::atomic_flag* completions, int newJobs) { + workQueueMutex.lock(); + for (int i = 0; i < newJobs; i++) { + workQueue.push({work, userPtrs[i], completions + i}); + jobCount++; + wakePool.notify_one(); + } + workQueueMutex.unlock(); + } + //Create thread pool, existing work will begin executing int createThreadPool(unsigned int extraThreads) { //Exit if thread pool already exists diff --git a/src/ammonite/core/threadManager.hpp b/src/ammonite/core/threadManager.hpp index cce0ae3..07767d5 100644 --- a/src/ammonite/core/threadManager.hpp +++ b/src/ammonite/core/threadManager.hpp @@ -15,6 +15,12 @@ namespace ammonite { void destroyThreadPool(); void submitWork(AmmoniteWork work, void* userPtr, std::atomic_flag* completion); + void submitMultiple(AmmoniteWork work, int jobCount); + void submitMultipleUser(AmmoniteWork work, void** userPtrs, int jobCount); + void submitMultipleComp(AmmoniteWork work, std::atomic_flag* completions, int jobCount); + void submitMultipleUserComp(AmmoniteWork work, void** userPtrs, std::atomic_flag* completions, + int jobCount); + void blockThreads(bool sync); void unblockThreads(bool sync); } diff --git a/src/ammonite/thread.cpp b/src/ammonite/thread.cpp index aa1a5b0..fb5f953 100644 --- a/src/ammonite/thread.cpp +++ b/src/ammonite/thread.cpp @@ -16,6 +16,46 @@ namespace ammonite { internal::submitWork(work, userPtr, completion); } + void submitMultiple(AmmoniteWork work, void** userPtrs, + std::atomic_flag* completions, int jobCount) { + const int threadCount = getThreadPoolSize(); + + int offset = 0; + for (int i = 0; i < jobCount / threadCount; i++) { + if (userPtrs == nullptr) { + if (completions == nullptr) { + ammonite::thread::internal::submitMultiple(work, threadCount); + } else { + ammonite::thread::internal::submitMultipleComp(work, completions + offset, threadCount); + } + } else { + if (completions == nullptr) { + ammonite::thread::internal::submitMultipleUser(work, userPtrs + offset, threadCount); + } else { + ammonite::thread::internal::submitMultipleUserComp(work, userPtrs + offset, + completions + offset, threadCount); + } + } + offset += threadCount; + } + + int remainingJobs = jobCount % threadCount; + if (userPtrs == nullptr) { + if (completions == nullptr) { + ammonite::thread::internal::submitMultiple(work, remainingJobs); + } else { + ammonite::thread::internal::submitMultipleComp(work, completions + offset, remainingJobs); + } + } else { + if (completions == nullptr) { + ammonite::thread::internal::submitMultipleUser(work, userPtrs + offset, remainingJobs); + } else { + ammonite::thread::internal::submitMultipleUserComp(work, userPtrs + offset, + completions + offset, remainingJobs); + } + } + } + void waitWorkComplete(std::atomic_flag* completion) { //Wait for completion to become true if (completion != nullptr) { diff --git a/src/ammonite/thread.hpp b/src/ammonite/thread.hpp index 469af5b..eb43dbf 100644 --- a/src/ammonite/thread.hpp +++ b/src/ammonite/thread.hpp @@ -8,6 +8,8 @@ namespace ammonite { unsigned int getThreadPoolSize(); void submitWork(AmmoniteWork work, void* userPtr); void submitWork(AmmoniteWork work, void* userPtr, std::atomic_flag* completion); + void submitMultiple(AmmoniteWork work, void** userPtrs, + std::atomic_flag* completions, int jobCount); void waitWorkComplete(std::atomic_flag* completion); void blockThreadsAsync(); diff --git a/src/threadDemo.cpp b/src/threadDemo.cpp index 4730cef..3895b06 100644 --- a/src/threadDemo.cpp +++ b/src/threadDemo.cpp @@ -27,9 +27,7 @@ int main() { //Submit 300000 fast 'jobs' long long int numJobs = 300000; std::atomic_flag* syncs = (std::atomic_flag*)std::malloc(sizeof(std::atomic_flag) * numJobs); - for (int i = 0; i < numJobs; i++) { - ammonite::thread::submitWork(shortTask, nullptr, &syncs[i]); - } + ammonite::thread::submitMultiple(shortTask, nullptr, syncs, numJobs); std::cout << "Submitted in " << runTimer.getTime() << "s" << std::endl; //Wait for work to complete