diff --git a/CMakeLists.txt b/CMakeLists.txt index 5200bf33..5f0deb46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -95,6 +95,9 @@ if( ENKITS_BUILD_EXAMPLES ) add_executable( TestWaitforTask example/TestWaitforTask.cpp ) target_link_libraries(TestWaitforTask enkiTS ) + add_executable( ExternalTaskThread example/ExternalTaskThread.cpp ) + target_link_libraries(ExternalTaskThread enkiTS ) + if( ENKITS_BUILD_C_INTERFACE ) add_executable( ParallelSum_c example/ParallelSum_c.c ) target_link_libraries(ParallelSum_c enkiTS ) @@ -106,5 +109,9 @@ if( ENKITS_BUILD_C_INTERFACE ) add_executable( Priorities_c example/Priorities_c.c ) target_link_libraries(Priorities_c enkiTS ) endif() + + add_executable( ExternalTaskThread_c example/ExternalTaskThread_c.c ) + target_link_libraries(ExternalTaskThread_c enkiTS ) + endif() endif() diff --git a/README.md b/README.md index f806861e..248d0be8 100644 --- a/README.md +++ b/README.md @@ -45,10 +45,13 @@ For cmake, on Windows / Mac OS X / Linux with cmake installed, open a prompt in 1. *Up-front Allocation friendly* - enkiTS is designed for zero allocations during scheduling. 1. *Can pin tasks to a given thread* - enkiTS can schedule a task which will only be run on the specified thread. 1. *Can set task priorities* - Up to 5 task priorities can be configured via define ENKITS_TASK_PRIORITIES_NUM (defaults to 3). Higher priority tasks are run before lower priority ones. +1. **NEW** *Can register external threads to use with enkiTS* - Can configure enkiTS with numExternalTaskThreads which can be registered to use with the enkiTS API. ## Usage C++ usage: +- full example in [example/ParallelSum.cpp](example/ParallelSum.cpp) +- C example in [example/ParallelSum_c.c](example/ParallelSum_c.c) ```C #include "TaskScheduler.h" @@ -56,23 +59,25 @@ enki::TaskScheduler g_TS; // define a task set, can ignore range if we only do one thing struct ParallelTaskSet : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) { - // do something here, can issue tasks with g_TS - } + virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) { + // do something here, can issue tasks with g_TS + } }; int main(int argc, const char * argv[]) { - g_TS.Initialize(); - ParallelTaskSet task; // default constructor has a set size of 1 - g_TS.AddTaskSetToPipe( &task ); + g_TS.Initialize(); + ParallelTaskSet task; // default constructor has a set size of 1 + g_TS.AddTaskSetToPipe( &task ); - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. - g_TS.WaitforTask( &task ); - return 0; + // wait for task set (running tasks if they exist) + since we've just added it and it has no range we'll likely run it. + g_TS.WaitforTask( &task ); + return 0; } ``` C++ 11 lambda usage: +- full example in [example/LambdaTask.cpp](example/LambdaTask.cpp) ```C #include "TaskScheduler.h" @@ -91,7 +96,9 @@ int main(int argc, const char * argv[]) { } ``` -Task priorities usage in C++ (see example/Priorities_c.c for C example). +Task priorities usage in C++: +- full example in [example/Priorities.cpp](example/Priorities.cpp) +- C example in [example/Priorities_c.c](example/Priorities_c.c) ```C // See full example in Priorities.cpp #include "TaskScheduler.h" @@ -136,7 +143,9 @@ int main(int argc, const char * argv[]) } ``` -Pinned Tasks usage in C++ (see example/PinnedTask_c.c for C example). +Pinned Tasks usage in C++: +- full example in [example/PinnedTask.cpp](example/PinnedTask.cpp) +- C example in [example/PinnedTask_c.c](example/PinnedTask_c.c) ```C #include "TaskScheduler.h" @@ -146,23 +155,68 @@ enki::TaskScheduler g_TS; struct PinnedTask : enki::IPinnedTask { virtual void Execute() { // do something here, can issue tasks with g_TS - } + } }; int main(int argc, const char * argv[]) { - g_TS.Initialize(); - PinnedTask task; //default constructor sets thread for pinned task to 0 (main thread) - g_TS.AddPinnedTask( &task ); - - // RunPinnedTasks must be called on main thread to run any pinned tasks for that thread. - // Tasking threads automatically do this in their task loop. - g_TS.RunPinnedTasks(); - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. - g_TS.WaitforTask( &task ); - return 0; + g_TS.Initialize(); + PinnedTask task; //default constructor sets thread for pinned task to 0 (main thread) + g_TS.AddPinnedTask( &task ); + + // RunPinnedTasks must be called on main thread to run any pinned tasks for that thread. + // Tasking threads automatically do this in their task loop. + g_TS.RunPinnedTasks(); + + // wait for task set (running tasks if they exist) + // since we've just added it and it has no range we'll likely run it. + g_TS.WaitforTask( &task ); + return 0; } ``` +External thread usage in C++: +- full example in [example/ExternalTaskThread.cpp](example/ExternalTaskThread.cpp) +- C example in [example/ExternalTaskThread_c.c](example/ExternalTaskThread_c.c) +```C +#include "TaskScheduler.h" + +enki::TaskScheduler g_TS; +struct ParallelTaskSet : ITaskSet +{ + virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + { + // Do something + } +}; + +void threadFunction() +{ + g_TS.RegisterExternalTaskThread(); + + // sleep for a while instead of doing something such as file IO + std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); + + ParallelTaskSet task; + g_TS.AddTaskSetToPipe( &task ); + g_TS.WaitforTask( &task); + + g_TS.DeRegisterExternalTaskThread(); +} + +int main(int argc, const char * argv[]) +{ + enki::TaskSchedulerConfig config; + config.numExternalTaskThreads = 1; // we have one extra external thread + + g_TS.Initialize( config ); + + std::thread exampleThread( threadFunction ); + + exampleThread.join(); + + return 0; +} +``` C usage: ```C @@ -178,13 +232,14 @@ int main(int argc, const char * argv[]) { enkiTaskSet* pTask; g_pTS = enkiNewTaskScheduler(); enkiInitTaskScheduler( g_pTS ); - + // create a task, can re-use this to get allocation occurring on startup - pTask = enkiCreateTaskSet( g_pTS, ParalleTaskSetFunc ); + pTask = enkiCreateTaskSet( g_pTS, ParalleTaskSetFunc ); enkiAddTaskSetToPipe( g_pTS, pTask, NULL, 1); // NULL args, setsize of 1 - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. + // wait for task set (running tasks if they exist) + // since we've just added it and it has no range we'll likely run it. enkiWaitForTaskSet( g_pTS, pTask ); enkiDeleteTaskSet( pTask ); @@ -195,6 +250,7 @@ int main(int argc, const char * argv[]) { } ``` + ## Bindings - C# [EnkiTasks C#](https://github.com/nxrighthere/EnkiTasks-CSharp) diff --git a/example/ExternalTaskThread.cpp b/example/ExternalTaskThread.cpp new file mode 100644 index 00000000..3e9e124f --- /dev/null +++ b/example/ExternalTaskThread.cpp @@ -0,0 +1,95 @@ +// Copyright (c) 2019 Doug Binks +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgement in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. + +#include "TaskScheduler.h" + +#include +#include + +using namespace enki; + +TaskScheduler g_TS; +static std::atomic g_Run; + + +struct ParallelTaskSet : ITaskSet +{ + ParallelTaskSet() { m_SetSize = 100; } + + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) + { + printf(" Run %d: This could run on any thread, currently thread %d\n", g_Run.load(), threadnum_); + + // sleep used as a 'pretend' workload + std::chrono::milliseconds sleepTime( range_.end - range_.start ); + std::this_thread::sleep_for( sleepTime ); + } +}; + +// Example thread function +// May want to use threads for blocking IO, during which enkiTS task threads can do work +void threadFunction( uint32_t num_ ) +{ + bool bRegistered = g_TS.RegisterExternalTaskThread(); + assert( bRegistered ); + if( bRegistered ) + { + // sleep for a while instead of doing something such as file IO + std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); + + + ParallelTaskSet task; + g_TS.AddTaskSetToPipe( &task ); + g_TS.WaitforTask( &task); + g_TS.DeRegisterExternalTaskThread(); + } +} + +static const int REPEATS = 5; +static const uint32_t NUMEXTERNALTHREADS = 5; + +int main(int argc, const char * argv[]) +{ + enki::TaskSchedulerConfig config; + config.numExternalTaskThreads = NUMEXTERNALTHREADS; + + std::thread threads[NUMEXTERNALTHREADS]; + g_TS.Initialize( config ); + + + for( g_Run = 0; g_Run< REPEATS; ++g_Run ) + { + printf("Run %d\n", g_Run.load() ); + + for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) + { + threads[ iThread ] = std::thread( threadFunction, iThread ); + } + + // check that out of order Deregister / Register works... + threads[ 0 ].join(); + threads[ 0 ] = std::thread( threadFunction, 0 ); + + for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) + { + threads[ iThread ].join(); + } + } + + return 0; +} diff --git a/example/ExternalTaskThread_c.c b/example/ExternalTaskThread_c.c new file mode 100644 index 00000000..11e7fc0a --- /dev/null +++ b/example/ExternalTaskThread_c.c @@ -0,0 +1,122 @@ +// Copyright (c) 2019 Doug Binks +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgement in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. + +#include "TaskScheduler_c.h" +#include +#include +#include +#include + +// XPLATF Thread handling functions for C +#ifdef _WIN32 + +#define NOMINMAX +#define WIN32_LEAN_AND_MEAN +#include + +typedef HANDLE threadid_t; +#define THREADFUNC_DECL DWORD WINAPI + +// declare the thread start function as: +// THREADFUNC_DECL MyThreadStart( void* pArg ); +int32_t ThreadCreate( threadid_t* returnid, DWORD ( WINAPI *StartFunc) (void* ), void* pArg ) +{ + DWORD threadid; + *returnid = CreateThread( 0, 0, StartFunc, pArg, 0, &threadid ); + return *returnid != NULL; +} + +int32_t ThreadJoin( threadid_t threadid ) +{ + return WaitForSingleObject( threadid, INFINITE ) == 0; +} + +#else // posix +#include +#include + +typedef pthread_t threadid_t; +#define THREADFUNC_DECL void* + +// declare the thread start function as: +// THREADFUNC_DECL MyThreadStart( void* pArg ); +int32_t ThreadCreate( threadid_t* returnid, void* ( *StartFunc) (void* ), void* pArg ) +{ + int32_t retval = pthread_create( returnid, NULL, StartFunc, pArg ); + + return retval == 0; +} + +int32_t ThreadJoin( threadid_t threadid ) +{ + return pthread_join( threadid, NULL ) == 0; +} +#endif + +enkiTaskScheduler* pETS; +enkiTaskSet* pParallelTask; + + +void ParallelFunc( uint32_t start_, uint32_t end, uint32_t threadnum_, void* pArgs_ ) +{ + // do something + printf("ParallelFunc running on thread %d (could be any thread)\n", threadnum_ ); +} + +THREADFUNC_DECL ThreadFunc( void* pArgs_ ) +{ + uint32_t threadNum; + int retVal; + + retVal = enkiRegisterExternalTaskThread( pETS ); + assert( retVal ); + + threadNum = enkiGetThreadNum( pETS ); + assert( threadNum == 1 ); + printf("ThreadFunc running on thread %d (should be thread 1)\n", threadNum ); + + pParallelTask = enkiCreateTaskSet( pETS, ParallelFunc ); + + enkiAddTaskSetToPipe( pETS, pParallelTask, NULL, 1); + enkiWaitForTaskSet( pETS, pParallelTask ); + + enkiDeleteTaskSet( pParallelTask ); + + enkiDeRegisterExternalTaskThread( pETS ); + return 0; +} + +int main(int argc, const char * argv[]) +{ + struct enkiTaskSchedulerConfig config; + + pETS = enkiNewTaskScheduler(); + + // get default config and request one external thread + config = enkiGetTaskSchedulerConfig( pETS ); + config.numExternalTaskThreads = 1; + enkiInitTaskSchedulerWithConfig( pETS, config ); + + threadid_t threadID; + ThreadCreate( &threadID, ThreadFunc, NULL ); + + ThreadJoin( threadID ); + + + enkiDeleteTaskScheduler( pETS ); +} \ No newline at end of file diff --git a/example/ParallelSum.cpp b/example/ParallelSum.cpp index 0fcb23ba..19edccd1 100644 --- a/example/ParallelSum.cpp +++ b/example/ParallelSum.cpp @@ -60,15 +60,15 @@ struct ParallelSumTaskSet : ITaskSet memset( m_pPartialSums, 0, sizeof(Count)*m_NumPartialSums ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { assert( m_pPartialSums && m_NumPartialSums ); - uint64_t sum = m_pPartialSums[threadnum].count; - for( uint64_t i = range.start; i < range.end; ++i ) + uint64_t sum = m_pPartialSums[threadnum_].count; + for( uint64_t i = range_.start; i < range_.end; ++i ) { sum += i + 1; } - m_pPartialSums[threadnum].count = sum; + m_pPartialSums[threadnum_].count = sum; } }; @@ -83,7 +83,7 @@ struct ParallelReductionSumTaskSet : ITaskSet m_ParallelSumTaskSet.Init( g_TS.GetNumTaskThreads() ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { g_TS.AddTaskSetToPipe( &m_ParallelSumTaskSet ); g_TS.WaitforTask( &m_ParallelSumTaskSet ); diff --git a/example/PinnedTask.cpp b/example/PinnedTask.cpp index f32d16c5..805a6177 100644 --- a/example/PinnedTask.cpp +++ b/example/PinnedTask.cpp @@ -39,11 +39,11 @@ struct PinnedTaskHelloWorld : IPinnedTask struct ParallelTaskSet : ITaskSet { PinnedTaskHelloWorld pinnedTask; - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { g_TS.AddPinnedTask( &pinnedTask ); - printf("This could run on any thread, currently thread %d\n", threadnum); + printf("This could run on any thread, currently thread %d\n", threadnum_); g_TS.WaitforTask( &pinnedTask ); } diff --git a/example/Priorities.cpp b/example/Priorities.cpp index b7866783..2150b67c 100644 --- a/example/Priorities.cpp +++ b/example/Priorities.cpp @@ -27,24 +27,24 @@ struct ExampleTask : enki::ITaskSet { ExampleTask( uint32_t size_ ) { m_SetSize = size_; } - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { if( m_Priority == enki::TASK_PRIORITY_LOW ) { // fake slow task with timer Timer timer; timer.Start(); - double tWaittime = (double)( range.end - range.start ) * 100.; + double tWaittime = (double)( range_.end - range_.start ) * 100.; while( timer.GetTimeMS() < tWaittime ) { } printf( "\tLOW PRIORITY TASK range complete: thread: %d, start: %d, end: %d\n", - threadnum, range.start, range.end ); + threadnum_, range_.start, range_.end ); } else { printf( "HIGH PRIORITY TASK range complete: thread: %d, start: %d, end: %d\n", - threadnum, range.start, range.end ); + threadnum_, range_.start, range_.end ); } } }; diff --git a/example/TaskOverhead.cpp b/example/TaskOverhead.cpp index fd19b077..8660b2e8 100644 --- a/example/TaskOverhead.cpp +++ b/example/TaskOverhead.cpp @@ -62,15 +62,15 @@ struct ParallelSumTaskSet : ITaskSet memset( m_pPartialSums, 0, sizeof(Count)*m_NumPartialSums ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { assert( m_pPartialSums && m_NumPartialSums ); - uint64_t sum = m_pPartialSums[threadnum].count; - for( uint64_t i = range.start; i < range.end; ++i ) + uint64_t sum = m_pPartialSums[threadnum_].count; + for( uint64_t i = range_.start; i < range_.end; ++i ) { sum += i + 1; } - m_pPartialSums[threadnum].count = sum; + m_pPartialSums[threadnum_].count = sum; } }; diff --git a/example/TaskThroughput.cpp b/example/TaskThroughput.cpp index f84d9548..39b47c0a 100644 --- a/example/TaskThroughput.cpp +++ b/example/TaskThroughput.cpp @@ -48,9 +48,9 @@ struct ConsumeTask : ITaskSet static Count* pCount; static uint32_t numCount; - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - ++pCount[threadnum].count; + ++pCount[threadnum_].count; } static void Init() @@ -62,7 +62,7 @@ struct ConsumeTask : ITaskSet } }; -ConsumeTask ConsumeTask::tasks[numTasks]; +ConsumeTask ConsumeTask::tasks[numTasks]; ConsumeTask::Count* ConsumeTask::pCount = NULL; uint32_t ConsumeTask::numCount = 0; @@ -74,9 +74,9 @@ struct CreateTasks : ITaskSet { m_SetSize = numTasks; } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - for( uint32_t i=range.start; i g_WaitCount(0); struct SlowTask : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { // fake slow task with timer Timer timer; @@ -44,7 +44,7 @@ struct SlowTask : enki::ITaskSet struct WaitingTask : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { int numWaitTasks = maxWaitasks - depth; for( int t = 0; t < numWaitTasks; ++t ) @@ -70,7 +70,8 @@ struct WaitingTask : enki::ITaskSet ++g_WaitCount; g_TS.WaitforTask( pWaitingTasks[t] ); } - printf( "\tIteration %d: WaitingTask depth %d complete: thread: %d\n\t\tWaits: %d blocking waits: %d\n", g_Iteration, depth, threadnum, g_WaitCount.load(), g_WaitForTaskCompletion.load() ); + printf( "\tIteration %d: WaitingTask depth %d complete: thread: %d\n\t\tWaits: %d blocking waits: %d\n", + g_Iteration, depth, threadnum_, g_WaitCount.load(), g_WaitForTaskCompletion.load() ); } virtual ~WaitingTask() @@ -92,9 +93,10 @@ struct WaitingTask : enki::ITaskSet // which must complete as early as possible using priorities. int main(int argc, const char * argv[]) { - g_TS.GetProfilerCallbacks()->waitForTaskCompleteStart = []( uint32_t threadnum_ ) { ++g_WaitCount; }; - g_TS.GetProfilerCallbacks()->waitForTaskCompleteSuspendStart = []( uint32_t threadnum_ ) { ++g_WaitForTaskCompletion; }; - g_TS.Initialize(); + enki::TaskSchedulerConfig config; + config.profilerCallbacks.waitForTaskCompleteStart = []( uint32_t threadnum_ ) { ++g_WaitCount; }; + config.profilerCallbacks.waitForTaskCompleteSuspendStart = []( uint32_t threadnum_ ) { ++g_WaitForTaskCompletion; }; + g_TS.Initialize( config ); for( g_Iteration = 0; g_Iteration < 1000; ++g_Iteration ) { WaitingTask taskRoot; diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index c3448087..3ab84411 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -36,6 +36,7 @@ static const uint32_t PIPESIZE_LOG2 = 8; static const uint32_t SPIN_COUNT = 10; static const uint32_t SPIN_BACKOFF_MULTIPLIER = 100; static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8; +static const uint32_t CACHE_LINE_SIZE = 64; // awaiting std::hardware_constructive_interference_size // thread_local not well supported yet by C++11 compilers. #ifdef _MSC_VER @@ -62,21 +63,29 @@ namespace enki // we derive class TaskPipe rather than typedef to get forward declaration working easily class TaskPipe : public LockLessMultiReadPipe {}; - enum ThreadState : uint8_t + enum ThreadState : int32_t { THREAD_STATE_RUNNING, - THREAD_STATE_WAIT_NEW_TASKS, THREAD_STATE_WAIT_TASK_COMPLETION, + THREAD_STATE_EXTERNAL_REGISTERED, + THREAD_STATE_EXTERNAL_UNREGISTERED, + THREAD_STATE_WAIT_NEW_TASKS, THREAD_STATE_STOPPED, }; struct ThreadArgs { - uint32_t threadNum; - TaskScheduler* pTaskScheduler; - ThreadState threadState; + uint32_t threadNum; + TaskScheduler* pTaskScheduler; }; + struct ThreadDataStore + { + std::atomic threadState; + char prevent_false_Share[ CACHE_LINE_SIZE - sizeof(std::atomic) ]; + }; + static_assert( sizeof( ThreadDataStore ) >= CACHE_LINE_SIZE, "ThreadDataStore may exhibit false sharing" ); + class PinnedTaskList : public LocklessMultiWriteIntrusiveList {}; semaphoreid_t* SemaphoreCreate(); @@ -87,7 +96,7 @@ namespace enki namespace { - SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ ) + SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ ) { SubTaskSet splitTask = subTask_; uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start; @@ -123,17 +132,47 @@ namespace #endif } -static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_) +static void SafeCallback( ProfilerCallbackFunc func_, uint32_t threadnum_ ) { if( func_ != nullptr ) { - func_(threadnum_); + func_( threadnum_ ); } } -ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks() +ENKITS_API bool enki::TaskScheduler::RegisterExternalTaskThread() { - return &m_ProfilerCallbacks; + bool bRegistered = false; + while( !bRegistered && m_NumExternalTaskThreadsRegistered < (int32_t)m_Config.numExternalTaskThreads ) + { + for(uint32_t thread = 1; thread <= m_Config.numExternalTaskThreads; ++thread ) + { + // ignore our thread + ThreadState threadStateExpected = THREAD_STATE_EXTERNAL_UNREGISTERED; + if( m_pThreadDataStore[thread].threadState.compare_exchange_strong( + threadStateExpected, THREAD_STATE_EXTERNAL_REGISTERED ) ) + { + ++m_NumExternalTaskThreadsRegistered; + gtl_threadNum = thread; + bRegistered = true; + break; + } + } + } + return bRegistered; +} + +ENKITS_API void enki::TaskScheduler::DeRegisterExternalTaskThread() +{ + assert( gtl_threadNum ); + --m_NumExternalTaskThreadsRegistered; + m_pThreadDataStore[gtl_threadNum].threadState.store( THREAD_STATE_EXTERNAL_UNREGISTERED, std::memory_order_release ); + gtl_threadNum = 0; +} + +ENKITS_API uint32_t enki::TaskScheduler::GetNumRegisteredExternalTaskThreads() +{ + return m_NumExternalTaskThreadsRegistered; } @@ -143,7 +182,7 @@ void TaskScheduler::TaskingThreadFunction( const ThreadArgs& args_ ) TaskScheduler* pTS = args_.pTaskScheduler; gtl_threadNum = threadNum; - SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum ); + SafeCallback( pTS->m_Config.profilerCallbacks.threadStart, threadNum ); uint32_t spinCount = 0; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. @@ -170,9 +209,9 @@ void TaskScheduler::TaskingThreadFunction( const ThreadArgs& args_ ) } } - pTS->m_NumThreadsRunning.fetch_sub( 1, std::memory_order_release ); - SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum ); - pTS->m_pThreadArgStore[threadNum].threadState = THREAD_STATE_STOPPED; + pTS->m_NumInternalTaskThreadsRunning.fetch_sub( 1, std::memory_order_release ); + SafeCallback( pTS->m_Config.profilerCallbacks.threadStop, threadNum ); + pTS->m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_STOPPED, std::memory_order_release ); return; } @@ -185,6 +224,8 @@ void TaskScheduler::StartThreads() return; } + m_NumThreads = m_Config.numTaskThreadsToCreate + m_Config.numExternalTaskThreads + 1; + for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { m_pPipesPerThread[ priority ] = new TaskPipe[ m_NumThreads ]; @@ -195,23 +236,25 @@ void TaskScheduler::StartThreads() m_pTaskCompleteSemaphore = SemaphoreCreate(); // we create one less thread than m_NumThreads as the main thread counts as one - m_pThreadArgStore = new ThreadArgs[m_NumThreads]; + m_pThreadDataStore = new ThreadDataStore[m_NumThreads]; m_pThreads = new std::thread*[m_NumThreads]; - m_pThreadArgStore[0].threadNum = 0; - m_pThreadArgStore[0].pTaskScheduler = this; - m_pThreadArgStore[0].threadState = THREAD_STATE_RUNNING; - m_NumThreadsRunning = 1; // account for main thread m_bRunning = 1; - for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) + for( uint32_t thread = 0; thread < m_Config.numExternalTaskThreads + 1; ++thread ) + { + m_pThreadDataStore[thread].threadState = THREAD_STATE_EXTERNAL_UNREGISTERED; + m_pThreads[thread] = nullptr; + } + for( uint32_t thread = m_Config.numExternalTaskThreads + 1; thread < m_NumThreads; ++thread ) { - m_pThreadArgStore[thread].threadNum = thread; - m_pThreadArgStore[thread].pTaskScheduler = this; - m_pThreadArgStore[thread].threadState = THREAD_STATE_RUNNING; - m_pThreads[thread] = new std::thread( TaskingThreadFunction, m_pThreadArgStore[thread] ); - ++m_NumThreadsRunning; + m_pThreadDataStore[thread].threadState = THREAD_STATE_RUNNING; + m_pThreads[thread] = new std::thread( TaskingThreadFunction, ThreadArgs{ thread, this } ); + ++m_NumInternalTaskThreadsRunning; } + // Thread 0 is intialize thread and registered + m_pThreadDataStore[0].threadState = THREAD_STATE_EXTERNAL_REGISTERED; + // ensure we have sufficient tasks to equally fill either all threads including main // or just the threads we've launched, this is outside the firstinit as we want to be able // to runtime change it @@ -222,8 +265,13 @@ void TaskScheduler::StartThreads() } else { - m_NumPartitions = m_NumThreads * (m_NumThreads - 1); - m_NumInitialPartitions = m_NumThreads - 1; + // There could be more threads than hardware threads if external threads are + // being intended for blocking functionality such as io etc. + // We only need to partition for a maximum of the available processor parallelism. + uint32_t numThreadsToPartitionFor = m_NumThreads; + numThreadsToPartitionFor = std::min( m_NumThreads, GetNumHardwareThreads() ); + m_NumPartitions = numThreadsToPartitionFor * (numThreadsToPartitionFor - 1); + m_NumInitialPartitions = numThreadsToPartitionFor - 1; if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS ) { m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS; @@ -239,22 +287,24 @@ void TaskScheduler::StopThreads( bool bWait_ ) { // wait for them threads quit before deleting data m_bRunning = 0; - while( bWait_ && m_NumThreadsRunning > 1) + while( bWait_ && m_NumInternalTaskThreadsRunning ) { // keep firing event to ensure all threads pick up state of m_bRunning WakeThreadsForNewTasks(); } - for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) + // detach threads starting with thread 1 (as 0 is initialization thread). + for( uint32_t thread = m_Config.numExternalTaskThreads + 1; thread < m_NumThreads; ++thread ) { + assert( m_pThreads[thread] ); m_pThreads[thread]->detach(); delete m_pThreads[thread]; } m_NumThreads = 0; - delete[] m_pThreadArgStore; + delete[] m_pThreadDataStore; delete[] m_pThreads; - m_pThreadArgStore = 0; + m_pThreadDataStore = 0; m_pThreads = 0; SemaphoreDelete( m_pNewTaskSemaphore ); @@ -265,7 +315,8 @@ void TaskScheduler::StopThreads( bool bWait_ ) m_bHaveThreads = false; m_NumThreadsWaitingForNewTasks = 0; m_NumThreadsWaitingForTaskCompletion = 0; - m_NumThreadsRunning = 0; + m_NumInternalTaskThreadsRunning = 0; + m_NumExternalTaskThreadsRegistered = 0; for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { @@ -289,21 +340,21 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_i return false; } -bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ) +bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ) { // Run any tasks for this thread - RunPinnedTasks( threadNum, priority_ ); + RunPinnedTasks( threadNum_, priority_ ); // check for tasks SubTaskSet subTask; - bool bHaveTask = m_pPipesPerThread[ priority_ ][ threadNum ].WriterTryReadFront( &subTask ); + bool bHaveTask = m_pPipesPerThread[ priority_ ][ threadNum_ ].WriterTryReadFront( &subTask ); uint32_t threadToCheck = hintPipeToCheck_io_; uint32_t checkCount = 0; while( !bHaveTask && checkCount < m_NumThreads ) { threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads; - if( threadToCheck != threadNum ) + if( threadToCheck != threadNum_ ) { bHaveTask = m_pPipesPerThread[ priority_ ][ threadToCheck ].ReaderTryReadBack( &subTask ); } @@ -319,8 +370,8 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t if( subTask.pTask->m_RangeToRun < partitionSize ) { SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun ); - SplitAndAddTask( threadNum, subTask, subTask.pTask->m_RangeToRun ); - taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum ); + SplitAndAddTask( threadNum_, subTask, subTask.pTask->m_RangeToRun ); + taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum_ ); int prevCount = taskToRun.pTask->m_RunningCount.fetch_sub(1,std::memory_order_release ); if( 1 == prevCount && taskToRun.pTask->m_WaitingForTaskCount.load( std::memory_order_acquire ) ) @@ -331,7 +382,7 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t else { // the task has already been divided up by AddTaskSetToPipe, so just run it - subTask.pTask->ExecuteRange( subTask.partition, threadNum ); + subTask.pTask->ExecuteRange( subTask.partition, threadNum_ ); int prevCount = subTask.pTask->m_RunningCount.fetch_sub(1,std::memory_order_release ); if( 1 == prevCount && subTask.pTask->m_WaitingForTaskCount.load( std::memory_order_acquire ) ) @@ -364,7 +415,7 @@ bool TaskScheduler::HaveTasks( uint32_t threadNum_ ) return false; } -void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) +void TaskScheduler::WaitForNewTasks( uint32_t threadNum_ ) { // We incrememt the number of threads waiting here in order // to ensure that the check for tasks occurs after the increment @@ -372,15 +423,16 @@ void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) // This will occasionally result in threads being mistakenly awoken, // but they will then go back to sleep. - bool bHaveTasks = HaveTasks( threadNum ); + bool bHaveTasks = HaveTasks( threadNum_ ); if( !bHaveTasks ) { - SafeCallback( m_ProfilerCallbacks.waitForNewTaskSuspendStart, threadNum ); - m_pThreadArgStore[threadNum].threadState = THREAD_STATE_WAIT_NEW_TASKS; + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStart, threadNum_ ); + ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum_].threadState.store( THREAD_STATE_WAIT_NEW_TASKS, std::memory_order_relaxed ); // rely on fetch_add acquire for order m_NumThreadsWaitingForNewTasks.fetch_add( 1, std::memory_order_acquire ); SemaphoreWait( *m_pNewTaskSemaphore ); - m_pThreadArgStore[threadNum].threadState = THREAD_STATE_RUNNING; - SafeCallback( m_ProfilerCallbacks.waitForNewTaskSuspendStop, threadNum ); + m_pThreadDataStore[threadNum_].threadState.store( prevThreadState, std::memory_order_release ); + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStop, threadNum_ ); } } @@ -395,16 +447,19 @@ void TaskScheduler::WaitForTaskCompletion( const ICompletable* pCompletable_, ui } else { - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteSuspendStart, threadNum_ ); - m_pThreadArgStore[threadNum_].threadState = THREAD_STATE_WAIT_TASK_COMPLETION; + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStart, threadNum_ ); + ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum_].threadState.store( THREAD_STATE_WAIT_TASK_COMPLETION, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); + SemaphoreWait( *m_pTaskCompleteSemaphore ); if( !pCompletable_->GetIsComplete() ) { // This thread which may not the one which was supposed to be awoken WakeThreadsForTaskCompletion(); } - m_pThreadArgStore[threadNum_].threadState = THREAD_STATE_RUNNING; - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteSuspendStop, threadNum_ ); + m_pThreadDataStore[threadNum_].threadState.store( prevThreadState, std::memory_order_release ); + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStop, threadNum_ ); } pCompletable_->m_WaitingForTaskCount.fetch_sub( 1, std::memory_order_release ); @@ -473,10 +528,27 @@ void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, u WakeThreadsForNewTasks(); } +ENKITS_API TaskSchedulerConfig enki::TaskScheduler::GetConfig() const +{ + TaskSchedulerConfig config; + if( m_bHaveThreads ) + { + config.numTaskThreadsToCreate = m_NumThreads; + config.numExternalTaskThreads = 0; + } + return config; +} + void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) { assert( pTaskSet_->m_RunningCount == 0 ); + uint32_t threadNum = gtl_threadNum; + + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); pTaskSet_->m_RunningCount.store( 0, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); + // divide task up and add to pipe pTaskSet_->m_RangeToRun = pTaskSet_->m_SetSize / m_NumPartitions; @@ -489,7 +561,10 @@ void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) subTask.pTask = pTaskSet_; subTask.partition.start = 0; subTask.partition.end = pTaskSet_->m_SetSize; - SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit ); + SplitAndAddTask( threadNum, subTask, rangeToSplit ); + + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); + } void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ ) @@ -504,10 +579,14 @@ void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ ) void TaskScheduler::RunPinnedTasks() { uint32_t threadNum = gtl_threadNum; + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { RunPinnedTasks( threadNum, priority ); } + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); } void TaskScheduler::RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ) @@ -533,9 +612,15 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas uint32_t threadNum = gtl_threadNum; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. + // waiting for a task is equivalent to 'running' for thread state purpose as we may run tasks whilst waiting + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); + + if( pCompletable_ && !pCompletable_->GetIsComplete() ) { - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteStart, threadNum ); + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteStart, threadNum ); // We need to ensure that the task we're waiting on can complete even if we're the only thread, // so we clamp the priorityOfLowestToRun_ to no smaller than the task we're waiting for priorityOfLowestToRun_ = std::max( priorityOfLowestToRun_, pCompletable_->m_Priority ); @@ -563,7 +648,7 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas } } - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteStop, threadNum ); + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteStop, threadNum ); } else { @@ -575,6 +660,9 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas } } } + + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); + } class TaskSchedulerWaitTask : public IPinnedTask @@ -590,11 +678,11 @@ void TaskScheduler::WaitforAll() bool bHaveTasks = true; uint32_t threadNum = gtl_threadNum; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. - int32_t numThreadsRunning = m_NumThreadsRunning.load( std::memory_order_relaxed ) - 1; // account for this thread + int32_t numOtherThreadsRunning = 0; // account for this thread uint32_t spinCount = 0; TaskSchedulerWaitTask dummyWaitTask; dummyWaitTask.threadNum = 0; - while( bHaveTasks || m_NumThreadsWaitingForNewTasks.load( std::memory_order_relaxed ) < numThreadsRunning ) + while( bHaveTasks || numOtherThreadsRunning ) { bHaveTasks = TryRunTask( threadNum, hintPipeToCheck_io ); if( bHaveTasks ) @@ -613,7 +701,7 @@ void TaskScheduler::WaitforAll() { dummyWaitTask.threadNum = ( dummyWaitTask.threadNum + 1 ) % m_NumThreads; } - } while( countThreadsToCheck && m_pThreadArgStore[ dummyWaitTask.threadNum ].threadState != THREAD_STATE_RUNNING ); + } while( countThreadsToCheck && m_pThreadDataStore[ dummyWaitTask.threadNum ].threadState != THREAD_STATE_RUNNING ); assert( dummyWaitTask.threadNum != threadNum ); AddPinnedTask( &dummyWaitTask ); WaitForTaskCompletion( &dummyWaitTask, threadNum ); @@ -624,6 +712,28 @@ void TaskScheduler::WaitforAll() uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER; SpinWait( spinBackoffCount ); } + + // count threads running + numOtherThreadsRunning = 0; + for(uint32_t thread = 0; thread < m_NumThreads; ++thread ) + { + // ignore our thread + if( thread != threadNum ) + { + switch( m_pThreadDataStore[thread].threadState.load( std::memory_order_acquire ) ) + { + case THREAD_STATE_RUNNING: + case THREAD_STATE_WAIT_TASK_COMPLETION: + ++numOtherThreadsRunning; + break; + case THREAD_STATE_EXTERNAL_REGISTERED: + case THREAD_STATE_EXTERNAL_UNREGISTERED: + case THREAD_STATE_WAIT_NEW_TASKS: + case THREAD_STATE_STOPPED: + break; + }; + } + } } } @@ -652,15 +762,15 @@ TaskScheduler::TaskScheduler() : m_pPipesPerThread() , m_pPinnedTaskListPerThread() , m_NumThreads(0) - , m_pThreadArgStore(NULL) + , m_pThreadDataStore(NULL) , m_pThreads(NULL) , m_bRunning(0) - , m_NumThreadsRunning(0) + , m_NumInternalTaskThreadsRunning(0) , m_NumThreadsWaitingForNewTasks(0) , m_NumThreadsWaitingForTaskCompletion(0) , m_NumPartitions(0) , m_bHaveThreads(false) - , m_ProfilerCallbacks() + , m_NumExternalTaskThreadsRegistered(0) { } @@ -669,23 +779,26 @@ TaskScheduler::~TaskScheduler() StopThreads( true ); // Stops threads, waiting for them. } -void TaskScheduler::Initialize( uint32_t numThreads_ ) +void TaskScheduler::Initialize( uint32_t numThreadsTotal_ ) { - assert( numThreads_ ); + assert( numThreadsTotal_ >= 1 ); + m_Config.numTaskThreadsToCreate = numThreadsTotal_ - 1; + m_Config.numExternalTaskThreads = 0; StopThreads( true ); // Stops threads, waiting for them. + StartThreads();} - m_NumThreads = numThreads_; - +ENKITS_API void enki::TaskScheduler::Initialize( TaskSchedulerConfig config_ ) +{ + m_Config = config_; + StopThreads( true ); // Stops threads, waiting for them. StartThreads(); } -void TaskScheduler::Initialize() +void TaskScheduler::Initialize() { Initialize( std::thread::hardware_concurrency() ); } - - // Semaphore implementation #ifdef _WIN32 diff --git a/src/TaskScheduler.h b/src/TaskScheduler.h index e32dca03..e1d7d483 100644 --- a/src/TaskScheduler.h +++ b/src/TaskScheduler.h @@ -44,6 +44,7 @@ #define ENKITS_API #endif + namespace enki { @@ -57,9 +58,12 @@ namespace enki class TaskPipe; class PinnedTaskList; struct ThreadArgs; + struct ThreadDataStore; struct SubTaskSet; struct semaphoreid_t; + uint32_t GetNumHardwareThreads(); + enum TaskPriority { @@ -84,8 +88,8 @@ namespace enki class ICompletable { public: - ICompletable() : m_Priority(TASK_PRIORITY_HIGH), m_RunningCount(0) {} - bool GetIsComplete() const { + ICompletable() : m_Priority(TASK_PRIORITY_HIGH), m_RunningCount(0) {} + bool GetIsComplete() const { return 0 == m_RunningCount.load( std::memory_order_acquire ); } @@ -127,22 +131,25 @@ namespace enki // i.e. neighbouring values should be close together. // threadnum should not be used for changing processing of data, it's intended purpose // is to allow per-thread data buckets for output. - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) = 0; + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) = 0; - // Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1 - uint32_t m_SetSize; + // Set Size - usually the number of data items to be processed, see ExecuteRange. Defaults to 1 + uint32_t m_SetSize; - // Minimum size of of TaskSetPartition range when splitting a task set into partitions. + // Min Range - Minimum size of of TaskSetPartition range when splitting a task set into partitions. + // Designed for reducing scheduling overhead by preventing set being + // divided up too small. Ranges passed to ExecuteRange will *not* be a mulitple of this, + // only attempts to deliver range sizes larger than this most of the time. // This should be set to a value which results in computation effort of at least 10k // clock cycles to minimize tast scheduler overhead. // NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple // of m_MinRange. // Also known as grain size in literature. - uint32_t m_MinRange; + uint32_t m_MinRange; private: - friend class TaskScheduler; - uint32_t m_RangeToRun; + friend class TaskScheduler; + uint32_t m_RangeToRun; }; // Subclass IPinnedTask to create tasks which can be run on a given thread only. @@ -155,10 +162,10 @@ namespace enki // IPinnedTask needs to be non abstract for intrusive list functionality. // Should never be called as should be overridden. - virtual void Execute() { assert(false); } + virtual void Execute() { assert(false); } - uint32_t threadNum; // thread to run this pinned task on + uint32_t threadNum; // thread to run this pinned task on std::atomic pNext; // Do not use. For intrusive list only. }; @@ -172,9 +179,9 @@ namespace enki TaskSet( uint32_t setSize_, TaskSetFunction func_ ) : ITaskSet( setSize_ ), m_Function( func_ ) {} - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - m_Function( range, threadnum ); + m_Function( range_, threadnum_ ); } TaskSetFunction m_Function; @@ -194,25 +201,46 @@ namespace enki ProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended }; + // TaskSchedulerConfig - configuration struct for advanced Initialize + struct TaskSchedulerConfig + { + // numTaskThreadsToCreate - Number of tasking threads the task scheduler will create. Must be > 0. + // Defaults to GetNumHardwareThreads()-1 threads as thread which calls initialize is thread 0. + uint32_t numTaskThreadsToCreate = GetNumHardwareThreads()-1; + + // numExternalTaskThreads - Advanced use. Number of external threads which need to use TaskScheduler API. + // See TaskScheduler::RegisterExternalTaskThread() for usage. + // Defaults to 0, the thread used to initialize the TaskScheduler. + uint32_t numExternalTaskThreads = 0; + + ProfilerCallbacks profilerCallbacks = {}; + }; + class TaskScheduler { public: ENKITS_API TaskScheduler(); ENKITS_API ~TaskScheduler(); - // Call either Initialize() or Initialize( numThreads_ ) before adding tasks. + // Call an Initialize function before adding tasks. - // Initialize() will create GetNumHardwareThreads()-1 threads, which is + // Initialize() will create GetNumHardwareThreads()-1 tasking threads, which is // sufficient to fill the system when including the main thread. // Initialize can be called multiple times - it will wait for completion // before re-initializing. ENKITS_API void Initialize(); - // Initialize( numThreads_ ) - numThreads_ (must be > 0) - // will create numThreads_-1 threads, as thread 0 is + // Initialize( numThreadsTotal_ ) + // will create numThreadsTotal_-1 threads, as thread 0 is // the thread on which the initialize was called. - ENKITS_API void Initialize( uint32_t numThreads_ ); + // numThreadsTotal_ must be > 0 + ENKITS_API void Initialize( uint32_t numThreadsTotal_ ); + // Initialize with advanced TaskSchedulerConfig settings. See TaskSchedulerConfig. + ENKITS_API void Initialize( TaskSchedulerConfig config_ ); + + // Get config. Can be called before Initialize to get the defaults. + ENKITS_API TaskSchedulerConfig GetConfig() const; // Adds the TaskSet to pipe and returns if the pipe is not full. // If the pipe is full, pTaskSet is run. @@ -235,9 +263,6 @@ namespace enki // Only wait for child tasks of the current task otherwise a deadlock could occur. ENKITS_API void WaitforTask( const ICompletable* pCompletable_, enki::TaskPriority priorityOfLowestToRun_ = TaskPriority(TASK_PRIORITY_NUM - 1) ); - // WaitforTaskSet, deprecated interface use WaitforTask - inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } - // Waits for all task sets to complete - not guaranteed to work unless we know we // are in a situation where tasks aren't being continuously added. ENKITS_API void WaitforAll(); @@ -247,51 +272,78 @@ namespace enki // This function can be safely called even if TaskScheduler::Initialize() has not been called. ENKITS_API void WaitforAllAndShutdown(); - // Returns the number of threads created for running tasks + 1 - // to account for the main thread. + // Returns the number of threads created for running tasks + number of external threads + // plus 1 to account for the thread used to initialize the task scheduler. + // Equivalent to config values: numTaskThreadsToCreate + numExternalTaskThreads + 1. + // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetNumTaskThreads() const; // Returns the current task threadNum - // Will return 0 for main thread and all other non-enkiTS threads, and < GetNumTaskThreads() + // Will return 0 for thread which initialized the task scheduler, + // and all other non-enkiTS threads which have not been registered ( see RegisterExternalTaskThread() ), + // and < GetNumTaskThreads() for all threads. + // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetThreadNum() const; + // Call on a thread to register the thread to use the TaskScheduling API. + // This is implicitly done for the thread which initializes the TaskScheduler + // Intended for developers who have threads who need to call the TaskScheduler API + // Returns true if successfull, false if not. + // Can only have numExternalTaskThreads registered at any one time, which must be set + // at initialization time. + ENKITS_API bool RegisterExternalTaskThread(); + + // Call on a thread on which RegisterExternalTaskThread has been called to deregister that thread. + ENKITS_API void DeRegisterExternalTaskThread(); + + // Get the number of registered external task threads. + ENKITS_API uint32_t GetNumRegisteredExternalTaskThreads(); + + + // ------------- Start DEPRECATED Functions ------------- + // DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask + inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } + + // DEPRECATED - GetProfilerCallbacks. Use TaskSchedulerConfig instead // Returns the ProfilerCallbacks structure so that it can be modified to - // set the callbacks. - ENKITS_API ProfilerCallbacks* GetProfilerCallbacks(); + // set the callbacks. Should be set prior to initialization. + inline ProfilerCallbacks* GetProfilerCallbacks() { return &m_Config.profilerCallbacks; } + // ------------- End DEPRECATED Functions ------------- private: - static void TaskingThreadFunction( const ThreadArgs& args_ ); - bool HaveTasks( uint32_t threadNum_ ); - void WaitForNewTasks( uint32_t threadNum_ ); - void WaitForTaskCompletion( const ICompletable* pCompletable_, uint32_t threadNum_ ); - void RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ); - bool TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_io_ ); - bool TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ); - void StartThreads(); - void StopThreads( bool bWait_ ); - void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ ); - void WakeThreadsForNewTasks(); - void WakeThreadsForTaskCompletion(); - - TaskPipe* m_pPipesPerThread[ TASK_PRIORITY_NUM ]; - PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ]; - - uint32_t m_NumThreads; - ThreadArgs* m_pThreadArgStore; - std::thread** m_pThreads; - std::atomic m_bRunning; - std::atomic m_NumThreadsRunning; - std::atomic m_NumThreadsWaitingForNewTasks; - std::atomic m_NumThreadsWaitingForTaskCompletion; - uint32_t m_NumPartitions; - semaphoreid_t* m_pNewTaskSemaphore; - semaphoreid_t* m_pTaskCompleteSemaphore; - uint32_t m_NumInitialPartitions; - bool m_bHaveThreads; - ProfilerCallbacks m_ProfilerCallbacks; - - TaskScheduler( const TaskScheduler& nocopy ); - TaskScheduler& operator=( const TaskScheduler& nocopy ); + static void TaskingThreadFunction( const ThreadArgs& args_ ); + bool HaveTasks( uint32_t threadNum_ ); + void WaitForNewTasks( uint32_t threadNum_ ); + void WaitForTaskCompletion( const ICompletable* pCompletable_, uint32_t threadNum_ ); + void RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ); + bool TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_io_ ); + bool TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ); + void StartThreads(); + void StopThreads( bool bWait_ ); + void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ ); + void WakeThreadsForNewTasks(); + void WakeThreadsForTaskCompletion(); + + TaskPipe* m_pPipesPerThread[ TASK_PRIORITY_NUM ]; + PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ]; + + uint32_t m_NumThreads; + ThreadDataStore* m_pThreadDataStore; + std::thread** m_pThreads; + std::atomic m_bRunning; + std::atomic m_NumInternalTaskThreadsRunning; + std::atomic m_NumThreadsWaitingForNewTasks; + std::atomic m_NumThreadsWaitingForTaskCompletion; + uint32_t m_NumPartitions; + semaphoreid_t* m_pNewTaskSemaphore; + semaphoreid_t* m_pTaskCompleteSemaphore; + uint32_t m_NumInitialPartitions; + bool m_bHaveThreads; + TaskSchedulerConfig m_Config; + std::atomic m_NumExternalTaskThreadsRegistered; + + TaskScheduler( const TaskScheduler& nocopy_ ); + TaskScheduler& operator=( const TaskScheduler& nocopy_ ); }; inline uint32_t GetNumHardwareThreads() diff --git a/src/TaskScheduler_c.cpp b/src/TaskScheduler_c.cpp index f094336f..4b542419 100644 --- a/src/TaskScheduler_c.cpp +++ b/src/TaskScheduler_c.cpp @@ -31,9 +31,9 @@ struct enkiTaskSet : ITaskSet { enkiTaskSet( enkiTaskExecuteRange taskFun_ ) : taskFun(taskFun_), pArgs(NULL) {} - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - taskFun( range.start, range.end, threadnum, pArgs ); + taskFun( range_.start, range_.end, threadnum_, pArgs ); } enkiTaskExecuteRange taskFun; @@ -60,6 +60,23 @@ enkiTaskScheduler* enkiNewTaskScheduler() return pETS; } +ENKITS_API struct enkiTaskSchedulerConfig enkiGetTaskSchedulerConfig( enkiTaskScheduler* pETS_ ) +{ + TaskSchedulerConfig config = pETS_->GetConfig(); + enkiTaskSchedulerConfig configC; + configC.numExternalTaskThreads = config.numExternalTaskThreads; + configC.numTaskThreadsToCreate = config.numTaskThreadsToCreate; + configC.profilerCallbacks.threadStart = config.profilerCallbacks.threadStart; + configC.profilerCallbacks.threadStop = config.profilerCallbacks.threadStop; + configC.profilerCallbacks.waitForNewTaskSuspendStart = config.profilerCallbacks.waitForNewTaskSuspendStart; + configC.profilerCallbacks.waitForNewTaskSuspendStop = config.profilerCallbacks.waitForNewTaskSuspendStop; + configC.profilerCallbacks.waitForTaskCompleteStart = config.profilerCallbacks.waitForTaskCompleteStart; + configC.profilerCallbacks.waitForTaskCompleteStop = config.profilerCallbacks.waitForTaskCompleteStop; + configC.profilerCallbacks.waitForTaskCompleteSuspendStart = config.profilerCallbacks.waitForTaskCompleteSuspendStart; + configC.profilerCallbacks.waitForTaskCompleteSuspendStop = config.profilerCallbacks.waitForTaskCompleteSuspendStop; + return configC; +} + void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ ) { pETS_->Initialize(); @@ -70,6 +87,22 @@ void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThr pETS_->Initialize( numThreads_ ); } +ENKITS_API void enkiInitTaskSchedulerWithConfig( enkiTaskScheduler* pETS_, struct enkiTaskSchedulerConfig config_ ) +{ + TaskSchedulerConfig config; + config.numExternalTaskThreads = config_.numExternalTaskThreads; + config.numTaskThreadsToCreate = config_.numTaskThreadsToCreate; + config.profilerCallbacks.threadStart = config_.profilerCallbacks.threadStart; + config.profilerCallbacks.threadStop = config_.profilerCallbacks.threadStop; + config.profilerCallbacks.waitForNewTaskSuspendStart = config_.profilerCallbacks.waitForNewTaskSuspendStart; + config.profilerCallbacks.waitForNewTaskSuspendStop = config_.profilerCallbacks.waitForNewTaskSuspendStop; + config.profilerCallbacks.waitForTaskCompleteStart = config_.profilerCallbacks.waitForTaskCompleteStart; + config.profilerCallbacks.waitForTaskCompleteStop = config_.profilerCallbacks.waitForTaskCompleteStop; + config.profilerCallbacks.waitForTaskCompleteSuspendStart = config_.profilerCallbacks.waitForTaskCompleteSuspendStart; + config.profilerCallbacks.waitForTaskCompleteSuspendStop = config_.profilerCallbacks.waitForTaskCompleteSuspendStop; + pETS_->Initialize( config ); +} + void enkiDeleteTaskScheduler( enkiTaskScheduler* pETS_ ) { delete pETS_; @@ -182,6 +215,26 @@ uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ) return pETS_->GetNumTaskThreads(); } +ENKITS_API uint32_t enkiGetThreadNum( enkiTaskScheduler* pETS_ ) +{ + return pETS_->GetThreadNum(); +} + +ENKITS_API int enkiRegisterExternalTaskThread( enkiTaskScheduler* pETS_) +{ + return (int)pETS_->RegisterExternalTaskThread(); +} + +ENKITS_API void enkiDeRegisterExternalTaskThread( enkiTaskScheduler* pETS_) +{ + return pETS_->DeRegisterExternalTaskThread(); +} + +ENKITS_API uint32_t enkiGetNumRegisteredExternalTaskThreads( enkiTaskScheduler* pETS_) +{ + return pETS_->GetNumRegisteredExternalTaskThreads(); +} + enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ ) { static_assert( sizeof(enkiProfilerCallbacks) == sizeof(enki::ProfilerCallbacks), "enkiTS profiler callback structs do not match" ); diff --git a/src/TaskScheduler_c.h b/src/TaskScheduler_c.h index 29dcd2ef..150bdfeb 100644 --- a/src/TaskScheduler_c.h +++ b/src/TaskScheduler_c.h @@ -44,10 +44,41 @@ typedef struct enkiPinnedTask enkiPinnedTask; typedef void (* enkiTaskExecuteRange)( uint32_t start_, uint32_t end, uint32_t threadnum_, void* pArgs_ ); typedef void (* enkiPinnedTaskExecute)( void* pArgs_ ); +// TaskScheduler implements several callbacks intended for profilers +typedef void (*enkiProfilerCallbackFunc)( uint32_t threadnum_ ); +struct enkiProfilerCallbacks +{ + enkiProfilerCallbackFunc threadStart; + enkiProfilerCallbackFunc threadStop; + enkiProfilerCallbackFunc waitForNewTaskSuspendStart; // thread suspended waiting for new tasks + enkiProfilerCallbackFunc waitForNewTaskSuspendStop; // thread unsuspended + enkiProfilerCallbackFunc waitForTaskCompleteStart; // thread waiting for task completion + enkiProfilerCallbackFunc waitForTaskCompleteStop; // thread stopped waiting + enkiProfilerCallbackFunc waitForTaskCompleteSuspendStart; // thread suspended waiting task completion + enkiProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended +}; + +struct enkiTaskSchedulerConfig +{ + // numTaskThreadsToCreate - Number of tasking threads the task scheduler will create. Must be > 0. + // Defaults to GetNumHardwareThreads()-1 threads as thread which calls initialize is thread 0. + uint32_t numTaskThreadsToCreate; + + // numExternalTaskThreads - Advanced use. Number of external threads which need to use TaskScheduler API. + // See enkiRegisterExternalTaskThread() for usage. + // Defaults to 0, the thread used to initialize the TaskScheduler. + uint32_t numExternalTaskThreads; + + struct enkiProfilerCallbacks profilerCallbacks; +}; + // Create a new task scheduler ENKITS_API enkiTaskScheduler* enkiNewTaskScheduler(); +// Get config. Can be called before enkiInitTaskSchedulerWithConfig to get the defaults +ENKITS_API struct enkiTaskSchedulerConfig enkiGetTaskSchedulerConfig( enkiTaskScheduler* pETS_ ); + // Initialize task scheduler - will create GetNumHardwareThreads()-1 threads, which is // sufficient to fill the system when including the main thread. // Initialize can be called multiple times - it will wait for completion @@ -57,7 +88,10 @@ ENKITS_API void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ // Initialize a task scheduler with numThreads_ (must be > 0) // will create numThreads_-1 threads, as thread 0 is // the thread on which the initialize was called. -ENKITS_API void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ ); +ENKITS_API void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ ); + +// Initialize a task scheduler with config, see enkiTaskSchedulerConfig for details +ENKITS_API void enkiInitTaskSchedulerWithConfig( enkiTaskScheduler* pETS_, struct enkiTaskSchedulerConfig config_ ); // Delete a task scheduler @@ -134,26 +168,39 @@ ENKITS_API void enkiWaitForPinnedTaskPriority( enkiTaskScheduler* // are in a situation where tasks aren't being continuosly added. ENKITS_API void enkiWaitForAll( enkiTaskScheduler* pETS_ ); - -// get number of threads +// Returns the number of threads created for running tasks + number of external threads +// plus 1 to account for the thread used to initialize the task scheduler. +// Equivalent to config values: numTaskThreadsToCreate + numExternalTaskThreads + 1. +// It is guaranteed that enkiGetThreadNum() < enkiGetNumTaskThreads() ENKITS_API uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ); -// TaskScheduler implements several callbacks intended for profilers -typedef void (*enkiProfilerCallbackFunc)( uint32_t threadnum_ ); -struct enkiProfilerCallbacks -{ - enkiProfilerCallbackFunc threadStart; - enkiProfilerCallbackFunc threadStop; - enkiProfilerCallbackFunc waitForNewTaskSuspendStart; // thread suspended waiting for new tasks - enkiProfilerCallbackFunc waitForNewTaskSuspendStop; // thread unsuspended - enkiProfilerCallbackFunc waitForTaskCompleteStart; // thread waiting for task completion - enkiProfilerCallbackFunc waitForTaskCompleteStop; // thread stopped waiting - enkiProfilerCallbackFunc waitForTaskCompleteSuspendStart; // thread suspended waiting task completion - enkiProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended -}; - +// Returns the current task threadNum +// Will return 0 for thread which initialized the task scheduler, +// and all other non-enkiTS threads which have not been registered ( see enkiRegisterExternalTaskThread() ), +// and < enkiGetNumTaskThreads() for all threads. +// It is guaranteed that enkiGetThreadNum() < enkiGetNumTaskThreads() +ENKITS_API uint32_t enkiGetThreadNum( enkiTaskScheduler* pETS_ ); + +// Call on a thread to register the thread to use the TaskScheduling API. +// This is implicitly done for the thread which initializes the TaskScheduler +// Intended for developers who have threads who need to call the TaskScheduler API +// Returns true if successfull, false if not. +// Can only have numExternalTaskThreads registered at any one time, which must be set +// at initialization time. +ENKITS_API int enkiRegisterExternalTaskThread( enkiTaskScheduler* pETS_); + +// Call on a thread on which RegisterExternalTaskThread has been called to deregister that thread. +ENKITS_API void enkiDeRegisterExternalTaskThread( enkiTaskScheduler* pETS_); + +// Get the number of registered external task threads. +ENKITS_API uint32_t enkiGetNumRegisteredExternalTaskThreads( enkiTaskScheduler* pETS_); + +// ------------- Start DEPRECATED Functions ------------- +// DEPRECATED - enkiGetProfilerCallbacks. Use enkiTaskSchedulerConfig instead // Get the callback structure so it can be set ENKITS_API struct enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ ); +// ------------- End DEPRECATED Functions ------------- + #ifdef __cplusplus }