-
Notifications
You must be signed in to change notification settings - Fork 1
/
threadPool.h
158 lines (136 loc) · 2.86 KB
/
threadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#include <iostream>
#include <string>
#include <queue>
#include <stdio.h>
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
#include <pthread.h>
#include <stdlib.h>
using namespace std;
bool DEBUG = false;
class Task{
public:
virtual void run()=0;
};
class TaskQ{
private:
std::queue<Task*> vTask;
pthread_mutex_t task_mutex;
pthread_cond_t cond_push;
public:
bool shouldStop;
TaskQ(){
shouldStop=false;
pthread_mutex_init(&task_mutex,0);
pthread_cond_init(&cond_push, 0);
}
void pushTask(Task* task)
{
if (!shouldStop) {
pthread_mutex_lock(&task_mutex);
vTask.push(task);
pthread_cond_signal(&cond_push);
pthread_mutex_unlock(&task_mutex);
}
}
Task* getTask() {
if (DEBUG)
cout<<"trying to get task , is stop ?"<< shouldStop<< "Q size: "<<vTask.size() <<endl;
pthread_mutex_lock(&task_mutex);
if ((shouldStop == true) && (vTask.size() == 0))
{
if (DEBUG)
cout<<"lets exit!!"<<endl;
pthread_mutex_unlock(&task_mutex);
return NULL;
}
else{
Task* task = NULL;
if (vTask.size() ==0){
if (DEBUG)
cout<<"On wait"<<endl;
pthread_cond_wait(&cond_push, &task_mutex);
}
//pop only if queue has something
if (vTask.size() !=0 ){
task = vTask.front();
vTask.pop();
}
pthread_mutex_unlock(&task_mutex);
return task;
}
}
void wait()
{
while(vTask.size() !=0){
sleep(1);
}
}
void finish(int numThreads)
{
shouldStop=true;
pthread_mutex_unlock(&task_mutex);
if (DEBUG)
cout<<"took lock "<<endl;
int rc = pthread_cond_broadcast(&cond_push);
if (DEBUG)
cout<<"signaled : "<<rc<<endl;
// pthread_mutex_unlock(&task_mutex);
}
};
void* assignTask(void * param){
// get the tasks from task q
if (DEBUG)
cout<<"Assign task"<<endl;
TaskQ* Q = (TaskQ*)param;
Task* t = NULL;
while((t = Q->getTask()) && t !=NULL){
if (DEBUG)
cout<<"Got task"<<endl;
if (t == NULL)
break;
t->run();
}
if (DEBUG)
cout<<"Done"<<endl;
}
class ThreadPool{
private:
unsigned int numThr;
pthread_t* workerThreads;
TaskQ* taskQ;
public:
ThreadPool(unsigned int numThreads):numThr(numThreads)
{
//ceate a array of threads
workerThreads = new pthread_t[numThr];
//create a task Q
taskQ = new TaskQ();
// assign a global function
for (int i=0; i< numThr; ++i) {
// cout<<"DEBUG: "<<"Creating thread"<<endl;
pthread_create(&(workerThreads[i]),0,assignTask,taskQ);
}
}
~ThreadPool()
{
wait();
for (int i=0; i< numThr; ++i){
pthread_join(workerThreads[i],0);
}
}
void addTask(Task* t)
{
taskQ->pushTask(t);
}
void wait(){
taskQ->wait();
}
void stop(){
//check if there are
taskQ->finish(numThr);
}
};