Skip to content

Commit

Permalink
Support submitting multiple jobs at once to thread manager
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarthayhurst committed Feb 5, 2024
1 parent 8250d82 commit 927b982
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 3 deletions.
42 changes: 42 additions & 0 deletions src/ammonite/core/threadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,48 @@ namespace ammonite {
wakePool.notify_one();
}

//Specialised variants to submit multiple jobs
void submitMultiple(AmmoniteWork work, int newJobs) {
workQueueMutex.lock();
for (int i = 0; i < newJobs; i++) {
workQueue.push({work, nullptr, nullptr});
jobCount++;
wakePool.notify_one();
}
workQueueMutex.unlock();
}

void submitMultipleUser(AmmoniteWork work, void** userPtrs, int newJobs) {
workQueueMutex.lock();
for (int i = 0; i < newJobs; i++) {
workQueue.push({work, userPtrs[i], nullptr});
jobCount++;
wakePool.notify_one();
}
workQueueMutex.unlock();
}

void submitMultipleComp(AmmoniteWork work, std::atomic_flag* completions, int newJobs) {
workQueueMutex.lock();
for (int i = 0; i < newJobs; i++) {
workQueue.push({work, nullptr, completions + i});
jobCount++;
wakePool.notify_one();
}
workQueueMutex.unlock();
}

void submitMultipleUserComp(AmmoniteWork work, void** userPtrs,
std::atomic_flag* completions, int newJobs) {
workQueueMutex.lock();
for (int i = 0; i < newJobs; i++) {
workQueue.push({work, userPtrs[i], completions + i});
jobCount++;
wakePool.notify_one();
}
workQueueMutex.unlock();
}

//Create thread pool, existing work will begin executing
int createThreadPool(unsigned int extraThreads) {
//Exit if thread pool already exists
Expand Down
6 changes: 6 additions & 0 deletions src/ammonite/core/threadManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ namespace ammonite {
void destroyThreadPool();

void submitWork(AmmoniteWork work, void* userPtr, std::atomic_flag* completion);
void submitMultiple(AmmoniteWork work, int jobCount);
void submitMultipleUser(AmmoniteWork work, void** userPtrs, int jobCount);
void submitMultipleComp(AmmoniteWork work, std::atomic_flag* completions, int jobCount);
void submitMultipleUserComp(AmmoniteWork work, void** userPtrs, std::atomic_flag* completions,
int jobCount);

void blockThreads(bool sync);
void unblockThreads(bool sync);
}
Expand Down
40 changes: 40 additions & 0 deletions src/ammonite/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,46 @@ namespace ammonite {
internal::submitWork(work, userPtr, completion);
}

void submitMultiple(AmmoniteWork work, void** userPtrs,
std::atomic_flag* completions, int jobCount) {
const int threadCount = getThreadPoolSize();

int offset = 0;
for (int i = 0; i < jobCount / threadCount; i++) {
if (userPtrs == nullptr) {
if (completions == nullptr) {
ammonite::thread::internal::submitMultiple(work, threadCount);
} else {
ammonite::thread::internal::submitMultipleComp(work, completions + offset, threadCount);
}
} else {
if (completions == nullptr) {
ammonite::thread::internal::submitMultipleUser(work, userPtrs + offset, threadCount);
} else {
ammonite::thread::internal::submitMultipleUserComp(work, userPtrs + offset,
completions + offset, threadCount);
}
}
offset += threadCount;
}

int remainingJobs = jobCount % threadCount;
if (userPtrs == nullptr) {
if (completions == nullptr) {
ammonite::thread::internal::submitMultiple(work, remainingJobs);
} else {
ammonite::thread::internal::submitMultipleComp(work, completions + offset, remainingJobs);
}
} else {
if (completions == nullptr) {
ammonite::thread::internal::submitMultipleUser(work, userPtrs + offset, remainingJobs);
} else {
ammonite::thread::internal::submitMultipleUserComp(work, userPtrs + offset,
completions + offset, remainingJobs);
}
}
}

void waitWorkComplete(std::atomic_flag* completion) {
//Wait for completion to become true
if (completion != nullptr) {
Expand Down
2 changes: 2 additions & 0 deletions src/ammonite/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace ammonite {
unsigned int getThreadPoolSize();
void submitWork(AmmoniteWork work, void* userPtr);
void submitWork(AmmoniteWork work, void* userPtr, std::atomic_flag* completion);
void submitMultiple(AmmoniteWork work, void** userPtrs,
std::atomic_flag* completions, int jobCount);
void waitWorkComplete(std::atomic_flag* completion);

void blockThreadsAsync();
Expand Down
4 changes: 1 addition & 3 deletions src/threadDemo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ int main() {
//Submit 300000 fast 'jobs'
long long int numJobs = 300000;
std::atomic_flag* syncs = (std::atomic_flag*)std::malloc(sizeof(std::atomic_flag) * numJobs);
for (int i = 0; i < numJobs; i++) {
ammonite::thread::submitWork(shortTask, nullptr, &syncs[i]);
}
ammonite::thread::submitMultiple(shortTask, nullptr, syncs, numJobs);
std::cout << "Submitted in " << runTimer.getTime() << "s" << std::endl;

//Wait for work to complete
Expand Down

0 comments on commit 927b982

Please sign in to comment.