From 7665e43ccc497454351bfcdb6de136fec88f9137 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Wed, 23 Oct 2019 19:42:24 +0100 Subject: [PATCH 01/19] First draft of new RegisterExternalTaskThread functionality. --- src/TaskScheduler.cpp | 208 ++++++++++++++++++++++++++++++++---------- src/TaskScheduler.h | 71 +++++++++++--- 2 files changed, 216 insertions(+), 63 deletions(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index c3448087..d41e5d65 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -62,19 +62,26 @@ namespace enki // we derive class TaskPipe rather than typedef to get forward declaration working easily class TaskPipe : public LockLessMultiReadPipe {}; - enum ThreadState : uint8_t + enum ThreadState : int32_t { THREAD_STATE_RUNNING, - THREAD_STATE_WAIT_NEW_TASKS, THREAD_STATE_WAIT_TASK_COMPLETION, + THREAD_STATE_EXTERNAL_REGISTERED, + THREAD_STATE_EXTERNAL_UNREGISTERED, + THREAD_STATE_WAIT_NEW_TASKS, THREAD_STATE_STOPPED, }; struct ThreadArgs { - uint32_t threadNum; - TaskScheduler* pTaskScheduler; - ThreadState threadState; + uint32_t threadNum; + TaskScheduler* pTaskScheduler; + }; + + struct ThreadDataStore + { + uint32_t threadNum; + std::atomic threadState; }; class PinnedTaskList : public LocklessMultiWriteIntrusiveList {}; @@ -133,7 +140,42 @@ static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_) ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks() { - return &m_ProfilerCallbacks; + return &m_Config.profilerCallbacks; +} + +ENKITS_API bool enki::TaskScheduler::RegisterExternalTaskThread() +{ + bool bRegistered = false; + while( !bRegistered && m_NumExternalTaskThreadsRegistered < (int32_t)m_Config.numExternalTaskThreads ) + { + for(uint32_t thread = 1; thread <= m_Config.numExternalTaskThreads; ++thread ) + { + // ignore our thread + ThreadState threadStateExpected = THREAD_STATE_EXTERNAL_UNREGISTERED; + if( m_pThreadDataStore[thread].threadState.compare_exchange_strong( + threadStateExpected, THREAD_STATE_EXTERNAL_REGISTERED ) ) + { + ++m_NumExternalTaskThreadsRegistered; + gtl_threadNum = thread; + bRegistered = true; + break; + } + } + } + return bRegistered; +} + +ENKITS_API void enki::TaskScheduler::DeRegisterExternalTaskThread() +{ + assert( gtl_threadNum ); + --m_NumExternalTaskThreadsRegistered; + m_pThreadDataStore[gtl_threadNum].threadState = THREAD_STATE_EXTERNAL_UNREGISTERED; + gtl_threadNum = 0; +} + +ENKITS_API uint32_t enki::TaskScheduler::GetNumRegisteredExternalTaskThreads() +{ + return m_NumExternalTaskThreadsRegistered; } @@ -143,7 +185,7 @@ void TaskScheduler::TaskingThreadFunction( const ThreadArgs& args_ ) TaskScheduler* pTS = args_.pTaskScheduler; gtl_threadNum = threadNum; - SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum ); + SafeCallback( pTS->m_Config.profilerCallbacks.threadStart, threadNum ); uint32_t spinCount = 0; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. @@ -170,9 +212,9 @@ void TaskScheduler::TaskingThreadFunction( const ThreadArgs& args_ ) } } - pTS->m_NumThreadsRunning.fetch_sub( 1, std::memory_order_release ); - SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum ); - pTS->m_pThreadArgStore[threadNum].threadState = THREAD_STATE_STOPPED; + pTS->m_NumInternalTaskThreadsRunning.fetch_sub( 1, std::memory_order_release ); + SafeCallback( pTS->m_Config.profilerCallbacks.threadStop, threadNum ); + pTS->m_pThreadDataStore[threadNum].threadState = THREAD_STATE_STOPPED; return; } @@ -185,6 +227,8 @@ void TaskScheduler::StartThreads() return; } + m_NumThreads = m_Config.numTaskThreadsToCreate + m_Config.numExternalTaskThreads + 1; + for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { m_pPipesPerThread[ priority ] = new TaskPipe[ m_NumThreads ]; @@ -195,23 +239,27 @@ void TaskScheduler::StartThreads() m_pTaskCompleteSemaphore = SemaphoreCreate(); // we create one less thread than m_NumThreads as the main thread counts as one - m_pThreadArgStore = new ThreadArgs[m_NumThreads]; + m_pThreadDataStore = new ThreadDataStore[m_NumThreads]; m_pThreads = new std::thread*[m_NumThreads]; - m_pThreadArgStore[0].threadNum = 0; - m_pThreadArgStore[0].pTaskScheduler = this; - m_pThreadArgStore[0].threadState = THREAD_STATE_RUNNING; - m_NumThreadsRunning = 1; // account for main thread m_bRunning = 1; - for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) + for( uint32_t thread = 0; thread < m_Config.numExternalTaskThreads + 1; ++thread ) + { + m_pThreadDataStore[thread].threadNum = thread; + m_pThreadDataStore[thread].threadState = THREAD_STATE_EXTERNAL_UNREGISTERED; + m_pThreads[thread] = nullptr; + } + for( uint32_t thread = m_Config.numExternalTaskThreads + 1; thread < m_NumThreads; ++thread ) { - m_pThreadArgStore[thread].threadNum = thread; - m_pThreadArgStore[thread].pTaskScheduler = this; - m_pThreadArgStore[thread].threadState = THREAD_STATE_RUNNING; - m_pThreads[thread] = new std::thread( TaskingThreadFunction, m_pThreadArgStore[thread] ); - ++m_NumThreadsRunning; + m_pThreadDataStore[thread].threadNum = thread; + m_pThreadDataStore[thread].threadState = THREAD_STATE_RUNNING; + m_pThreads[thread] = new std::thread( TaskingThreadFunction, ThreadArgs{ thread, this } ); + ++m_NumInternalTaskThreadsRunning; } + // Thread 0 is intialize thread and registered + m_pThreadDataStore[0].threadState = THREAD_STATE_EXTERNAL_REGISTERED; + // ensure we have sufficient tasks to equally fill either all threads including main // or just the threads we've launched, this is outside the firstinit as we want to be able // to runtime change it @@ -239,22 +287,24 @@ void TaskScheduler::StopThreads( bool bWait_ ) { // wait for them threads quit before deleting data m_bRunning = 0; - while( bWait_ && m_NumThreadsRunning > 1) + while( bWait_ && m_NumInternalTaskThreadsRunning ) { // keep firing event to ensure all threads pick up state of m_bRunning WakeThreadsForNewTasks(); } - for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) + // detach threads starting with thread 1 (as 0 is initialization thread). + for( uint32_t thread = m_Config.numExternalTaskThreads + 1; thread < m_NumThreads; ++thread ) { + assert( m_pThreads[thread] ); m_pThreads[thread]->detach(); delete m_pThreads[thread]; } m_NumThreads = 0; - delete[] m_pThreadArgStore; + delete[] m_pThreadDataStore; delete[] m_pThreads; - m_pThreadArgStore = 0; + m_pThreadDataStore = 0; m_pThreads = 0; SemaphoreDelete( m_pNewTaskSemaphore ); @@ -265,7 +315,8 @@ void TaskScheduler::StopThreads( bool bWait_ ) m_bHaveThreads = false; m_NumThreadsWaitingForNewTasks = 0; m_NumThreadsWaitingForTaskCompletion = 0; - m_NumThreadsRunning = 0; + m_NumInternalTaskThreadsRunning = 0; + m_NumExternalTaskThreadsRegistered = 0; for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { @@ -375,12 +426,13 @@ void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) bool bHaveTasks = HaveTasks( threadNum ); if( !bHaveTasks ) { - SafeCallback( m_ProfilerCallbacks.waitForNewTaskSuspendStart, threadNum ); - m_pThreadArgStore[threadNum].threadState = THREAD_STATE_WAIT_NEW_TASKS; + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStart, threadNum ); + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; + m_pThreadDataStore[threadNum].threadState = THREAD_STATE_WAIT_NEW_TASKS; m_NumThreadsWaitingForNewTasks.fetch_add( 1, std::memory_order_acquire ); SemaphoreWait( *m_pNewTaskSemaphore ); - m_pThreadArgStore[threadNum].threadState = THREAD_STATE_RUNNING; - SafeCallback( m_ProfilerCallbacks.waitForNewTaskSuspendStop, threadNum ); + m_pThreadDataStore[threadNum].threadState = prevThreadState; + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStop, threadNum ); } } @@ -395,16 +447,17 @@ void TaskScheduler::WaitForTaskCompletion( const ICompletable* pCompletable_, ui } else { - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteSuspendStart, threadNum_ ); - m_pThreadArgStore[threadNum_].threadState = THREAD_STATE_WAIT_TASK_COMPLETION; + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStart, threadNum_ ); + ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState; + m_pThreadDataStore[threadNum_].threadState = THREAD_STATE_WAIT_TASK_COMPLETION; SemaphoreWait( *m_pTaskCompleteSemaphore ); if( !pCompletable_->GetIsComplete() ) { // This thread which may not the one which was supposed to be awoken WakeThreadsForTaskCompletion(); } - m_pThreadArgStore[threadNum_].threadState = THREAD_STATE_RUNNING; - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteSuspendStop, threadNum_ ); + m_pThreadDataStore[threadNum_].threadState = prevThreadState; + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStop, threadNum_ ); } pCompletable_->m_WaitingForTaskCount.fetch_sub( 1, std::memory_order_release ); @@ -473,9 +526,25 @@ void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, u WakeThreadsForNewTasks(); } +ENKITS_API TaskSchedulerConfig enki::TaskScheduler::GetConfig() const +{ + TaskSchedulerConfig config; + if( m_bHaveThreads ) + { + config.numTaskThreadsToCreate = m_NumThreads; + config.numExternalTaskThreads = 0; + } + return config; +} + void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) { assert( pTaskSet_->m_RunningCount == 0 ); + uint32_t threadNum = gtl_threadNum; + + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; + m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; + pTaskSet_->m_RunningCount.store( 0, std::memory_order_relaxed ); // divide task up and add to pipe @@ -489,7 +558,10 @@ void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) subTask.pTask = pTaskSet_; subTask.partition.start = 0; subTask.partition.end = pTaskSet_->m_SetSize; - SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit ); + SplitAndAddTask( threadNum, subTask, rangeToSplit ); + + m_pThreadDataStore[threadNum].threadState = prevThreadState; + } void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ ) @@ -504,10 +576,13 @@ void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ ) void TaskScheduler::RunPinnedTasks() { uint32_t threadNum = gtl_threadNum; + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; + m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { RunPinnedTasks( threadNum, priority ); } + m_pThreadDataStore[threadNum].threadState = prevThreadState; } void TaskScheduler::RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ) @@ -533,9 +608,14 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas uint32_t threadNum = gtl_threadNum; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. + // waiting for a task is equivalent to 'running' + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; + m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; + + if( pCompletable_ && !pCompletable_->GetIsComplete() ) { - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteStart, threadNum ); + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteStart, threadNum ); // We need to ensure that the task we're waiting on can complete even if we're the only thread, // so we clamp the priorityOfLowestToRun_ to no smaller than the task we're waiting for priorityOfLowestToRun_ = std::max( priorityOfLowestToRun_, pCompletable_->m_Priority ); @@ -563,7 +643,7 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas } } - SafeCallback( m_ProfilerCallbacks.waitForTaskCompleteStop, threadNum ); + SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteStop, threadNum ); } else { @@ -575,6 +655,9 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas } } } + + m_pThreadDataStore[threadNum].threadState = prevThreadState; + } class TaskSchedulerWaitTask : public IPinnedTask @@ -590,11 +673,11 @@ void TaskScheduler::WaitforAll() bool bHaveTasks = true; uint32_t threadNum = gtl_threadNum; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. - int32_t numThreadsRunning = m_NumThreadsRunning.load( std::memory_order_relaxed ) - 1; // account for this thread + int32_t numOtherThreadsRunning = 0; // account for this thread uint32_t spinCount = 0; TaskSchedulerWaitTask dummyWaitTask; dummyWaitTask.threadNum = 0; - while( bHaveTasks || m_NumThreadsWaitingForNewTasks.load( std::memory_order_relaxed ) < numThreadsRunning ) + while( bHaveTasks || numOtherThreadsRunning ) { bHaveTasks = TryRunTask( threadNum, hintPipeToCheck_io ); if( bHaveTasks ) @@ -613,7 +696,7 @@ void TaskScheduler::WaitforAll() { dummyWaitTask.threadNum = ( dummyWaitTask.threadNum + 1 ) % m_NumThreads; } - } while( countThreadsToCheck && m_pThreadArgStore[ dummyWaitTask.threadNum ].threadState != THREAD_STATE_RUNNING ); + } while( countThreadsToCheck && m_pThreadDataStore[ dummyWaitTask.threadNum ].threadState != THREAD_STATE_RUNNING ); assert( dummyWaitTask.threadNum != threadNum ); AddPinnedTask( &dummyWaitTask ); WaitForTaskCompletion( &dummyWaitTask, threadNum ); @@ -624,6 +707,28 @@ void TaskScheduler::WaitforAll() uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER; SpinWait( spinBackoffCount ); } + + // count threads running + numOtherThreadsRunning = 0; + for(uint32_t thread = 0; thread < m_NumThreads; ++thread ) + { + // ignore our thread + if( thread != threadNum ) + { + switch( m_pThreadDataStore[thread].threadState ) + { + case THREAD_STATE_RUNNING: + case THREAD_STATE_WAIT_TASK_COMPLETION: + ++numOtherThreadsRunning; + break; + case THREAD_STATE_EXTERNAL_REGISTERED: + case THREAD_STATE_EXTERNAL_UNREGISTERED: + case THREAD_STATE_WAIT_NEW_TASKS: + case THREAD_STATE_STOPPED: + break; + }; + } + } } } @@ -652,15 +757,15 @@ TaskScheduler::TaskScheduler() : m_pPipesPerThread() , m_pPinnedTaskListPerThread() , m_NumThreads(0) - , m_pThreadArgStore(NULL) + , m_pThreadDataStore(NULL) , m_pThreads(NULL) , m_bRunning(0) - , m_NumThreadsRunning(0) + , m_NumInternalTaskThreadsRunning(0) , m_NumThreadsWaitingForNewTasks(0) , m_NumThreadsWaitingForTaskCompletion(0) , m_NumPartitions(0) , m_bHaveThreads(false) - , m_ProfilerCallbacks() + , m_NumExternalTaskThreadsRegistered(0) { } @@ -669,23 +774,26 @@ TaskScheduler::~TaskScheduler() StopThreads( true ); // Stops threads, waiting for them. } -void TaskScheduler::Initialize( uint32_t numThreads_ ) +void TaskScheduler::Initialize( uint32_t numThreadsTotal_ ) { - assert( numThreads_ ); + assert( numThreadsTotal_ >= 1 ); + m_Config.numTaskThreadsToCreate = numThreadsTotal_ - 1; + m_Config.numExternalTaskThreads = 0; StopThreads( true ); // Stops threads, waiting for them. + StartThreads();} - m_NumThreads = numThreads_; - +ENKITS_API void enki::TaskScheduler::Initialize( TaskSchedulerConfig config_ ) +{ + m_Config = config_; + StopThreads( true ); // Stops threads, waiting for them. StartThreads(); } -void TaskScheduler::Initialize() +void TaskScheduler::Initialize() { Initialize( std::thread::hardware_concurrency() ); } - - // Semaphore implementation #ifdef _WIN32 diff --git a/src/TaskScheduler.h b/src/TaskScheduler.h index e32dca03..0d6d314c 100644 --- a/src/TaskScheduler.h +++ b/src/TaskScheduler.h @@ -57,9 +57,12 @@ namespace enki class TaskPipe; class PinnedTaskList; struct ThreadArgs; + struct ThreadDataStore; struct SubTaskSet; struct semaphoreid_t; + uint32_t GetNumHardwareThreads(); + enum TaskPriority { @@ -194,25 +197,46 @@ namespace enki ProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended }; + // TaskSchedulerConfig - configuration struct for advanced Initialize + struct TaskSchedulerConfig + { + // numTaskThreadsToCreate - Number of tasking threads the task scheduler will create. Must be > 0. + // Defaults to GetNumHardwareThreads()-1 threads as thread which calls initialize is thread 0. + uint32_t numTaskThreadsToCreate = GetNumHardwareThreads()-1; + + // numExternalTaskThreads - Advanced use. Number of external threads which need to use TaskScheduler API. + // See TaskScheduler::RegisterExternalTaskThread() for usage. + // Defaults to 0, the thread used to initialize the TaskScheduler. + uint32_t numExternalTaskThreads = 0; + + ProfilerCallbacks profilerCallbacks = {}; + }; + class TaskScheduler { public: ENKITS_API TaskScheduler(); ENKITS_API ~TaskScheduler(); - // Call either Initialize() or Initialize( numThreads_ ) before adding tasks. + // Call an Initialize function before adding tasks. - // Initialize() will create GetNumHardwareThreads()-1 threads, which is + // Initialize() will create GetNumHardwareThreads()-1 tasking threads, which is // sufficient to fill the system when including the main thread. // Initialize can be called multiple times - it will wait for completion // before re-initializing. ENKITS_API void Initialize(); - // Initialize( numThreads_ ) - numThreads_ (must be > 0) - // will create numThreads_-1 threads, as thread 0 is + // Initialize( numThreadsTotal_ ) + // will create numThreadsTotal_-1 threads, as thread 0 is // the thread on which the initialize was called. - ENKITS_API void Initialize( uint32_t numThreads_ ); + // numThreadsTotal_ must be > 0 + ENKITS_API void Initialize( uint32_t numThreadsTotal_ ); + + // Initialize with advanced TaskSchedulerConfig settings. See TaskSchedulerConfig. + ENKITS_API void Initialize( TaskSchedulerConfig config_ ); + // Get config. Can be called before Initialize to get the defaults. + ENKITS_API TaskSchedulerConfig GetConfig() const; // Adds the TaskSet to pipe and returns if the pipe is not full. // If the pipe is full, pTaskSet is run. @@ -235,7 +259,7 @@ namespace enki // Only wait for child tasks of the current task otherwise a deadlock could occur. ENKITS_API void WaitforTask( const ICompletable* pCompletable_, enki::TaskPriority priorityOfLowestToRun_ = TaskPriority(TASK_PRIORITY_NUM - 1) ); - // WaitforTaskSet, deprecated interface use WaitforTask + // DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } // Waits for all task sets to complete - not guaranteed to work unless we know we @@ -247,18 +271,38 @@ namespace enki // This function can be safely called even if TaskScheduler::Initialize() has not been called. ENKITS_API void WaitforAllAndShutdown(); - // Returns the number of threads created for running tasks + 1 - // to account for the main thread. + // Returns the number of threads created for running tasks + number of external threads + // plus 1 to account for the thread used to initialize the task scheduler. + // Equivalent to config values: numTaskThreadsToCreate + numExternalTaskThreads + 1. + // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetNumTaskThreads() const; // Returns the current task threadNum - // Will return 0 for main thread and all other non-enkiTS threads, and < GetNumTaskThreads() + // Will return 0 for thread which initialized the task scheduler, + // and all other non-enkiTS threads which have not been registered ( see RegisterExternalTaskThread() ), + // and < GetNumTaskThreads() for all threads. + // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetThreadNum() const; + // DEPRECATED - GetProfilerCallbacks. Use TaskSchedulerConfig to initialize. // Returns the ProfilerCallbacks structure so that it can be modified to - // set the callbacks. + // set the callbacks. Should be set prior to initialization. ENKITS_API ProfilerCallbacks* GetProfilerCallbacks(); + // Call on a thread to register the thread to use the TaskScheduling API. + // This is implicitly done for the thread which initializes the TaskScheduler + // Intended for developers who have threads who need to call the TaskScheduler API + // Returns true if successfull, false if not. + // Can only have numExternalTaskThreads registered at any one time, which must be set + // at initialization time. + ENKITS_API bool RegisterExternalTaskThread(); + + // Call on a thread on which RegisterExternalTaskThread has been called to deregister that thread. + ENKITS_API void DeRegisterExternalTaskThread(); + + // Get the number of registered external task threads. + ENKITS_API uint32_t GetNumRegisteredExternalTaskThreads(); + private: static void TaskingThreadFunction( const ThreadArgs& args_ ); bool HaveTasks( uint32_t threadNum_ ); @@ -277,10 +321,10 @@ namespace enki PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ]; uint32_t m_NumThreads; - ThreadArgs* m_pThreadArgStore; + ThreadDataStore* m_pThreadDataStore; std::thread** m_pThreads; std::atomic m_bRunning; - std::atomic m_NumThreadsRunning; + std::atomic m_NumInternalTaskThreadsRunning; std::atomic m_NumThreadsWaitingForNewTasks; std::atomic m_NumThreadsWaitingForTaskCompletion; uint32_t m_NumPartitions; @@ -288,7 +332,8 @@ namespace enki semaphoreid_t* m_pTaskCompleteSemaphore; uint32_t m_NumInitialPartitions; bool m_bHaveThreads; - ProfilerCallbacks m_ProfilerCallbacks; + TaskSchedulerConfig m_Config; + std::atomic m_NumExternalTaskThreadsRegistered; TaskScheduler( const TaskScheduler& nocopy ); TaskScheduler& operator=( const TaskScheduler& nocopy ); From 5d732461716cc6d611a9d1c18a60f370ab70313f Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 11:04:33 +0100 Subject: [PATCH 02/19] Removed threadNum from ThreadDataStore as not required. --- src/TaskScheduler.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index d41e5d65..951d8e09 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -80,7 +80,6 @@ namespace enki struct ThreadDataStore { - uint32_t threadNum; std::atomic threadState; }; @@ -245,13 +244,11 @@ void TaskScheduler::StartThreads() for( uint32_t thread = 0; thread < m_Config.numExternalTaskThreads + 1; ++thread ) { - m_pThreadDataStore[thread].threadNum = thread; m_pThreadDataStore[thread].threadState = THREAD_STATE_EXTERNAL_UNREGISTERED; m_pThreads[thread] = nullptr; } for( uint32_t thread = m_Config.numExternalTaskThreads + 1; thread < m_NumThreads; ++thread ) { - m_pThreadDataStore[thread].threadNum = thread; m_pThreadDataStore[thread].threadState = THREAD_STATE_RUNNING; m_pThreads[thread] = new std::thread( TaskingThreadFunction, ThreadArgs{ thread, this } ); ++m_NumInternalTaskThreadsRunning; From 2727f30662abe868cf5e4ff64c55da0578b039bf Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 11:04:55 +0100 Subject: [PATCH 03/19] Prevent false sharing for threadstate. --- src/TaskScheduler.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index 951d8e09..7132ae31 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -36,6 +36,7 @@ static const uint32_t PIPESIZE_LOG2 = 8; static const uint32_t SPIN_COUNT = 10; static const uint32_t SPIN_BACKOFF_MULTIPLIER = 100; static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8; +static const uint32_t CACHE_LINE_SIZE = 64; // awaiting std::hardware_constructive_interference_size // thread_local not well supported yet by C++11 compilers. #ifdef _MSC_VER @@ -81,7 +82,9 @@ namespace enki struct ThreadDataStore { std::atomic threadState; + char prevent_false_Share[ CACHE_LINE_SIZE - sizeof(std::atomic) ]; }; + static_assert( sizeof( ThreadDataStore ) >= CACHE_LINE_SIZE, "ThreadDataStore may exhibit false sharing" ); class PinnedTaskList : public LocklessMultiWriteIntrusiveList {}; From 9552470139905a74ec11747e0dc5ad0e86fd2372 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 11:05:21 +0100 Subject: [PATCH 04/19] Set std::memory_order on thread state operations. --- src/TaskScheduler.cpp | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index 7132ae31..f568e663 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -171,7 +171,7 @@ ENKITS_API void enki::TaskScheduler::DeRegisterExternalTaskThread() { assert( gtl_threadNum ); --m_NumExternalTaskThreadsRegistered; - m_pThreadDataStore[gtl_threadNum].threadState = THREAD_STATE_EXTERNAL_UNREGISTERED; + m_pThreadDataStore[gtl_threadNum].threadState.store( THREAD_STATE_EXTERNAL_UNREGISTERED, std::memory_order_release ); gtl_threadNum = 0; } @@ -216,7 +216,7 @@ void TaskScheduler::TaskingThreadFunction( const ThreadArgs& args_ ) pTS->m_NumInternalTaskThreadsRunning.fetch_sub( 1, std::memory_order_release ); SafeCallback( pTS->m_Config.profilerCallbacks.threadStop, threadNum ); - pTS->m_pThreadDataStore[threadNum].threadState = THREAD_STATE_STOPPED; + pTS->m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_STOPPED, std::memory_order_release ); return; } @@ -427,11 +427,11 @@ void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) if( !bHaveTasks ) { SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStart, threadNum ); - ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; - m_pThreadDataStore[threadNum].threadState = THREAD_STATE_WAIT_NEW_TASKS; + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_WAIT_NEW_TASKS, std::memory_order_relaxed ); // rely on fetch_add acquire for order m_NumThreadsWaitingForNewTasks.fetch_add( 1, std::memory_order_acquire ); SemaphoreWait( *m_pNewTaskSemaphore ); - m_pThreadDataStore[threadNum].threadState = prevThreadState; + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStop, threadNum ); } } @@ -448,15 +448,17 @@ void TaskScheduler::WaitForTaskCompletion( const ICompletable* pCompletable_, ui else { SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStart, threadNum_ ); - ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState; - m_pThreadDataStore[threadNum_].threadState = THREAD_STATE_WAIT_TASK_COMPLETION; + ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum_].threadState.store( THREAD_STATE_WAIT_TASK_COMPLETION, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); + SemaphoreWait( *m_pTaskCompleteSemaphore ); if( !pCompletable_->GetIsComplete() ) { // This thread which may not the one which was supposed to be awoken WakeThreadsForTaskCompletion(); } - m_pThreadDataStore[threadNum_].threadState = prevThreadState; + m_pThreadDataStore[threadNum_].threadState.store( prevThreadState, std::memory_order_release ); SafeCallback( m_Config.profilerCallbacks.waitForTaskCompleteSuspendStop, threadNum_ ); } @@ -542,10 +544,11 @@ void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) assert( pTaskSet_->m_RunningCount == 0 ); uint32_t threadNum = gtl_threadNum; - ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; - m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; - + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); pTaskSet_->m_RunningCount.store( 0, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); + // divide task up and add to pipe pTaskSet_->m_RangeToRun = pTaskSet_->m_SetSize / m_NumPartitions; @@ -560,7 +563,7 @@ void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet_ ) subTask.partition.end = pTaskSet_->m_SetSize; SplitAndAddTask( threadNum, subTask, rangeToSplit ); - m_pThreadDataStore[threadNum].threadState = prevThreadState; + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); } @@ -576,13 +579,14 @@ void TaskScheduler::AddPinnedTask( IPinnedTask* pTask_ ) void TaskScheduler::RunPinnedTasks() { uint32_t threadNum = gtl_threadNum; - ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; - m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); for( int priority = 0; priority < TASK_PRIORITY_NUM; ++priority ) { RunPinnedTasks( threadNum, priority ); } - m_pThreadDataStore[threadNum].threadState = prevThreadState; + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); } void TaskScheduler::RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ) @@ -608,9 +612,10 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas uint32_t threadNum = gtl_threadNum; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. - // waiting for a task is equivalent to 'running' - ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState; - m_pThreadDataStore[threadNum].threadState = THREAD_STATE_RUNNING; + // waiting for a task is equivalent to 'running' for thread state purpose as we may run tasks whilst waiting + ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_RUNNING, std::memory_order_relaxed ); + std::atomic_thread_fence(std::memory_order_acquire); if( pCompletable_ && !pCompletable_->GetIsComplete() ) @@ -656,7 +661,7 @@ void TaskScheduler::WaitforTask( const ICompletable* pCompletable_, enki::Tas } } - m_pThreadDataStore[threadNum].threadState = prevThreadState; + m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); } From 87f48ee68f8a3f874e0cdb725d415522ee3f6de0 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 11:32:11 +0100 Subject: [PATCH 05/19] Compile fix for gcc --- src/TaskScheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index f568e663..7000a6d8 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -720,7 +720,7 @@ void TaskScheduler::WaitforAll() // ignore our thread if( thread != threadNum ) { - switch( m_pThreadDataStore[thread].threadState ) + switch( m_pThreadDataStore[thread].threadState.load( std::memory_order_acquire ) ) { case THREAD_STATE_RUNNING: case THREAD_STATE_WAIT_TASK_COMPLETION: From dfb90bf8a0ea0c6a644968e6bfe5abd5d99abeeb Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 14:15:27 +0100 Subject: [PATCH 06/19] Added example ExternalTaskThread --- CMakeLists.txt | 3 ++ example/ExternalTaskThread.cpp | 91 ++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 example/ExternalTaskThread.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5200bf33..438d30c5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -95,6 +95,9 @@ if( ENKITS_BUILD_EXAMPLES ) add_executable( TestWaitforTask example/TestWaitforTask.cpp ) target_link_libraries(TestWaitforTask enkiTS ) + add_executable( ExternalTaskThread example/ExternalTaskThread.cpp ) + target_link_libraries(ExternalTaskThread enkiTS ) + if( ENKITS_BUILD_C_INTERFACE ) add_executable( ParallelSum_c example/ParallelSum_c.c ) target_link_libraries(ParallelSum_c enkiTS ) diff --git a/example/ExternalTaskThread.cpp b/example/ExternalTaskThread.cpp new file mode 100644 index 00000000..35d8fd44 --- /dev/null +++ b/example/ExternalTaskThread.cpp @@ -0,0 +1,91 @@ +// Copyright (c) 2013 Doug Binks +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgement in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. + +#include "TaskScheduler.h" + +#include +#include + +using namespace enki; + +TaskScheduler g_TS; +static std::atomic g_Run; + + +struct ParallelTaskSet : ITaskSet +{ + ParallelTaskSet() { m_SetSize = 100; } + + virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + { + printf(" Run %d: This could run on any thread, currently thread %d\n", g_Run.load(), threadnum); + + // sleep used as a 'pretend' workload + std::chrono::milliseconds sleepTime( range.end - range.start ); + std::this_thread::sleep_for( sleepTime ); + } +}; + +// Example thread function +// May want to use threads for blocking IO, during which enkiTS task threads can do work +void threadFunction() +{ + bool bRegistered = g_TS.RegisterExternalTaskThread(); + assert( bRegistered ); + if( bRegistered ) + { + // sleep for a while instead of doing something such as file IO + std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + + + ParallelTaskSet task; + g_TS.AddTaskSetToPipe( &task ); + g_TS.WaitforTask( &task); + g_TS.DeRegisterExternalTaskThread(); + } +} + +static const int REPEATS = 5; +static const uint32_t NUMEXTERNALTHREADS = 5; + +int main(int argc, const char * argv[]) +{ + enki::TaskSchedulerConfig config; + config.numExternalTaskThreads = NUMEXTERNALTHREADS; + + std::thread threads[NUMEXTERNALTHREADS]; + g_TS.Initialize( config ); + + + for( g_Run = 0; g_Run< REPEATS; ++g_Run ) + { + printf("Run %d\n", g_Run.load() ); + + for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) + { + threads[ iThread ] = std::thread( threadFunction ); + } + + for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) + { + threads[ iThread ].join(); + } + } + + return 0; +} From eb25e81780eb45d80ba57419d491d11e75b735c3 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Thu, 24 Oct 2019 16:12:05 +0100 Subject: [PATCH 07/19] Capped partitioning to that calculated from maximum available hardware parallelism. --- src/TaskScheduler.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index 7000a6d8..bb906505 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -270,8 +270,13 @@ void TaskScheduler::StartThreads() } else { - m_NumPartitions = m_NumThreads * (m_NumThreads - 1); - m_NumInitialPartitions = m_NumThreads - 1; + // There could be more threads than hardware threads if external threads are + // being intended for blocking functionality such as io etc. + // We only need to partition for a maximum of the available processor parallelism. + uint32_t numThreadsToPartitionFor = m_NumThreads; + numThreadsToPartitionFor = std::min( m_NumThreads, GetNumHardwareThreads() ); + m_NumPartitions = numThreadsToPartitionFor * (numThreadsToPartitionFor - 1); + m_NumInitialPartitions = numThreadsToPartitionFor - 1; if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS ) { m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS; From 74cf4b5e717da86480af8a075586acfd046316ec Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 10:52:33 +0100 Subject: [PATCH 08/19] Improved ExternalTaskThread utility as test for functionality. --- example/ExternalTaskThread.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/example/ExternalTaskThread.cpp b/example/ExternalTaskThread.cpp index 35d8fd44..c5312c2f 100644 --- a/example/ExternalTaskThread.cpp +++ b/example/ExternalTaskThread.cpp @@ -43,14 +43,14 @@ struct ParallelTaskSet : ITaskSet // Example thread function // May want to use threads for blocking IO, during which enkiTS task threads can do work -void threadFunction() +void threadFunction( uint32_t num_ ) { bool bRegistered = g_TS.RegisterExternalTaskThread(); assert( bRegistered ); if( bRegistered ) { // sleep for a while instead of doing something such as file IO - std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); ParallelTaskSet task; @@ -78,9 +78,13 @@ int main(int argc, const char * argv[]) for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) { - threads[ iThread ] = std::thread( threadFunction ); + threads[ iThread ] = std::thread( threadFunction, iThread ); } + // check that out of order Deregister / Register works... + threads[ 0 ].join(); + threads[ 0 ] = std::thread( threadFunction, 0 ); + for( uint32_t iThread = 0; iThread < NUMEXTERNALTHREADS; ++iThread ) { threads[ iThread ].join(); From 4cd616e7739c259e24123e02561a7a004fe583c9 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 11:02:20 +0100 Subject: [PATCH 09/19] Updated Readme for external threads --- README.md | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/README.md b/README.md index af6646c1..cd08a67b 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ For cmake, on Windows / Mac OS X / Linux with cmake installed, open a prompt in 1. *Up-front Allocation friendly* - enkiTS is designed for zero allocations during scheduling. 1. *Can pin tasks to a given thread* - enkiTS can schedule a task which will only be run on the specified thread. 1. *Can set task priorities* - Up to 5 task priorities can be configured via define ENKITS_TASK_PRIORITIES_NUM (defaults to 3). Higher priority tasks are run before lower priority ones. +1. **NEW** *Can register external threads to use with enkiTS* - Can configure enkiTS with numExternalTaskThreads which can be registered to use with the enkiTS API. ## Usage @@ -163,6 +164,49 @@ int main(int argc, const char * argv[]) { } ``` +External thread usage in C++ (full example in example/ExternalTaskThread.cpp) +```C +#include "TaskScheduler.h" + +enki::TaskScheduler g_TS; +struct ParallelTaskSet : ITaskSet +{ + virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + { + // Do something + } +}; + +void threadFunction() +{ + bool bRegistered = g_TS.RegisterExternalTaskThread(); + assert( bRegistered ); + if( bRegistered ) + { + // sleep for a while instead of doing something such as file IO + std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); + + ParallelTaskSet task; + g_TS.AddTaskSetToPipe( &task ); + g_TS.WaitforTask( &task); + g_TS.DeRegisterExternalTaskThread(); + } +} + +int main(int argc, const char * argv[]) +{ + enki::TaskSchedulerConfig config; + config.numExternalTaskThreads = 1; // we have one extra external thread + + g_TS.Initialize( config ); + + std::thread exampleThread( threadFunction ); + + exampleThread.join(); + + return 0; +} +``` C usage: ```C @@ -195,6 +239,7 @@ int main(int argc, const char * argv[]) { } ``` + ## Bindings - C# [EnkiTasks C#](https://github.com/nxrighthere/EnkiTasks-CSharp) From 4b2d593e3291284da1e6d177b4100e666022611f Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 11:29:10 +0100 Subject: [PATCH 10/19] Formatting changes. --- example/ExternalTaskThread.cpp | 6 +-- example/ParallelSum.cpp | 10 ++-- example/PinnedTask.cpp | 4 +- example/Priorities.cpp | 8 +-- example/TaskOverhead.cpp | 8 +-- example/TaskThroughput.cpp | 10 ++-- example/TestWaitforTask.cpp | 7 +-- src/TaskScheduler.cpp | 34 ++++++------- src/TaskScheduler.h | 92 +++++++++++++++++----------------- src/TaskScheduler_c.cpp | 4 +- 10 files changed, 92 insertions(+), 91 deletions(-) diff --git a/example/ExternalTaskThread.cpp b/example/ExternalTaskThread.cpp index c5312c2f..45293cd9 100644 --- a/example/ExternalTaskThread.cpp +++ b/example/ExternalTaskThread.cpp @@ -31,12 +31,12 @@ struct ParallelTaskSet : ITaskSet { ParallelTaskSet() { m_SetSize = 100; } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - printf(" Run %d: This could run on any thread, currently thread %d\n", g_Run.load(), threadnum); + printf(" Run %d: This could run on any thread, currently thread %d\n", g_Run.load(), threadnum_); // sleep used as a 'pretend' workload - std::chrono::milliseconds sleepTime( range.end - range.start ); + std::chrono::milliseconds sleepTime( range_.end - range_.start ); std::this_thread::sleep_for( sleepTime ); } }; diff --git a/example/ParallelSum.cpp b/example/ParallelSum.cpp index 0fcb23ba..19edccd1 100644 --- a/example/ParallelSum.cpp +++ b/example/ParallelSum.cpp @@ -60,15 +60,15 @@ struct ParallelSumTaskSet : ITaskSet memset( m_pPartialSums, 0, sizeof(Count)*m_NumPartialSums ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { assert( m_pPartialSums && m_NumPartialSums ); - uint64_t sum = m_pPartialSums[threadnum].count; - for( uint64_t i = range.start; i < range.end; ++i ) + uint64_t sum = m_pPartialSums[threadnum_].count; + for( uint64_t i = range_.start; i < range_.end; ++i ) { sum += i + 1; } - m_pPartialSums[threadnum].count = sum; + m_pPartialSums[threadnum_].count = sum; } }; @@ -83,7 +83,7 @@ struct ParallelReductionSumTaskSet : ITaskSet m_ParallelSumTaskSet.Init( g_TS.GetNumTaskThreads() ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { g_TS.AddTaskSetToPipe( &m_ParallelSumTaskSet ); g_TS.WaitforTask( &m_ParallelSumTaskSet ); diff --git a/example/PinnedTask.cpp b/example/PinnedTask.cpp index f32d16c5..805a6177 100644 --- a/example/PinnedTask.cpp +++ b/example/PinnedTask.cpp @@ -39,11 +39,11 @@ struct PinnedTaskHelloWorld : IPinnedTask struct ParallelTaskSet : ITaskSet { PinnedTaskHelloWorld pinnedTask; - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { g_TS.AddPinnedTask( &pinnedTask ); - printf("This could run on any thread, currently thread %d\n", threadnum); + printf("This could run on any thread, currently thread %d\n", threadnum_); g_TS.WaitforTask( &pinnedTask ); } diff --git a/example/Priorities.cpp b/example/Priorities.cpp index b7866783..2150b67c 100644 --- a/example/Priorities.cpp +++ b/example/Priorities.cpp @@ -27,24 +27,24 @@ struct ExampleTask : enki::ITaskSet { ExampleTask( uint32_t size_ ) { m_SetSize = size_; } - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { if( m_Priority == enki::TASK_PRIORITY_LOW ) { // fake slow task with timer Timer timer; timer.Start(); - double tWaittime = (double)( range.end - range.start ) * 100.; + double tWaittime = (double)( range_.end - range_.start ) * 100.; while( timer.GetTimeMS() < tWaittime ) { } printf( "\tLOW PRIORITY TASK range complete: thread: %d, start: %d, end: %d\n", - threadnum, range.start, range.end ); + threadnum_, range_.start, range_.end ); } else { printf( "HIGH PRIORITY TASK range complete: thread: %d, start: %d, end: %d\n", - threadnum, range.start, range.end ); + threadnum_, range_.start, range_.end ); } } }; diff --git a/example/TaskOverhead.cpp b/example/TaskOverhead.cpp index fd19b077..8660b2e8 100644 --- a/example/TaskOverhead.cpp +++ b/example/TaskOverhead.cpp @@ -62,15 +62,15 @@ struct ParallelSumTaskSet : ITaskSet memset( m_pPartialSums, 0, sizeof(Count)*m_NumPartialSums ); } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { assert( m_pPartialSums && m_NumPartialSums ); - uint64_t sum = m_pPartialSums[threadnum].count; - for( uint64_t i = range.start; i < range.end; ++i ) + uint64_t sum = m_pPartialSums[threadnum_].count; + for( uint64_t i = range_.start; i < range_.end; ++i ) { sum += i + 1; } - m_pPartialSums[threadnum].count = sum; + m_pPartialSums[threadnum_].count = sum; } }; diff --git a/example/TaskThroughput.cpp b/example/TaskThroughput.cpp index f84d9548..39b47c0a 100644 --- a/example/TaskThroughput.cpp +++ b/example/TaskThroughput.cpp @@ -48,9 +48,9 @@ struct ConsumeTask : ITaskSet static Count* pCount; static uint32_t numCount; - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - ++pCount[threadnum].count; + ++pCount[threadnum_].count; } static void Init() @@ -62,7 +62,7 @@ struct ConsumeTask : ITaskSet } }; -ConsumeTask ConsumeTask::tasks[numTasks]; +ConsumeTask ConsumeTask::tasks[numTasks]; ConsumeTask::Count* ConsumeTask::pCount = NULL; uint32_t ConsumeTask::numCount = 0; @@ -74,9 +74,9 @@ struct CreateTasks : ITaskSet { m_SetSize = numTasks; } - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - for( uint32_t i=range.start; i g_WaitCount(0); struct SlowTask : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { // fake slow task with timer Timer timer; @@ -44,7 +44,7 @@ struct SlowTask : enki::ITaskSet struct WaitingTask : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( enki::TaskSetPartition range_, uint32_t threadnum_ ) { int numWaitTasks = maxWaitasks - depth; for( int t = 0; t < numWaitTasks; ++t ) @@ -70,7 +70,8 @@ struct WaitingTask : enki::ITaskSet ++g_WaitCount; g_TS.WaitforTask( pWaitingTasks[t] ); } - printf( "\tIteration %d: WaitingTask depth %d complete: thread: %d\n\t\tWaits: %d blocking waits: %d\n", g_Iteration, depth, threadnum, g_WaitCount.load(), g_WaitForTaskCompletion.load() ); + printf( "\tIteration %d: WaitingTask depth %d complete: thread: %d\n\t\tWaits: %d blocking waits: %d\n", + g_Iteration, depth, threadnum_, g_WaitCount.load(), g_WaitForTaskCompletion.load() ); } virtual ~WaitingTask() diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index bb906505..a856fec9 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -96,7 +96,7 @@ namespace enki namespace { - SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ ) + SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ ) { SubTaskSet splitTask = subTask_; uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start; @@ -132,11 +132,11 @@ namespace #endif } -static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_) +static void SafeCallback( ProfilerCallbackFunc func_, uint32_t threadnum_ ) { if( func_ != nullptr ) { - func_(threadnum_); + func_( threadnum_ ); } } @@ -345,21 +345,21 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_i return false; } -bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ) +bool TaskScheduler::TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ) { // Run any tasks for this thread - RunPinnedTasks( threadNum, priority_ ); + RunPinnedTasks( threadNum_, priority_ ); // check for tasks SubTaskSet subTask; - bool bHaveTask = m_pPipesPerThread[ priority_ ][ threadNum ].WriterTryReadFront( &subTask ); + bool bHaveTask = m_pPipesPerThread[ priority_ ][ threadNum_ ].WriterTryReadFront( &subTask ); uint32_t threadToCheck = hintPipeToCheck_io_; uint32_t checkCount = 0; while( !bHaveTask && checkCount < m_NumThreads ) { threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads; - if( threadToCheck != threadNum ) + if( threadToCheck != threadNum_ ) { bHaveTask = m_pPipesPerThread[ priority_ ][ threadToCheck ].ReaderTryReadBack( &subTask ); } @@ -375,8 +375,8 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t if( subTask.pTask->m_RangeToRun < partitionSize ) { SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun ); - SplitAndAddTask( threadNum, subTask, subTask.pTask->m_RangeToRun ); - taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum ); + SplitAndAddTask( threadNum_, subTask, subTask.pTask->m_RangeToRun ); + taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum_ ); int prevCount = taskToRun.pTask->m_RunningCount.fetch_sub(1,std::memory_order_release ); if( 1 == prevCount && taskToRun.pTask->m_WaitingForTaskCount.load( std::memory_order_acquire ) ) @@ -387,7 +387,7 @@ bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t priority_, uint32_t else { // the task has already been divided up by AddTaskSetToPipe, so just run it - subTask.pTask->ExecuteRange( subTask.partition, threadNum ); + subTask.pTask->ExecuteRange( subTask.partition, threadNum_ ); int prevCount = subTask.pTask->m_RunningCount.fetch_sub(1,std::memory_order_release ); if( 1 == prevCount && subTask.pTask->m_WaitingForTaskCount.load( std::memory_order_acquire ) ) @@ -420,7 +420,7 @@ bool TaskScheduler::HaveTasks( uint32_t threadNum_ ) return false; } -void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) +void TaskScheduler::WaitForNewTasks( uint32_t threadNum_ ) { // We incrememt the number of threads waiting here in order // to ensure that the check for tasks occurs after the increment @@ -428,16 +428,16 @@ void TaskScheduler::WaitForNewTasks( uint32_t threadNum ) // This will occasionally result in threads being mistakenly awoken, // but they will then go back to sleep. - bool bHaveTasks = HaveTasks( threadNum ); + bool bHaveTasks = HaveTasks( threadNum_ ); if( !bHaveTasks ) { - SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStart, threadNum ); - ThreadState prevThreadState = m_pThreadDataStore[threadNum].threadState.load( std::memory_order_relaxed ); - m_pThreadDataStore[threadNum].threadState.store( THREAD_STATE_WAIT_NEW_TASKS, std::memory_order_relaxed ); // rely on fetch_add acquire for order + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStart, threadNum_ ); + ThreadState prevThreadState = m_pThreadDataStore[threadNum_].threadState.load( std::memory_order_relaxed ); + m_pThreadDataStore[threadNum_].threadState.store( THREAD_STATE_WAIT_NEW_TASKS, std::memory_order_relaxed ); // rely on fetch_add acquire for order m_NumThreadsWaitingForNewTasks.fetch_add( 1, std::memory_order_acquire ); SemaphoreWait( *m_pNewTaskSemaphore ); - m_pThreadDataStore[threadNum].threadState.store( prevThreadState, std::memory_order_release ); - SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStop, threadNum ); + m_pThreadDataStore[threadNum_].threadState.store( prevThreadState, std::memory_order_release ); + SafeCallback( m_Config.profilerCallbacks.waitForNewTaskSuspendStop, threadNum_ ); } } diff --git a/src/TaskScheduler.h b/src/TaskScheduler.h index 0d6d314c..07cbe899 100644 --- a/src/TaskScheduler.h +++ b/src/TaskScheduler.h @@ -87,8 +87,8 @@ namespace enki class ICompletable { public: - ICompletable() : m_Priority(TASK_PRIORITY_HIGH), m_RunningCount(0) {} - bool GetIsComplete() const { + ICompletable() : m_Priority(TASK_PRIORITY_HIGH), m_RunningCount(0) {} + bool GetIsComplete() const { return 0 == m_RunningCount.load( std::memory_order_acquire ); } @@ -130,10 +130,10 @@ namespace enki // i.e. neighbouring values should be close together. // threadnum should not be used for changing processing of data, it's intended purpose // is to allow per-thread data buckets for output. - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) = 0; + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) = 0; // Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1 - uint32_t m_SetSize; + uint32_t m_SetSize; // Minimum size of of TaskSetPartition range when splitting a task set into partitions. // This should be set to a value which results in computation effort of at least 10k @@ -141,11 +141,11 @@ namespace enki // NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple // of m_MinRange. // Also known as grain size in literature. - uint32_t m_MinRange; + uint32_t m_MinRange; private: - friend class TaskScheduler; - uint32_t m_RangeToRun; + friend class TaskScheduler; + uint32_t m_RangeToRun; }; // Subclass IPinnedTask to create tasks which can be run on a given thread only. @@ -158,10 +158,10 @@ namespace enki // IPinnedTask needs to be non abstract for intrusive list functionality. // Should never be called as should be overridden. - virtual void Execute() { assert(false); } + virtual void Execute() { assert(false); } - uint32_t threadNum; // thread to run this pinned task on + uint32_t threadNum; // thread to run this pinned task on std::atomic pNext; // Do not use. For intrusive list only. }; @@ -175,9 +175,9 @@ namespace enki TaskSet( uint32_t setSize_, TaskSetFunction func_ ) : ITaskSet( setSize_ ), m_Function( func_ ) {} - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - m_Function( range, threadnum ); + m_Function( range_, threadnum_ ); } TaskSetFunction m_Function; @@ -260,7 +260,7 @@ namespace enki ENKITS_API void WaitforTask( const ICompletable* pCompletable_, enki::TaskPriority priorityOfLowestToRun_ = TaskPriority(TASK_PRIORITY_NUM - 1) ); // DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask - inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } + inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } // Waits for all task sets to complete - not guaranteed to work unless we know we // are in a situation where tasks aren't being continuously added. @@ -284,7 +284,7 @@ namespace enki // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetThreadNum() const; - // DEPRECATED - GetProfilerCallbacks. Use TaskSchedulerConfig to initialize. + // DEPRECATED - GetProfilerCallbacks. // Returns the ProfilerCallbacks structure so that it can be modified to // set the callbacks. Should be set prior to initialization. ENKITS_API ProfilerCallbacks* GetProfilerCallbacks(); @@ -304,39 +304,39 @@ namespace enki ENKITS_API uint32_t GetNumRegisteredExternalTaskThreads(); private: - static void TaskingThreadFunction( const ThreadArgs& args_ ); - bool HaveTasks( uint32_t threadNum_ ); - void WaitForNewTasks( uint32_t threadNum_ ); - void WaitForTaskCompletion( const ICompletable* pCompletable_, uint32_t threadNum_ ); - void RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ); - bool TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_io_ ); - bool TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ); - void StartThreads(); - void StopThreads( bool bWait_ ); - void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ ); - void WakeThreadsForNewTasks(); - void WakeThreadsForTaskCompletion(); - - TaskPipe* m_pPipesPerThread[ TASK_PRIORITY_NUM ]; - PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ]; - - uint32_t m_NumThreads; - ThreadDataStore* m_pThreadDataStore; - std::thread** m_pThreads; - std::atomic m_bRunning; - std::atomic m_NumInternalTaskThreadsRunning; - std::atomic m_NumThreadsWaitingForNewTasks; - std::atomic m_NumThreadsWaitingForTaskCompletion; - uint32_t m_NumPartitions; - semaphoreid_t* m_pNewTaskSemaphore; - semaphoreid_t* m_pTaskCompleteSemaphore; - uint32_t m_NumInitialPartitions; - bool m_bHaveThreads; - TaskSchedulerConfig m_Config; - std::atomic m_NumExternalTaskThreadsRegistered; - - TaskScheduler( const TaskScheduler& nocopy ); - TaskScheduler& operator=( const TaskScheduler& nocopy ); + static void TaskingThreadFunction( const ThreadArgs& args_ ); + bool HaveTasks( uint32_t threadNum_ ); + void WaitForNewTasks( uint32_t threadNum_ ); + void WaitForTaskCompletion( const ICompletable* pCompletable_, uint32_t threadNum_ ); + void RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ ); + bool TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_io_ ); + bool TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ ); + void StartThreads(); + void StopThreads( bool bWait_ ); + void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ ); + void WakeThreadsForNewTasks(); + void WakeThreadsForTaskCompletion(); + + TaskPipe* m_pPipesPerThread[ TASK_PRIORITY_NUM ]; + PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ]; + + uint32_t m_NumThreads; + ThreadDataStore* m_pThreadDataStore; + std::thread** m_pThreads; + std::atomic m_bRunning; + std::atomic m_NumInternalTaskThreadsRunning; + std::atomic m_NumThreadsWaitingForNewTasks; + std::atomic m_NumThreadsWaitingForTaskCompletion; + uint32_t m_NumPartitions; + semaphoreid_t* m_pNewTaskSemaphore; + semaphoreid_t* m_pTaskCompleteSemaphore; + uint32_t m_NumInitialPartitions; + bool m_bHaveThreads; + TaskSchedulerConfig m_Config; + std::atomic m_NumExternalTaskThreadsRegistered; + + TaskScheduler( const TaskScheduler& nocopy_ ); + TaskScheduler& operator=( const TaskScheduler& nocopy_ ); }; inline uint32_t GetNumHardwareThreads() diff --git a/src/TaskScheduler_c.cpp b/src/TaskScheduler_c.cpp index f094336f..e96441ca 100644 --- a/src/TaskScheduler_c.cpp +++ b/src/TaskScheduler_c.cpp @@ -31,9 +31,9 @@ struct enkiTaskSet : ITaskSet { enkiTaskSet( enkiTaskExecuteRange taskFun_ ) : taskFun(taskFun_), pArgs(NULL) {} - virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) + virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) { - taskFun( range.start, range.end, threadnum, pArgs ); + taskFun( range_.start, range_.end, threadnum_, pArgs ); } enkiTaskExecuteRange taskFun; From 273bec3a36d6df762f9c543c0d78107adc781299 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 12:10:18 +0100 Subject: [PATCH 11/19] Moved away from deprecated interface. --- example/TestWaitforTask.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/example/TestWaitforTask.cpp b/example/TestWaitforTask.cpp index 6fc79b4a..b399ffd3 100644 --- a/example/TestWaitforTask.cpp +++ b/example/TestWaitforTask.cpp @@ -93,9 +93,10 @@ struct WaitingTask : enki::ITaskSet // which must complete as early as possible using priorities. int main(int argc, const char * argv[]) { - g_TS.GetProfilerCallbacks()->waitForTaskCompleteStart = []( uint32_t threadnum_ ) { ++g_WaitCount; }; - g_TS.GetProfilerCallbacks()->waitForTaskCompleteSuspendStart = []( uint32_t threadnum_ ) { ++g_WaitForTaskCompletion; }; - g_TS.Initialize(); + enki::TaskSchedulerConfig config; + config.profilerCallbacks.waitForTaskCompleteStart = []( uint32_t threadnum_ ) { ++g_WaitCount; }; + config.profilerCallbacks.waitForTaskCompleteSuspendStart = []( uint32_t threadnum_ ) { ++g_WaitForTaskCompletion; }; + g_TS.Initialize( config ); for( g_Iteration = 0; g_Iteration < 1000; ++g_Iteration ) { WaitingTask taskRoot; From c5e3903912cf3238797bf1e2dfc3e1876c8f28bd Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 14:17:22 +0100 Subject: [PATCH 12/19] Moved deprecated functions. --- src/TaskScheduler.cpp | 5 ----- src/TaskScheduler.h | 22 +++++++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/TaskScheduler.cpp b/src/TaskScheduler.cpp index a856fec9..3ab84411 100644 --- a/src/TaskScheduler.cpp +++ b/src/TaskScheduler.cpp @@ -140,11 +140,6 @@ static void SafeCallback( ProfilerCallbackFunc func_, uint32_t threadnum_ ) } } -ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks() -{ - return &m_Config.profilerCallbacks; -} - ENKITS_API bool enki::TaskScheduler::RegisterExternalTaskThread() { bool bRegistered = false; diff --git a/src/TaskScheduler.h b/src/TaskScheduler.h index 07cbe899..0bbc7b9e 100644 --- a/src/TaskScheduler.h +++ b/src/TaskScheduler.h @@ -44,6 +44,7 @@ #define ENKITS_API #endif + namespace enki { @@ -259,9 +260,6 @@ namespace enki // Only wait for child tasks of the current task otherwise a deadlock could occur. ENKITS_API void WaitforTask( const ICompletable* pCompletable_, enki::TaskPriority priorityOfLowestToRun_ = TaskPriority(TASK_PRIORITY_NUM - 1) ); - // DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask - inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } - // Waits for all task sets to complete - not guaranteed to work unless we know we // are in a situation where tasks aren't being continuously added. ENKITS_API void WaitforAll(); @@ -284,12 +282,7 @@ namespace enki // It is guaranteed that GetThreadNum() < GetNumTaskThreads() ENKITS_API uint32_t GetThreadNum() const; - // DEPRECATED - GetProfilerCallbacks. - // Returns the ProfilerCallbacks structure so that it can be modified to - // set the callbacks. Should be set prior to initialization. - ENKITS_API ProfilerCallbacks* GetProfilerCallbacks(); - - // Call on a thread to register the thread to use the TaskScheduling API. + // Call on a thread to register the thread to use the TaskScheduling API. // This is implicitly done for the thread which initializes the TaskScheduler // Intended for developers who have threads who need to call the TaskScheduler API // Returns true if successfull, false if not. @@ -303,6 +296,17 @@ namespace enki // Get the number of registered external task threads. ENKITS_API uint32_t GetNumRegisteredExternalTaskThreads(); + + // ------------- Start DEPRECATED Functions ------------- + // DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask + inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); } + + // DEPRECATED - GetProfilerCallbacks. Use TaskSchedulerConfig instead + // Returns the ProfilerCallbacks structure so that it can be modified to + // set the callbacks. Should be set prior to initialization. + inline ProfilerCallbacks* GetProfilerCallbacks() { return &m_Config.profilerCallbacks; } + // ------------- End DEPRECATED Functions ------------- + private: static void TaskingThreadFunction( const ThreadArgs& args_ ); bool HaveTasks( uint32_t threadNum_ ); From 36b6c2fa2d6483e4abc7bc6c968f5ddc57319f0d Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 14:17:54 +0100 Subject: [PATCH 13/19] Added C interface for external threads and moved deprecated C functions. --- src/TaskScheduler_c.cpp | 48 +++++++++++++++++++++++++++++ src/TaskScheduler_c.h | 67 ++++++++++++++++++++++++++++++++--------- 2 files changed, 100 insertions(+), 15 deletions(-) diff --git a/src/TaskScheduler_c.cpp b/src/TaskScheduler_c.cpp index e96441ca..40a705fe 100644 --- a/src/TaskScheduler_c.cpp +++ b/src/TaskScheduler_c.cpp @@ -60,6 +60,23 @@ enkiTaskScheduler* enkiNewTaskScheduler() return pETS; } +ENKITS_API struct enkiTaskSchedulerConfig enkiGetTaskSchedulerConfig( enkiTaskScheduler* pETS_ ) +{ + TaskSchedulerConfig config = pETS_->GetConfig(); + enkiTaskSchedulerConfig configC; + configC.numExternalTaskThreads = config.numExternalTaskThreads; + configC.numTaskThreadsToCreate = config.numTaskThreadsToCreate; + configC.profilerCallbacks.threadStart = config.profilerCallbacks.threadStart; + configC.profilerCallbacks.threadStop = config.profilerCallbacks.threadStop; + configC.profilerCallbacks.waitForNewTaskSuspendStart = config.profilerCallbacks.waitForNewTaskSuspendStart; + configC.profilerCallbacks.waitForNewTaskSuspendStop = config.profilerCallbacks.waitForNewTaskSuspendStop; + configC.profilerCallbacks.waitForTaskCompleteStart = config.profilerCallbacks.waitForTaskCompleteStart; + configC.profilerCallbacks.waitForTaskCompleteStop = config.profilerCallbacks.waitForTaskCompleteStop; + configC.profilerCallbacks.waitForTaskCompleteSuspendStart = config.profilerCallbacks.waitForTaskCompleteSuspendStart; + configC.profilerCallbacks.waitForTaskCompleteSuspendStop = config.profilerCallbacks.waitForTaskCompleteSuspendStop; + return configC; +} + void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ ) { pETS_->Initialize(); @@ -70,6 +87,22 @@ void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThr pETS_->Initialize( numThreads_ ); } +ENKITS_API void enkiInitTaskSchedulerWithConfig( enkiTaskScheduler* pETS_, struct enkiTaskSchedulerConfig config_ ) +{ + TaskSchedulerConfig config; + config.numExternalTaskThreads = config_.numExternalTaskThreads; + config.numTaskThreadsToCreate = config_.numTaskThreadsToCreate; + config.profilerCallbacks.threadStart = config_.profilerCallbacks.threadStart; + config.profilerCallbacks.threadStop = config_.profilerCallbacks.threadStop; + config.profilerCallbacks.waitForNewTaskSuspendStart = config_.profilerCallbacks.waitForNewTaskSuspendStart; + config.profilerCallbacks.waitForNewTaskSuspendStop = config_.profilerCallbacks.waitForNewTaskSuspendStop; + config.profilerCallbacks.waitForTaskCompleteStart = config_.profilerCallbacks.waitForTaskCompleteStart; + config.profilerCallbacks.waitForTaskCompleteStop = config_.profilerCallbacks.waitForTaskCompleteStop; + config.profilerCallbacks.waitForTaskCompleteSuspendStart = config_.profilerCallbacks.waitForTaskCompleteSuspendStart; + config.profilerCallbacks.waitForTaskCompleteSuspendStop = config_.profilerCallbacks.waitForTaskCompleteSuspendStop; + pETS_->Initialize( config ); +} + void enkiDeleteTaskScheduler( enkiTaskScheduler* pETS_ ) { delete pETS_; @@ -182,6 +215,21 @@ uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ) return pETS_->GetNumTaskThreads(); } +ENKITS_API int enkiRegisterExternalTaskThread( enkiTaskScheduler* pETS_) +{ + return (int)pETS_->RegisterExternalTaskThread(); +} + +ENKITS_API void enkiDeRegisterExternalTaskThread( enkiTaskScheduler* pETS_) +{ + return pETS_->DeRegisterExternalTaskThread(); +} + +ENKITS_API uint32_t enkiGetNumRegisteredExternalTaskThreads( enkiTaskScheduler* pETS_) +{ + return pETS_->GetNumRegisteredExternalTaskThreads(); +} + enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ ) { static_assert( sizeof(enkiProfilerCallbacks) == sizeof(enki::ProfilerCallbacks), "enkiTS profiler callback structs do not match" ); diff --git a/src/TaskScheduler_c.h b/src/TaskScheduler_c.h index 29dcd2ef..3360f6a0 100644 --- a/src/TaskScheduler_c.h +++ b/src/TaskScheduler_c.h @@ -44,10 +44,41 @@ typedef struct enkiPinnedTask enkiPinnedTask; typedef void (* enkiTaskExecuteRange)( uint32_t start_, uint32_t end, uint32_t threadnum_, void* pArgs_ ); typedef void (* enkiPinnedTaskExecute)( void* pArgs_ ); +// TaskScheduler implements several callbacks intended for profilers +typedef void (*enkiProfilerCallbackFunc)( uint32_t threadnum_ ); +struct enkiProfilerCallbacks +{ + enkiProfilerCallbackFunc threadStart; + enkiProfilerCallbackFunc threadStop; + enkiProfilerCallbackFunc waitForNewTaskSuspendStart; // thread suspended waiting for new tasks + enkiProfilerCallbackFunc waitForNewTaskSuspendStop; // thread unsuspended + enkiProfilerCallbackFunc waitForTaskCompleteStart; // thread waiting for task completion + enkiProfilerCallbackFunc waitForTaskCompleteStop; // thread stopped waiting + enkiProfilerCallbackFunc waitForTaskCompleteSuspendStart; // thread suspended waiting task completion + enkiProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended +}; + +struct enkiTaskSchedulerConfig +{ + // numTaskThreadsToCreate - Number of tasking threads the task scheduler will create. Must be > 0. + // Defaults to GetNumHardwareThreads()-1 threads as thread which calls initialize is thread 0. + uint32_t numTaskThreadsToCreate; + + // numExternalTaskThreads - Advanced use. Number of external threads which need to use TaskScheduler API. + // See enkiRegisterExternalTaskThread() for usage. + // Defaults to 0, the thread used to initialize the TaskScheduler. + uint32_t numExternalTaskThreads; + + struct enkiProfilerCallbacks profilerCallbacks; +}; + // Create a new task scheduler ENKITS_API enkiTaskScheduler* enkiNewTaskScheduler(); +// Get config. Can be called before enkiInitTaskSchedulerWithConfig to get the defaults +ENKITS_API struct enkiTaskSchedulerConfig enkiGetTaskSchedulerConfig( enkiTaskScheduler* pETS_ ); + // Initialize task scheduler - will create GetNumHardwareThreads()-1 threads, which is // sufficient to fill the system when including the main thread. // Initialize can be called multiple times - it will wait for completion @@ -57,7 +88,10 @@ ENKITS_API void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ // Initialize a task scheduler with numThreads_ (must be > 0) // will create numThreads_-1 threads, as thread 0 is // the thread on which the initialize was called. -ENKITS_API void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ ); +ENKITS_API void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ ); + +// Initialize a task scheduler with config, see enkiTaskSchedulerConfig for details +ENKITS_API void enkiInitTaskSchedulerWithConfig( enkiTaskScheduler* pETS_, struct enkiTaskSchedulerConfig config_ ); // Delete a task scheduler @@ -134,26 +168,29 @@ ENKITS_API void enkiWaitForPinnedTaskPriority( enkiTaskScheduler* // are in a situation where tasks aren't being continuosly added. ENKITS_API void enkiWaitForAll( enkiTaskScheduler* pETS_ ); - // get number of threads ENKITS_API uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ); -// TaskScheduler implements several callbacks intended for profilers -typedef void (*enkiProfilerCallbackFunc)( uint32_t threadnum_ ); -struct enkiProfilerCallbacks -{ - enkiProfilerCallbackFunc threadStart; - enkiProfilerCallbackFunc threadStop; - enkiProfilerCallbackFunc waitForNewTaskSuspendStart; // thread suspended waiting for new tasks - enkiProfilerCallbackFunc waitForNewTaskSuspendStop; // thread unsuspended - enkiProfilerCallbackFunc waitForTaskCompleteStart; // thread waiting for task completion - enkiProfilerCallbackFunc waitForTaskCompleteStop; // thread stopped waiting - enkiProfilerCallbackFunc waitForTaskCompleteSuspendStart; // thread suspended waiting task completion - enkiProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended -}; +// Call on a thread to register the thread to use the TaskScheduling API. +// This is implicitly done for the thread which initializes the TaskScheduler +// Intended for developers who have threads who need to call the TaskScheduler API +// Returns true if successfull, false if not. +// Can only have numExternalTaskThreads registered at any one time, which must be set +// at initialization time. +ENKITS_API int enkiRegisterExternalTaskThread( enkiTaskScheduler* pETS_); + +// Call on a thread on which RegisterExternalTaskThread has been called to deregister that thread. +ENKITS_API void enkiDeRegisterExternalTaskThread( enkiTaskScheduler* pETS_); +// Get the number of registered external task threads. +ENKITS_API uint32_t enkiGetNumRegisteredExternalTaskThreads( enkiTaskScheduler* pETS_); + +// ------------- Start DEPRECATED Functions ------------- +// DEPRECATED - enkiGetProfilerCallbacks. Use enkiTaskSchedulerConfig instead // Get the callback structure so it can be set ENKITS_API struct enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ ); +// ------------- End DEPRECATED Functions ------------- + #ifdef __cplusplus } From 3b39d3df573d2dc4305f3b710b88db5df84f8c12 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 14:33:31 +0100 Subject: [PATCH 14/19] Simplified register external thread example in readme. --- README.md | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index cd08a67b..88dfc625 100644 --- a/README.md +++ b/README.md @@ -179,18 +179,16 @@ struct ParallelTaskSet : ITaskSet void threadFunction() { - bool bRegistered = g_TS.RegisterExternalTaskThread(); - assert( bRegistered ); - if( bRegistered ) - { - // sleep for a while instead of doing something such as file IO - std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); + g_TS.RegisterExternalTaskThread(); - ParallelTaskSet task; - g_TS.AddTaskSetToPipe( &task ); - g_TS.WaitforTask( &task); - g_TS.DeRegisterExternalTaskThread(); - } + // sleep for a while instead of doing something such as file IO + std::this_thread::sleep_for( std::chrono::milliseconds( num_ * 100 ) ); + + ParallelTaskSet task; + g_TS.AddTaskSetToPipe( &task ); + g_TS.WaitforTask( &task); + + g_TS.DeRegisterExternalTaskThread(); } int main(int argc, const char * argv[]) From 9f0643d9ad390e241823d72cf4270213b63b2b0f Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 15:30:06 +0100 Subject: [PATCH 15/19] Added enkiGetThreadNum() function to C interface --- src/TaskScheduler_c.cpp | 5 +++++ src/TaskScheduler_c.h | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/TaskScheduler_c.cpp b/src/TaskScheduler_c.cpp index 40a705fe..4b542419 100644 --- a/src/TaskScheduler_c.cpp +++ b/src/TaskScheduler_c.cpp @@ -215,6 +215,11 @@ uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ) return pETS_->GetNumTaskThreads(); } +ENKITS_API uint32_t enkiGetThreadNum( enkiTaskScheduler* pETS_ ) +{ + return pETS_->GetThreadNum(); +} + ENKITS_API int enkiRegisterExternalTaskThread( enkiTaskScheduler* pETS_) { return (int)pETS_->RegisterExternalTaskThread(); diff --git a/src/TaskScheduler_c.h b/src/TaskScheduler_c.h index 3360f6a0..150bdfeb 100644 --- a/src/TaskScheduler_c.h +++ b/src/TaskScheduler_c.h @@ -168,9 +168,19 @@ ENKITS_API void enkiWaitForPinnedTaskPriority( enkiTaskScheduler* // are in a situation where tasks aren't being continuosly added. ENKITS_API void enkiWaitForAll( enkiTaskScheduler* pETS_ ); -// get number of threads +// Returns the number of threads created for running tasks + number of external threads +// plus 1 to account for the thread used to initialize the task scheduler. +// Equivalent to config values: numTaskThreadsToCreate + numExternalTaskThreads + 1. +// It is guaranteed that enkiGetThreadNum() < enkiGetNumTaskThreads() ENKITS_API uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ ); +// Returns the current task threadNum +// Will return 0 for thread which initialized the task scheduler, +// and all other non-enkiTS threads which have not been registered ( see enkiRegisterExternalTaskThread() ), +// and < enkiGetNumTaskThreads() for all threads. +// It is guaranteed that enkiGetThreadNum() < enkiGetNumTaskThreads() +ENKITS_API uint32_t enkiGetThreadNum( enkiTaskScheduler* pETS_ ); + // Call on a thread to register the thread to use the TaskScheduling API. // This is implicitly done for the thread which initializes the TaskScheduler // Intended for developers who have threads who need to call the TaskScheduler API From 9ee456d4574a7ae5e941a56ed08672b335efd472 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 15:31:46 +0100 Subject: [PATCH 16/19] Added ExternalTaskThead_c example. --- CMakeLists.txt | 6 +- README.md | 3 +- example/ExternalTaskThread.cpp | 2 +- example/ExternalTaskThread_c.c | 122 +++++++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 example/ExternalTaskThread_c.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 438d30c5..5f0deb46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -95,7 +95,7 @@ if( ENKITS_BUILD_EXAMPLES ) add_executable( TestWaitforTask example/TestWaitforTask.cpp ) target_link_libraries(TestWaitforTask enkiTS ) - add_executable( ExternalTaskThread example/ExternalTaskThread.cpp ) + add_executable( ExternalTaskThread example/ExternalTaskThread.cpp ) target_link_libraries(ExternalTaskThread enkiTS ) if( ENKITS_BUILD_C_INTERFACE ) @@ -109,5 +109,9 @@ if( ENKITS_BUILD_C_INTERFACE ) add_executable( Priorities_c example/Priorities_c.c ) target_link_libraries(Priorities_c enkiTS ) endif() + + add_executable( ExternalTaskThread_c example/ExternalTaskThread_c.c ) + target_link_libraries(ExternalTaskThread_c enkiTS ) + endif() endif() diff --git a/README.md b/README.md index 88dfc625..f8a4bd89 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,8 @@ int main(int argc, const char * argv[]) { } ``` -External thread usage in C++ (full example in example/ExternalTaskThread.cpp) +External thread usage in C++ (full example in example/ExternalTaskThread.cpp, +C example in example/ExternalTaskThread_c.c) ```C #include "TaskScheduler.h" diff --git a/example/ExternalTaskThread.cpp b/example/ExternalTaskThread.cpp index 45293cd9..3e9e124f 100644 --- a/example/ExternalTaskThread.cpp +++ b/example/ExternalTaskThread.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2013 Doug Binks +// Copyright (c) 2019 Doug Binks // // This software is provided 'as-is', without any express or implied // warranty. In no event will the authors be held liable for any damages diff --git a/example/ExternalTaskThread_c.c b/example/ExternalTaskThread_c.c new file mode 100644 index 00000000..11e7fc0a --- /dev/null +++ b/example/ExternalTaskThread_c.c @@ -0,0 +1,122 @@ +// Copyright (c) 2019 Doug Binks +// +// This software is provided 'as-is', without any express or implied +// warranty. In no event will the authors be held liable for any damages +// arising from the use of this software. +// +// Permission is granted to anyone to use this software for any purpose, +// including commercial applications, and to alter it and redistribute it +// freely, subject to the following restrictions: +// +// 1. The origin of this software must not be misrepresented; you must not +// claim that you wrote the original software. If you use this software +// in a product, an acknowledgement in the product documentation would be +// appreciated but is not required. +// 2. Altered source versions must be plainly marked as such, and must not be +// misrepresented as being the original software. +// 3. This notice may not be removed or altered from any source distribution. + +#include "TaskScheduler_c.h" +#include +#include +#include +#include + +// XPLATF Thread handling functions for C +#ifdef _WIN32 + +#define NOMINMAX +#define WIN32_LEAN_AND_MEAN +#include + +typedef HANDLE threadid_t; +#define THREADFUNC_DECL DWORD WINAPI + +// declare the thread start function as: +// THREADFUNC_DECL MyThreadStart( void* pArg ); +int32_t ThreadCreate( threadid_t* returnid, DWORD ( WINAPI *StartFunc) (void* ), void* pArg ) +{ + DWORD threadid; + *returnid = CreateThread( 0, 0, StartFunc, pArg, 0, &threadid ); + return *returnid != NULL; +} + +int32_t ThreadJoin( threadid_t threadid ) +{ + return WaitForSingleObject( threadid, INFINITE ) == 0; +} + +#else // posix +#include +#include + +typedef pthread_t threadid_t; +#define THREADFUNC_DECL void* + +// declare the thread start function as: +// THREADFUNC_DECL MyThreadStart( void* pArg ); +int32_t ThreadCreate( threadid_t* returnid, void* ( *StartFunc) (void* ), void* pArg ) +{ + int32_t retval = pthread_create( returnid, NULL, StartFunc, pArg ); + + return retval == 0; +} + +int32_t ThreadJoin( threadid_t threadid ) +{ + return pthread_join( threadid, NULL ) == 0; +} +#endif + +enkiTaskScheduler* pETS; +enkiTaskSet* pParallelTask; + + +void ParallelFunc( uint32_t start_, uint32_t end, uint32_t threadnum_, void* pArgs_ ) +{ + // do something + printf("ParallelFunc running on thread %d (could be any thread)\n", threadnum_ ); +} + +THREADFUNC_DECL ThreadFunc( void* pArgs_ ) +{ + uint32_t threadNum; + int retVal; + + retVal = enkiRegisterExternalTaskThread( pETS ); + assert( retVal ); + + threadNum = enkiGetThreadNum( pETS ); + assert( threadNum == 1 ); + printf("ThreadFunc running on thread %d (should be thread 1)\n", threadNum ); + + pParallelTask = enkiCreateTaskSet( pETS, ParallelFunc ); + + enkiAddTaskSetToPipe( pETS, pParallelTask, NULL, 1); + enkiWaitForTaskSet( pETS, pParallelTask ); + + enkiDeleteTaskSet( pParallelTask ); + + enkiDeRegisterExternalTaskThread( pETS ); + return 0; +} + +int main(int argc, const char * argv[]) +{ + struct enkiTaskSchedulerConfig config; + + pETS = enkiNewTaskScheduler(); + + // get default config and request one external thread + config = enkiGetTaskSchedulerConfig( pETS ); + config.numExternalTaskThreads = 1; + enkiInitTaskSchedulerWithConfig( pETS, config ); + + threadid_t threadID; + ThreadCreate( &threadID, ThreadFunc, NULL ); + + ThreadJoin( threadID ); + + + enkiDeleteTaskScheduler( pETS ); +} \ No newline at end of file From 216ae9b6a3f5221794fb13c2c246119422cb007b Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 16:19:50 +0100 Subject: [PATCH 17/19] Readme improvements. --- README.md | 66 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index f8a4bd89..444bf648 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,8 @@ For cmake, on Windows / Mac OS X / Linux with cmake installed, open a prompt in ## Usage C++ usage: +- full example in (example/ParallelSum.cpp)[example/ParallelSum.cpp] +- C example in (example/ParallelSum_c.c)[example/ParallelSum_c.c] ```C #include "TaskScheduler.h" @@ -57,23 +59,25 @@ enki::TaskScheduler g_TS; // define a task set, can ignore range if we only do one thing struct ParallelTaskSet : enki::ITaskSet { - virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) { - // do something here, can issue tasks with g_TS - } + virtual void ExecuteRange( enki::TaskSetPartition range, uint32_t threadnum ) { + // do something here, can issue tasks with g_TS + } }; int main(int argc, const char * argv[]) { - g_TS.Initialize(); - ParallelTaskSet task; // default constructor has a set size of 1 - g_TS.AddTaskSetToPipe( &task ); + g_TS.Initialize(); + ParallelTaskSet task; // default constructor has a set size of 1 + g_TS.AddTaskSetToPipe( &task ); - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. - g_TS.WaitforTask( &task ); - return 0; + // wait for task set (running tasks if they exist) + since we've just added it and it has no range we'll likely run it. + g_TS.WaitforTask( &task ); + return 0; } ``` C++ 11 lambda usage: +- full example in (example/LambdaTask.cpp)[example/LambdaTask.cpp] ```C #include "TaskScheduler.h" @@ -92,7 +96,9 @@ int main(int argc, const char * argv[]) { } ``` -Task priorities usage in C++ (see example/Priorities_c.c for C example). +Task priorities usage in C++: +- full example in (example/Priorities.cpp)[example/Priorities.cpp] +- C example in (example/Priorities_c.c)[example/Priorities_c.c] ```C // See full example in Priorities.cpp #include "TaskScheduler.h" @@ -137,7 +143,9 @@ int main(int argc, const char * argv[]) } ``` -Pinned Tasks usage in C++ (see example/PinnedTask_c.c for C example). +Pinned Tasks usage in C++: +- full example in (example/PinnedTask.cpp)[example/PinnedTask.cpp] +- C example in (example/PinnedTask_c.c)[example/PinnedTask_c.c] ```C #include "TaskScheduler.h" @@ -147,25 +155,28 @@ enki::TaskScheduler g_TS; struct PinnedTask : enki::IPinnedTask { virtual void Execute() { // do something here, can issue tasks with g_TS - } + } }; int main(int argc, const char * argv[]) { - g_TS.Initialize(); - PinnedTask task; //default constructor sets thread for pinned task to 0 (main thread) - g_TS.AddPinnedTask( &task ); - - // RunPinnedTasks must be called on main thread to run any pinned tasks for that thread. - // Tasking threads automatically do this in their task loop. - g_TS.RunPinnedTasks(); - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. - g_TS.WaitforTask( &task ); - return 0; + g_TS.Initialize(); + PinnedTask task; //default constructor sets thread for pinned task to 0 (main thread) + g_TS.AddPinnedTask( &task ); + + // RunPinnedTasks must be called on main thread to run any pinned tasks for that thread. + // Tasking threads automatically do this in their task loop. + g_TS.RunPinnedTasks(); + + // wait for task set (running tasks if they exist) + // since we've just added it and it has no range we'll likely run it. + g_TS.WaitforTask( &task ); + return 0; } ``` -External thread usage in C++ (full example in example/ExternalTaskThread.cpp, -C example in example/ExternalTaskThread_c.c) +External thread usage in C++: +- full example in (example/ExternalTaskThread.cpp)[example/ExternalTaskThread.cpp] +- C example in (example/ExternalTaskThread_c.c)[example/ExternalTaskThread_c.c] ```C #include "TaskScheduler.h" @@ -221,13 +232,14 @@ int main(int argc, const char * argv[]) { enkiTaskSet* pTask; g_pTS = enkiNewTaskScheduler(); enkiInitTaskScheduler( g_pTS ); - + // create a task, can re-use this to get allocation occurring on startup - pTask = enkiCreateTaskSet( g_pTS, ParalleTaskSetFunc ); + pTask = enkiCreateTaskSet( g_pTS, ParalleTaskSetFunc ); enkiAddTaskSetToPipe( g_pTS, pTask, NULL, 1); // NULL args, setsize of 1 - // wait for task set (running tasks if they exist) - since we've just added it and it has no range we'll likely run it. + // wait for task set (running tasks if they exist) + // since we've just added it and it has no range we'll likely run it. enkiWaitForTaskSet( g_pTS, pTask ); enkiDeleteTaskSet( pTask ); From b247671af380dd015b56cfa732cce22eb490481f Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Fri, 25 Oct 2019 16:23:01 +0100 Subject: [PATCH 18/19] Readme improvements fix --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 444bf648..5c27bc44 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,8 @@ For cmake, on Windows / Mac OS X / Linux with cmake installed, open a prompt in ## Usage C++ usage: -- full example in (example/ParallelSum.cpp)[example/ParallelSum.cpp] -- C example in (example/ParallelSum_c.c)[example/ParallelSum_c.c] +- full example in [example/ParallelSum.cpp](example/ParallelSum.cpp) +- C example in [example/ParallelSum_c.c](example/ParallelSum_c.c) ```C #include "TaskScheduler.h" @@ -77,7 +77,7 @@ int main(int argc, const char * argv[]) { ``` C++ 11 lambda usage: -- full example in (example/LambdaTask.cpp)[example/LambdaTask.cpp] +- full example in [example/LambdaTask.cpp](example/LambdaTask.cpp) ```C #include "TaskScheduler.h" @@ -97,8 +97,8 @@ int main(int argc, const char * argv[]) { ``` Task priorities usage in C++: -- full example in (example/Priorities.cpp)[example/Priorities.cpp] -- C example in (example/Priorities_c.c)[example/Priorities_c.c] +- full example in [example/Priorities.cpp](example/Priorities.cpp) +- C example in [example/Priorities_c.c](example/Priorities_c.c) ```C // See full example in Priorities.cpp #include "TaskScheduler.h" @@ -144,8 +144,8 @@ int main(int argc, const char * argv[]) ``` Pinned Tasks usage in C++: -- full example in (example/PinnedTask.cpp)[example/PinnedTask.cpp] -- C example in (example/PinnedTask_c.c)[example/PinnedTask_c.c] +- full example in [example/PinnedTask.cpp](example/PinnedTask.cpp) +- C example in [example/PinnedTask_c.c](example/PinnedTask_c.c) ```C #include "TaskScheduler.h" @@ -175,8 +175,8 @@ int main(int argc, const char * argv[]) { ``` External thread usage in C++: -- full example in (example/ExternalTaskThread.cpp)[example/ExternalTaskThread.cpp] -- C example in (example/ExternalTaskThread_c.c)[example/ExternalTaskThread_c.c] +- full example in [example/ExternalTaskThread.cpp](example/ExternalTaskThread.cpp) +- C example in [example/ExternalTaskThread_c.c](example/ExternalTaskThread_c.c) ```C #include "TaskScheduler.h" From ba0ded56d4c1a4957dff7f14e89dc59ef636cd30 Mon Sep 17 00:00:00 2001 From: Doug Binks Date: Mon, 28 Oct 2019 09:52:32 +0000 Subject: [PATCH 19/19] Improved comment documentation for set size and min range to assist issues like #42 --- src/TaskScheduler.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/TaskScheduler.h b/src/TaskScheduler.h index 0bbc7b9e..e1d7d483 100644 --- a/src/TaskScheduler.h +++ b/src/TaskScheduler.h @@ -133,10 +133,13 @@ namespace enki // is to allow per-thread data buckets for output. virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) = 0; - // Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1 + // Set Size - usually the number of data items to be processed, see ExecuteRange. Defaults to 1 uint32_t m_SetSize; - // Minimum size of of TaskSetPartition range when splitting a task set into partitions. + // Min Range - Minimum size of of TaskSetPartition range when splitting a task set into partitions. + // Designed for reducing scheduling overhead by preventing set being + // divided up too small. Ranges passed to ExecuteRange will *not* be a mulitple of this, + // only attempts to deliver range sizes larger than this most of the time. // This should be set to a value which results in computation effort of at least 10k // clock cycles to minimize tast scheduler overhead. // NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple