Skip to content

Commit

Permalink
Merge pull request #8 from LIHPC-Computational-Geometry/cpu100_fan2
Browse files Browse the repository at this point in the history
Cpu100 fan2
  • Loading branch information
CharlesPignerol authored Feb 16, 2024
2 parents c23396f + a74a07b commit 59a96e3
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 156 deletions.
2 changes: 1 addition & 1 deletion cmake/version.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

set (TK_UTIL_MAJOR_VERSION "6")
set (TK_UTIL_MINOR_VERSION "7")
set (TK_UTIL_MINOR_VERSION "8")
set (TK_UTIL_RELEASE_VERSION "0")
set (TK_UTIL_VERSION ${TK_UTIL_MAJOR_VERSION}.${TK_UTIL_MINOR_VERSION}.${TK_UTIL_RELEASE_VERSION})

Expand Down
158 changes: 56 additions & 102 deletions src/TkUtil/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ void ThreadPool::TaskIfc::setStatus (STATUS status, const UTF8String& msg)

ThreadPool::WorkerThread::WorkerThread ( )
: _thread (0), _task (0), _haltMutex ( ), _halted (false), _completed (false)
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
// Ils seront initialisés dans execute.
{
} // WorkerThread::WorkerThread


Expand Down Expand Up @@ -125,17 +124,7 @@ void ThreadPool::WorkerThread::execute ( )
} // if (true == _halted)
}

if (0 == _task)
{
//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
const size_t delay = ThreadPool::yieldDelay ( );
if (0 == delay)
this_thread::yield ( );
else
this_thread::sleep_for (std::chrono::nanoseconds (delay)); // v 6.7.0
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;
} // if (0 == _task)
else
if (0 != _task)
{
try
{
Expand Down Expand Up @@ -179,16 +168,25 @@ void ThreadPool::WorkerThread::execute ( )

// Informer le gestionnaire de l'accomplissement de la tache :
ThreadPool::instance ( ).taskCompleted (*task);
} // while (false == _halted)
} // if (0 != _task)

{
unique_lock<mutex> haltLock (_haltMutex);
halted = _halted;
}
} // else if (0 == _task)

unique_lock<mutex> completedLock (_completedMutex);
_completed = true;

{ // v 6.8.0. La tache est considérée achevée :
unique_lock<mutex> completedLock (_completedMutex);
_completed = true;
}

// Attente d'une nouvelle tache à exécuter :
if (false == halted)
{ // v 6.8.0
unique_lock<mutex> sleepingLock (ThreadPool::instance ( ).getWakeUpCondMutex ( ));
ThreadPool::instance ( ).getWakeUpCondition ( ).wait (sleepingLock);
} // if (false == halted)
} // while (false == _halted)
} // WorkerThread::execute


Expand Down Expand Up @@ -229,12 +227,13 @@ void ThreadPool::WorkerThread::join ( )

ThreadPool* ThreadPool::_instance = 0;
bool ThreadPool::_completed = true; // !running
size_t ThreadPool::_yieldDelay = 100000; // v 6.7.0 - 1 milliseconde


ThreadPool::ThreadPool (size_t tasksNum)
: _thread (0), _tasksNum (tasksNum),
: _thread (0), _tasksNum (tasksNum), _halted (false), _barrier (false),
_queuedTasks ( ), _runningTasks ( ), _deadTasks ( ), _workerThreads ( ),
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _barrierCondMutex ( )
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _wakeUpCond ( ),
_barrierCondMutex ( ), _barrierCond ( ), _joinCond ( )
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
// Ils seront initialisés dans init.
} // ThreadPool::ThreadPool
Expand Down Expand Up @@ -267,23 +266,6 @@ void ThreadPool::initialize (size_t tasksNum)
if (0 != _instance)
throw Exception (UTF8String ("ThreadPool::initialize : API déjà initialisée.", charset));

