Skip to content

Commit

Permalink
configure rdma target thread pool size; issue-2267 (#2268)
Browse files Browse the repository at this point in the history
Also clean up TRdmaTarget interface and unnest Host and Port fields in TRdmaTargetProto
  • Loading branch information
tpashkin authored Oct 19, 2024
1 parent f78cab3 commit cda11ff
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 121 deletions.
2 changes: 1 addition & 1 deletion cloud/blockstore/config/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ message TDiskAgentConfig
optional TSpdkEnvConfig SpdkEnvConfig = 19;

// RDMA target config.
optional TRdmaTarget RdmaTarget = 20;
optional TRdmaTarget RdmaTarget = 20; // deprecated

// Use flock to lock devices on agent initialization.
optional bool DeviceLockingEnabled = 21;
Expand Down
20 changes: 11 additions & 9 deletions cloud/blockstore/config/rdma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ message TRdmaServer
uint32 Backlog = 1;
uint32 QueueSize = 2;
uint32 MaxBufferSize = 3;
uint64 KeepAliveTimeout = 4; // in milliseconds
uint64 KeepAliveTimeout = 4; // in milliseconds
EWaitMode WaitMode = 5;
uint32 PollerThreads = 6;
uint32 MaxInflightBytes = 7; // per client
uint64 AdaptiveWaitSleepDelay = 8; // in microseconds
uint64 AdaptiveWaitSleepDuration = 9; // in microseconds
uint32 MaxInflightBytes = 7; // per client
uint64 AdaptiveWaitSleepDelay = 8; // in microseconds
uint64 AdaptiveWaitSleepDuration = 9; // in microseconds
bool AlignedDataEnabled = 10;
}

Expand All @@ -35,24 +35,26 @@ message TRdmaClient
uint32 MaxBufferSize = 2;
uint32 PollerThreads = 3;
EWaitMode WaitMode = 4;
uint64 AdaptiveWaitSleepDelay = 5; // in microseconds
uint64 AdaptiveWaitSleepDuration = 6; // in microseconds
uint64 AdaptiveWaitSleepDelay = 5; // in microseconds
uint64 AdaptiveWaitSleepDuration = 6; // in microseconds
bool AlignedDataEnabled = 7;
}

message TRdmaTarget
{
TRdmaEndpoint Endpoint = 1;
TRdmaServer Server = 2;
TRdmaServer Server = 2; // deprecated
uint32 WorkerThreads = 3;
}

message TRdmaConfig
{
bool ClientEnabled = 1;

TRdmaClient Client = 2;

bool ServerEnabled = 3;

TRdmaServer Server = 4;

bool DiskAgentTargetEnabled = 5;
TRdmaTarget DiskAgentTarget = 6;
}
15 changes: 13 additions & 2 deletions cloud/blockstore/libs/disk_agent/config_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,23 @@ void TConfigInitializer::InitRdmaConfig()

if (Options->RdmaConfig) {
ParseProtoTextFromFileRobust(Options->RdmaConfig, rdmaConfig);

// inherit the target from DiskAgentConfig to smooth out the transition
if (DiskAgentConfig->DeprecatedHasRdmaTarget()) {
rdmaConfig.SetDiskAgentTargetEnabled(true);
}
} else {
// no rdma config file is given fallback to legacy config
if (DiskAgentConfig->DeprecatedHasRdmaTarget()) {
const auto& oldTarget = DiskAgentConfig->DeprecatedGetRdmaTarget();
auto* newTarget = rdmaConfig.MutableDiskAgentTarget();

rdmaConfig.SetServerEnabled(true);
const auto& rdmaTarget = DiskAgentConfig->DeprecatedGetRdmaTarget();
rdmaConfig.MutableServer()->CopyFrom(rdmaTarget.GetServer());
rdmaConfig.MutableServer()->CopyFrom(oldTarget.GetServer());

rdmaConfig.SetDiskAgentTargetEnabled(true);
newTarget->MutableEndpoint()->CopyFrom(oldTarget.GetEndpoint());
newTarget->SetWorkerThreads(oldTarget.GetWorkerThreads());
}
}

Expand Down
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/rdma/iface/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ class TRdmaConfig
{
return Config.GetServer();
}

bool GetDiskAgentTargetEnabled() const
{
return Config.GetDiskAgentTargetEnabled();
}

const auto& GetDiskAgentTarget() const
{
return Config.GetDiskAgentTarget();
}
};

} // namespace NCloud::NBlockStore::NRdma
28 changes: 16 additions & 12 deletions cloud/blockstore/libs/rdma_test/rdma_test_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ TRdmaTestEnvironment::TRdmaTestEnvironment(size_t deviceSize, ui32 poolSize)
"vol0",
0);

