Skip to content

Commit

Permalink
Revert "Refactor harmonizer in actorsystem" (#12254)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Dec 3, 2024
1 parent 5c8e3e6 commit 88daf44
Show file tree
Hide file tree
Showing 32 changed files with 1,003 additions and 2,234 deletions.
10 changes: 6 additions & 4 deletions ydb/library/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ namespace NActors {
poolsWithSharedThreads.push_back(cfg.PoolId);
}
}
Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());
Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());

ui64 ts = GetCycleCountFast();
Harmonizer.reset(MakeHarmonizer(ts));
Expand Down Expand Up @@ -135,9 +135,11 @@ namespace NActors {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
if (cfg.HasSharedThread) {
auto *sharedPool = Shared.get();
auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
if (pool) {
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
}
return pool;
} else {
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include "config.h"
#include "executor_pool_jail.h"
#include "harmonizer.h"
#include "executor_pool.h"
#include "executor_pool_shared.h"
#include "mon_stats.h"
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <memory>

namespace NActors {
Expand All @@ -16,7 +16,7 @@ namespace NActors {
const ui32 ExecutorPoolCount;
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
std::unique_ptr<IHarmonizer> Harmonizer;
std::unique_ptr<ISharedExecutorPool> Shared;
std::unique_ptr<TSharedExecutorPool> Shared;
std::unique_ptr<TExecutorPoolJail> Jail;
TCpuManagerConfig Config;

Expand Down
19 changes: 4 additions & 15 deletions ydb/library/actors/core/executor_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ namespace NActors {
struct TExecutorThreadStats;
class TExecutorPoolJail;
class ISchedulerCookie;
struct TSharedExecutorThreadCtx;

struct TCpuConsumption {
double CpuUs = 0;
double ElapsedUs = 0;
double ConsumedUs = 0;
double BookedUs = 0;
ui64 NotEnoughCpuExecutions = 0;

void Add(const TCpuConsumption& other) {
CpuUs += other.CpuUs;
ElapsedUs += other.ElapsedUs;
ConsumedUs += other.ConsumedUs;
BookedUs += other.BookedUs;
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
}
};
Expand Down Expand Up @@ -177,16 +176,6 @@ namespace NActors {
return 1;
}

virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
return nullptr;
}
virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
}

virtual bool IsSharedThreadEnabled() const {
return false;
}

virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
return TCpuConsumption{0.0, 0.0};
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "actorsystem.h"
#include "activity_guard.h"
#include "actor.h"
#include "executor_pool_base.h"
#include "executor_pool_basic_feature_flags.h"
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ namespace NActors {
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
poolStats.MaxCpuUs = stats.MaxCpuUs;
poolStats.MinCpuUs = stats.MinCpuUs;
poolStats.MaxElapsedUs = stats.MaxElapsedUs;
poolStats.MinElapsedUs = stats.MinElapsedUs;
poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
poolStats.MinBookedCpuUs = stats.MinBookedCpu;
}

statsCopy.resize(MaxFullThreadCount + 1);
Expand All @@ -429,7 +429,7 @@ namespace NActors {
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgElapsedUs;
poolState.UsedCpu = stats.AvgConsumedCpu;
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
} else {
poolState.PossibleMaxLimit = poolState.MaxLimit;
Expand Down Expand Up @@ -625,7 +625,7 @@ namespace NActors {
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
}

i16 TBasicExecutorPool::GetBlockingThreadCount() const {
Expand Down
9 changes: 3 additions & 6 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "executor_pool_basic_feature_flags.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include "harmonizer.h"
#include <memory>
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/unordered_cache.h>
#include <ydb/library/actors/util/threadparkpad.h>
Expand Down Expand Up @@ -278,11 +278,8 @@ namespace NActors {
void CalcSpinPerThread(ui64 wakingUpConsumption);
void ClearWaitingStats() const;

TSharedExecutorThreadCtx* ReleaseSharedThread() override;
void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
bool IsSharedThreadEnabled() const override {
return true;
}
TSharedExecutorThreadCtx* ReleaseSharedThread();
void AddSharedThread(TSharedExecutorThreadCtx* thread);

private:
void AskToGoToSleep(bool *needToWait, bool *needToBlock);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ namespace NActors {
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
if (Harmonizer) {
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
poolState.UsedCpu = stats.AvgElapsedUs;
poolState.UsedCpu = stats.AvgConsumedCpu;
}
poolState.CurrentLimit = PoolThreads;
poolState.MaxLimit = PoolThreads;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include "actorsystem.h"
#include "executor_thread.h"
#include "executor_thread_ctx.h"
#include "harmonizer.h"
#include "scheduler_queue.h"
#include "executor_pool_base.h"
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
#include <ydb/library/actors/actor_type/indexes.h>
#include <ydb/library/actors/util/ticket_lock.h>
#include <ydb/library/actors/util/unordered_cache.h>
Expand Down
158 changes: 37 additions & 121 deletions ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,6 @@

namespace NActors {

class TSharedExecutorPool: public ISharedExecutorPool {
public:
TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);

// IThreadPool
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
void Start() override;
void PrepareStop() override;
void Shutdown() override;
bool Cleanup() override;

TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;

i16 ReturnOwnHalfThread(i16 pool) override;
i16 ReturnBorrowedHalfThread(i16 pool) override;
void GiveHalfThread(i16 from, i16 to) override;

i16 GetSharedThreadCount() const override;

TSharedPoolState GetState() const override;

void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;

private:
TSharedPoolState State;

std::vector<IExecutorPool*> Pools;

i16 PoolCount;
i16 SharedThreadCount;
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;

std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;

TDuration TimePerMailbox;
ui32 EventsPerMailbox;
ui64 SoftProcessingDurationTs;
}; // class TSharedExecutorPool

TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
: State(poolCount, poolsWithThreads.size())
, Pools(poolCount)
Expand All @@ -73,47 +29,40 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
}
}

void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
for (IExecutorPool* pool : pools) {
Pools[pool->PoolId] = pool;
i16 threadIdx = State.ThreadByPool[pool->PoolId];
if (threadIdx >= 0) {
poolByThread[threadIdx] = pool;
void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
// ActorSystem = actorSystem;

ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);

std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
for (IExecutorPool* pool : poolsBasic) {
Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
i16 threadIdx = State.ThreadByPool[pool->PoolId];
if (threadIdx >= 0) {
poolByThread[threadIdx] = pool;
}
}
}

for (i16 i = 0; i != SharedThreadCount; ++i) {
// !TODO
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
if (withThreads) {
for (i16 i = 0; i != SharedThreadCount; ++i) {
// !TODO
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
Threads[i].Thread.reset(
new TSharedExecutorThread(
-1,
nullptr,
&Threads[i],
PoolCount,
"SharedThread",
SoftProcessingDurationTs,
TimePerMailbox,
actorSystem,
&Threads[i],
PoolCount,
"SharedThread",
SoftProcessingDurationTs,
TimePerMailbox,
EventsPerMailbox));
ScheduleWriters[i].Init(ScheduleReaders[i]);
}
}
}

void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);

std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
Init(poolsBasic, true);

for (i16 i = 0; i != SharedThreadCount; ++i) {
ScheduleWriters[i].Init(ScheduleReaders[i]);
}

*scheduleReaders = ScheduleReaders.get();
*scheduleSz = SharedThreadCount;
*scheduleReaders = ScheduleReaders.get();
*scheduleSz = SharedThreadCount;
}

void TSharedExecutorPool::Start() {
Expand Down Expand Up @@ -150,27 +99,24 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
return &Threads[threadIdx];
}

i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
i16 threadIdx = State.ThreadByPool[pool];
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
State.BorrowedThreadByPool[borrowedPool] = -1;
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
return borrowedPool;
}

i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
i16 threadIdx = State.BorrowedThreadByPool[pool];
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
Y_ABORT_UNLESS(borrowingPool);
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
State.PoolByBorrowedThread[threadIdx] = -1;
// TODO(kruall): Check on race
borrowingPool->ReleaseSharedThread();
return State.PoolByThread[threadIdx];
}

void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
Expand All @@ -181,14 +127,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
if (borrowedThreadIdx != -1) {
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
if (originalPool == to) {
ReturnOwnHalfThread(to);
return ReturnOwnHalfThread(to);
} else {
ReturnOwnHalfThread(originalPool);
}
from = originalPool;
}
i16 threadIdx = State.ThreadByPool[from];
IExecutorPool* borrowingPool = Pools[to];
TBasicExecutorPool* borrowingPool = Pools[to];
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
State.BorrowedThreadByPool[to] = threadIdx;
State.PoolByBorrowedThread[threadIdx] = to;
Expand All @@ -197,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
}

void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
statsCopy.resize(SharedThreadCount);
statsCopy.resize(SharedThreadCount + 1);
for (i16 i = 0; i < SharedThreadCount; ++i) {
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
}
}

void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
statsCopy.resize(SharedThreadCount);
statsCopy.resize(SharedThreadCount + 1);
for (i16 i = 0; i < SharedThreadCount; ++i) {
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
}
}

Expand Down Expand Up @@ -235,34 +181,4 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
return State;
}

ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
}

TString TSharedPoolState::ToString() const {
TStringBuilder builder;
builder << '{';
builder << "ThreadByPool: [";
for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "PoolByThread: [";
for (ui32 i = 0; i < PoolByThread.size(); ++i) {
builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "BorrowedThreadByPool: [";
for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
}
builder << "], ";
builder << "PoolByBorrowedThread: [";
for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
}
builder << ']';
return builder << '}';
}

}
Loading

0 comments on commit 88daf44

Please sign in to comment.