Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KqpRun improved multi query mode and storage settings #13177

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include "config_helpers.h"

#include <ydb/core/base/localdb.h>
#include <ydb/core/protos/bootstrap.pb.h>
#include <ydb/core/protos/resource_broker.pb.h>

#include <ydb/library/actors/util/affinity.h>


Expand Down Expand Up @@ -114,4 +118,31 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste

} // namespace NActorSystemConfigHelpers

namespace NKikimrConfigHelpers {

NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config) {
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility
auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) {
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
}
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
}
}
}
};
if (config.HasBootstrapConfig() && config.GetBootstrapConfig().HasResourceBroker()) {
mergeResourceBrokerConfigs(config.GetBootstrapConfig().GetResourceBroker());
}
if (config.HasResourceBrokerConfig()) {
mergeResourceBrokerConfigs(config.GetResourceBrokerConfig());
}
return resourceBrokerSelfConfig;
}

} // namespace NKikimrConfigHelpers

} // namespace NKikimr
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <ydb/core/memory_controller/memory_controller.h>
#include <ydb/core/protos/config.pb.h>

#include <ydb/library/actors/core/config.h>
Expand All @@ -15,4 +16,10 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste

} // namespace NActorSystemConfigHelpers

namespace NKikimrConfigHelpers {

NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config);

} // namespace NKikimrConfigHelpers

} // namespace NKikimr
22 changes: 1 addition & 21 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2052,28 +2052,8 @@ void TMemoryControllerInitializer::InitializeServices(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData)
{
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility
auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) {
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
}
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
}
}
}
};
if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasResourceBroker()) {
mergeResourceBrokerConfigs(Config.GetBootstrapConfig().GetResourceBroker());
}
if (Config.HasResourceBrokerConfig()) {
mergeResourceBrokerConfigs(Config.GetResourceBrokerConfig());
}

auto* actor = NMemory::CreateMemoryController(TDuration::Seconds(1), ProcessMemoryInfoProvider,
Config.GetMemoryControllerConfig(), resourceBrokerSelfConfig,
Config.GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(Config),
appData->Counters);
setup->LocalServices.emplace_back(
NMemory::MakeMemoryControllerId(0),
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/testlib/basics/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace NFake {
ui64 SectorSize = 0;
ui64 ChunkSize = 0;
ui64 DiskSize = 0;
bool FormatDisk = true;
TString DiskPath;
};

struct INode {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/testlib/basics/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ namespace NKikimr {

static ui64 keySalt = 0;
ui64 salt = ++keySalt;
TString baseDir = Runtime.GetTempDir();
TString baseDir = conf.DiskPath ? conf.DiskPath : Runtime.GetTempDir();

if (Conf.UseDisk) {
MakeDirIfNotExist(baseDir.c_str());
}

PDiskPath = TStringBuilder() << baseDir << "pdisk_1.dat";

if (!Mock) {
if (!Mock && conf.FormatDisk) {
FormatPDisk(PDiskPath,
Conf.DiskSize, Conf.SectorSize, Conf.ChunkSize, PDiskGuid,
0x123 + salt, 0x456 + salt, 0x789 + salt, mainKey,
Expand Down
32 changes: 30 additions & 2 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#include <ydb/core/mind/tenant_slot_broker.h>
#include <ydb/core/mind/tenant_node_enumeration.h>
#include <ydb/core/mind/node_broker.h>
#include <ydb/core/mon_alloc/monitor.h>
#include <ydb/core/kesus/tablet/events.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
Expand Down Expand Up @@ -613,18 +614,21 @@ namespace Tests {

NKikimrBlobStorage::TDefineBox boxConfig;
boxConfig.SetBoxId(Settings->BOX_ID);
boxConfig.SetItemConfigGeneration(Settings->StorageGeneration);

ui32 nodeId = Runtime->GetNodeId(0);
Y_ABORT_UNLESS(nodesInfo->Nodes[0].NodeId == nodeId);
auto& nodeInfo = nodesInfo->Nodes[0];

NKikimrBlobStorage::TDefineHostConfig hostConfig;
hostConfig.SetHostConfigId(nodeId);
hostConfig.SetItemConfigGeneration(Settings->StorageGeneration);
TString path;
if (Settings->UseSectorMap) {
path ="SectorMap:test-client[:2000]";
} else {
path = TStringBuilder() << Runtime->GetTempDir() << "pdisk_1.dat";
TString diskPath = Settings->CustomDiskParams.DiskPath;
path = TStringBuilder() << (diskPath ? diskPath : Runtime->GetTempDir()) << "pdisk_1.dat";
}
hostConfig.AddDrive()->SetPath(path);
if (Settings->Verbose) {
Expand All @@ -640,7 +644,9 @@ namespace Tests {

for (const auto& [poolKind, storagePool] : Settings->StoragePoolTypes) {
if (storagePool.GetNumGroups() > 0) {
bsConfigureRequest->Record.MutableRequest()->AddCommand()->MutableDefineStoragePool()->CopyFrom(storagePool);
auto* command = bsConfigureRequest->Record.MutableRequest()->AddCommand()->MutableDefineStoragePool();
command->CopyFrom(storagePool);
command->SetItemConfigGeneration(Settings->StorageGeneration);
}
}

Expand Down Expand Up @@ -1074,6 +1080,28 @@ namespace Tests {
}
}

{
if (Settings->NeedStatsCollectors) {
TString filePathPrefix;
if (Settings->AppConfig->HasMonitoringConfig()) {
filePathPrefix = Settings->AppConfig->GetMonitoringConfig().GetMemAllocDumpPathPrefix();
}

const TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider(MakeIntrusive<NMemory::TProcessMemoryInfoProvider>());

IActor* monitorActor = CreateMemProfMonitor(TDuration::Seconds(1), processMemoryInfoProvider,
Runtime->GetAppData(nodeIdx).Counters, filePathPrefix);
const TActorId monitorActorId = Runtime->Register(monitorActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(MakeMemProfMonitorID(Runtime->GetNodeId(nodeIdx)), monitorActorId, nodeIdx);

IActor* controllerActor = NMemory::CreateMemoryController(TDuration::Seconds(1), processMemoryInfoProvider,
Settings->AppConfig->GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(*Settings->AppConfig),
Runtime->GetAppData(nodeIdx).Counters);
const TActorId controllerActorId = Runtime->Register(controllerActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(NMemory::MakeMemoryControllerId(0), controllerActorId, nodeIdx);
}
}

{
IActor* kesusService = NKesus::CreateKesusProxyService();
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/testlib/test_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ namespace Tests {
TString DomainName = TestDomainName;
ui32 NodeCount = 1;
ui32 DynamicNodeCount = 0;
ui64 StorageGeneration = 0;
NFake::TStorage CustomDiskParams;
TControls Controls;
TAppPrepare::TFnReg FrFactory = &DefaultFrFactory;
Expand Down Expand Up @@ -179,6 +180,7 @@ namespace Tests {
TServerSettings& SetDomainName(const TString& value);
TServerSettings& SetNodeCount(ui32 value) { NodeCount = value; return *this; }
TServerSettings& SetDynamicNodeCount(ui32 value) { DynamicNodeCount = value; return *this; }
TServerSettings& SetStorageGeneration(ui64 value) { StorageGeneration = value; return *this; }
TServerSettings& SetCustomDiskParams(const NFake::TStorage& value) { CustomDiskParams = value; return *this; }
TServerSettings& SetControls(const TControls& value) { Controls = value; return *this; }
TServerSettings& SetFrFactory(const TAppPrepare::TFnReg& value) { FrFactory = value; return *this; }
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
storage
sync_dir
example
udfs

*.log
*.json
*.sql
Expand Down
13 changes: 7 additions & 6 deletions ydb/tests/tools/kqprun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env bash

# For svg graph download https://github.com/brendangregg/FlameGraph
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`
set -eux

pid=$(pgrep -u $USER kqprun)
kqprun_pid=$(pgrep -u $USER kqprun)

echo "Target process id: ${pid}"

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $kqprun_pid -v -o profdata -- sleep ${1:-'30'}
sudo perf script -i profdata > profdata.txt

flame_graph_tool="../../../../contrib/tools/flame-graph/"

${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
Loading
Loading