-
Notifications
You must be signed in to change notification settings - Fork 0
/
WorkerPool.h
117 lines (106 loc) · 3.41 KB
/
WorkerPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#pragma once
#include <thread>
#include <memory>
#include <functional>
#include <mutex>
#include "MultiSemaphore.h"
#include <queue>
#include "FlowLog.h"
#include "Worker.h"
#include <functional>
#include <map>
class WorkerPool {
public:
WorkerPool(const size_t &workerCount = std::thread::hardware_concurrency()) : workerCount(workerCount) {
for (int i = 0; i < workerCount; ++i) {
auto worker = std::make_shared<Worker>(workerId);
workerMap[workerId] = worker;
++workerId;
const auto onIdleCallback = std::make_shared<std::function<void(Worker *worker)>>([&](Worker *worker) {
std::unique_lock<std::mutex> lock(poolMutex);
toWaitFor.unlock();
if (!isStopping)
idleWorker.push(workerMap.at(worker->getId()));
startWorker();
// LOG_INFO << "Idle: "<< "Locks " << toWaitFor.count() << " Tasks " << tasks.size();
});
worker->onIdle(onIdleCallback);
const auto onStopCallback = std::make_shared<std::function<void(Worker *worker)>>([&](Worker *worker) {
toWaitFor.unlock();
startWorker();
LOG_INFO << "Stop: " << "Locks " << toWaitFor.count() << " Tasks " << tasks.size();
});
worker->onStop(onStopCallback);
idleWorker.push(worker);
}
}
~WorkerPool() {
stop();
}
void wait() {
toWaitFor.wait();
}
void addTask(std::shared_ptr<std::function<void()>> function) {
std::lock_guard guard(tasksMutex);
tasks.emplace(function);
toWaitFor.addLock();
}
void stop() {
isStopping = true;
for (const auto entry : workerMap) {
entry.second->stop();
}
}
void start() {
std::unique_lock<std::mutex> lock(poolMutex, std::try_to_lock);
if (lock.owns_lock()) {
startWorker();
}
}
size_t size() const {
return tasks.size();
}
void join() {
isJoin = true;
while (!tasks.empty()) {
start();
toWaitFor.wait();
for (const auto entry : workerMap) {
entry.second->join();
}
}
}
private:
void startWorker() {
std::lock_guard guard(tasksMutex);
while (!tasks.empty() && !idleWorker.empty()) {
try {
const auto toRun = tasks.front();
tasks.pop();
const auto worker = idleWorker.front();
idleWorker.pop();
worker->assignTask(toRun);
} catch (const std::system_error &e) {
LOG_WARNING << "Code " << e.code()
<< " meaning " << e.what() << '\n';
tasks.pop();
toWaitFor.unlock();
}
}
if (isJoin && tasks.empty() && toWaitFor.count() == 0) {
for (const auto entry : workerMap) {
entry.second->stop();
}
}
}
bool isStopping = false;
bool isJoin = false;
std::atomic_size_t workerId;
std::queue<std::shared_ptr<std::function<void()>>> tasks;
MultiSemaphore toWaitFor;
std::mutex poolMutex;
std::mutex tasksMutex;
size_t workerCount;
std::queue<std::shared_ptr<Worker>> idleWorker;
std::unordered_map<size_t, std::shared_ptr<Worker>> workerMap;
};