NProto::TRdmaEndpoint config;
config.SetHost(Host);
config.SetPort(Port);

TRdmaTargetConfig rdmaTargetConfig{
true,
TOldRequestCounters{
Counters->GetCounter("Delayed"),
Counters->GetCounter("Rejected"),
Counters->GetCounter("Already")},
poolSize};
NProto::TRdmaTarget target;
target.MutableEndpoint()->SetHost(Host);
target.MutableEndpoint()->SetPort(Port);
target.SetWorkerThreads(poolSize);

constexpr bool rejectLateRequests = true;

auto rdmaTargetConfig = std::make_shared<TRdmaTargetConfig>(
rejectLateRequests,
target);

TOldRequestCounters oldRequestCounters{
Counters->GetCounter("Delayed"),
Counters->GetCounter("Rejected"),
Counters->GetCounter("Already")};

RdmaTarget = CreateRdmaTarget(
std::move(config),
std::move(rdmaTargetConfig),
std::move(oldRequestCounters),
Logging,
Server,
std::move(deviceClient),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cloud/blockstore/libs/kikimr/helpers.h>
#include <cloud/blockstore/libs/nvme/public.h>
#include <cloud/blockstore/libs/spdk/iface/env.h>
#include <cloud/blockstore/libs/rdma/iface/config.h>
#include <cloud/blockstore/libs/storage/api/disk_agent.h>
#include <cloud/blockstore/libs/storage/api/disk_registry.h>
#include <cloud/blockstore/libs/storage/api/disk_registry_proxy.h>
Expand Down
27 changes: 15 additions & 12 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,35 @@ using namespace NActors;

void TDiskAgentActor::InitAgent(const TActorContext& ctx)
{
Y_DEBUG_ABORT_UNLESS(
OldRequestCounters.Delayed && OldRequestCounters.Rejected &&
OldRequestCounters.Already);

TRdmaTargetConfigPtr rdmaTargetConfig = nullptr;
if (RdmaConfig && RdmaConfig->GetDiskAgentTargetEnabled()) {
rdmaTargetConfig = std::make_shared<TRdmaTargetConfig>(
Config->GetRejectLateRequestsAtDiskAgentEnabled(),
RdmaConfig->GetDiskAgentTarget());
}

State = std::make_unique<TDiskAgentState>(
Config,
AgentConfig,
RdmaConfig,
Spdk,
Allocator,
StorageProvider,
ProfileLog,
BlockDigestGenerator,
Logging,
RdmaServer,
NvmeManager);

Y_DEBUG_ABORT_UNLESS(
OldRequestCounters.Delayed && OldRequestCounters.Rejected &&
OldRequestCounters.Already);
TRdmaTargetConfig rdmaTargetConfig{
RejectLateRequestsAtDiskAgentEnabled,
OldRequestCounters};

auto result = State->Initialize(std::move(rdmaTargetConfig));
NvmeManager,
std::move(rdmaTargetConfig),
OldRequestCounters);

auto* actorSystem = ctx.ActorSystem();
auto replyTo = ctx.SelfID;

result.Subscribe([=] (auto future) {
State->Initialize().Subscribe([=] (auto future) {
using TCompletionEvent = TEvDiskAgentPrivate::TEvInitAgentCompleted;

NProto::TError error;
Expand Down
33 changes: 15 additions & 18 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,18 @@ TVector<IProfileLog::TBlockInfo> ComputeDigest(
TDiskAgentState::TDiskAgentState(
TStorageConfigPtr storageConfig,
TDiskAgentConfigPtr agentConfig,
NRdma::TRdmaConfigPtr rdmaConfig,
NSpdk::ISpdkEnvPtr spdk,
ICachingAllocatorPtr allocator,
IStorageProviderPtr storageProvider,
IProfileLogPtr profileLog,
IBlockDigestGeneratorPtr blockDigestGenerator,
ILoggingServicePtr logging,
NRdma::IServerPtr rdmaServer,
NNvme::INvmeManagerPtr nvmeManager)
NNvme::INvmeManagerPtr nvmeManager,
TRdmaTargetConfigPtr rdmaTargetConfig,
TOldRequestCounters oldRequestCounters)
: StorageConfig(std::move(storageConfig))
, AgentConfig(std::move(agentConfig))
, RdmaConfig(std::move(rdmaConfig))
, Spdk(std::move(spdk))
, Allocator(std::move(allocator))
, StorageProvider(std::move(storageProvider))
Expand All @@ -260,6 +260,8 @@ TDiskAgentState::TDiskAgentState(
, Log(Logging->CreateLog("BLOCKSTORE_DISK_AGENT"))
, RdmaServer(std::move(rdmaServer))
, NvmeManager(std::move(nvmeManager))
, RdmaTargetConfig(std::move(rdmaTargetConfig))
, OldRequestCounters(std::move(oldRequestCounters))
{
}

Expand Down Expand Up @@ -419,25 +421,21 @@ TFuture<TInitializeResult> TDiskAgentState::InitAioStorage()
});
}

void TDiskAgentState::InitRdmaTarget(TRdmaTargetConfig rdmaTargetConfig)
void TDiskAgentState::InitRdmaTarget()
{
if (RdmaServer) {
if (RdmaServer && RdmaTargetConfig) {
THashMap<TString, TStorageAdapterPtr> devices;

auto endpoint = AgentConfig->GetRdmaEndpoint();

if (endpoint.GetHost().empty()) {
endpoint.SetHost(FQDNHostName());
}

for (auto& [uuid, state]: Devices) {
state.Config.MutableRdmaEndpoint()->CopyFrom(endpoint);
auto* endpoint = state.Config.MutableRdmaEndpoint();
endpoint->SetHost(RdmaTargetConfig->Host);
endpoint->SetPort(RdmaTargetConfig->Port);
devices.emplace(uuid, state.StorageAdapter);
}

RdmaTarget = CreateRdmaTarget(
endpoint,
std::move(rdmaTargetConfig),
RdmaTargetConfig,
OldRequestCounters,
Logging,
RdmaServer,
DeviceClient,
Expand All @@ -447,13 +445,12 @@ void TDiskAgentState::InitRdmaTarget(TRdmaTargetConfig rdmaTargetConfig)
}
}

TFuture<TInitializeResult> TDiskAgentState::Initialize(
TRdmaTargetConfig rdmaTargetConfig)
TFuture<TInitializeResult> TDiskAgentState::Initialize()
{
auto future = Spdk ? InitSpdkStorage() : InitAioStorage();

return future.Subscribe(
[this, rdmaTargetConfig = std::move(rdmaTargetConfig)](auto) mutable
[this](auto) mutable
{
TVector<TString> uuids(Reserve(Devices.size()));
for (const auto& x: Devices) {
Expand All @@ -465,7 +462,7 @@ TFuture<TInitializeResult> TDiskAgentState::Initialize(
std::move(uuids),
Logging->CreateLog("BLOCKSTORE_DISK_AGENT"));

InitRdmaTarget(std::move(rdmaTargetConfig));
InitRdmaTarget();

RestoreSessions(*DeviceClient);
});
Expand Down
14 changes: 8 additions & 6 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class TDiskAgentState
private:
const TStorageConfigPtr StorageConfig;
const TDiskAgentConfigPtr AgentConfig;
const NRdma::TRdmaConfigPtr RdmaConfig;
const NSpdk::ISpdkEnvPtr Spdk;
const ICachingAllocatorPtr Allocator;
const IStorageProviderPtr StorageProvider;
Expand All @@ -64,19 +63,23 @@ class TDiskAgentState
ui32 InitErrorsCount = 0;
bool PartiallySuspended = false;

TRdmaTargetConfigPtr RdmaTargetConfig;
TOldRequestCounters OldRequestCounters;

public:
TDiskAgentState(
TStorageConfigPtr storageConfig,
TDiskAgentConfigPtr agentConfig,
NRdma::TRdmaConfigPtr rdmaConfig,
NSpdk::ISpdkEnvPtr spdk,
ICachingAllocatorPtr allocator,
IStorageProviderPtr storageProvider,
IProfileLogPtr profileLog,
IBlockDigestGeneratorPtr blockDigestGenerator,
ILoggingServicePtr logging,
NRdma::IServerPtr rdmaServer,
NNvme::INvmeManagerPtr nvmeManager);
NNvme::INvmeManagerPtr nvmeManager,
TRdmaTargetConfigPtr rdmaTargetConfig,
TOldRequestCounters oldRequestCounters);

struct TInitializeResult
{
Expand All @@ -86,8 +89,7 @@ class TDiskAgentState
TDeviceGuard Guard;
};

NThreading::TFuture<TInitializeResult> Initialize(
TRdmaTargetConfig rdmaTargetConfig);
NThreading::TFuture<TInitializeResult> Initialize();

NThreading::TFuture<NProto::TAgentStats> CollectStats();

Expand Down Expand Up @@ -178,7 +180,7 @@ class TDiskAgentState
NThreading::TFuture<TInitializeResult> InitSpdkStorage();
NThreading::TFuture<TInitializeResult> InitAioStorage();

void InitRdmaTarget(TRdmaTargetConfig rdmaTargetConfig);
void InitRdmaTarget();

void RestoreSessions(TDeviceClient& client) const;
};
Expand Down
Loading

0 comments on commit cda11ff

Please sign in to comment.