Skip to content

Commit

Permalink
Improve JobSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
halx99 committed Jun 7, 2024
1 parent 92143c0 commit 292125f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 60 deletions.
2 changes: 1 addition & 1 deletion core/base/Director.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ bool Director::init()
_lastUpdate = std::chrono::steady_clock::now();

auto concurrency = Configuration::getInstance()->getValue("axmol.concurrency", Value{-1}).asInt();
_jobSystem = JobSystem::create(concurrency);
_jobSystem = new JobSystem(concurrency);

#ifdef AX_ENABLE_CONSOLE
_console = new Console();
Expand Down
109 changes: 55 additions & 54 deletions core/base/JobSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,105 +105,106 @@ class JobExecutor

#pragma region JobSystem

JobSystem* JobSystem::create(int nthread)
static int clampThreads(int nThreads)
{
auto inst = new JobSystem();
inst->start(nthread);
return inst;
}

void JobSystem::destroy(JobSystem* inst)
{
if (inst)
if (nThreads <= 0)
{
inst->stop();
delete inst;
#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__)
# if !defined(__EMSCRIPTEN__)
nThreads = (std::max)(static_cast<int>(std::thread::hardware_concurrency() * 3 / 2), 2);
# else
nThreads = (std::clamp)(static_cast<int>(std::thread::hardware_concurrency()), 2, 8);
# endif
#else
AXLOGW("The emscripten pthread not enabled, JobSystem not working");
nThreads = 0;
#endif
}
}

JobSystem* JobSystem::create(std::span<std::shared_ptr<JobThreadData>> tdds)
{
if (!tdds.empty())
{
auto inst = new JobSystem();
inst->start(tdds);
return inst;
}
return nullptr;
return nThreads;
}

JobSystem::~JobSystem()
class MainThreadData : public JobThreadData
{
stop();
}
public:
const char* name() override { return "axmol-main"; }
};

// Call at task collect thread
void JobSystem::start(int nThreads)
JobSystem::JobSystem(int nThreads)
{
if (_executor)
return;

if (nThreads < 0)
{
#if !defined(__EMSCRIPTEN_PTHREADS__)
nThreads = (std::max)(static_cast<int>(std::thread::hardware_concurrency() * 3 / 2), 1);
#else
nThreads = 4;
#endif
}
nThreads = clampThreads(nThreads);
std::vector<std::shared_ptr<JobThreadData>> tdds;
for (auto i = 0; i < nThreads; ++i)
tdds.emplace_back(std::make_shared<JobThreadData>());

_executor = new JobExecutor(tdds);
init(tdds);
}

void JobSystem::start(std::span<std::shared_ptr<JobThreadData>> tdds)
JobSystem::JobSystem(std::span<std::shared_ptr<JobThreadData>> tdds)
{
if (_executor)
return;
init(tdds);
}

_executor = new JobExecutor(tdds);
void JobSystem::init(const std::span<std::shared_ptr<JobThreadData>>& tdds)
{
_mainThreadData = new MainThreadData();
if (!tdds.empty())
_executor = new JobExecutor(tdds);
}

// Call at task collect thread
void JobSystem::stop()
JobSystem::~JobSystem()
{
if (_executor != nullptr)
{
if (_executor)
delete _executor;
}
delete _mainThreadData;
}

void JobSystem::enqueue_v(std::function<void(JobThreadData*)> task)
{
_executor->enqueue_v(std::move(task));
if (_executor)
_executor->enqueue_v(std::move(task));
else
task(_mainThreadData);
}

void JobSystem::enqueue(std::function<void()> task)
{
_executor->enqueue_v([task_ = std::move(task)](JobThreadData*) { task_(); });
if (_executor)
this->enqueue(task, nullptr);
else
task();
}

void JobSystem::enqueue(std::shared_ptr<JobThreadTask> task)
{
_executor->enqueue_v([task](JobThreadData* thread_data) {
auto taskw = [task](JobThreadData* thread_data) {
if (!task->isRequestCancel())
{
task->setThreadData(thread_data);
task->setState(JobThreadTask::State::Inprogress);
task->execute();
task->setState(JobThreadTask::State::Idle);
}
});
};
if (_executor)
_executor->enqueue_v(std::move(taskw));
else
taskw(_mainThreadData);
}

void JobSystem::enqueue(std::function<void()> task, std::function<void()> done)
{
_executor->enqueue_v([task_ = std::move(task), done_ = std::move(done)](JobThreadData*) {
if (!task)
return;
auto taskw = [task_ = std::move(task), done_ = std::move(done)](JobThreadData*) {
task_();
Director::getInstance()->getScheduler()->runOnAxmolThread(done_);
});
if (done_)
Director::getInstance()->getScheduler()->runOnAxmolThread(done_);
};
if (_executor)
_executor->enqueue_v(taskw);
else
taskw(_mainThreadData);
}

#pragma endregion
Expand Down
11 changes: 6 additions & 5 deletions core/base/JobSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,22 @@ class AX_API JobSystem
static JobSystem* create(std::span<std::shared_ptr<JobThreadData>> tdds);
static void destroy(JobSystem* system);

JobSystem(int nThreads = -1);
JobSystem(std::span<std::shared_ptr<JobThreadData>> tdds);
~JobSystem();

void start(int nThreads = -1);
void start(std::span<std::shared_ptr<JobThreadData>> tdds);

void stop();

void enqueue_v(std::function<void(JobThreadData*)> task);

void enqueue(std::function<void()> task);
void enqueue(std::function<void()> task, std::function<void()> done);
void enqueue(std::shared_ptr<JobThreadTask> task);

protected:
void init(const std::span<std::shared_ptr<JobThreadData>>& tdds);

private:
JobExecutor* _executor{nullptr};
JobThreadData* _mainThreadData{nullptr};
};

NS_AX_END

0 comments on commit 292125f

Please sign in to comment.