#ifdef __INTEL_COMPILER
if (__INTEL_COMPILER < 1500)
{
ConsoleOutput::cerr ( )
<< "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
<< co_endl
<< "ATTENTION : LE MULTITHREADING AVEC L'UTILISATION DE L'API "
<< "ThreadPool RISQUE DE PROVOQUER DES PLANTAGES." << co_endl
<< "Cette API ThreadPool est incompatible avec les versions du "
<< "compilateur Intel antérieures à la version 15.0 (version "
<< "courante : " << (unsigned long)__INTEL_COMPILER << ")."
<< co_endl
<< "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
<< co_endl;
} // if (__INTEL_COMPILER < 1500)
#endif // __INTEL_COMPILER

_instance = new ThreadPool (0 == tasksNum ? MachineData::instance ( ).getProcessorsNum ( ) : tasksNum);
CHECK_NULL_PTR_ERROR (_instance)

Expand Down Expand Up @@ -317,18 +299,6 @@ ThreadPool& ThreadPool::instance ( )
} // ThreadPool::instance


size_t ThreadPool::yieldDelay ( ) // v 6.7.0
{
return _yieldDelay;
} // ThreadPool::yieldDelay


void ThreadPool::setYieldDelay (size_t delay) // v 6.7.0
{
_yieldDelay = delay;
} // ThreadPool::setYieldDelay


void ThreadPool::stop ( )
{
unique_lock<mutex> haltLock (_haltMutex);
Expand All @@ -338,35 +308,17 @@ void ThreadPool::stop ( )

void ThreadPool::stopWorkers ( )
{
// On demande l'arrêt des travailleurs. */
// On demande l'arrêt des travailleurs.
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
(*itw1)->stop ( );

// On attend l'arrêt des travailleurs. */
// On réveille les travailleurs s'ils étaient au chômage pour qu'ils
// puissent être joignables :
// On attend l'arrêt des travailleurs.
// On réveille les travailleurs s'ils étaient au chômage pour qu'ils puissent être joignables :
{
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
_wakeUpCond.notify_all ( );
}

bool completed = false;
while (false == completed)
{
// Pour une raison inconnue le réveil ci-dessus ne fonctionne pas toujours bien, on remet donc ici une couche tant que tous les threads
// ne sont pas achevés.
// 12/2017 - gcc 4.4.6
{
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
_wakeUpCond.notify_all ( );
}
this_thread::yield ( );
completed = true;
for (vector<WorkerThread*>::iterator itw3 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw3; itw3++)
if (false == (*itw3)->completed ( ))
completed = false;
} // while (false == completed)

for (vector<WorkerThread*>::iterator itw2 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw2; itw2++)
(*itw2)->join ( );
} // ThreadPool::stopWorkers
Expand Down Expand Up @@ -428,10 +380,9 @@ void ThreadPool::barrier ( )
_barrier = true;
}
unique_lock<mutex> barrierCondLock (_barrierCondMutex);
// Patch du 09/04/18 : Faut il se mettre en situation d'attente ?
// ATTENTION : on peut avoir _queuedTasks et _runningTasks vides.
unique_lock<mutex> tasksLock (_tasksMutex); // 09/04/18
if ((0 != _queuedTasks.size( )) || (0 != _runningTasks.size( )))// 09/04/18
unique_lock<mutex> tasksLock (_tasksMutex);
if ((0 != _queuedTasks.size( )) || (0 != _runningTasks.size( )))
{
tasksLock.unlock ( ); // 09/04/18
_barrierCond.wait (barrierCondLock);
Expand Down Expand Up @@ -488,8 +439,12 @@ void ThreadPool::execute ( )
unique_lock<mutex> completedLock (_tasksMutex);
_completed = false;
}

// On met les travailleurs en marche. */
{ // v 6.8.0
unique_lock<mutex> haltLock (_barrierMutex);
_barrier = false;
}

// On met les travailleurs en marche.
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
(*itw1)->start ( );

Expand All @@ -498,17 +453,25 @@ void ThreadPool::execute ( )
{
deleteDeadTasks ( );

//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
if (0 == _yieldDelay)
this_thread::yield ( );
else
this_thread::sleep_for (std::chrono::nanoseconds (_yieldDelay)); // v 6.7.0
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;

// Si nécessaire, relancer des taches :
{ // v 6.8.0
unique_lock<mutex> runLock (_tasksMutex);
const size_t queued = _queuedTasks.size ( );
if (0 != queued)
{
unique_lock<mutex> wakeUpLock (_wakeUpCondMutex);
if (1 == queued)
_wakeUpCond.notify_one ( );
else
_wakeUpCond.notify_all ( );
} // if (0 != queued)
}

checkBarrier ( );

{ // Mise en sommeil si absence de travail :
unique_lock<mutex> sleepLock (_tasksMutex);

// Pas de mutex sur _barrierCondMutex : volontaire
if ((false == _barrier) && (0 == _queuedTasks.size ( )) && (0 == _runningTasks.size ( )))
_tasksCond.wait (sleepLock);
Expand All @@ -520,11 +483,12 @@ void ThreadPool::execute ( )
}
} // while (false == halted)

// stopWorkers ( );
{
unique_lock<mutex> completedLock (_tasksMutex);
_completed = true;
}

_joinCond.notify_one ( ); // v 6.8.0
} // ThreadPool::execute


Expand All @@ -549,7 +513,6 @@ void ThreadPool::taskCompleted (ThreadPool::TaskIfc& task)
// On réveille les travailleurs s'ils étaient au chômage. Peut être y a t'il maintenant une tache pouvant être lancée en concurrence avec celles
// actives :
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
// _wakeUpCond.notify_all ( );
_wakeUpCond.notify_one ( ); // v 6.7.0
} // ThreadPool::taskCompleted

Expand All @@ -575,7 +538,6 @@ void ThreadPool::deleteWorkers ( )
unique_lock<mutex> tasksLock (_tasksMutex);

vector<WorkerThread*> workers = _workerThreads;
int step = 0;
while (false == _workerThreads.empty ( ))
{
for (vector<WorkerThread*>::iterator it = _workerThreads.begin ( ); _workerThreads.end ( ) != it; it++)
Expand Down Expand Up @@ -639,21 +601,19 @@ void ThreadPool::join ( )
_barrier = true;
}

// Idem ThreadPool::stopWorkers : pour une raison inconnue le réveil ne fonctionne pas forcément du premier coup, on remet donc ici plusieurs couches.
// 12/2017 - gcc 4.4.6
// for (int i = 0; i < 10; i++)
// Réveiller a minima l'instance de cette classe qui est éventuellement en sommeil dans la fonction execute ( ). Pour ce
// on réveille les travailleurs afin qu'ils soient eux-mêmes joignables :
{
unique_lock<mutex> sleepLock (_tasksMutex);
_tasksCond.notify_all ( );
this_thread::yield ( );
}
bool completed = false;
while (false == completed)
{
unique_lock<mutex> completionLock (_tasksMutex);
completed = _completed;
this_thread::yield ( );
} // while (false == completed)

// v 6.8.0. Attendre si nécessaire la fin d'execute ( ) :
std::mutex joinMutex;
unique_lock<mutex> joinLock (joinMutex);
_joinCond.wait (joinLock);

_thread->join ( );
} // ThreadPool::join

Expand Down Expand Up @@ -685,12 +645,6 @@ ThreadPool::TaskIfc* ThreadPool::getTask ( )
} // if (0 != _queuedTasks.size ( ))
}

/* if (0 == task) v 6.7.0 : appel à ThreadPool::getTask ( ) non bloquant
{
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
_wakeUpCond.wait (wakeUpCondLock);
} */

return task;
} // ThreadPool::getTask

Expand Down
Loading

0 comments on commit 59a96e3

Please sign in to comment.