From 88daf44f9da55b4b67c13eba5a1afec31dc7f4f2 Mon Sep 17 00:00:00 2001 From: kruall Date: Tue, 3 Dec 2024 20:41:00 +0300 Subject: [PATCH] Revert "Refactor harmonizer in actorsystem" (#12254) --- ydb/library/actors/core/cpu_manager.cpp | 10 +- ydb/library/actors/core/cpu_manager.h | 4 +- ydb/library/actors/core/executor_pool.h | 19 +- .../actors/core/executor_pool_base.cpp | 1 - .../actors/core/executor_pool_basic.cpp | 12 +- ydb/library/actors/core/executor_pool_basic.h | 9 +- ydb/library/actors/core/executor_pool_io.cpp | 2 +- ydb/library/actors/core/executor_pool_io.h | 2 +- .../actors/core/executor_pool_shared.cpp | 158 +--- .../actors/core/executor_pool_shared.h | 56 +- ydb/library/actors/core/executor_thread_ctx.h | 7 +- ydb/library/actors/core/harmonizer.cpp | 861 ++++++++++++++++++ .../actors/core/{harmonizer => }/harmonizer.h | 36 +- .../core/harmonizer/cpu_consumption.cpp | 171 ---- .../actors/core/harmonizer/cpu_consumption.h | 45 - ydb/library/actors/core/harmonizer/debug.h | 23 - ydb/library/actors/core/harmonizer/defs.h | 3 - .../actors/core/harmonizer/harmonizer.cpp | 485 ---------- ydb/library/actors/core/harmonizer/history.h | 149 --- ydb/library/actors/core/harmonizer/pool.cpp | 149 --- ydb/library/actors/core/harmonizer/pool.h | 93 -- .../actors/core/harmonizer/shared_info.cpp | 55 -- .../actors/core/harmonizer/shared_info.h | 20 - .../core/harmonizer/ut/harmonizer_ut.cpp | 673 -------------- ydb/library/actors/core/harmonizer/ut/ya.make | 22 - .../actors/core/harmonizer/waiting_stats.cpp | 48 - .../actors/core/harmonizer/waiting_stats.h | 21 - ydb/library/actors/core/harmonizer/ya.make | 44 - ydb/library/actors/core/mon_stats.h | 8 +- ydb/library/actors/core/probes.h | 4 +- ydb/library/actors/core/ya.make | 7 +- .../actors/helpers/pool_stats_collector.h | 40 +- 32 files changed, 1003 insertions(+), 2234 deletions(-) create mode 100644 ydb/library/actors/core/harmonizer.cpp rename ydb/library/actors/core/{harmonizer => }/harmonizer.h (60%) delete mode 100644 ydb/library/actors/core/harmonizer/cpu_consumption.cpp delete mode 100644 ydb/library/actors/core/harmonizer/cpu_consumption.h delete mode 100644 ydb/library/actors/core/harmonizer/debug.h delete mode 100644 ydb/library/actors/core/harmonizer/defs.h delete mode 100644 ydb/library/actors/core/harmonizer/harmonizer.cpp delete mode 100644 ydb/library/actors/core/harmonizer/history.h delete mode 100644 ydb/library/actors/core/harmonizer/pool.cpp delete mode 100644 ydb/library/actors/core/harmonizer/pool.h delete mode 100644 ydb/library/actors/core/harmonizer/shared_info.cpp delete mode 100644 ydb/library/actors/core/harmonizer/shared_info.h delete mode 100644 ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp delete mode 100644 ydb/library/actors/core/harmonizer/ut/ya.make delete mode 100644 ydb/library/actors/core/harmonizer/waiting_stats.cpp delete mode 100644 ydb/library/actors/core/harmonizer/waiting_stats.h delete mode 100644 ydb/library/actors/core/harmonizer/ya.make diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 047388ce9069..1c286d9bd79b 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -37,8 +37,8 @@ namespace NActors { poolsWithSharedThreads.push_back(cfg.PoolId); } } - Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); - auto sharedPool = static_cast(Shared.get()); + Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads)); + auto sharedPool = static_cast(Shared.get()); ui64 ts = GetCycleCountFast(); Harmonizer.reset(MakeHarmonizer(ts)); @@ -135,9 +135,11 @@ namespace NActors { for (TBasicExecutorPoolConfig& cfg : Config.Basic) { if (cfg.PoolId == poolId) { if (cfg.HasSharedThread) { - auto *sharedPool = Shared.get(); + auto *sharedPool = static_cast(Shared.get()); auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get()); - pool->AddSharedThread(sharedPool->GetSharedThread(poolId)); + if (pool) { + pool->AddSharedThread(sharedPool->GetSharedThread(poolId)); + } return pool; } else { return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get()); diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h index 0bd0994f4b61..1fee9467987d 100644 --- a/ydb/library/actors/core/cpu_manager.h +++ b/ydb/library/actors/core/cpu_manager.h @@ -2,10 +2,10 @@ #include "config.h" #include "executor_pool_jail.h" +#include "harmonizer.h" #include "executor_pool.h" #include "executor_pool_shared.h" #include "mon_stats.h" -#include #include namespace NActors { @@ -16,7 +16,7 @@ namespace NActors { const ui32 ExecutorPoolCount; TArrayHolder> Executors; std::unique_ptr Harmonizer; - std::unique_ptr Shared; + std::unique_ptr Shared; std::unique_ptr Jail; TCpuManagerConfig Config; diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 8fd791133912..67297b428ba4 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -14,16 +14,15 @@ namespace NActors { struct TExecutorThreadStats; class TExecutorPoolJail; class ISchedulerCookie; - struct TSharedExecutorThreadCtx; struct TCpuConsumption { - double CpuUs = 0; - double ElapsedUs = 0; + double ConsumedUs = 0; + double BookedUs = 0; ui64 NotEnoughCpuExecutions = 0; void Add(const TCpuConsumption& other) { - CpuUs += other.CpuUs; - ElapsedUs += other.ElapsedUs; + ConsumedUs += other.ConsumedUs; + BookedUs += other.BookedUs; NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; } }; @@ -177,16 +176,6 @@ namespace NActors { return 1; } - virtual TSharedExecutorThreadCtx* ReleaseSharedThread() { - return nullptr; - } - virtual void AddSharedThread(TSharedExecutorThreadCtx*) { - } - - virtual bool IsSharedThreadEnabled() const { - return false; - } - virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); return TCpuConsumption{0.0, 0.0}; diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index e8e504abe201..fe41e9462330 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -1,5 +1,4 @@ #include "actorsystem.h" -#include "activity_guard.h" #include "actor.h" #include "executor_pool_base.h" #include "executor_pool_basic_feature_flags.h" diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index dbe3e8987720..bfb72f02b360 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -407,10 +407,10 @@ namespace NActors { poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; - poolStats.MaxCpuUs = stats.MaxCpuUs; - poolStats.MinCpuUs = stats.MinCpuUs; - poolStats.MaxElapsedUs = stats.MaxElapsedUs; - poolStats.MinElapsedUs = stats.MinElapsedUs; + poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; + poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; + poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; + poolStats.MinBookedCpuUs = stats.MinBookedCpu; } statsCopy.resize(MaxFullThreadCount + 1); @@ -429,7 +429,7 @@ namespace NActors { void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgElapsedUs; + poolState.UsedCpu = stats.AvgConsumedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; @@ -625,7 +625,7 @@ namespace NActors { TExecutorThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStatsForHarmonizer(stats); - return {static_cast(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions}; + return {Ts2Us(stats.SafeElapsedTicks), static_cast(stats.CpuUs), stats.NotEnoughCpuExecutions}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index 769099975199..c60e2ab8334b 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -7,8 +7,8 @@ #include "executor_pool_basic_feature_flags.h" #include "scheduler_queue.h" #include "executor_pool_base.h" +#include "harmonizer.h" #include -#include #include #include #include @@ -278,11 +278,8 @@ namespace NActors { void CalcSpinPerThread(ui64 wakingUpConsumption); void ClearWaitingStats() const; - TSharedExecutorThreadCtx* ReleaseSharedThread() override; - void AddSharedThread(TSharedExecutorThreadCtx* thread) override; - bool IsSharedThreadEnabled() const override { - return true; - } + TSharedExecutorThreadCtx* ReleaseSharedThread(); + void AddSharedThread(TSharedExecutorThreadCtx* thread); private: void AskToGoToSleep(bool *needToWait, bool *needToBlock); diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 089f7057f9a6..310a6f83b359 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -156,7 +156,7 @@ namespace NActors { void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgElapsedUs; + poolState.UsedCpu = stats.AvgConsumedCpu; } poolState.CurrentLimit = PoolThreads; poolState.MaxLimit = PoolThreads; diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h index 16b17127d4c6..316b21d4d2f7 100644 --- a/ydb/library/actors/core/executor_pool_io.h +++ b/ydb/library/actors/core/executor_pool_io.h @@ -3,9 +3,9 @@ #include "actorsystem.h" #include "executor_thread.h" #include "executor_thread_ctx.h" +#include "harmonizer.h" #include "scheduler_queue.h" #include "executor_pool_base.h" -#include #include #include #include diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index 7d1327f8454e..104e02812cc5 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -12,50 +12,6 @@ namespace NActors { -class TSharedExecutorPool: public ISharedExecutorPool { -public: - TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads); - - // IThreadPool - void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; - void Start() override; - void PrepareStop() override; - void Shutdown() override; - bool Cleanup() override; - - TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override; - void GetSharedStats(i16 pool, std::vector& statsCopy) override; - void GetSharedStatsForHarmonizer(i16 pool, std::vector& statsCopy) override; - TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override; - std::vector GetThreadsCpuConsumption(i16 poolId) override; - - i16 ReturnOwnHalfThread(i16 pool) override; - i16 ReturnBorrowedHalfThread(i16 pool) override; - void GiveHalfThread(i16 from, i16 to) override; - - i16 GetSharedThreadCount() const override; - - TSharedPoolState GetState() const override; - - void Init(const std::vector& pools, bool withThreads) override; - -private: - TSharedPoolState State; - - std::vector Pools; - - i16 PoolCount; - i16 SharedThreadCount; - std::unique_ptr Threads; - - std::unique_ptr ScheduleReaders; - std::unique_ptr ScheduleWriters; - - TDuration TimePerMailbox; - ui32 EventsPerMailbox; - ui64 SoftProcessingDurationTs; -}; // class TSharedExecutorPool - TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads) : State(poolCount, poolsWithThreads.size()) , Pools(poolCount) @@ -73,47 +29,40 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config } } -void TSharedExecutorPool::Init(const std::vector& pools, bool withThreads) { - std::vector poolByThread(SharedThreadCount); - for (IExecutorPool* pool : pools) { - Pools[pool->PoolId] = pool; - i16 threadIdx = State.ThreadByPool[pool->PoolId]; - if (threadIdx >= 0) { - poolByThread[threadIdx] = pool; +void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { + // ActorSystem = actorSystem; + + ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]); + ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]); + + std::vector poolsBasic = actorSystem->GetBasicExecutorPools(); + std::vector poolByThread(SharedThreadCount); + for (IExecutorPool* pool : poolsBasic) { + Pools[pool->PoolId] = dynamic_cast(pool); + i16 threadIdx = State.ThreadByPool[pool->PoolId]; + if (threadIdx >= 0) { + poolByThread[threadIdx] = pool; + } } - } - for (i16 i = 0; i != SharedThreadCount; ++i) { - // !TODO - Threads[i].ExecutorPools[0].store(dynamic_cast(poolByThread[i]), std::memory_order_release); - if (withThreads) { + for (i16 i = 0; i != SharedThreadCount; ++i) { + // !TODO + Threads[i].ExecutorPools[0].store(dynamic_cast(poolByThread[i]), std::memory_order_release); Threads[i].Thread.reset( new TSharedExecutorThread( -1, - nullptr, - &Threads[i], - PoolCount, - "SharedThread", - SoftProcessingDurationTs, - TimePerMailbox, + actorSystem, + &Threads[i], + PoolCount, + "SharedThread", + SoftProcessingDurationTs, + TimePerMailbox, EventsPerMailbox)); + ScheduleWriters[i].Init(ScheduleReaders[i]); } - } -} - -void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { - ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]); - ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]); - - std::vector poolsBasic = actorSystem->GetBasicExecutorPools(); - Init(poolsBasic, true); - for (i16 i = 0; i != SharedThreadCount; ++i) { - ScheduleWriters[i].Init(ScheduleReaders[i]); - } - - *scheduleReaders = ScheduleReaders.get(); - *scheduleSz = SharedThreadCount; + *scheduleReaders = ScheduleReaders.get(); + *scheduleSz = SharedThreadCount; } void TSharedExecutorPool::Start() { @@ -150,27 +99,24 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) { return &Threads[threadIdx]; } -i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) { +void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) { i16 threadIdx = State.ThreadByPool[pool]; - IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); Y_ABORT_UNLESS(borrowingPool); - i16 borrowedPool = State.PoolByBorrowedThread[threadIdx]; - State.BorrowedThreadByPool[borrowedPool] = -1; + State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; State.PoolByBorrowedThread[threadIdx] = -1; // TODO(kruall): Check on race borrowingPool->ReleaseSharedThread(); - return borrowedPool; } -i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) { +void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) { i16 threadIdx = State.BorrowedThreadByPool[pool]; - IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); + TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel); Y_ABORT_UNLESS(borrowingPool); State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1; State.PoolByBorrowedThread[threadIdx] = -1; // TODO(kruall): Check on race borrowingPool->ReleaseSharedThread(); - return State.PoolByThread[threadIdx]; } void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { @@ -181,14 +127,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { if (borrowedThreadIdx != -1) { i16 originalPool = State.PoolByThread[borrowedThreadIdx]; if (originalPool == to) { - ReturnOwnHalfThread(to); + return ReturnOwnHalfThread(to); } else { ReturnOwnHalfThread(originalPool); } from = originalPool; } i16 threadIdx = State.ThreadByPool[from]; - IExecutorPool* borrowingPool = Pools[to]; + TBasicExecutorPool* borrowingPool = Pools[to]; Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release); State.BorrowedThreadByPool[to] = threadIdx; State.PoolByBorrowedThread[threadIdx] = to; @@ -197,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { } void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount); + statsCopy.resize(SharedThreadCount + 1); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]); + Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]); } } void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount); + statsCopy.resize(SharedThreadCount + 1); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]); + Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]); } } @@ -235,34 +181,4 @@ TSharedPoolState TSharedExecutorPool::GetState() const { return State; } -ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads) { - return new TSharedExecutorPool(config, poolCount, poolsWithThreads); -} - -TString TSharedPoolState::ToString() const { - TStringBuilder builder; - builder << '{'; - builder << "ThreadByPool: ["; - for (ui32 i = 0; i < ThreadByPool.size(); ++i) { - builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", "); - } - builder << "], "; - builder << "PoolByThread: ["; - for (ui32 i = 0; i < PoolByThread.size(); ++i) { - builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", "); - } - builder << "], "; - builder << "BorrowedThreadByPool: ["; - for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) { - builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", "); - } - builder << "], "; - builder << "PoolByBorrowedThread: ["; - for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) { - builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", "); - } - builder << ']'; - return builder << '}'; -} - } diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h index f6c79c07a588..c083c21654a0 100644 --- a/ydb/library/actors/core/executor_pool_shared.h +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -1,12 +1,14 @@ #pragma once #include "executor_pool.h" +#include "executor_thread_ctx.h" namespace NActors { + struct TExecutorThreadCtx; struct TSharedExecutorPoolConfig; - struct TSharedExecutorThreadCtx; + class TBasicExecutorPool; struct TSharedPoolState { std::vector ThreadByPool; @@ -20,30 +22,48 @@ namespace NActors { , BorrowedThreadByPool(poolCount, -1) , PoolByBorrowedThread(threadCount, -1) {} - - TString ToString() const; }; - class ISharedExecutorPool : public IActorThreadPool { + class TSharedExecutorPool: public IActorThreadPool { public: - virtual ~ISharedExecutorPool() = default; + TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads); - virtual TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) = 0; - virtual void GetSharedStats(i16 pool, std::vector& statsCopy) = 0; - virtual void GetSharedStatsForHarmonizer(i16 pool, std::vector& statsCopy) = 0; - virtual TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) = 0; - virtual std::vector GetThreadsCpuConsumption(i16 poolId) = 0; - virtual void Init(const std::vector& pools, bool withThreads) = 0; + // IThreadPool + void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override; + void Start() override; + void PrepareStop() override; + void Shutdown() override; + bool Cleanup() override; - virtual i16 ReturnOwnHalfThread(i16 pool) = 0; - virtual i16 ReturnBorrowedHalfThread(i16 pool) = 0; - virtual void GiveHalfThread(i16 from, i16 to) = 0; + TSharedExecutorThreadCtx *GetSharedThread(i16 poolId); + void GetSharedStats(i16 pool, std::vector& statsCopy); + void GetSharedStatsForHarmonizer(i16 pool, std::vector& statsCopy); + TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx); + std::vector GetThreadsCpuConsumption(i16 poolId); - virtual i16 GetSharedThreadCount() const = 0; + void ReturnOwnHalfThread(i16 pool); + void ReturnBorrowedHalfThread(i16 pool); + void GiveHalfThread(i16 from, i16 to); - virtual TSharedPoolState GetState() const = 0; - }; + i16 GetSharedThreadCount() const; + + TSharedPoolState GetState() const; + + private: + TSharedPoolState State; + + std::vector Pools; - ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector poolsWithThreads); + i16 PoolCount; + i16 SharedThreadCount; + std::unique_ptr Threads; + + std::unique_ptr ScheduleReaders; + std::unique_ptr ScheduleWriters; + + TDuration TimePerMailbox; + ui32 EventsPerMailbox; + ui64 SoftProcessingDurationTs; + }; } \ No newline at end of file diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index c7d1caa5ed6d..6a5bfaa34be1 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -11,7 +11,8 @@ namespace NActors { class TGenericExecutorThread; - class IExecutorPool; + class TBasicExecutorPool; + class TIOExecutorPool; enum class EThreadState : ui64 { None, @@ -122,7 +123,7 @@ namespace NActors { struct TExecutorThreadCtx : public TGenericExecutorThreadCtx { using TBase = TGenericExecutorThreadCtx; - IExecutorPool *OwnerExecutorPool = nullptr; + TBasicExecutorPool *OwnerExecutorPool = nullptr; void SetWork() { ExchangeState(EThreadState::Work); @@ -185,7 +186,7 @@ namespace NActors { } }; - std::atomic ExecutorPools[MaxPoolsForSharedThreads]; + std::atomic ExecutorPools[MaxPoolsForSharedThreads]; std::atomic RequestsForWakeUp = 0; ui32 NextPool = 0; diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp new file mode 100644 index 000000000000..df6516421698 --- /dev/null +++ b/ydb/library/actors/core/harmonizer.cpp @@ -0,0 +1,861 @@ +#include "harmonizer.h" + +#include "executor_thread_ctx.h" +#include "executor_thread.h" +#include "probes.h" + +#include "activity_guard.h" +#include "actorsystem.h" +#include "executor_pool_basic.h" +#include "executor_pool_basic_feature_flags.h" +#include "executor_pool_shared.h" + +#include +#include +#include +#include + +#include + +#include + +namespace NActors { + +LWTRACE_USING(ACTORLIB_PROVIDER); + +constexpr bool CheckBinaryPower(ui64 value) { + return !(value & (value - 1)); +} + +template +struct TValueHistory { + static_assert(CheckBinaryPower(HistoryBufferSize)); + + double History[HistoryBufferSize] = {0.0}; + ui64 HistoryIdx = 0; + ui64 LastTs = Max(); + double LastUs = 0.0; + double AccumulatedUs = 0.0; + ui64 AccumulatedTs = 0; + + template + double Accumulate(auto op, auto comb, ui8 seconds) { + double acc = AccumulatedUs; + size_t idx = HistoryIdx; + ui8 leftSeconds = seconds; + if constexpr (!WithTail) { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + acc = History[idx]; + } + do { + idx--; + leftSeconds--; + if (idx >= HistoryBufferSize) { + idx = HistoryBufferSize - 1; + } + if constexpr (WithTail) { + acc = op(acc, History[idx]); + } else if (leftSeconds) { + acc = op(acc, History[idx]); + } else { + ui64 tsInSecond = Us2Ts(1'000'000.0); + acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); + } + } while (leftSeconds); + double duration = 1'000'000.0 * seconds; + if constexpr (WithTail) { + duration += Ts2Us(AccumulatedTs); + } + return comb(acc, duration); + } + + template + double GetAvgPartForLastSeconds(ui8 seconds) { + auto sum = [](double acc, double value) { + return acc + value; + }; + auto avg = [](double sum, double duration) { + return sum / duration; + }; + return Accumulate(sum, avg, seconds); + } + + double GetAvgPart() { + return GetAvgPartForLastSeconds(HistoryBufferSize); + } + + double GetMaxForLastSeconds(ui8 seconds) { + auto max = [](const double& acc, const double& value) { + return Max(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate(max, fst, seconds); + } + + double GetMax() { + return GetMaxForLastSeconds(HistoryBufferSize); + } + + i64 GetMaxInt() { + return static_cast(GetMax()); + } + + double GetMinForLastSeconds(ui8 seconds) { + auto min = [](const double& acc, const double& value) { + return Min(acc, value); + }; + auto fst = [](const double& value, const double&) { return value; }; + return Accumulate(min, fst, seconds); + } + + double GetMin() { + return GetMinForLastSeconds(HistoryBufferSize); + } + + i64 GetMinInt() { + return static_cast(GetMin()); + } + + void Register(ui64 ts, double valueUs) { + if (ts < LastTs) { + LastTs = ts; + LastUs = valueUs; + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + ui64 lastTs = std::exchange(LastTs, ts); + ui64 dTs = ts - lastTs; + double lastUs = std::exchange(LastUs, valueUs); + double dUs = valueUs - lastUs; + LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs); + + if (dTs > Us2Ts(8'000'000.0)) { + dUs = dUs * 1'000'000.0 / Ts2Us(dTs); + for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { + History[idx] = dUs; + } + AccumulatedUs = 0.0; + AccumulatedTs = 0; + return; + } + + while (dTs > 0) { + if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { + AccumulatedTs += dTs; + AccumulatedUs += dUs; + break; + } else { + ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; + double addUs = dUs * addTs / dTs; + dTs -= addTs; + dUs -= addUs; + History[HistoryIdx] = AccumulatedUs + addUs; + HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; + AccumulatedUs = 0.0; + AccumulatedTs = 0; + } + } + } +}; + +struct TThreadInfo { + TValueHistory<8> Consumed; + TValueHistory<8> Booked; +}; + +struct TPoolInfo { + std::vector ThreadInfo; + std::vector SharedInfo; + TSharedExecutorPool* Shared = nullptr; + IExecutorPool* Pool = nullptr; + TBasicExecutorPool* BasicPool = nullptr; + + i16 DefaultFullThreadCount = 0; + i16 MinFullThreadCount = 0; + i16 MaxFullThreadCount = 0; + + float DefaultThreadCount = 0; + float MinThreadCount = 0; + float MaxThreadCount = 0; + + i16 Priority = 0; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; + NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; + ui32 MaxAvgPingUs = 0; + ui64 LastUpdateTs = 0; + ui64 NotEnoughCpuExecutions = 0; + ui64 NewNotEnoughCpuExecutions = 0; + ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE; + + TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish + TAtomic IncreasingThreadsByNeedyState = 0; + TAtomic IncreasingThreadsByExchange = 0; + TAtomic DecreasingThreadsByStarvedState = 0; + TAtomic DecreasingThreadsByHoggishState = 0; + TAtomic DecreasingThreadsByExchange = 0; + TAtomic PotentialMaxThreadCount = 0; + + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + std::atomic MaxConsumedCpu = 0; + std::atomic MinConsumedCpu = 0; + std::atomic AvgConsumedCpu = 0; + std::atomic MaxBookedCpu = 0; + std::atomic MinBookedCpu = 0; + + std::unique_ptr> WaitingStats; + std::unique_ptr> MovingWaitingStats; + + double GetBooked(i16 threadIdx); + double GetSharedBooked(i16 threadIdx); + double GetLastSecondBooked(i16 threadIdx); + double GetLastSecondSharedBooked(i16 threadIdx); + double GetConsumed(i16 threadIdx); + double GetSharedConsumed(i16 threadIdx); + double GetLastSecondConsumed(i16 threadIdx); + double GetLastSecondSharedConsumed(i16 threadIdx); + TCpuConsumption PullStats(ui64 ts); + i16 GetFullThreadCount(); + float GetThreadCount(); + void SetFullThreadCount(i16 threadCount); + bool IsAvgPingGood(); +}; + +double TPoolInfo::GetBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetSharedBooked(i16 threadIdx) { + if ((size_t)threadIdx < SharedInfo.size()) { + return SharedInfo[threadIdx].Booked.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondBooked(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) { + if ((size_t)threadIdx < SharedInfo.size()) { + return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +double TPoolInfo::GetConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetSharedConsumed(i16 threadIdx) { + if ((size_t)threadIdx < SharedInfo.size()) { + return SharedInfo[threadIdx].Consumed.GetAvgPart(); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) { + if ((size_t)threadIdx < ThreadInfo.size()) { + return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) { + if ((size_t)threadIdx < SharedInfo.size()) { + return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + } + return 0.0; +} + +#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] +TCpuConsumption TPoolInfo::PullStats(ui64 ts) { + TCpuConsumption acc; + for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { + TThreadInfo &threadInfo = ThreadInfo[threadIdx]; + TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + acc.Add(cpuConsumption); + threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); + threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); + } + TVector sharedStats; + if (Shared) { + Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats); + } + + for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) { + auto stat = sharedStats[sharedIdx]; + TCpuConsumption sharedConsumption{ + Ts2Us(stat.SafeElapsedTicks), + static_cast(stat.CpuUs), + stat.NotEnoughCpuExecutions + }; + acc.Add(sharedConsumption); + SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History)); + SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History)); + } + + Consumed.Register(ts, acc.ConsumedUs); + MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed); + MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed); + AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); + Booked.Register(ts, acc.BookedUs); + MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed); + MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed); + NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; + NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; + if (WaitingStats && BasicPool) { + WaitingStats->Clear(); + BasicPool->GetWaitingStats(*WaitingStats); + if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) { + MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2); + } + } + return acc; +} +#undef UNROLL_HISTORY + +float TPoolInfo::GetThreadCount() { + return Pool->GetThreadCount(); +} + +i16 TPoolInfo::GetFullThreadCount() { + return Pool->GetFullThreadCount(); +} + +void TPoolInfo::SetFullThreadCount(i16 threadCount) { + Pool->SetFullThreadCount(threadCount); +} + +bool TPoolInfo::IsAvgPingGood() { + bool res = true; + if (AvgPingCounter) { + res &= *AvgPingCounter > MaxAvgPingUs; + } + if (AvgPingCounterWithSmallWindow) { + res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; + } + return res; +} + +class THarmonizer: public IHarmonizer { +private: + std::atomic IsDisabled = false; + TSpinLock Lock; + std::atomic NextHarmonizeTs = 0; + std::vector> Pools; + std::vector PriorityOrder; + + TValueHistory<16> Consumed; + TValueHistory<16> Booked; + + TAtomic MaxConsumedCpu = 0; + TAtomic MinConsumedCpu = 0; + TAtomic MaxBookedCpu = 0; + TAtomic MinBookedCpu = 0; + + TSharedExecutorPool* Shared = nullptr; + + std::atomic AvgAwakeningTimeUs = 0; + std::atomic AvgWakingUpTimeUs = 0; + + void PullStats(ui64 ts); + void HarmonizeImpl(ui64 ts); + void CalculatePriorityOrder(); +public: + THarmonizer(ui64 ts); + virtual ~THarmonizer(); + double Rescale(double value) const; + void Harmonize(ui64 ts) override; + void DeclareEmergency(ui64 ts) override; + void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; + void Enable(bool enable) override; + TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; + THarmonizerStats GetStats() const override; + void SetSharedPool(TSharedExecutorPool* pool) override; +}; + +THarmonizer::THarmonizer(ui64 ts) { + NextHarmonizeTs = ts; +} + +THarmonizer::~THarmonizer() { +} + +double THarmonizer::Rescale(double value) const { + return Max(0.0, Min(1.0, value * (1.0/0.9))); +} + +void THarmonizer::PullStats(ui64 ts) { + TCpuConsumption acc; + for (auto &pool : Pools) { + TCpuConsumption consumption = pool->PullStats(ts); + acc.Add(consumption); + } + Consumed.Register(ts, acc.ConsumedUs); + RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); + RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + Booked.Register(ts, acc.BookedUs); + RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); + RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); +} + +Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { + return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; +} + +Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) { + return booked < currentThreadCount - 0.5; +} + +void THarmonizer::HarmonizeImpl(ui64 ts) { + bool isStarvedPresent = false; + double booked = 0.0; + double consumed = 0.0; + double lastSecondBooked = 0.0; + i64 beingStopped = 0; + double total = 0; + TStackVec needyPools; + TStackVec, 8> hoggishPools; + TStackVec isNeedyByPool; + + size_t sumOfAdditionalThreads = 0; + + ui64 TotalWakingUpTime = 0; + ui64 TotalWakingUps = 0; + ui64 TotalAwakeningTime = 0; + ui64 TotalAwakenings = 0; + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + if (pool.WaitingStats) { + TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime; + TotalWakingUps += pool.WaitingStats->WakingUpCount; + TotalAwakeningTime += pool.WaitingStats->AwakingTotalTime; + TotalAwakenings += pool.WaitingStats->AwakingCount; + } + } + + constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime; + constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime; + + ui64 realAvgWakingUpTime = (TotalWakingUps ? TotalWakingUpTime / TotalWakingUps : knownAvgWakingUpTime); + ui64 avgWakingUpTime = realAvgWakingUpTime; + if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) { + avgWakingUpTime = knownAvgWakingUpTime; + } + AvgWakingUpTimeUs = Ts2Us(avgWakingUpTime); + + ui64 realAvgAwakeningTime = (TotalAwakenings ? TotalAwakeningTime / TotalAwakenings : knownAvgAwakeningUpTime); + ui64 avgAwakeningTime = realAvgAwakeningTime; + if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) { + avgAwakeningTime = knownAvgAwakeningUpTime; + } + AvgAwakeningTimeUs = Ts2Us(avgAwakeningTime); + + ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime; + LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); + + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + if (!pool.BasicPool) { + continue; + } + if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) { + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption); + } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } else { + ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } + pool.BasicPool->ClearWaitingStats(); + } + } + + std::vector hasSharedThread(Pools.size()); + std::vector hasSharedThreadWhichWasNotBorrowed(Pools.size()); + std::vector hasBorrowedSharedThread(Pools.size()); + std::vector freeHalfThread; + if (Shared) { + auto sharedState = Shared->GetState(); + for (ui32 poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + i16 threadIdx = sharedState.ThreadByPool[poolIdx]; + if (threadIdx != -1) { + hasSharedThread[poolIdx] = true; + if (sharedState.PoolByBorrowedThread[threadIdx] == -1) { + hasSharedThreadWhichWasNotBorrowed[poolIdx] = true; + } + + } + if (sharedState.BorrowedThreadByPool[poolIdx] != -1) { + hasBorrowedSharedThread[poolIdx] = true; + } + } + } + + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + total += pool.DefaultThreadCount; + + i16 currentFullThreadCount = pool.GetFullThreadCount(); + sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount); + float currentThreadCount = pool.GetThreadCount(); + + double poolBooked = 0.0; + double poolConsumed = 0.0; + double lastSecondPoolBooked = 0.0; + double lastSecondPoolConsumed = 0.0; + beingStopped += pool.Pool->GetBlockingThreadCount(); + + for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { + double threadBooked = Rescale(pool.GetBooked(threadIdx)); + double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx)); + double threadConsumed = Rescale(pool.GetConsumed(threadIdx)); + double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx)); + poolBooked += threadBooked; + lastSecondPoolBooked += threadLastSecondBooked; + poolConsumed += threadConsumed; + lastSecondPoolConsumed += threadLastSecondConsumed; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed); + } + + for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { + double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx)); + double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx)); + double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx)); + double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx)); + poolBooked += sharedBooked; + lastSecondPoolBooked += sharedLastSecondBooked; + poolConsumed += sharedConsumed; + lastSecondPoolConsumed += sharedLastSecondConsumed; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed); + } + + bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); + if (isStarved) { + isStarvedPresent = true; + } + + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount); + if (pool.AvgPingCounter) { + if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { + isNeedy = false; + } else { + pool.LastUpdateTs = ts; + } + } + if (currentThreadCount - poolBooked > 0.5) { + if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) { + freeHalfThread.push_back(poolIdx); + } + } + isNeedyByPool.push_back(isNeedy); + if (isNeedy) { + needyPools.push_back(poolIdx); + } + bool isHoggish = IsHoggish(poolBooked, currentThreadCount) + || IsHoggish(lastSecondPoolBooked, currentThreadCount); + if (isHoggish) { + hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)}); + } + booked += poolBooked; + consumed += poolConsumed; + AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); + } + + double budget = total - Max(booked, lastSecondBooked); + i16 budgetInt = static_cast(Max(budget, 0.0)); + if (budget < -0.1) { + isStarvedPresent = true; + } + double overbooked = consumed - booked; + if (overbooked < 0) { + isStarvedPresent = false; + } + + if (needyPools.size()) { + Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority > Pools[rhs]->Priority; + } + return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; + }); + } + + if (freeHalfThread.size()) { + Sort(freeHalfThread.begin(), freeHalfThread.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority > Pools[rhs]->Priority; + } + return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; + }); + } + + if (isStarvedPresent) { + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = *Pools[poolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { + Shared->ReturnOwnHalfThread(poolIdx); + } + while (threadCount > pool.DefaultFullThreadCount) { + pool.SetFullThreadCount(--threadCount); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + sumOfAdditionalThreads--; + + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + if (overbooked < 1) { + break; + } + } + if (overbooked < 1) { + break; + } + } + } + } else { + for (size_t needyPoolIdx : needyPools) { + TPoolInfo &pool = *Pools[needyPoolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + if (budget >= 1.0) { + if (threadCount + 1 <= pool.MaxFullThreadCount) { + AtomicIncrement(pool.IncreasingThreadsByNeedyState); + isNeedyByPool[needyPoolIdx] = false; + sumOfAdditionalThreads++; + pool.SetFullThreadCount(threadCount + 1); + budget -= 1.0; + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) { + Shared->GiveHalfThread(freeHalfThread.back(), needyPoolIdx); + freeHalfThread.pop_back(); + isNeedyByPool[needyPoolIdx] = false; + budget -= 0.5; + } + if constexpr (NFeatures::IsLocalQueues()) { + bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount; + needToExpandLocalQueue &= (bool)pool.BasicPool; + needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1); + needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE); + if (needToExpandLocalQueue) { + pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize); + } + } + } + } + + if (budget < 1.0) { + size_t takingAwayThreads = 0; + for (size_t needyPoolIdx : needyPools) { + TPoolInfo &pool = *Pools[needyPoolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; + if (sumOfAdditionalThreads < takingAwayThreads + 1) { + break; + } + if (!isNeedyByPool[needyPoolIdx]) { + continue; + } + AtomicIncrement(pool.IncreasingThreadsByExchange); + isNeedyByPool[needyPoolIdx] = false; + takingAwayThreads++; + pool.SetFullThreadCount(threadCount + 1); + + LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + + for (ui16 poolIdx : PriorityOrder) { + if (takingAwayThreads <= 0) { + break; + } + + TPoolInfo &pool = *Pools[poolIdx]; + size_t threadCount = pool.GetFullThreadCount(); + size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultFullThreadCount); + size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); + + if (!currentTakingAwayThreads) { + continue; + } + takingAwayThreads -= currentTakingAwayThreads; + pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); + + AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads); + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + } + } + + for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) { + TPoolInfo &pool = *Pools[hoggishPoolIdx]; + i64 threadCount = pool.GetFullThreadCount(); + if (hasBorrowedSharedThread[hoggishPoolIdx]) { + Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); + freeCpu -= 0.5; + continue; + } + if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { + pool.LocalQueueSize = std::min(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); + pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); + } + if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) { + AtomicIncrement(pool.DecreasingThreadsByHoggishState); + LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + pool.SetFullThreadCount(threadCount - 1); + } + } + + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = *Pools[poolIdx]; + AtomicSet(pool.PotentialMaxThreadCount, std::min(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt)); + } +} + +void THarmonizer::CalculatePriorityOrder() { + PriorityOrder.resize(Pools.size()); + Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); + Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority < Pools[rhs]->Priority; + } + return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId; + }); +} + +void THarmonizer::Harmonize(ui64 ts) { + if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); + return; + } + // Check again under the lock + if (IsDisabled) { + LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); + Lock.Release(); + return; + } + // Will never reach this line disabled + ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull)); + LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs); + + { + TInternalActorTypeGuard activityGuard; + + if (PriorityOrder.empty()) { + CalculatePriorityOrder(); + } + + PullStats(ts); + HarmonizeImpl(ts); + } + + Lock.Release(); +} + +void THarmonizer::DeclareEmergency(ui64 ts) { + NextHarmonizeTs = ts; +} + +void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { + TGuard guard(Lock); + Pools.emplace_back(new TPoolInfo); + TPoolInfo &poolInfo = *Pools.back(); + poolInfo.Pool = pool; + poolInfo.Shared = Shared; + poolInfo.BasicPool = dynamic_cast(pool); + poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); + poolInfo.MinThreadCount = pool->GetMinThreadCount(); + poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); + + poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); + poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); + poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); + poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); + poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); + poolInfo.Priority = pool->GetPriority(); + pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount); + if (pingInfo) { + poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; + poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; + poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; + } + if (poolInfo.BasicPool) { + poolInfo.WaitingStats.reset(new TWaitingStats()); + poolInfo.MovingWaitingStats.reset(new TWaitingStats()); + } + PriorityOrder.clear(); +} + +void THarmonizer::Enable(bool enable) { + TGuard guard(Lock); + IsDisabled = enable; +} + +IHarmonizer* MakeHarmonizer(ui64 ts) { + return new THarmonizer(ts); +} + +TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { + const TPoolInfo &pool = *Pools[poolId]; + ui64 flags = RelaxedLoad(&pool.LastFlags); + return TPoolHarmonizerStats{ + .IncreasingThreadsByNeedyState = static_cast(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), + .IncreasingThreadsByExchange = static_cast(RelaxedLoad(&pool.IncreasingThreadsByExchange)), + .DecreasingThreadsByStarvedState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), + .DecreasingThreadsByHoggishState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .DecreasingThreadsByExchange = static_cast(RelaxedLoad(&pool.DecreasingThreadsByExchange)), + .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed), + .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed), + .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed), + .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed), + .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed), + .PotentialMaxThreadCount = static_cast(RelaxedLoad(&pool.PotentialMaxThreadCount)), + .IsNeedy = static_cast(flags & 1), + .IsStarved = static_cast(flags & 2), + .IsHoggish = static_cast(flags & 4), + }; +} + +THarmonizerStats THarmonizer::GetStats() const { + return THarmonizerStats{ + .MaxConsumedCpu = static_cast(RelaxedLoad(&MaxConsumedCpu)), + .MinConsumedCpu = static_cast(RelaxedLoad(&MinConsumedCpu)), + .MaxBookedCpu = static_cast(RelaxedLoad(&MaxBookedCpu)), + .MinBookedCpu = static_cast(RelaxedLoad(&MinBookedCpu)), + .AvgAwakeningTimeUs = AvgAwakeningTimeUs, + .AvgWakingUpTimeUs = AvgWakingUpTimeUs, + }; +} + +void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) { + Shared = pool; +} + +} diff --git a/ydb/library/actors/core/harmonizer/harmonizer.h b/ydb/library/actors/core/harmonizer.h similarity index 60% rename from ydb/library/actors/core/harmonizer/harmonizer.h rename to ydb/library/actors/core/harmonizer.h index b5140c963b8b..b4323ef8de13 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.h +++ b/ydb/library/actors/core/harmonizer.h @@ -1,10 +1,11 @@ #pragma once #include "defs.h" +#include "executor_pool_shared.h" namespace NActors { class IExecutorPool; - class ISharedExecutorPool; + class TSharedExecutorPool; struct TSelfPingInfo; template @@ -16,38 +17,25 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - - ui64 ReceivedHalfThreadByNeedyState = 0; - ui64 GivenHalfThreadByOtherStarvedState = 0; - ui64 GivenHalfThreadByHoggishState = 0; - ui64 GivenHalfThreadByOtherNeedyState = 0; - ui64 ReturnedHalfThreadByStarvedState = 0; - ui64 ReturnedHalfThreadByOtherHoggishState = 0; - - float MaxCpuUs = 0.0; - float MinCpuUs = 0.0; - float AvgCpuUs = 0.0; - float MaxElapsedUs = 0.0; - float MinElapsedUs = 0.0; - float AvgElapsedUs = 0.0; + float MaxConsumedCpu = 0.0; + float MinConsumedCpu = 0.0; + float AvgConsumedCpu = 0.0; + float MaxBookedCpu = 0.0; + float MinBookedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; - - TString ToString() const; }; struct THarmonizerStats { - i64 MaxCpuUs = 0.0; - i64 MinCpuUs = 0.0; - i64 MaxElapsedUs = 0.0; - i64 MinElapsedUs = 0.0; + i64 MaxConsumedCpu = 0.0; + i64 MinConsumedCpu = 0.0; + i64 MaxBookedCpu = 0.0; + i64 MinBookedCpu = 0.0; double AvgAwakeningTimeUs = 0; double AvgWakingUpTimeUs = 0; - - TString ToString() const; }; // Pool cpu harmonizer @@ -60,7 +48,7 @@ namespace NActors { virtual void Enable(bool enable) = 0; virtual TPoolHarmonizerStats GetPoolStats(i16 poolId) const = 0; virtual THarmonizerStats GetStats() const = 0; - virtual void SetSharedPool(ISharedExecutorPool* pool) = 0; + virtual void SetSharedPool(TSharedExecutorPool* pool) = 0; }; IHarmonizer* MakeHarmonizer(ui64 ts); diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp deleted file mode 100644 index a2bd5c20e3cf..000000000000 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp +++ /dev/null @@ -1,171 +0,0 @@ -#include "cpu_consumption.h" -#include "debug.h" - -namespace NActors { - -LWTRACE_USING(ACTORLIB_PROVIDER); - -void TCpuConsumptionInfo::Clear() { - Elapsed = 0.0; - Cpu = 0.0; - LastSecondElapsed = 0.0; - LastSecondCpu = 0.0; -} - -void THarmonizerCpuConsumption::Init(i16 poolCount) { - PoolConsumption.resize(poolCount); - IsNeedyByPool.reserve(poolCount); - NeedyPools.reserve(poolCount); - HoggishPools.reserve(poolCount); -} - -namespace { - - float Rescale(float value) { - return Max(0.0, Min(1.0, value * (1.0/0.9))); - } - - void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) { - poolConsumption->Clear(); - for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { - float threadElapsed = Rescale(pool.GetElapsed(threadIdx)); - float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); - float threadCpu = Rescale(pool.GetCpu(threadIdx)); - float threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx)); - poolConsumption->Elapsed += threadElapsed; - poolConsumption->LastSecondElapsed += threadLastSecondElapsed; - poolConsumption->Cpu += threadCpu; - poolConsumption->LastSecondCpu += threadLastSecondCpu; - LWPROBE(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu); - } - for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { - float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); - float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); - float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); - float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); - poolConsumption->Elapsed += sharedElapsed; - poolConsumption->LastSecondElapsed += sharedLastSecondElapsed; - poolConsumption->Cpu += sharedCpu; - poolConsumption->LastSecondCpu += sharedLastSecondCpu; - LWPROBE(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), -1 - sharedIdx, sharedElapsed, sharedCpu, sharedLastSecondElapsed, sharedLastSecondCpu); - } - } - - bool IsStarved(double elapsed, double cpu) { - return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5); - } - - bool IsHoggish(double elapsed, double currentThreadCount) { - return elapsed < currentThreadCount - 0.5; - } - -} // namespace - - -void THarmonizerCpuConsumption::Pull(const std::vector> &pools, const TSharedInfo& sharedInfo) { - FreeHalfThread.clear(); - NeedyPools.clear(); - HoggishPools.clear(); - IsNeedyByPool.clear(); - - - TotalCores = 0; - AdditionalThreads = 0; - StoppingThreads = 0; - IsStarvedPresent = false; - Elapsed = 0.0; - Cpu = 0.0; - LastSecondElapsed = 0.0; - LastSecondCpu = 0.0; - for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) { - TPoolInfo& pool = *pools[poolIdx]; - TotalCores += pool.DefaultThreadCount; - - AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount); - float currentThreadCount = pool.GetThreadCount(); - StoppingThreads += pool.Pool->GetBlockingThreadCount(); - HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount); - - UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]); - - HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu); - - bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu) - || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu); - if (isStarved) { - IsStarvedPresent = true; - } - - bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].Cpu >= currentThreadCount); - IsNeedyByPool.push_back(isNeedy); - if (isNeedy) { - NeedyPools.push_back(poolIdx); - } - - if (currentThreadCount - PoolConsumption[poolIdx].Elapsed > 0.5) { - if (sharedInfo.HasBorrowedSharedThread[poolIdx] || sharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) { - FreeHalfThread.push_back(poolIdx); - } - } - - bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount) - || IsHoggish(PoolConsumption[poolIdx].LastSecondElapsed, currentThreadCount); - if (isHoggish) { - float freeCpu = std::max(currentThreadCount - PoolConsumption[poolIdx].Elapsed, currentThreadCount - PoolConsumption[poolIdx].LastSecondElapsed); - HoggishPools.push_back({poolIdx, freeCpu}); - } - - Elapsed += PoolConsumption[poolIdx].Elapsed; - Cpu += PoolConsumption[poolIdx].Cpu; - LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; - LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu; - pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed); - LWPROBE_WITH_DEBUG( - HarmonizeCheckPool, - poolIdx, - pool.Pool->GetName(), - PoolConsumption[poolIdx].Elapsed, - PoolConsumption[poolIdx].Cpu, - PoolConsumption[poolIdx].LastSecondElapsed, - PoolConsumption[poolIdx].LastSecondCpu, - currentThreadCount, - pool.MaxFullThreadCount, - isStarved, - isNeedy, - isHoggish - ); - } - - if (NeedyPools.size()) { - Sort(NeedyPools.begin(), NeedyPools.end(), [&] (i16 lhs, i16 rhs) { - if (pools[lhs]->Priority != pools[rhs]->Priority) { - return pools[lhs]->Priority > pools[rhs]->Priority; - } - return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId; - }); - } - - if (FreeHalfThread.size()) { - Sort(FreeHalfThread.begin(), FreeHalfThread.end(), [&] (i16 lhs, i16 rhs) { - if (pools[lhs]->Priority != pools[rhs]->Priority) { - return pools[lhs]->Priority > pools[rhs]->Priority; - } - return pools[lhs]->Pool->PoolId < pools[rhs]->Pool->PoolId; - }); - } - - HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "FreeHalfThread", FreeHalfThread.size(), "HoggishPools", HoggishPools.size()); - - Budget = TotalCores - Max(Elapsed, LastSecondElapsed); - BudgetInt = static_cast(Max(Budget, 0.0f)); - if (Budget < -0.1) { - IsStarvedPresent = true; - } - Overbooked = Elapsed - Cpu; - if (Overbooked < 0) { - IsStarvedPresent = false; - } - HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "BudgetInt", BudgetInt, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu); -} - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h deleted file mode 100644 index 0fbfd85b6ee1..000000000000 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once - -#include "defs.h" -#include "pool.h" -#include "shared_info.h" -namespace NActors { - -struct TPoolInfo; - -struct TCpuConsumptionInfo { - float Elapsed; - float Cpu; - float LastSecondElapsed; - float LastSecondCpu; - - void Clear(); -}; // struct TCpuConsumptionInfo - -struct THarmonizerCpuConsumption { - std::vector PoolConsumption; - - float TotalCores = 0; - i16 AdditionalThreads = 0; - i16 StoppingThreads = 0; - bool IsStarvedPresent = false; - float Budget = 0.0; - i16 BudgetInt = 0; - float Overbooked = 0.0; - - float Elapsed = 0.0; - float Cpu = 0.0; - float LastSecondElapsed = 0.0; - float LastSecondCpu = 0.0; - TStackVec NeedyPools; - TStackVec, 8> HoggishPools; - TStackVec IsNeedyByPool; - std::vector FreeHalfThread; - - void Init(i16 poolCount); - - void Pull(const std::vector> &pools, const TSharedInfo& sharedInfo); - -}; // struct THarmonizerCpuConsumption - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/debug.h b/ydb/library/actors/core/harmonizer/debug.h deleted file mode 100644 index 007d7d801224..000000000000 --- a/ydb/library/actors/core/harmonizer/debug.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include - -namespace NActors { - -constexpr bool DebugHarmonizer = false; - -template -void Print(TArgs&& ... args) { - ((Cerr << std::forward(args) << " "), ...) << Endl; -} - -#define HARMONIZER_DEBUG_PRINT(...) \ - if constexpr (DebugHarmonizer) { Print(__VA_ARGS__); } -// HARMONIZER_DEBUG_PRINT(...) - -#define LWPROBE_WITH_DEBUG(probe, ...) \ - LWPROBE(probe, __VA_ARGS__); \ - HARMONIZER_DEBUG_PRINT(#probe, __VA_ARGS__); \ -// LWPROBE_WITH_DEBUG(probe, ...) - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/defs.h b/ydb/library/actors/core/harmonizer/defs.h deleted file mode 100644 index 4e185067403d..000000000000 --- a/ydb/library/actors/core/harmonizer/defs.h +++ /dev/null @@ -1,3 +0,0 @@ -#pragma once - -#include diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp deleted file mode 100644 index e6b077b24033..000000000000 --- a/ydb/library/actors/core/harmonizer/harmonizer.cpp +++ /dev/null @@ -1,485 +0,0 @@ -#include "harmonizer.h" -#include "history.h" -#include "pool.h" -#include "waiting_stats.h" -#include "cpu_consumption.h" -#include "shared_info.h" -#include "debug.h" -#include - -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include - -#include - -namespace NActors { - - - -LWTRACE_USING(ACTORLIB_PROVIDER); - - -class THarmonizer: public IHarmonizer { -private: - std::atomic IsDisabled = false; - TSpinLock Lock; - std::atomic NextHarmonizeTs = 0; - std::vector> Pools; - std::vector PriorityOrder; - - TValueHistory<16> CpuUs; - TValueHistory<16> Elapsed; - - std::atomic MaxCpuUs = 0; - std::atomic MinCpuUs = 0; - std::atomic MaxElapsedUs = 0; - std::atomic MinElapsedUs = 0; - - ISharedExecutorPool* Shared = nullptr; - TSharedInfo SharedInfo; - - TWaitingInfo WaitingInfo; - THarmonizerCpuConsumption CpuConsumption; - THarmonizerStats Stats; - float ProcessingBudget = 0.0; - - void PullStats(ui64 ts); - void PullSharedInfo(); - void ProcessWaitingStats(); - void HarmonizeImpl(ui64 ts); - void CalculatePriorityOrder(); - void ProcessStarvedState(); - void ProcessNeedyState(); - void ProcessExchange(); - void ProcessHoggishState(); -public: - THarmonizer(ui64 ts); - virtual ~THarmonizer(); - double Rescale(double value) const; - void Harmonize(ui64 ts) override; - void DeclareEmergency(ui64 ts) override; - void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override; - void Enable(bool enable) override; - TPoolHarmonizerStats GetPoolStats(i16 poolId) const override; - THarmonizerStats GetStats() const override; - void SetSharedPool(ISharedExecutorPool* pool) override; -}; - -THarmonizer::THarmonizer(ui64 ts) { - NextHarmonizeTs = ts; -} - -THarmonizer::~THarmonizer() { -} - -void THarmonizer::PullStats(ui64 ts) { - HARMONIZER_DEBUG_PRINT("PullStats"); - TCpuConsumption acc; - for (auto &pool : Pools) { - TCpuConsumption consumption = pool->PullStats(ts); - acc.Add(consumption); - } - CpuUs.Register(ts, acc.CpuUs); - MaxCpuUs.store(CpuUs.GetMaxInt(), std::memory_order_relaxed); - MinCpuUs.store(CpuUs.GetMinInt(), std::memory_order_relaxed); - Elapsed.Register(ts, acc.ElapsedUs); - MaxElapsedUs.store(Elapsed.GetMaxInt(), std::memory_order_relaxed); - MinElapsedUs.store(Elapsed.GetMinInt(), std::memory_order_relaxed); - - WaitingInfo.Pull(Pools); - if (Shared) { - SharedInfo.Pull(*Shared); - } - CpuConsumption.Pull(Pools, SharedInfo); - ProcessingBudget = CpuConsumption.Budget; -} - -void THarmonizer::ProcessWaitingStats() { - HARMONIZER_DEBUG_PRINT("ProcessWaitingStats"); - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - if (!pool.BasicPool) { - continue; - } - if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) { - if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { - pool.BasicPool->CalcSpinPerThread(WaitingInfo.AvgWakingUpTimeUs); - } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { - ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); - } else { - ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(WaitingInfo.AvgWakingUpTimeUs); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); - } - pool.BasicPool->ClearWaitingStats(); - } - } -} - -void THarmonizer::ProcessStarvedState() { - HARMONIZER_DEBUG_PRINT("ProcessStarvedState"); - HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString()); - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = *Pools[poolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - if (SharedInfo.HasSharedThread[poolIdx] && !SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]) { - HARMONIZER_DEBUG_PRINT("return own half thread", poolIdx); - i16 borrowedPool = Shared->ReturnOwnHalfThread(poolIdx); - pool.ReturnedHalfThreadByStarvedState.fetch_add(1, std::memory_order_relaxed); - Y_ABORT_UNLESS(borrowedPool != -1); - Pools[borrowedPool]->GivenHalfThreadByOtherStarvedState.fetch_add(1, std::memory_order_relaxed); - } else { - HARMONIZER_DEBUG_PRINT("no own half thread", poolIdx, "has shared thread", SharedInfo.HasSharedThread[poolIdx], "has shared thread which was not borrowed", SharedInfo.HasSharedThreadWhichWasNotBorrowed[poolIdx]); - } - HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount); - while (threadCount > pool.DefaultFullThreadCount) { - pool.SetFullThreadCount(--threadCount); - pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed); - CpuConsumption.AdditionalThreads--; - CpuConsumption.StoppingThreads++; - - LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { - break; - } - } - if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { - break; - } - } -} - -void THarmonizer::ProcessNeedyState() { - HARMONIZER_DEBUG_PRINT("ProcessNeedyState"); - if (CpuConsumption.NeedyPools.empty()) { - HARMONIZER_DEBUG_PRINT("No needy pools"); - return; - } - for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { - TPoolInfo &pool = *Pools[needyPoolIdx]; - if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { - continue; - } - i64 threadCount = pool.GetFullThreadCount(); - if (Shared && ProcessingBudget >= 0.5 && !SharedInfo.HasBorrowedSharedThread[needyPoolIdx] && CpuConsumption.FreeHalfThread.size()) { - i16 poolWithHalfThread = CpuConsumption.FreeHalfThread.back(); - Shared->GiveHalfThread(poolWithHalfThread, needyPoolIdx); - CpuConsumption.FreeHalfThread.pop_back(); - CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; - ProcessingBudget -= 0.5; - pool.ReceivedHalfThreadByNeedyState.fetch_add(1, std::memory_order_relaxed); - Pools[poolWithHalfThread]->GivenHalfThreadByOtherNeedyState.fetch_add(1, std::memory_order_relaxed); - } else if (ProcessingBudget >= 1.0) { - if (threadCount + 1 <= pool.MaxFullThreadCount) { - pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed); - CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; - CpuConsumption.AdditionalThreads++; - pool.SetFullThreadCount(threadCount + 1); - ProcessingBudget -= 1.0; - LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - } - if constexpr (NFeatures::IsLocalQueues()) { - bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount; - needToExpandLocalQueue &= (bool)pool.BasicPool; - needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1); - needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE); - if (needToExpandLocalQueue) { - pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize); - } - } - } -} - -void THarmonizer::ProcessExchange() { - HARMONIZER_DEBUG_PRINT("ProcessExchange"); - if (CpuConsumption.NeedyPools.empty()) { - HARMONIZER_DEBUG_PRINT("No needy pools"); - return; - } - size_t takingAwayThreads = 0; - size_t sumOfAdditionalThreads = CpuConsumption.AdditionalThreads; - for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { - TPoolInfo &pool = *Pools[needyPoolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; - if (sumOfAdditionalThreads < takingAwayThreads + 1) { - break; - } - if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { - continue; - } - pool.IncreasingThreadsByExchange.fetch_add(1, std::memory_order_relaxed); - CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; - takingAwayThreads++; - pool.SetFullThreadCount(threadCount + 1); - - LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - - for (ui16 poolIdx : PriorityOrder) { - if (takingAwayThreads <= 0) { - break; - } - - TPoolInfo &pool = *Pools[poolIdx]; - size_t threadCount = pool.GetFullThreadCount(); - size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultFullThreadCount); - size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); - - if (!currentTakingAwayThreads) { - continue; - } - takingAwayThreads -= currentTakingAwayThreads; - pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); - - pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed); - LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } -} - -void THarmonizer::ProcessHoggishState() { - HARMONIZER_DEBUG_PRINT("ProcessHoggishState"); - for (auto &[hoggishPoolIdx, freeCpu] : CpuConsumption.HoggishPools) { - TPoolInfo &pool = *Pools[hoggishPoolIdx]; - i64 threadCount = pool.GetFullThreadCount(); - if (SharedInfo.HasBorrowedSharedThread[hoggishPoolIdx]) { - i16 originalPool = Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); - HARMONIZER_DEBUG_PRINT("has borrowed shared thread", hoggishPoolIdx, "will return half thread to", originalPool); - freeCpu -= 0.5; - pool.GivenHalfThreadByHoggishState.fetch_add(1, std::memory_order_relaxed); - Y_ABORT_UNLESS(originalPool != -1); - Pools[originalPool]->ReturnedHalfThreadByOtherHoggishState.fetch_add(1, std::memory_order_relaxed); - } - if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { - pool.LocalQueueSize = std::min(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); - pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); - } - HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu); - if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) { - pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed); - pool.SetFullThreadCount(threadCount - 1); - LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); - } - } -} - -void THarmonizer::HarmonizeImpl(ui64 ts) { - HARMONIZER_DEBUG_PRINT("HarmonizeImpl"); - Y_UNUSED(ts); - - ProcessWaitingStats(); - - for (size_t needyPoolIdx : CpuConsumption.NeedyPools) { - TPoolInfo &pool = *Pools[needyPoolIdx]; - if (pool.AvgPingCounter && pool.LastUpdateTs + Us2Ts(3'000'000) > ts) { - CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; - HARMONIZER_DEBUG_PRINT("pool won't be updated because time", needyPoolIdx); - } - } - - HARMONIZER_DEBUG_PRINT("IsStarvedPresent", CpuConsumption.IsStarvedPresent, "Overbooked", CpuConsumption.Overbooked, "StoppingThreads", CpuConsumption.StoppingThreads); - if (CpuConsumption.IsStarvedPresent && CpuConsumption.Overbooked >= CpuConsumption.StoppingThreads) { - ProcessStarvedState(); - } else if (!CpuConsumption.IsStarvedPresent) { - ProcessNeedyState(); - } - - if (ProcessingBudget < 1.0) { - ProcessExchange(); - } - - if (!CpuConsumption.HoggishPools.empty()) { - ProcessHoggishState(); - } - - for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = *Pools[poolIdx]; - pool.PotentialMaxThreadCount.store(std::min(pool.MaxThreadCount, static_cast(pool.GetThreadCount() + CpuConsumption.Budget)), std::memory_order_relaxed); - HARMONIZER_DEBUG_PRINT(poolIdx, pool.Pool->GetName(), "potential max thread count", pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), "budget", CpuConsumption.Budget, "thread count", pool.GetThreadCount()); - } -} - -void THarmonizer::CalculatePriorityOrder() { - PriorityOrder.resize(Pools.size()); - Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); - Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs]->Priority != Pools[rhs]->Priority) { - return Pools[lhs]->Priority < Pools[rhs]->Priority; - } - return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId; - }); -} - -void THarmonizer::Harmonize(ui64 ts) { - if (IsDisabled.load(std::memory_order_relaxed) || NextHarmonizeTs.load(std::memory_order_acquire) > ts || !Lock.TryAcquire()) { - LWPROBE_WITH_DEBUG(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), false); - return; - } - - if (NextHarmonizeTs.load(std::memory_order_acquire) > ts) { - Lock.Release(); - return; - } - - // Check again under the lock - if (IsDisabled.load(std::memory_order_relaxed)) { - LWPROBE_WITH_DEBUG(TryToHarmonizeFailed, ts, NextHarmonizeTs.load(std::memory_order_relaxed), IsDisabled.load(std::memory_order_relaxed), true); - Lock.Release(); - return; - } - // Will never reach this line disabled - ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull), std::memory_order_acquire); - LWPROBE_WITH_DEBUG(TryToHarmonizeSuccess, ts, NextHarmonizeTs.load(std::memory_order_relaxed), previousNextHarmonizeTs); - - { - TInternalActorTypeGuard activityGuard; - - if (PriorityOrder.empty()) { - CalculatePriorityOrder(); - SharedInfo.Init(Pools.size()); - CpuConsumption.Init(Pools.size()); - } - - PullStats(ts); - HarmonizeImpl(ts); - } - - Lock.Release(); -} - -void THarmonizer::DeclareEmergency(ui64 ts) { - NextHarmonizeTs = ts; -} - -void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { - TGuard guard(Lock); - Pools.emplace_back(new TPoolInfo); - TPoolInfo &poolInfo = *Pools.back(); - poolInfo.Pool = pool; - poolInfo.Shared = Shared; - poolInfo.BasicPool = dynamic_cast(pool); - poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount(); - poolInfo.MinThreadCount = pool->GetMinThreadCount(); - poolInfo.MaxThreadCount = pool->GetMaxThreadCount(); - poolInfo.PotentialMaxThreadCount = poolInfo.MaxThreadCount; - - poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); - poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); - poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); - poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); - poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); - poolInfo.Priority = pool->GetPriority(); - pool->SetFullThreadCount(poolInfo.DefaultFullThreadCount); - if (pingInfo) { - poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; - poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow; - poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs; - } - if (poolInfo.BasicPool) { - poolInfo.WaitingStats.reset(new TWaitingStats()); - poolInfo.MovingWaitingStats.reset(new TWaitingStats()); - } - PriorityOrder.clear(); -} - -void THarmonizer::Enable(bool enable) { - TGuard guard(Lock); - IsDisabled = enable; -} - -IHarmonizer* MakeHarmonizer(ui64 ts) { - return new THarmonizer(ts); -} - -TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { - const TPoolInfo &pool = *Pools[poolId]; - ui64 flags = pool.LastFlags.load(std::memory_order_relaxed); - return TPoolHarmonizerStats{ - .IncreasingThreadsByNeedyState = pool.IncreasingThreadsByNeedyState.load(std::memory_order_relaxed), - .IncreasingThreadsByExchange = pool.IncreasingThreadsByExchange.load(std::memory_order_relaxed), - .DecreasingThreadsByStarvedState = pool.DecreasingThreadsByStarvedState.load(std::memory_order_relaxed), - .DecreasingThreadsByHoggishState = pool.DecreasingThreadsByHoggishState.load(std::memory_order_relaxed), - .DecreasingThreadsByExchange = pool.DecreasingThreadsByExchange.load(std::memory_order_relaxed), - .ReceivedHalfThreadByNeedyState = pool.ReceivedHalfThreadByNeedyState.load(std::memory_order_relaxed), - .GivenHalfThreadByOtherStarvedState = pool.GivenHalfThreadByOtherStarvedState.load(std::memory_order_relaxed), - .GivenHalfThreadByHoggishState = pool.GivenHalfThreadByHoggishState.load(std::memory_order_relaxed), - .GivenHalfThreadByOtherNeedyState = pool.GivenHalfThreadByOtherNeedyState.load(std::memory_order_relaxed), - .ReturnedHalfThreadByStarvedState = pool.ReturnedHalfThreadByStarvedState.load(std::memory_order_relaxed), - .ReturnedHalfThreadByOtherHoggishState = pool.ReturnedHalfThreadByOtherHoggishState.load(std::memory_order_relaxed), - .MaxCpuUs = pool.MaxCpuUs.load(std::memory_order_relaxed), - .MinCpuUs = pool.MinCpuUs.load(std::memory_order_relaxed), - .AvgCpuUs = pool.AvgCpuUs.load(std::memory_order_relaxed), - .MaxElapsedUs = pool.MaxElapsedUs.load(std::memory_order_relaxed), - .MinElapsedUs = pool.MinElapsedUs.load(std::memory_order_relaxed), - .PotentialMaxThreadCount = pool.PotentialMaxThreadCount.load(std::memory_order_relaxed), - .IsNeedy = static_cast(flags & 1), - .IsStarved = static_cast(flags & 2), - .IsHoggish = static_cast(flags & 4), - }; -} - -THarmonizerStats THarmonizer::GetStats() const { - return THarmonizerStats{ - .MaxCpuUs = MaxCpuUs.load(std::memory_order_relaxed), - .MinCpuUs = MinCpuUs.load(std::memory_order_relaxed), - .MaxElapsedUs = MaxElapsedUs.load(std::memory_order_relaxed), - .MinElapsedUs = MinElapsedUs.load(std::memory_order_relaxed), - .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed), - .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed), - }; -} - -void THarmonizer::SetSharedPool(ISharedExecutorPool* pool) { - Shared = pool; -} - -TString TPoolHarmonizerStats::ToString() const { - return TStringBuilder() << '{' - << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", " - << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", " - << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", " - << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << ", " - << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << ", " - << "ReceivedHalfThreadByNeedyState: " << ReceivedHalfThreadByNeedyState << ", " - << "GivenHalfThreadByOtherStarvedState: " << GivenHalfThreadByOtherStarvedState << ", " - << "GivenHalfThreadByHoggishState: " << GivenHalfThreadByHoggishState << ", " - << "GivenHalfThreadByOtherNeedyState: " << GivenHalfThreadByOtherNeedyState << ", " - << "ReturnedHalfThreadByStarvedState: " << ReturnedHalfThreadByStarvedState << ", " - << "ReturnedHalfThreadByOtherHoggishState: " << ReturnedHalfThreadByOtherHoggishState << ", " - << "MaxCpuUs: " << MaxCpuUs << ", " - << "MinCpuUs: " << MinCpuUs << ", " - << "AvgCpuUs: " << AvgCpuUs << ", " - << "MaxElapsedUs: " << MaxElapsedUs << ", " - << "MinElapsedUs: " << MinElapsedUs << ", " - << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << ", " - << "IsNeedy: " << IsNeedy << ", " - << "IsStarved: " << IsStarved << ", " - << "IsHoggish: " << IsHoggish << '}'; -} - -TString THarmonizerStats::ToString() const { - return TStringBuilder() << '{' - << "MaxCpuUs: " << MaxCpuUs << ", " - << "MinCpuUs: " << MinCpuUs << ", " - << "MaxElapsedUs: " << MaxElapsedUs << ", " - << "MinElapsedUs: " << MinElapsedUs << ", " - << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", " - << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}'; -} - -} diff --git a/ydb/library/actors/core/harmonizer/history.h b/ydb/library/actors/core/harmonizer/history.h deleted file mode 100644 index c3ac81cf83d5..000000000000 --- a/ydb/library/actors/core/harmonizer/history.h +++ /dev/null @@ -1,149 +0,0 @@ -#pragma once - -#include "defs.h" -#include - - -namespace NActors { - -template -struct TValueHistory { - - static constexpr bool CheckBinaryPower(ui64 value) { - return !(value & (value - 1)); - } - - static_assert(CheckBinaryPower(HistoryBufferSize)); - - double History[HistoryBufferSize] = {0.0}; - ui64 HistoryIdx = 0; - ui64 LastTs = Max(); - double LastUs = 0.0; - double AccumulatedUs = 0.0; - ui64 AccumulatedTs = 0; - - template - double Accumulate(auto op, auto comb, ui8 seconds) const { - double acc = AccumulatedUs; - size_t idx = HistoryIdx; - ui8 leftSeconds = seconds; - if constexpr (!WithTail) { - idx--; - leftSeconds--; - if (idx >= HistoryBufferSize) { - idx = HistoryBufferSize - 1; - } - acc = History[idx]; - } - do { - idx--; - leftSeconds--; - if (idx >= HistoryBufferSize) { - idx = HistoryBufferSize - 1; - } - if constexpr (WithTail) { - acc = op(acc, History[idx]); - } else if (leftSeconds) { - acc = op(acc, History[idx]); - } else { - ui64 tsInSecond = Us2Ts(1'000'000.0); - acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond); - } - } while (leftSeconds); - double duration = 1'000'000.0 * seconds; - if constexpr (WithTail) { - duration += Ts2Us(AccumulatedTs); - } - return comb(acc, duration); - } - - template - double GetAvgPartForLastSeconds(ui8 seconds) const { - auto sum = [](double acc, double value) { - return acc + value; - }; - auto avg = [](double sum, double duration) { - return sum / duration; - }; - return Accumulate(sum, avg, seconds); - } - - double GetAvgPart() const { - return GetAvgPartForLastSeconds(HistoryBufferSize); - } - - double GetMaxForLastSeconds(ui8 seconds) const { - auto max = [](const double& acc, const double& value) { - return Max(acc, value); - }; - auto fst = [](const double& value, const double&) { return value; }; - return Accumulate(max, fst, seconds); - } - - double GetMax() const { - return GetMaxForLastSeconds(HistoryBufferSize); - } - - i64 GetMaxInt() const { - return static_cast(GetMax()); - } - - double GetMinForLastSeconds(ui8 seconds) const { - auto min = [](const double& acc, const double& value) { - return Min(acc, value); - }; - auto fst = [](const double& value, const double&) { return value; }; - return Accumulate(min, fst, seconds); - } - - double GetMin() const { - return GetMinForLastSeconds(HistoryBufferSize); - } - - i64 GetMinInt() const { - return static_cast(GetMin()); - } - - void Register(ui64 ts, double valueUs) { - if (ts < LastTs) { - LastTs = ts; - LastUs = valueUs; - AccumulatedUs = 0.0; - AccumulatedTs = 0; - return; - } - ui64 lastTs = std::exchange(LastTs, ts); - ui64 dTs = ts - lastTs; - double lastUs = std::exchange(LastUs, valueUs); - double dUs = valueUs - lastUs; - - if (dTs > Us2Ts(8'000'000.0)) { - dUs = dUs * 1'000'000.0 / Ts2Us(dTs); - for (size_t idx = 0; idx < HistoryBufferSize; ++idx) { - History[idx] = dUs; - } - AccumulatedUs = 0.0; - AccumulatedTs = 0; - return; - } - - while (dTs > 0) { - if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) { - AccumulatedTs += dTs; - AccumulatedUs += dUs; - break; - } else { - ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs; - double addUs = dUs * addTs / dTs; - dTs -= addTs; - dUs -= addUs; - History[HistoryIdx] = AccumulatedUs + addUs; - HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize; - AccumulatedUs = 0.0; - AccumulatedTs = 0; - } - } - } -}; // struct TValueHistory - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/pool.cpp b/ydb/library/actors/core/harmonizer/pool.cpp deleted file mode 100644 index 472d8d2592ff..000000000000 --- a/ydb/library/actors/core/harmonizer/pool.cpp +++ /dev/null @@ -1,149 +0,0 @@ -#include "pool.h" - -#include -#include -#include -#include - -#include "debug.h" - -namespace NActors { - -LWTRACE_USING(ACTORLIB_PROVIDER); - -TPoolInfo::TPoolInfo() - : LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) -{} - -double TPoolInfo::GetCpu(i16 threadIdx) const { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].CpuUs.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetSharedCpu(i16 sharedThreadIdx) const { - if ((size_t)sharedThreadIdx < SharedInfo.size()) { - return SharedInfo[sharedThreadIdx].CpuUs.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondCpu(i16 threadIdx) const { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].CpuUs.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondSharedCpu(i16 sharedThreadIdx) const { - if ((size_t)sharedThreadIdx < SharedInfo.size()) { - return SharedInfo[sharedThreadIdx].CpuUs.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetElapsed(i16 threadIdx) const { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].ElapsedUs.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetSharedElapsed(i16 sharedThreadIdx) const { - if ((size_t)sharedThreadIdx < SharedInfo.size()) { - return SharedInfo[sharedThreadIdx].ElapsedUs.GetAvgPart(); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) const { - if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].ElapsedUs.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -double TPoolInfo::GetLastSecondSharedElapsed(i16 sharedThreadIdx) const { - if ((size_t)sharedThreadIdx < SharedInfo.size()) { - return SharedInfo[sharedThreadIdx].ElapsedUs.GetAvgPartForLastSeconds(1); - } - return 0.0; -} - -#define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] -TCpuConsumption TPoolInfo::PullStats(ui64 ts) { - TCpuConsumption acc; - for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { - TThreadInfo &threadInfo = ThreadInfo[threadIdx]; - TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); - acc.Add(cpuConsumption); - threadInfo.ElapsedUs.Register(ts, cpuConsumption.ElapsedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(threadInfo.ElapsedUs.History)); - threadInfo.CpuUs.Register(ts, cpuConsumption.CpuUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(threadInfo.CpuUs.History)); - } - TVector sharedStats; - if (Shared) { - Shared->GetSharedStatsForHarmonizer(Pool->PoolId, sharedStats); - } - - for (ui32 sharedIdx = 0; sharedIdx < SharedInfo.size(); ++sharedIdx) { - auto stat = sharedStats[sharedIdx]; - TCpuConsumption sharedConsumption{ - Ts2Us(stat.SafeElapsedTicks), - static_cast(stat.CpuUs), - stat.NotEnoughCpuExecutions - }; - acc.Add(sharedConsumption); - SharedInfo[sharedIdx].ElapsedUs.Register(ts, sharedConsumption.ElapsedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "elapsed", UNROLL_HISTORY(SharedInfo[sharedIdx].ElapsedUs.History)); - SharedInfo[sharedIdx].CpuUs.Register(ts, sharedConsumption.CpuUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "cpu", UNROLL_HISTORY(SharedInfo[sharedIdx].CpuUs.History)); - } - - CpuUs.Register(ts, acc.CpuUs); - MaxCpuUs.store(CpuUs.GetMax() / 1'000'000, std::memory_order_relaxed); - MinCpuUs.store(CpuUs.GetMin() / 1'000'000, std::memory_order_relaxed); - AvgCpuUs.store(CpuUs.GetAvgPart() / 1'000'000, std::memory_order_relaxed); - ElapsedUs.Register(ts, acc.ElapsedUs); - MaxElapsedUs.store(ElapsedUs.GetMax() / 1'000'000, std::memory_order_relaxed); - MinElapsedUs.store(ElapsedUs.GetMin() / 1'000'000, std::memory_order_relaxed); - NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; - NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; - if (WaitingStats && BasicPool) { - WaitingStats->Clear(); - BasicPool->GetWaitingStats(*WaitingStats); - if constexpr (!NFeatures::TSpinFeatureFlags::CalcPerThread) { - MovingWaitingStats->Add(*WaitingStats, 0.8, 0.2); - } - } - return acc; -} -#undef UNROLL_HISTORY - -float TPoolInfo::GetThreadCount() { - return Pool->GetThreadCount(); -} - -i16 TPoolInfo::GetFullThreadCount() { - return Pool->GetFullThreadCount(); -} - -void TPoolInfo::SetFullThreadCount(i16 threadCount) { - HARMONIZER_DEBUG_PRINT(Pool->PoolId, Pool->GetName(), "set full thread count", threadCount); - Pool->SetFullThreadCount(threadCount); -} - -bool TPoolInfo::IsAvgPingGood() { - bool res = true; - if (AvgPingCounter) { - res &= *AvgPingCounter > MaxAvgPingUs; - } - if (AvgPingCounterWithSmallWindow) { - res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs; - } - return res; -} - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/pool.h b/ydb/library/actors/core/harmonizer/pool.h deleted file mode 100644 index e5e88b71e281..000000000000 --- a/ydb/library/actors/core/harmonizer/pool.h +++ /dev/null @@ -1,93 +0,0 @@ -#pragma once - -#include "defs.h" - -#include "history.h" -#include - -#include - -namespace NActors { - -class ISharedExecutorPool; -class TBasicExecutorPool; -class IExecutorPool; -template -struct TWaitingStats; - -struct TThreadInfo { - TValueHistory<8> CpuUs; - TValueHistory<8> ElapsedUs; -}; // struct TThreadInfo - -struct TPoolInfo { - std::vector ThreadInfo; - std::vector SharedInfo; - ISharedExecutorPool* Shared = nullptr; - IExecutorPool* Pool = nullptr; - TBasicExecutorPool* BasicPool = nullptr; - - i16 DefaultFullThreadCount = 0; - i16 MinFullThreadCount = 0; - i16 MaxFullThreadCount = 0; - - float DefaultThreadCount = 0; - float MinThreadCount = 0; - float MaxThreadCount = 0; - - i16 Priority = 0; - NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter; - NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow; - ui32 MaxAvgPingUs = 0; - ui64 LastUpdateTs = 0; - ui64 NotEnoughCpuExecutions = 0; - ui64 NewNotEnoughCpuExecutions = 0; - ui16 LocalQueueSize; - - std::atomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish - std::atomic IncreasingThreadsByNeedyState = 0; - std::atomic IncreasingThreadsByExchange = 0; - std::atomic DecreasingThreadsByStarvedState = 0; - std::atomic DecreasingThreadsByHoggishState = 0; - std::atomic DecreasingThreadsByExchange = 0; - std::atomic PotentialMaxThreadCount = 0; - std::atomic ReceivedHalfThreadByNeedyState = 0; - std::atomic GivenHalfThreadByOtherStarvedState = 0; - std::atomic GivenHalfThreadByHoggishState = 0; - std::atomic GivenHalfThreadByOtherNeedyState = 0; - std::atomic ReturnedHalfThreadByStarvedState = 0; - std::atomic ReturnedHalfThreadByOtherHoggishState = 0; - - TValueHistory<16> CpuUs; - TValueHistory<16> ElapsedUs; - - std::atomic MaxCpuUs = 0; - std::atomic MinCpuUs = 0; - std::atomic AvgCpuUs = 0; - std::atomic MaxElapsedUs = 0; - std::atomic MinElapsedUs = 0; - std::atomic AvgElapsedUs = 0; - - std::unique_ptr> WaitingStats; - std::unique_ptr> MovingWaitingStats; - - TPoolInfo(); - - double GetCpu(i16 threadIdx) const; - double GetElapsed(i16 threadIdx) const; - double GetLastSecondCpu(i16 threadIdx) const; - double GetLastSecondElapsed(i16 threadIdx) const; - - double GetSharedCpu(i16 sharedThreadIdx) const; - double GetSharedElapsed(i16 sharedThreadIdx) const; - double GetLastSecondSharedCpu(i16 sharedThreadIdx) const; - double GetLastSecondSharedElapsed(i16 sharedThreadIdx) const; - - TCpuConsumption PullStats(ui64 ts); - i16 GetFullThreadCount(); - float GetThreadCount(); - void SetFullThreadCount(i16 threadCount); - bool IsAvgPingGood(); -}; // struct TPoolInfo - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp deleted file mode 100644 index 6b8b94f0bd88..000000000000 --- a/ydb/library/actors/core/harmonizer/shared_info.cpp +++ /dev/null @@ -1,55 +0,0 @@ -#include "shared_info.h" -#include - -#include - -namespace NActors { - -void TSharedInfo::Init(i16 poolCount) { - HasSharedThread.resize(poolCount, false); - HasSharedThreadWhichWasNotBorrowed.resize(poolCount, false); - HasBorrowedSharedThread.resize(poolCount, false); -} - -void TSharedInfo::Pull(const ISharedExecutorPool& shared) { - auto sharedState = shared.GetState(); - for (ui32 poolIdx = 0; poolIdx < HasSharedThread.size(); ++poolIdx) { - i16 threadIdx = sharedState.ThreadByPool[poolIdx]; - if (threadIdx != -1) { - HasSharedThread[poolIdx] = true; - if (sharedState.PoolByBorrowedThread[threadIdx] == -1) { - HasSharedThreadWhichWasNotBorrowed[poolIdx] = true; - } else { - HasSharedThreadWhichWasNotBorrowed[poolIdx] = false; - } - } - if (sharedState.BorrowedThreadByPool[poolIdx] != -1) { - HasBorrowedSharedThread[poolIdx] = true; - } else { - HasBorrowedSharedThread[poolIdx] = false; - } - } -} - -TString TSharedInfo::ToString() const { - TStringBuilder builder; - builder << "{"; - builder << "HasSharedThread: \""; - for (ui32 i = 0; i < HasSharedThread.size(); ++i) { - builder << (HasSharedThread[i] ? "1" : "0"); - } - builder << "\", "; - builder << "HasSharedThreadWhichWasNotBorrowed: \""; - for (ui32 i = 0; i < HasSharedThreadWhichWasNotBorrowed.size(); ++i) { - builder << (HasSharedThreadWhichWasNotBorrowed[i] ? "1" : "0"); - } - builder << "\", "; - builder << "HasBorrowedSharedThread: \""; - for (ui32 i = 0; i < HasBorrowedSharedThread.size(); ++i) { - builder << (HasBorrowedSharedThread[i] ? "1" : "0"); - } - builder << "\"}"; - return builder; -} - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h deleted file mode 100644 index 41277db1bbb7..000000000000 --- a/ydb/library/actors/core/harmonizer/shared_info.h +++ /dev/null @@ -1,20 +0,0 @@ -#pragma once - -#include "defs.h" - -namespace NActors { - -class ISharedExecutorPool; - -struct TSharedInfo { - std::vector HasSharedThread; - std::vector HasSharedThreadWhichWasNotBorrowed; - std::vector HasBorrowedSharedThread; - - void Init(i16 poolCount); - void Pull(const ISharedExecutorPool& shared); - - TString ToString() const; -}; // struct TSharedInfo - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp deleted file mode 100644 index 571550c7b1b9..000000000000 --- a/ydb/library/actors/core/harmonizer/ut/harmonizer_ut.cpp +++ /dev/null @@ -1,673 +0,0 @@ -#include "harmonizer.h" -#include "debug.h" -#include -#include -#include -#include - -using namespace NActors; - - -#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString()); -// end CHECK_CHANGING_THREADS - -#define CHECK_CHANGING_HALF_THREADS(stats, received_needy, given_starved, given_hoggish, given_other_needy, returned_starved, returned_other_hoggish) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).ReceivedHalfThreadByNeedyState, received_needy, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherStarvedState, given_starved, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByHoggishState, given_hoggish, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).GivenHalfThreadByOtherNeedyState, given_other_needy, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByStarvedState, returned_starved, (stats).ToString()); \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).ReturnedHalfThreadByOtherHoggishState, returned_other_hoggish, (stats).ToString()); -// end CHECK_CHANGING_HALF_THREADS - -#define CHECK_IS_NEEDY(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, true, (stats).ToString()); \ -// end CHECK_IS_NEEDY - -#define CHECK_IS_NOT_NEEDY(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, false, (stats).ToString()); \ -// end CHECK_IS_NOT_NEEDY - -#define CHECK_IS_HOGGISH(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, true, (stats).ToString()); \ -// end CHECK_IS_HOGGISH - -#define CHECK_IS_NOT_HOGGISH(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, false, (stats).ToString()); \ -// end CHECK_IS_NOT_HOGGISH - -#define CHECK_IS_STARVED(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, true, (stats).ToString()); \ -// end CHECK_IS_STARVED - -#define CHECK_IS_NOT_STARVED(stats) \ - UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, false, (stats).ToString()); \ -// end CHECK_IS_NOT_STARVED - - -Y_UNIT_TEST_SUITE(HarmonizerTests) { - - struct TMockExecutorPoolParams { - i16 DefaultFullThreadCount = 4; - i16 MinFullThreadCount = 4; - i16 MaxFullThreadCount = 8; - float DefaultThreadCount = 4.0f; - float MinThreadCount = 4.0f; - float MaxThreadCount = 8.0f; - i16 Priority = 0; - TString Name = "MockPool"; - ui32 PoolId = 0; - - TString ToString() const { - return TStringBuilder() << "PoolId: " << PoolId << ", Name: " << Name << ", DefaultFullThreadCount: " << DefaultFullThreadCount << ", MinFullThreadCount: " << MinFullThreadCount << ", MaxFullThreadCount: " << MaxFullThreadCount << ", DefaultThreadCount: " << DefaultThreadCount << ", MinThreadCount: " << MinThreadCount << ", MaxThreadCount: " << MaxThreadCount << ", Priority: " << Priority; - } - }; - - struct TCpuConsumptionModel { - TCpuConsumption value; - TCpuConsumptionModel() : value() {} - TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {} - operator TCpuConsumption() const { - return value; - } - void Increase(const TCpuConsumption& other) { - value.ElapsedUs += other.ElapsedUs; - value.CpuUs += other.CpuUs; - value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; - } - }; - - class TMockExecutorPool : public IExecutorPool { - public: - TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams()) - : IExecutorPool(params.PoolId) - , Params(params) - , ThreadCount(params.DefaultFullThreadCount) - , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0}) - { - - } - - TMockExecutorPoolParams Params; - i16 ThreadCount = 0; - std::vector ThreadCpuConsumptions; - std::vector SharedThreads; - - i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; } - i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; } - i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; } - void SetFullThreadCount(i16 count) override { - HARMONIZER_DEBUG_PRINT(Params.PoolId, Params.Name, "set full thread count", count); - ThreadCount = Max(Params.MinFullThreadCount, Min(Params.MaxFullThreadCount, count)); - } - i16 GetFullThreadCount() const override { return ThreadCount; } - float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; } - float GetMinThreadCount() const override { return Params.MinThreadCount; } - float GetMaxThreadCount() const override { return Params.MaxThreadCount; } - i16 GetPriority() const override { return Params.Priority; } - TString GetName() const override { return Params.Name; } - - // Дополнительные методы из IExecutorPool - void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} - void Start() override {} - void PrepareStop() override {} - void Shutdown() override {} - bool Cleanup() override { return true; } - - TMailbox* GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return nullptr; } - TMailbox* ResolveMailbox(ui32 /*hint*/) override { return nullptr; } - - void Schedule(TInstant /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - void Schedule(TMonotonic /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - void Schedule(TDuration /*delta*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} - - bool Send(TAutoPtr& /*ev*/) override { return true; } - bool SpecificSend(TAutoPtr& /*ev*/) override { return true; } - void ScheduleActivation(TMailbox* /*activation*/) override {} - void SpecificScheduleActivation(TMailbox* /*activation*/) override {} - void ScheduleActivationEx(TMailbox* /*activation*/, ui64 /*revolvingCounter*/) override {} - TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } - TActorId Register(IActor* /*actor*/, TMailboxCache& /*cache*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } - TActorId Register(IActor* /*actor*/, TMailbox* /*mailbox*/, const TActorId& /*parentId*/) override { return TActorId(); } - - TAffinity* Affinity() const override { return nullptr; } - - ui32 GetThreads() const override { return static_cast(ThreadCount); } - float GetThreadCount() const override { return static_cast(ThreadCount); } - - TSharedExecutorThreadCtx* ReleaseSharedThread() override { - UNIT_ASSERT(!SharedThreads.empty()); - TSharedExecutorThreadCtx* thread = SharedThreads.back(); - SharedThreads.pop_back(); - return thread; - } - void AddSharedThread(TSharedExecutorThreadCtx* thread) override { - UNIT_ASSERT(SharedThreads.size() < 2); - SharedThreads.push_back(thread); - } - - void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { - if (count == -1) { - count = Params.MaxFullThreadCount - start; - } - for (i16 i = start; i < start + count; ++i) { - ThreadCpuConsumptions[i].Increase(consumption); - } - } - - void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { - if (count == -1) { - count = Params.MaxFullThreadCount - start; - } - for (i16 i = start; i < start + count; ++i) { - ThreadCpuConsumptions[i] = consumption; - } - } - - TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override { - UNIT_ASSERT_GE(threadIdx, 0); - UNIT_ASSERT_LE(static_cast(threadIdx), ThreadCpuConsumptions.size()); - return ThreadCpuConsumptions[threadIdx]; - } - }; - - class TMockSharedExecutorPool : public ISharedExecutorPool { - std::unique_ptr OriginalPool; - std::vector> ThreadCpuConsumptions; - - static std::vector GetPoolIds(const std::vector& pools) { - std::vector poolIds; - for (size_t i = 0; i < pools.size(); ++i) { - poolIds.push_back(static_cast(i)); - } - return poolIds; - } - - public: - TMockSharedExecutorPool(const TSharedExecutorPoolConfig& config, i16 poolCount, std::vector pools) - : OriginalPool(CreateSharedExecutorPool(config, poolCount, GetPoolIds(pools))) - , ThreadCpuConsumptions(poolCount, std::vector(config.Threads, TCpuConsumption{0.0, 0.0})) - { - OriginalPool->Init(pools, false); - } - - void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} - void Start() override {} - void PrepareStop() override {} - void Shutdown() override {} - bool Cleanup() override { return true; } - - void GetSharedStatsForHarmonizer(i16 pool, std::vector& statsCopy) override { - OriginalPool->GetSharedStatsForHarmonizer(pool, statsCopy); - } - - void GetSharedStats(i16 pool, std::vector& statsCopy) override { - OriginalPool->GetSharedStats(pool, statsCopy); - } - - void Init(const std::vector& pools, bool /*isShared*/) override { - OriginalPool->Init(pools, false); - } - - TSharedExecutorThreadCtx* GetSharedThread(i16 poolId) override { - return OriginalPool->GetSharedThread(poolId); - } - - i16 ReturnOwnHalfThread(i16 pool) override { - return OriginalPool->ReturnOwnHalfThread(pool); - } - - i16 ReturnBorrowedHalfThread(i16 pool) override { - return OriginalPool->ReturnBorrowedHalfThread(pool); - } - - void GiveHalfThread(i16 from, i16 to) override { - OriginalPool->GiveHalfThread(from, to); - } - - i16 GetSharedThreadCount() const override { - return OriginalPool->GetSharedThreadCount(); - } - - TSharedPoolState GetState() const override { - return OriginalPool->GetState(); - } - - TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override { - return ThreadCpuConsumptions[poolId][threadIdx]; - } - - std::vector GetThreadsCpuConsumption(i16 poolId) override { - std::vector poolConsumptions(ThreadCpuConsumptions[poolId].size()); - for (size_t i = 0; i < poolConsumptions.size(); ++i) { - poolConsumptions[i] = ThreadCpuConsumptions[poolId][i]; - } - return poolConsumptions; - } - - void IncreaseThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) { - ThreadCpuConsumptions[poolId][threadIdx].Increase(consumption); - } - - void SetThreadCpuConsumption(ui32 threadIdx, ui32 poolId, TCpuConsumption consumption) { - ThreadCpuConsumptions[poolId][threadIdx] = consumption; - } - }; - - Y_UNIT_TEST(TestHarmonizerCreation) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - UNIT_ASSERT(harmonizer != nullptr); - } - - Y_UNIT_TEST(TestAddPool) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - auto mockPool = std::make_unique(); - harmonizer->AddPool(mockPool.get()); - - auto stats = harmonizer->GetPoolStats(0); - UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8); - UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0); - UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0); - } - - Y_UNIT_TEST(TestHarmonize) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - auto mockPool = new TMockExecutorPool(); - harmonizer->AddPool(mockPool); - - harmonizer->Harmonize(currentTs + 1000000); // 1 second later - - auto stats = harmonizer->GetPoolStats(0); - Y_UNUSED(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default - - delete harmonizer; - delete mockPool; - } - - Y_UNIT_TEST(TestToNeedyNextToHoggish) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - TCpuConsumptionModel cpuConsumptionModel; - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); - - currentTs += Us2Ts(59'000'000); - harmonizer->Harmonize(currentTs); - - auto stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); - CHECK_IS_NEEDY(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount); - harmonizer->Harmonize(currentTs); - - stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0); - CHECK_IS_HOGGISH(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - } - - Y_UNIT_TEST(TestToNeedyNextToStarved) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - TCpuConsumptionModel cpuConsumptionModel; - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); - - currentTs += Us2Ts(59'000'000); - harmonizer->Harmonize(currentTs); - - auto stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); - CHECK_IS_NEEDY(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({40'000'000.0, 60'000'000.0}, 0, 5); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4); - harmonizer->Harmonize(currentTs); - - stats = harmonizer->GetPoolStats(0); - - CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0); - CHECK_IS_STARVED(stats); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); - } - - Y_UNIT_TEST(TestExchangeThreads) { - ui64 currentTs = 1000000; - auto harmonizer = MakeHarmonizer(currentTs); - TMockExecutorPoolParams params { - .DefaultFullThreadCount = 1, - .MinFullThreadCount = 1, - .MaxFullThreadCount = 2, - .DefaultThreadCount = 1.0f, - .MinThreadCount = 1.0f, - .MaxThreadCount = 2.0f, - }; - std::vector> mockPools; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - params.PoolId = 2; - mockPools.emplace_back(new TMockExecutorPool(params)); - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); - } - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - harmonizer->Harmonize(currentTs); - - auto stats0 = harmonizer->GetPoolStats(0); - auto stats1 = harmonizer->GetPoolStats(1); - auto stats2 = harmonizer->GetPoolStats(2); - - CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0); - CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0); - CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); - CHECK_IS_HOGGISH(stats0); - CHECK_IS_NEEDY(stats1); - CHECK_IS_NEEDY(stats2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); - harmonizer->Harmonize(currentTs); - - stats0 = harmonizer->GetPoolStats(0); - stats1 = harmonizer->GetPoolStats(1); - stats2 = harmonizer->GetPoolStats(2); - - CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0); - CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1); - CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); - CHECK_IS_NEEDY(stats0); - CHECK_IS_NEEDY(stats1); - CHECK_IS_HOGGISH(stats2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2); - UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1); - UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); - } - - Y_UNIT_TEST(TestThreadCounts) { - ui64 currentTs = 1000000; - std::vector params { - { - .DefaultFullThreadCount = 5, - .MinFullThreadCount = 5, - .MaxFullThreadCount = 15, - .DefaultThreadCount = 5.0f, - .MinThreadCount = 5.0f, - .MaxThreadCount = 15.0f, - .PoolId = 0, - }, - { - .DefaultFullThreadCount = 5, - .MinFullThreadCount = 5, - .MaxFullThreadCount = 15, - .DefaultThreadCount = 5.0f, - .MinThreadCount = 5.0f, - .MaxThreadCount = 15.0f, - .PoolId = 1, - }, - { - .DefaultFullThreadCount = 5, - .MinFullThreadCount = 5, - .MaxFullThreadCount = 15, - .DefaultThreadCount = 5.0f, - .MinThreadCount = 5.0f, - .MaxThreadCount = 15.0f, - .PoolId = 2, - }, - }; - auto harmonizer = MakeHarmonizer(currentTs); - std::vector> mockPools; - i16 budget = 0; - for (auto& param : params) { - mockPools.emplace_back(new TMockExecutorPool(param)); - HARMONIZER_DEBUG_PRINT("created pool", mockPools.back()->Params.ToString()); - budget += param.DefaultFullThreadCount; - } - for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) { - auto &pool = mockPools[poolIdx]; - harmonizer->AddPool(pool.get()); - pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params[poolIdx].MaxFullThreadCount); - } - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - - for (i16 i = 0; i < params[0].MaxFullThreadCount; ++i) { - for (i16 ii = 0; ii < params[1].MaxFullThreadCount; ++ii) { - for (i16 iii = 0; iii < params[2].MaxFullThreadCount; ++iii) { - if (i + ii + iii > budget) { - continue; - } - ui32 localBudget = budget - (i + ii + iii); - HARMONIZER_DEBUG_PRINT("first pool", i, "second pool", ii, "third pool", iii, "budget", budget, "local budget", localBudget); - currentTs += Us2Ts(60'000'000); - mockPools[0]->SetFullThreadCount(i); - mockPools[1]->SetFullThreadCount(ii); - mockPools[2]->SetFullThreadCount(iii); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(i, mockPools[0]->ThreadCount)); - mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(ii, mockPools[1]->ThreadCount)); - mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, std::min(iii, mockPools[2]->ThreadCount)); - harmonizer->Harmonize(currentTs); - std::vector stats; - for (auto& pool : params) { - stats.emplace_back(harmonizer->GetPoolStats(pool.PoolId)); - } - for (ui32 poolIdx = 0; poolIdx < params.size(); ++poolIdx) { - UNIT_ASSERT_VALUES_EQUAL(stats[poolIdx].PotentialMaxThreadCount, std::min(mockPools[poolIdx]->ThreadCount + localBudget, params[poolIdx].MaxFullThreadCount)); - } - } - } - } - } - - Y_UNIT_TEST(TestSharedHalfThreads) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - TMockExecutorPoolParams params { - .DefaultFullThreadCount = 2, - .MinFullThreadCount = 1, - .MaxFullThreadCount = 3, - .DefaultThreadCount = 2.0f, - .MinThreadCount = 1.0f, - .MaxThreadCount = 3.0f, - }; - std::vector> mockPools; - std::vector pools; - mockPools.emplace_back(new TMockExecutorPool(params)); - pools.push_back(mockPools.back().get()); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - pools.push_back(mockPools.back().get()); - params.PoolId = 2; - mockPools.emplace_back(new TMockExecutorPool(params)); - pools.push_back(mockPools.back().get()); - - - TSharedExecutorPoolConfig sharedConfig; - sharedConfig.Threads = 3; - std::unique_ptr sharedPool(new TMockSharedExecutorPool(sharedConfig, 3, pools)); - - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - } - harmonizer->SetSharedPool(sharedPool.get()); - - for (ui32 i = 0; i < mockPools.size(); ++i) { - mockPools[i]->AddSharedThread(sharedPool->GetSharedThread(i)); - } - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); - mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - - harmonizer->Harmonize(currentTs); - - auto stats0 = harmonizer->GetPoolStats(0); - auto stats1 = harmonizer->GetPoolStats(1); - auto stats2 = harmonizer->GetPoolStats(2); - auto sharedState = sharedPool->GetState(); - - CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0); - CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0); - CHECK_IS_NEEDY(stats0); - CHECK_IS_HOGGISH(stats1); - CHECK_IS_NEEDY(stats2); - UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], 1); - UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], 0); - - currentTs += Us2Ts(60'000'000); - - harmonizer->Harmonize(currentTs); - - stats0 = harmonizer->GetPoolStats(0); - stats1 = harmonizer->GetPoolStats(1); - stats2 = harmonizer->GetPoolStats(2); - sharedState = sharedPool->GetState(); - - CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 1, 0, 0, 0); - CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 1); - CHECK_IS_HOGGISH(stats0); - CHECK_IS_HOGGISH(stats1); - CHECK_IS_HOGGISH(stats2); - UNIT_ASSERT_VALUES_EQUAL(sharedState.BorrowedThreadByPool[0], -1); - UNIT_ASSERT_VALUES_EQUAL(sharedState.PoolByBorrowedThread[1], -1); - } - - Y_UNIT_TEST(TestSharedHalfThreadsStarved) { - ui64 currentTs = 1000000; - std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); - TMockExecutorPoolParams params { - .DefaultFullThreadCount = 2, - .MinFullThreadCount = 1, - .MaxFullThreadCount = 3, - .DefaultThreadCount = 2.0f, - .MinThreadCount = 1.0f, - .MaxThreadCount = 3.0f, - }; - std::vector> mockPools; - std::vector pools; - mockPools.emplace_back(new TMockExecutorPool(params)); - pools.push_back(mockPools.back().get()); - params.PoolId = 1; - mockPools.emplace_back(new TMockExecutorPool(params)); - pools.push_back(mockPools.back().get()); - - TSharedExecutorPoolConfig sharedConfig; - sharedConfig.Threads = 2; - std::unique_ptr sharedPool(new TMockSharedExecutorPool(sharedConfig, 2, pools)); - - for (auto& pool : mockPools) { - harmonizer->AddPool(pool.get()); - } - harmonizer->SetSharedPool(sharedPool.get()); - - currentTs += Us2Ts(1'000'000); - harmonizer->Harmonize(currentTs); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); - harmonizer->Harmonize(currentTs); - - auto stats0 = harmonizer->GetPoolStats(0); - auto stats1 = harmonizer->GetPoolStats(1); - auto sharedState = sharedPool->GetState(); - CHECK_CHANGING_HALF_THREADS(stats0, 1, 0, 0, 0, 0, 0); - CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 0, 0); - CHECK_IS_NEEDY(stats0); - CHECK_IS_HOGGISH(stats1); - UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], 1, sharedState.ToString()); - UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], 0, sharedState.ToString()); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({30'000'000.0, 60'000'000.0}, 0, 2); - mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); - harmonizer->Harmonize(currentTs); - - stats0 = harmonizer->GetPoolStats(0); - stats1 = harmonizer->GetPoolStats(1); - sharedState = sharedPool->GetState(); - UNIT_ASSERT_VALUES_EQUAL_C(sharedState.BorrowedThreadByPool[0], -1, sharedState.ToString()); - UNIT_ASSERT_VALUES_EQUAL_C(sharedState.PoolByBorrowedThread[1], -1, sharedState.ToString()); - CHECK_CHANGING_HALF_THREADS(stats0, 1, 1, 0, 0, 0, 0); - CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 1, 1, 0); - CHECK_IS_STARVED(stats0); - CHECK_IS_HOGGISH(stats1); - - currentTs += Us2Ts(60'000'000); - mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); - mockPools[1]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 2); - - harmonizer->Harmonize(currentTs); - - stats0 = harmonizer->GetPoolStats(0); - stats1 = harmonizer->GetPoolStats(1); - - CHECK_CHANGING_HALF_THREADS(stats0, 2, 1, 0, 0, 0, 0); - CHECK_CHANGING_HALF_THREADS(stats1, 0, 0, 0, 2, 1, 0); - CHECK_IS_NEEDY(stats0); - CHECK_IS_HOGGISH(stats1); - } - -} diff --git a/ydb/library/actors/core/harmonizer/ut/ya.make b/ydb/library/actors/core/harmonizer/ut/ya.make deleted file mode 100644 index 5838d1bc2181..000000000000 --- a/ydb/library/actors/core/harmonizer/ut/ya.make +++ /dev/null @@ -1,22 +0,0 @@ -UNITTEST_FOR(ydb/library/actors/core/harmonizer) - -FORK_SUBTESTS() -IF (SANITIZER_TYPE) - SIZE(MEDIUM) - TIMEOUT(600) -ELSE() - SIZE(SMALL) - TIMEOUT(60) -ENDIF() - - -PEERDIR( - ydb/library/actors/interconnect - ydb/library/actors/testlib -) - -SRCS( - harmonizer_ut.cpp -) - -END() diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.cpp b/ydb/library/actors/core/harmonizer/waiting_stats.cpp deleted file mode 100644 index f855055034a3..000000000000 --- a/ydb/library/actors/core/harmonizer/waiting_stats.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include "waiting_stats.h" - -#include "pool.h" -#include -#include - -namespace NActors { - -LWTRACE_USING(ACTORLIB_PROVIDER); - -void TWaitingInfo::Pull(const std::vector> &pools) { - WakingUpTotalTime = 0; - WakingUpCount = 0; - AwakingTotalTime = 0; - AwakingCount = 0; - - for (size_t poolIdx = 0; poolIdx < pools.size(); ++poolIdx) { - TPoolInfo& pool = *pools[poolIdx]; - if (pool.WaitingStats) { - WakingUpTotalTime += pool.WaitingStats->WakingUpTotalTime; - WakingUpCount += pool.WaitingStats->WakingUpCount; - AwakingTotalTime += pool.WaitingStats->AwakingTotalTime; - AwakingCount += pool.WaitingStats->AwakingCount; - } - } - - constexpr ui64 knownAvgWakingUpTime = TWaitingStatsConstants::KnownAvgWakingUpTime; - constexpr ui64 knownAvgAwakeningUpTime = TWaitingStatsConstants::KnownAvgAwakeningTime; - - ui64 realAvgWakingUpTime = (WakingUpCount ? WakingUpTotalTime / WakingUpCount : knownAvgWakingUpTime); - ui64 avgWakingUpTime = realAvgWakingUpTime; - if (avgWakingUpTime > 2 * knownAvgWakingUpTime || !realAvgWakingUpTime) { - avgWakingUpTime = knownAvgWakingUpTime; - } - AvgWakingUpTimeUs.store(Ts2Us(avgWakingUpTime), std::memory_order_relaxed); - - ui64 realAvgAwakeningTime = (AwakingCount ? AwakingTotalTime / AwakingCount : knownAvgAwakeningUpTime); - ui64 avgAwakeningTime = realAvgAwakeningTime; - if (avgAwakeningTime > 2 * knownAvgAwakeningUpTime || !realAvgAwakeningTime) { - avgAwakeningTime = knownAvgAwakeningUpTime; - } - AvgAwakeningTimeUs.store(Ts2Us(avgAwakeningTime), std::memory_order_relaxed); - - ui64 avgWakingUpConsumption = avgWakingUpTime + avgAwakeningTime; - LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); -} - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/waiting_stats.h b/ydb/library/actors/core/harmonizer/waiting_stats.h deleted file mode 100644 index 76f661bb37b6..000000000000 --- a/ydb/library/actors/core/harmonizer/waiting_stats.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include "defs.h" - -namespace NActors { - -struct TPoolInfo; - -struct TWaitingInfo { - ui64 WakingUpTotalTime = 0; - ui64 WakingUpCount = 0; - ui64 AwakingTotalTime = 0; - ui64 AwakingCount = 0; - std::atomic AvgWakingUpTimeUs = 0; - std::atomic AvgAwakeningTimeUs = 0; - - void Pull(const std::vector> &pools); - -}; // struct TWaitingInfo - -} // namespace NActors diff --git a/ydb/library/actors/core/harmonizer/ya.make b/ydb/library/actors/core/harmonizer/ya.make deleted file mode 100644 index 9797c76c4af0..000000000000 --- a/ydb/library/actors/core/harmonizer/ya.make +++ /dev/null @@ -1,44 +0,0 @@ -LIBRARY() - -NO_WSHADOW() - -IF (PROFILE_MEMORY_ALLOCATIONS) - CFLAGS(-DPROFILE_MEMORY_ALLOCATIONS) -ENDIF() - -IF (ALLOCATOR == "B" OR ALLOCATOR == "BS" OR ALLOCATOR == "C") - CXXFLAGS(-DBALLOC) - PEERDIR( - library/cpp/balloc/optional - ) -ENDIF() - -SRCS( - cpu_consumption.cpp - pool.cpp - shared_info.cpp - waiting_stats.cpp - harmonizer.cpp -) - -PEERDIR( - ydb/library/actors/util - ydb/library/actors/protos - ydb/library/services - library/cpp/logger - library/cpp/lwtrace - library/cpp/monlib/dynamic_counters - library/cpp/time_provider -) - -IF (SANITIZER_TYPE == "thread") - SUPPRESSIONS( - tsan.supp - ) -ENDIF() - -END() - -RECURSE_FOR_TESTS( - ut -) diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 04a698245709..4bceffa8f373 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -63,10 +63,10 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - i64 MaxCpuUs = 0; - i64 MinCpuUs = 0; - i64 MaxElapsedUs = 0; - i64 MinElapsedUs = 0; + i64 MaxConsumedCpuUs = 0; + i64 MinConsumedCpuUs = 0; + i64 MaxBookedCpuUs = 0; + i64 MinBookedCpuUs = 0; double SpinningTimeUs = 0; double SpinThresholdUs = 0; i16 WrongWakenedThreadCount = 0; diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h index ebf34299e27e..f9394f3273f6 100644 --- a/ydb/library/actors/core/probes.h +++ b/ydb/library/actors/core/probes.h @@ -174,11 +174,11 @@ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ - NAMES("poolId", "pool", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu", "threadCount", "maxThreadCount", \ + NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \ "isStarved", "isNeedy", "isHoggish")) \ PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \ TYPES(ui32, TString, i16, double, double, double, double), \ - NAMES("poolId", "pool", "threadIdx", "elapsed", "cpu", "lastSecondElapsed", "lastSecondCpu")) \ + NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \ PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \ TYPES(double, double, double, double, double), \ NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \ diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index ca44ab7707b8..fbeb0e2b0d98 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -56,6 +56,8 @@ SRCS( executor_pool_shared.h executor_thread.cpp executor_thread.h + harmonizer.cpp + harmonizer.h hfunc.h interconnect.cpp interconnect.h @@ -104,7 +106,6 @@ GENERATE_ENUM_SERIALIZATION(log_iface.h) PEERDIR( ydb/library/actors/actor_type - ydb/library/actors/core/harmonizer ydb/library/actors/memory_log ydb/library/actors/prof ydb/library/actors/protos @@ -128,10 +129,6 @@ ENDIF() END() -RECURSE( - harmonizer -) - RECURSE_FOR_TESTS( ut ut_fat diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h index f1852cb48519..a0790a55c8bf 100644 --- a/ydb/library/actors/helpers/pool_stats_collector.h +++ b/ydb/library/actors/helpers/pool_stats_collector.h @@ -189,10 +189,10 @@ class TStatsCollectingActor : public TActorBootstrapped { NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; - NMonitoring::TDynamicCounters::TCounterPtr MaxCpuUs; - NMonitoring::TDynamicCounters::TCounterPtr MinCpuUs; - NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedUs; - NMonitoring::TDynamicCounters::TCounterPtr MinElapsedUs; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; @@ -266,10 +266,10 @@ class TStatsCollectingActor : public TActorBootstrapped { DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); - MaxCpuUs = PoolGroup->GetCounter("MaxCpuUs", false); - MinCpuUs = PoolGroup->GetCounter("MinCpuUs", false); - MaxElapsedUs = PoolGroup->GetCounter("MaxElapsedUs", false); - MinElapsedUs = PoolGroup->GetCounter("MinElapsedUs", false); + MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); + MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); + MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); + MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); @@ -384,10 +384,10 @@ class TStatsCollectingActor : public TActorBootstrapped { struct TActorSystemCounters { TIntrusivePtr Group; - NMonitoring::TDynamicCounters::TCounterPtr MaxCpuUs; - NMonitoring::TDynamicCounters::TCounterPtr MinCpuUs; - NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedUs; - NMonitoring::TDynamicCounters::TCounterPtr MinElapsedUs; + NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs; @@ -396,20 +396,20 @@ class TStatsCollectingActor : public TActorBootstrapped { void Init(NMonitoring::TDynamicCounters* group) { Group = group; - MaxCpuUs = Group->GetCounter("MaxCpuUs", false); - MinCpuUs = Group->GetCounter("MinCpuUs", false); - MaxElapsedUs = Group->GetCounter("MaxElapsedUs", false); - MinElapsedUs = Group->GetCounter("MinElapsedUs", false); + MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); + MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); + MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); + MinBookedCpu = Group->GetCounter("MinBookedCpu", false); AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false); AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false); } void Set(const THarmonizerStats& harmonizerStats) { #ifdef ACTORSLIB_COLLECT_EXEC_STATS - *MaxCpuUs = harmonizerStats.MaxCpuUs; - *MinCpuUs = harmonizerStats.MinCpuUs; - *MaxElapsedUs = harmonizerStats.MaxElapsedUs; - *MinElapsedUs = harmonizerStats.MinElapsedUs; + *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; + *MinConsumedCpu = harmonizerStats.MinConsumedCpu; + *MaxBookedCpu = harmonizerStats.MaxBookedCpu; + *MinBookedCpu = harmonizerStats.MinBookedCpu; *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs; *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;