diff --git a/ydb/core/driver_lib/run/config_helpers.cpp b/ydb/core/driver_lib/run/config_helpers.cpp index 2d0b834eb713..66a37eee15f1 100644 --- a/ydb/core/driver_lib/run/config_helpers.cpp +++ b/ydb/core/driver_lib/run/config_helpers.cpp @@ -1,5 +1,9 @@ #include "config_helpers.h" +#include +#include +#include + #include @@ -114,4 +118,31 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste } // namespace NActorSystemConfigHelpers +namespace NKikimrConfigHelpers { + +NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config) { + NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility + auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) { + if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) { + resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory(); + } + for (const auto& queue : resourceBrokerConfig.GetQueues()) { + if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) { + if (queue.HasLimit() && queue.GetLimit().HasMemory()) { + resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory(); + } + } + } + }; + if (config.HasBootstrapConfig() && config.GetBootstrapConfig().HasResourceBroker()) { + mergeResourceBrokerConfigs(config.GetBootstrapConfig().GetResourceBroker()); + } + if (config.HasResourceBrokerConfig()) { + mergeResourceBrokerConfigs(config.GetResourceBrokerConfig()); + } + return resourceBrokerSelfConfig; +} + +} // namespace NKikimrConfigHelpers + } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/config_helpers.h b/ydb/core/driver_lib/run/config_helpers.h index d38e416b6f5b..39a0c6cdb053 100644 --- a/ydb/core/driver_lib/run/config_helpers.h +++ b/ydb/core/driver_lib/run/config_helpers.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -15,4 +16,10 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste } // namespace NActorSystemConfigHelpers +namespace NKikimrConfigHelpers { + +NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config); + +} // namespace NKikimrConfigHelpers + } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 218994f506a2..d59d9315b129 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2052,28 +2052,8 @@ void TMemoryControllerInitializer::InitializeServices( NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { - NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility - auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) { - if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) { - resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory(); - } - for (const auto& queue : resourceBrokerConfig.GetQueues()) { - if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) { - if (queue.HasLimit() && queue.GetLimit().HasMemory()) { - resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory(); - } - } - } - }; - if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasResourceBroker()) { - mergeResourceBrokerConfigs(Config.GetBootstrapConfig().GetResourceBroker()); - } - if (Config.HasResourceBrokerConfig()) { - mergeResourceBrokerConfigs(Config.GetResourceBrokerConfig()); - } - auto* actor = NMemory::CreateMemoryController(TDuration::Seconds(1), ProcessMemoryInfoProvider, - Config.GetMemoryControllerConfig(), resourceBrokerSelfConfig, + Config.GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(Config), appData->Counters); setup->LocalServices.emplace_back( NMemory::MakeMemoryControllerId(0), diff --git a/ydb/core/testlib/basics/helpers.h b/ydb/core/testlib/basics/helpers.h index e6be7bca98ef..24126aca2cc4 100644 --- a/ydb/core/testlib/basics/helpers.h +++ b/ydb/core/testlib/basics/helpers.h @@ -18,6 +18,8 @@ namespace NFake { ui64 SectorSize = 0; ui64 ChunkSize = 0; ui64 DiskSize = 0; + bool FormatDisk = true; + TString DiskPath; }; struct INode { diff --git a/ydb/core/testlib/basics/storage.h b/ydb/core/testlib/basics/storage.h index 65b58a076fb1..e1aac88071d2 100644 --- a/ydb/core/testlib/basics/storage.h +++ b/ydb/core/testlib/basics/storage.h @@ -54,7 +54,7 @@ namespace NKikimr { static ui64 keySalt = 0; ui64 salt = ++keySalt; - TString baseDir = Runtime.GetTempDir(); + TString baseDir = conf.DiskPath ? conf.DiskPath : Runtime.GetTempDir(); if (Conf.UseDisk) { MakeDirIfNotExist(baseDir.c_str()); @@ -62,7 +62,7 @@ namespace NKikimr { PDiskPath = TStringBuilder() << baseDir << "pdisk_1.dat"; - if (!Mock) { + if (!Mock && conf.FormatDisk) { FormatPDisk(PDiskPath, Conf.DiskSize, Conf.SectorSize, Conf.ChunkSize, PDiskGuid, 0x123 + salt, 0x456 + salt, 0x789 + salt, mainKey, diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index ea5b47e4167c..a4880134e383 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -93,6 +93,7 @@ #include #include #include +#include #include #include #include @@ -613,6 +614,7 @@ namespace Tests { NKikimrBlobStorage::TDefineBox boxConfig; boxConfig.SetBoxId(Settings->BOX_ID); + boxConfig.SetItemConfigGeneration(Settings->StorageGeneration); ui32 nodeId = Runtime->GetNodeId(0); Y_ABORT_UNLESS(nodesInfo->Nodes[0].NodeId == nodeId); @@ -620,11 +622,13 @@ namespace Tests { NKikimrBlobStorage::TDefineHostConfig hostConfig; hostConfig.SetHostConfigId(nodeId); + hostConfig.SetItemConfigGeneration(Settings->StorageGeneration); TString path; if (Settings->UseSectorMap) { path ="SectorMap:test-client[:2000]"; } else { - path = TStringBuilder() << Runtime->GetTempDir() << "pdisk_1.dat"; + TString diskPath = Settings->CustomDiskParams.DiskPath; + path = TStringBuilder() << (diskPath ? diskPath : Runtime->GetTempDir()) << "pdisk_1.dat"; } hostConfig.AddDrive()->SetPath(path); if (Settings->Verbose) { @@ -640,7 +644,9 @@ namespace Tests { for (const auto& [poolKind, storagePool] : Settings->StoragePoolTypes) { if (storagePool.GetNumGroups() > 0) { - bsConfigureRequest->Record.MutableRequest()->AddCommand()->MutableDefineStoragePool()->CopyFrom(storagePool); + auto* command = bsConfigureRequest->Record.MutableRequest()->AddCommand()->MutableDefineStoragePool(); + command->CopyFrom(storagePool); + command->SetItemConfigGeneration(Settings->StorageGeneration); } } @@ -1074,6 +1080,28 @@ namespace Tests { } } + { + if (Settings->NeedStatsCollectors) { + TString filePathPrefix; + if (Settings->AppConfig->HasMonitoringConfig()) { + filePathPrefix = Settings->AppConfig->GetMonitoringConfig().GetMemAllocDumpPathPrefix(); + } + + const TIntrusivePtr processMemoryInfoProvider(MakeIntrusive()); + + IActor* monitorActor = CreateMemProfMonitor(TDuration::Seconds(1), processMemoryInfoProvider, + Runtime->GetAppData(nodeIdx).Counters, filePathPrefix); + const TActorId monitorActorId = Runtime->Register(monitorActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); + Runtime->RegisterService(MakeMemProfMonitorID(Runtime->GetNodeId(nodeIdx)), monitorActorId, nodeIdx); + + IActor* controllerActor = NMemory::CreateMemoryController(TDuration::Seconds(1), processMemoryInfoProvider, + Settings->AppConfig->GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(*Settings->AppConfig), + Runtime->GetAppData(nodeIdx).Counters); + const TActorId controllerActorId = Runtime->Register(controllerActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId); + Runtime->RegisterService(NMemory::MakeMemoryControllerId(0), controllerActorId, nodeIdx); + } + } + { IActor* kesusService = NKesus::CreateKesusProxyService(); TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 75e948610709..5db41537eb4e 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -124,6 +124,7 @@ namespace Tests { TString DomainName = TestDomainName; ui32 NodeCount = 1; ui32 DynamicNodeCount = 0; + ui64 StorageGeneration = 0; NFake::TStorage CustomDiskParams; TControls Controls; TAppPrepare::TFnReg FrFactory = &DefaultFrFactory; @@ -179,6 +180,7 @@ namespace Tests { TServerSettings& SetDomainName(const TString& value); TServerSettings& SetNodeCount(ui32 value) { NodeCount = value; return *this; } TServerSettings& SetDynamicNodeCount(ui32 value) { DynamicNodeCount = value; return *this; } + TServerSettings& SetStorageGeneration(ui64 value) { StorageGeneration = value; return *this; } TServerSettings& SetCustomDiskParams(const NFake::TStorage& value) { CustomDiskParams = value; return *this; } TServerSettings& SetControls(const TControls& value) { Controls = value; return *this; } TServerSettings& SetFrFactory(const TAppPrepare::TFnReg& value) { FrFactory = value; return *this; } diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index a4578942a676..86fc65c21b31 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -1,6 +1,8 @@ +storage sync_dir example udfs + *.log *.json *.sql diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh index ead8de30683a..61749c81b21e 100755 --- a/ydb/tests/tools/kqprun/flame_graph.sh +++ b/ydb/tests/tools/kqprun/flame_graph.sh @@ -1,11 +1,12 @@ #!/usr/bin/env bash -# For svg graph download https://github.com/brendangregg/FlameGraph -# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg` +set -eux -pid=$(pgrep -u $USER kqprun) +kqprun_pid=$(pgrep -u $USER kqprun) -echo "Target process id: ${pid}" - -sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30 +sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $kqprun_pid -v -o profdata -- sleep ${1:-'30'} sudo perf script -i profdata > profdata.txt + +flame_graph_tool="../../../../contrib/tools/flame-graph/" + +${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 69ddc226f7f9..9ac31ccb7b76 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -1,7 +1,5 @@ #include "src/kqp_runner.h" -#include - #include #include @@ -12,16 +10,27 @@ #include #include +#include #include #include + #include +#include + #include #include #include -#include + +#ifdef PROFILE_MEMORY_ALLOCATIONS +#include +#endif +namespace NKqpRun { + +namespace { + struct TExecutionOptions { enum class EExecutionCase { GenericScript, @@ -77,7 +86,7 @@ struct TExecutionOptions { return GetValue(index, ScriptQueryActions, NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE); } - NKqpRun::TRequestOptions GetSchemeQueryOptions() const { + TRequestOptions GetSchemeQueryOptions() const { TString sql = SchemeQuery; if (UseTemplates) { ReplaceYqlTokenTemplate(sql); @@ -94,7 +103,7 @@ struct TExecutionOptions { }; } - NKqpRun::TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { + TRequestOptions GetScriptQueryOptions(size_t index, size_t queryId, TInstant startTime) const { Y_ABORT_UNLESS(index < ScriptQueries.size()); TString sql = ScriptQueries[index]; @@ -110,27 +119,29 @@ struct TExecutionOptions { .PoolId = GetValue(index, PoolIds, TString()), .UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)), .Database = GetValue(index, Databases, TString()), - .Timeout = GetValue(index, Timeouts, TDuration::Zero()) + .Timeout = GetValue(index, Timeouts, TDuration::Zero()), + .QueryId = queryId }; } - void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const { + void Validate(const TRunnerOptions& runnerOptions) const { if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) { ythrow yexception() << "Nothing to execute and is not running as daemon"; } - ValidateOptionsSizes(); + ValidateOptionsSizes(runnerOptions); ValidateSchemeQueryOptions(runnerOptions); ValidateScriptExecutionOptions(runnerOptions); ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings); - ValidateTraceOpt(runnerOptions.TraceOptType); + ValidateTraceOpt(runnerOptions); + ValidateStorageSettings(runnerOptions.YdbSettings); } private: - void ValidateOptionsSizes() const { + void ValidateOptionsSizes(const TRunnerOptions& runnerOptions) const { const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) { if (checkSize > numberQueries) { - ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries; + ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of script queries is " << numberQueries; } }; @@ -141,9 +152,13 @@ struct TExecutionOptions { checker(PoolIds.size(), "pool ids"); checker(UserSIDs.size(), "user SIDs"); checker(Timeouts.size(), "timeouts"); + checker(runnerOptions.ScriptQueryAstOutputs.size(), "ast output files"); + checker(runnerOptions.ScriptQueryPlanOutputs.size(), "plan output files"); + checker(runnerOptions.ScriptQueryTimelineFiles.size(), "timeline files"); + checker(runnerOptions.InProgressStatisticsOutputFiles.size(), "statistics files"); } - void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + void ValidateSchemeQueryOptions(const TRunnerOptions& runnerOptions) const { if (SchemeQuery) { return; } @@ -152,7 +167,7 @@ struct TExecutionOptions { } } - void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const { + void ValidateScriptExecutionOptions(const TRunnerOptions& runnerOptions) const { if (runnerOptions.YdbSettings.SameSession && HasExecutionCase(EExecutionCase::AsyncQuery)) { ythrow yexception() << "Same session can not be used with async quries"; } @@ -175,7 +190,7 @@ struct TExecutionOptions { if (ResultsRowsLimit) { ythrow yexception() << "Result rows limit can not be used without script queries"; } - if (runnerOptions.InProgressStatisticsOutputFile) { + if (!runnerOptions.InProgressStatisticsOutputFiles.empty()) { ythrow yexception() << "Script statistics can not be used without script queries"; } @@ -183,10 +198,10 @@ struct TExecutionOptions { if (HasExecutionCase(EExecutionCase::YqlScript)) { return; } - if (runnerOptions.ScriptQueryAstOutput) { + if (!runnerOptions.ScriptQueryAstOutputs.empty()) { ythrow yexception() << "Script query AST output can not be used without script/yql queries"; } - if (runnerOptions.ScriptQueryPlanOutput) { + if (!runnerOptions.ScriptQueryPlanOutputs.empty()) { ythrow yexception() << "Script query plan output can not be used without script/yql queries"; } if (runnerOptions.YdbSettings.SameSession) { @@ -194,7 +209,7 @@ struct TExecutionOptions { } } - void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const { + void ValidateAsyncOptions(const TAsyncQueriesSettings& asyncQueriesSettings) const { if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) { ythrow yexception() << "In flight limit can not be used without async queries"; } @@ -205,42 +220,62 @@ struct TExecutionOptions { } } - void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const { - switch (traceOptType) { - case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: { + void ValidateTraceOpt(const TRunnerOptions& runnerOptions) const { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + switch (runnerOptions.TraceOptType) { + case TRunnerOptions::ETraceOptType::Scheme: { if (!SchemeQuery) { ythrow yexception() << "Trace opt type scheme cannot be used without scheme query"; } break; } - case NKqpRun::TRunnerOptions::ETraceOptType::Script: { + case TRunnerOptions::ETraceOptType::Script: { if (ScriptQueries.empty()) { ythrow yexception() << "Trace opt type script cannot be used without script queries"; } } - case NKqpRun::TRunnerOptions::ETraceOptType::All: { + case TRunnerOptions::ETraceOptType::All: { if (!SchemeQuery && ScriptQueries.empty()) { ythrow yexception() << "Trace opt type all cannot be used without any queries"; } } - case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: { + case TRunnerOptions::ETraceOptType::Disabled: { break; } } + + if (const auto traceOptId = runnerOptions.TraceOptScriptId) { + if (runnerOptions.TraceOptType != TRunnerOptions::ETraceOptType::Script) { + ythrow yexception() << "Trace opt id allowed only for trace opt type script (used " << runnerOptions.TraceOptType << ")"; + } + + const ui64 numberScripts = ScriptQueries.size() * LoopCount; + if (*traceOptId >= numberScripts) { + ythrow yexception() << "Invalid trace opt id " << *traceOptId << ", it should be less than number of scipt queries " << numberScripts; + } + if (numberScripts == 1) { + Cout << colors.Red() << "Warning: trace opt id is not necessary for single script mode" << Endl; + } + } } -private: - template - static TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { - if (values.empty()) { - return defaultValue; + static void ValidateStorageSettings(const TYdbSetupSettings& ydbSettings) { + if (ydbSettings.DisableDiskMock) { + if (ydbSettings.NodeCount + ydbSettings.SharedTenants.size() + ydbSettings.DedicatedTenants.size() > 1) { + ythrow yexception() << "Disable disk mock cannot be used for multi node clusters (already disabled)"; + } else if (ydbSettings.PDisksPath) { + ythrow yexception() << "Disable disk mock cannot be used with real PDisks (already disabled)"; + } + } + if (ydbSettings.FormatStorage && !ydbSettings.PDisksPath) { + ythrow yexception() << "Cannot format storage without real PDisks, please use --storage-path"; } - return values[std::min(index, values.size() - 1)]; } +private: static void ReplaceYqlTokenTemplate(TString& sql) { - const TString variableName = TStringBuilder() << "${" << NKqpRun::YQL_TOKEN_VARIABLE << "}"; - if (const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE)) { + const TString variableName = TStringBuilder() << "${" << YQL_TOKEN_VARIABLE << "}"; + if (const TString& yqlToken = GetEnv(YQL_TOKEN_VARIABLE)) { SubstGlobal(sql, variableName, yqlToken); } else if (sql.Contains(variableName)) { ythrow yexception() << "Failed to replace ${YQL_TOKEN} template, please specify YQL_TOKEN environment variable\n"; @@ -249,7 +284,7 @@ struct TExecutionOptions { }; -void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) { +void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, TKqpRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); switch (executionOptions.GetExecutionCase(index)) { @@ -292,7 +327,7 @@ void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TE } -void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) { +void RunArgumentQueries(const TExecutionOptions& executionOptions, TKqpRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); if (executionOptions.SchemeQuery) { @@ -354,11 +389,11 @@ void RunAsDaemon() { } -void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunnerOptions& runnerOptions) { +void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& runnerOptions) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of kqp runner..." << colors.Default() << Endl; - NKqpRun::TKqpRunner runner(runnerOptions); + TKqpRunner runner(runnerOptions); try { RunArgumentQueries(executionOptions, runner); @@ -403,11 +438,13 @@ TIntrusivePtr CreateFunctionRegistr class TMain : public TMainClassArgs { - inline static const TString YqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); + inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); inline static std::vector> FileHolders; + inline static IOutputStream* ProfileAllocationsOutput = nullptr; + inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); TExecutionOptions ExecutionOptions; - NKqpRun::TRunnerOptions RunnerOptions; + TRunnerOptions RunnerOptions; THashMap TablesMapping; TVector UdfsPaths; @@ -415,6 +452,9 @@ class TMain : public TMainClassArgs { bool ExcludeLinkedUdfs = false; bool EmulateYt = false; + std::optional DefaultLogPriority; + std::unordered_map LogPriorities; + static TString LoadFile(const TString& file) { return TFileInput(file).ReadAll(); } @@ -450,10 +490,29 @@ class TMain : public TMainClassArgs { return choices; } + bool Contains(const TString& choice) const { + return ChoicesMap.contains(choice); + } + private: const std::map ChoicesMap; }; +#ifdef PROFILE_MEMORY_ALLOCATIONS +public: + static void FinishProfileMemoryAllocations() { + if (ProfileAllocationsOutput) { + NAllocProfiler::StopAllocationSampling(*ProfileAllocationsOutput); + } else { + TString output; + TStringOutput stream(output); + NAllocProfiler::StopAllocationSampling(stream); + + Cout << CoutColors.Red() << "Warning: profile memory allocations output is not specified, please use flag `--profile-output` for writing profile info (dump size " << NKikimr::NBlobDepot::FormatByteSize(output.size()) << ")" << CoutColors.Default() << Endl; + } + } +#endif + protected: void RegisterOptions(NLastGetopt::TOpts& options) override { options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); @@ -467,11 +526,13 @@ class TMain : public TMainClassArgs { .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); }); + options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") .RequiredArgument("file") .Handler1([this](const NLastGetopt::TOptsParser* option) { ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); }); + options.AddLongOption("templates", "Enable templates for -s and -p queries, such as ${YQL_TOKEN} and ${QUERY_ID}") .NoArgument() .SetFlag(&ExecutionOptions.UseTemplates); @@ -483,10 +544,10 @@ class TMain : public TMainClassArgs { TStringBuf filePath; TStringBuf(option->CurVal()).Split('@', tableName, filePath); if (tableName.empty() || filePath.empty()) { - ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt"; + ythrow yexception() << "Incorrect table mapping, expected form table@file, e. g. yt.Root/plato.Input@input.txt"; } if (TablesMapping.contains(tableName)) { - ythrow yexception() << "Got duplicate table name: " << tableName; + ythrow yexception() << "Got duplicated table name: " << tableName; } TablesMapping[tableName] = filePath; }); @@ -507,9 +568,11 @@ class TMain : public TMainClassArgs { options.AddLongOption('u', "udf", "Load shared library with UDF by given path") .RequiredArgument("file") .EmplaceTo(&UdfsPaths); + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") .RequiredArgument("directory") .StoreResult(&UdfsDirectory); + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") .NoArgument() .SetFlag(&ExcludeLinkedUdfs); @@ -524,13 +587,51 @@ class TMain : public TMainClassArgs { std::remove(file.c_str()); } }); - TChoices traceOpt({ - {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, - {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, - {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, - {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} + + TChoices logPriority({ + {"emerg", NActors::NLog::EPriority::PRI_EMERG}, + {"alert", NActors::NLog::EPriority::PRI_ALERT}, + {"crit", NActors::NLog::EPriority::PRI_CRIT}, + {"error", NActors::NLog::EPriority::PRI_ERROR}, + {"warn", NActors::NLog::EPriority::PRI_WARN}, + {"notice", NActors::NLog::EPriority::PRI_NOTICE}, + {"info", NActors::NLog::EPriority::PRI_INFO}, + {"debug", NActors::NLog::EPriority::PRI_DEBUG}, + {"trace", NActors::NLog::EPriority::PRI_TRACE}, + }); + options.AddLongOption("log-default", "Default log priority") + .RequiredArgument("priority") + .Choices(logPriority.GetChoices()) + .StoreMappedResultT(&DefaultLogPriority, logPriority); + + options.AddLongOption("log", "Component log priority in format = (e. g. KQP_YQL=trace)") + .RequiredArgument("component priority") + .Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) { + TStringBuf component; + TStringBuf priority; + TStringBuf(option->CurVal()).Split('=', component, priority); + if (component.empty() || priority.empty()) { + ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace"; + } + + const auto service = GetLogService(TString(component)); + if (LogPriorities.contains(service)) { + ythrow yexception() << "Got duplicated log service name: " << component; + } + + if (!logPriority.Contains(TString(priority))) { + ythrow yexception() << "Incorrect log priority: " << priority; + } + LogPriorities[service] = logPriority(TString(priority)); + }); + + TChoices traceOpt({ + {"all", TRunnerOptions::ETraceOptType::All}, + {"scheme", TRunnerOptions::ETraceOptType::Scheme}, + {"script", TRunnerOptions::ETraceOptType::Script}, + {"disabled", TRunnerOptions::ETraceOptType::Disabled} }); - options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation") + options.AddLongOption('T', "trace-opt", "Print AST in the begin of each transformation (use script@ for tracing one -p query)") .RequiredArgument("trace-opt-query") .DefaultValue("disabled") .Choices(traceOpt.GetChoices()) @@ -539,6 +640,11 @@ class TMain : public TMainClassArgs { RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; return traceOptType; }); + + options.AddLongOption('I', "trace-opt-index", "Index of -p query to use --trace-opt, starts from zero") + .RequiredArgument("uint") + .StoreResult(&RunnerOptions.TraceOptScriptId); + options.AddLongOption("trace-id", "Trace id for -p queries") .RequiredArgument("id") .EmplaceTo(&ExecutionOptions.TraceIds); @@ -547,14 +653,16 @@ class TMain : public TMainClassArgs { .RequiredArgument("file") .DefaultValue("-") .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); + options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") .RequiredArgument("uint") .DefaultValue(0) .StoreResult(&ExecutionOptions.ResultsRowsLimit); - TChoices resultFormat({ - {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, - {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, - {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} + + TChoices resultFormat({ + {"rows", TRunnerOptions::EResultOutputFormat::RowsJson}, + {"full-json", TRunnerOptions::EResultOutputFormat::FullJson}, + {"full-proto", TRunnerOptions::EResultOutputFormat::FullProto} }); options.AddLongOption('R', "result-format", "Script query result format") .RequiredArgument("result-format") @@ -568,19 +676,26 @@ class TMain : public TMainClassArgs { options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") .RequiredArgument("file") - .StoreMappedResultT(&RunnerOptions.ScriptQueryAstOutput, &GetDefaultOutput); + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.ScriptQueryAstOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef()))); + }); options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") .RequiredArgument("file") - .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.ScriptQueryPlanOutputs.emplace_back(GetDefaultOutput(TString(option->CurValOrDef()))); + }); + options.AddLongOption("script-statistics", "File with script inprogress statistics") .RequiredArgument("file") - .StoreMappedResultT(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) { + .Handler1([this](const NLastGetopt::TOptsParser* option) { + const TString file(option->CurValOrDef()); if (file == "-") { ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name"; } - return file; + RunnerOptions.InProgressStatisticsOutputFiles.emplace_back(file); }); + TChoices planFormat({ {"pretty", NYdb::NConsoleClient::EDataFormat::Pretty}, {"table", NYdb::NConsoleClient::EDataFormat::PrettyTable}, @@ -594,13 +709,18 @@ class TMain : public TMainClassArgs { options.AddLongOption("script-timeline-file", "File with script query timline in svg format") .RequiredArgument("file") - .StoreMappedResultT(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) { + .Handler1([this](const NLastGetopt::TOptsParser* option) { + const TString file(option->CurValOrDef()); if (file == "-") { ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name"; } - return file; + RunnerOptions.ScriptQueryTimelineFiles.emplace_back(file); }); + options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&ProfileAllocationsOutput, &GetDefaultOutput); + // Pipeline settings TChoices executionCase({ @@ -616,13 +736,20 @@ class TMain : public TMainClassArgs { TString choice(option->CurValOrDef()); ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); }); + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") .RequiredArgument("uint") .DefaultValue(0) .StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit); - TChoices verbose({ - {"each-query", NKqpRun::TAsyncQueriesSettings::EVerbose::EachQuery}, - {"final", NKqpRun::TAsyncQueriesSettings::EVerbose::Final} + + options.AddLongOption("verbose", "Common verbose level (max level 2)") + .RequiredArgument("uint") + .DefaultValue(1) + .StoreResult(&RunnerOptions.YdbSettings.VerboseLevel); + + TChoices verbose({ + {"each-query", TAsyncQueriesSettings::EVerbose::EachQuery}, + {"final", TAsyncQueriesSettings::EVerbose::Final} }); options.AddLongOption("async-verbose", "Verbose type for async queries") .RequiredArgument("type") @@ -660,10 +787,12 @@ class TMain : public TMainClassArgs { .RequiredArgument("uint") .DefaultValue(ExecutionOptions.LoopCount) .StoreResult(&ExecutionOptions.LoopCount); + options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") .RequiredArgument("uint") .DefaultValue(0) .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails") .NoArgument() .SetFlag(&ExecutionOptions.ContinueAfterFail); @@ -742,9 +871,13 @@ class TMain : public TMainClassArgs { return static_cast(diskSize) << 30; }); - options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)") + options.AddLongOption("storage-path", "Use real PDisks by specified path instead of in memory PDisks (also disable disk mock), use '-' to use temp directory") + .RequiredArgument("directory") + .StoreResult(&RunnerOptions.YdbSettings.PDisksPath); + + options.AddLongOption("format-storage", "Clear storage if it exists on --storage-path") .NoArgument() - .SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks); + .SetFlag(&RunnerOptions.YdbSettings.FormatStorage); options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster") .NoArgument() @@ -767,18 +900,21 @@ class TMain : public TMainClassArgs { int DoRun(NLastGetopt::TOptsParseResult&&) override { ExecutionOptions.Validate(RunnerOptions); - if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) { - ythrow yexception() << "Disable disk mock cannot be used for multi node clusters"; - } - RunnerOptions.YdbSettings.YqlToken = YqlToken; RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + + auto& appConfig = RunnerOptions.YdbSettings.AppConfig; if (ExecutionOptions.ResultsRowsLimit) { - RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + appConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + } + + if (DefaultLogPriority) { + appConfig.MutableLogConfig()->SetDefaultLevel(*DefaultLogPriority); } + ModifyLogPriorities(LogPriorities, *appConfig.MutableLogConfig()); if (EmulateYt) { - const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); + const auto& fileStorageConfig = appConfig.GetQueryServiceConfig().GetFileStorage(); auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); auto ytFileServices = NYql::NFile::TYtFileServices::Make(RunnerOptions.YdbSettings.FunctionRegistry.Get(), TablesMapping, fileStorage); RunnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices); @@ -787,7 +923,26 @@ class TMain : public TMainClassArgs { ythrow yexception() << "Tables mapping is not supported without emulate YT mode"; } +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl; + } + NAllocProfiler::StartAllocationSampling(true); +#else + if (ProfileAllocationsOutput) { + ythrow yexception() << "Profile memory allocations disabled, please rebuild kqprun with flag `-D PROFILE_MEMORY_ALLOCATIONS`"; + } +#endif + RunScript(ExecutionOptions, RunnerOptions); + +#ifdef PROFILE_MEMORY_ALLOCATIONS + if (RunnerOptions.YdbSettings.VerboseLevel >= 1) { + Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl; + } + FinishProfileMemoryAllocations(); +#endif + return 0; } }; @@ -814,13 +969,31 @@ void SegmentationFaultHandler(int) { abort(); } +#ifdef PROFILE_MEMORY_ALLOCATIONS +void InterruptHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; + TMain::FinishProfileMemoryAllocations(); + + abort(); +} +#endif + +} // anonymous namespace + +} // namespace NKqpRun int main(int argc, const char* argv[]) { - std::set_terminate(KqprunTerminateHandler); - signal(SIGSEGV, &SegmentationFaultHandler); + std::set_terminate(NKqpRun::KqprunTerminateHandler); + signal(SIGSEGV, &NKqpRun::SegmentationFaultHandler); + +#ifdef PROFILE_MEMORY_ALLOCATIONS + signal(SIGINT, &NKqpRun::InterruptHandler); +#endif try { - TMain().Run(argc, argv); + NKqpRun::TMain().Run(argc, argv); } catch (...) { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 66b28b109388..ce9dda414fa9 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -14,6 +14,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped promise, TProgressCallback progressCallback) : TargetNode_(request.TargetNode) + , QueryId_(request.QueryId) , Request_(std::move(request.Event)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) @@ -83,12 +84,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedGet()->Record); + ProgressCallback_(QueryId_, ev->Get()->Record); } } private: - ui32 TargetNode_ = 0; + const ui32 TargetNode_ = 0; + const size_t QueryId_ = 0; + std::unique_ptr Request_; NThreading::TPromise Promise_; ui64 ResultRowsLimit_; @@ -296,6 +299,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped openPromise, NThreading::TPromise closePromise) : TargetNode_(request.TargetNode) , TraceId_(request.Event->Record.GetTraceId()) + , VerboseLevel_(request.VerboseLevel) , Request_(std::move(request.Event)) , OpenPromise_(openPromise) , ClosePromise_(closePromise) @@ -314,7 +318,9 @@ class TSessionHolderActor : public NActors::TActorBootstrapped= 1) { + Cout << CoutColors_.Cyan() << "Created new session on node " << TargetNode_ << " with id " << SessionId_ << "\n"; + } PingSession(); } @@ -390,6 +396,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped Request_; diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index cd477239ddb4..abe6be68060f 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -18,11 +18,13 @@ struct TQueryRequest { ui32 TargetNode; ui64 ResultRowsLimit; ui64 ResultSizeLimit; + size_t QueryId; }; struct TCreateSessionRequest { std::unique_ptr Event; ui32 TargetNode; + ui8 VerboseLevel; }; struct TEvPrivate { @@ -75,7 +77,7 @@ struct TEvPrivate { }; }; -using TProgressCallback = std::function; +using TProgressCallback = std::function; NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback); diff --git a/ydb/tests/tools/kqprun/src/common.cpp b/ydb/tests/tools/kqprun/src/common.cpp new file mode 100644 index 000000000000..0241ca6a932c --- /dev/null +++ b/ydb/tests/tools/kqprun/src/common.cpp @@ -0,0 +1,29 @@ +#include "common.h" + + +namespace NKqpRun { + +NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName) { + NKikimrServices::EServiceKikimr service; + if (!NKikimrServices::EServiceKikimr_Parse(serviceName, &service)) { + ythrow yexception() << "Invalid kikimr service name " << serviceName; + } + return service; +} + +void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig) { + for (auto& entry : *logConfig.MutableEntry()) { + const auto it = logPriorities.find(GetLogService(entry.GetComponent())); + if (it != logPriorities.end()) { + entry.SetLevel(it->second); + logPriorities.erase(it); + } + } + for (const auto& [service, priority] : logPriorities) { + auto* entry = logConfig.AddEntry(); + entry->SetComponent(NKikimrServices::EServiceKikimr_Name(service)); + entry->SetLevel(priority); + } +} + +} // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 0c55b7bc04de..81de1525889e 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -1,14 +1,18 @@ #pragma once #include -#include #include +#include +#include + +#include +#include + #include #include -#include -#include +#include namespace NKqpRun { @@ -35,7 +39,8 @@ struct TYdbSetupSettings { bool SameSession = false; bool DisableDiskMock = false; - bool UseRealPDisks = false; + bool FormatStorage = false; + std::optional PDisksPath; ui64 DiskSize = 32_GB; bool MonitoringEnabled = false; @@ -46,6 +51,7 @@ struct TYdbSetupSettings { bool TraceOptEnabled = false; TString LogOutputFile; + ui8 VerboseLevel = 1; TString YqlToken; TIntrusivePtr FunctionRegistry; @@ -72,14 +78,15 @@ struct TRunnerOptions { IOutputStream* ResultOutput = nullptr; IOutputStream* SchemeQueryAstOutput = nullptr; - IOutputStream* ScriptQueryAstOutput = nullptr; - IOutputStream* ScriptQueryPlanOutput = nullptr; - TString ScriptQueryTimelineFile; - TString InProgressStatisticsOutputFile; + std::vector ScriptQueryAstOutputs; + std::vector ScriptQueryPlanOutputs; + std::vector ScriptQueryTimelineFiles; + std::vector InProgressStatisticsOutputFiles; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EDataFormat PlanOutputFormat = NYdb::NConsoleClient::EDataFormat::Default; ETraceOptType TraceOptType = ETraceOptType::Disabled; + std::optional TraceOptScriptId; TDuration ScriptCancelAfter; @@ -95,6 +102,18 @@ struct TRequestOptions { TString UserSID; TString Database; TDuration Timeout; + size_t QueryId = 0; }; +template +TValue GetValue(size_t index, const std::vector& values, TValue defaultValue) { + if (values.empty()) { + return defaultValue; + } + return values[std::min(index, values.size() - 1)]; +} + +NKikimrServices::EServiceKikimr GetLogService(const TString& serviceName); +void ModifyLogPriorities(std::unordered_map logPriorities, NKikimrConfig::TLogConfig& logConfig); + } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index e880554d2c34..f926538af6d3 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -120,7 +120,7 @@ class TKqpRunner::TImpl { } bool ExecuteScript(const TRequestOptions& script) { - StartScriptTraceOpt(); + StartScriptTraceOpt(script.QueryId); TRequestResult status = YdbSetup_.ScriptRequest(script, ExecutionOperation_); @@ -132,11 +132,11 @@ class TKqpRunner::TImpl { ExecutionMeta_ = TExecutionMeta(); ExecutionMeta_.Database = script.Database; - return WaitScriptExecutionOperation(); + return WaitScriptExecutionOperation(script.QueryId); } bool ExecuteQuery(const TRequestOptions& query, EQueryType queryType) { - StartScriptTraceOpt(); + StartScriptTraceOpt(query.QueryId); StartTime_ = TInstant::Now(); TString queryTypeStr; @@ -164,9 +164,9 @@ class TKqpRunner::TImpl { meta.Plan = ExecutionMeta_.Plan; } - PrintScriptAst(meta.Ast); - PrintScriptProgress(meta.Plan); - PrintScriptPlan(meta.Plan); + PrintScriptAst(query.QueryId, meta.Ast); + PrintScriptProgress(query.QueryId, meta.Plan); + PrintScriptPlan(query.QueryId, meta.Plan); PrintScriptFinish(meta, queryTypeStr); if (!status.IsSuccess()) { @@ -224,7 +224,7 @@ class TKqpRunner::TImpl { if (Options_.ResultOutput) { Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing script query results..." << CoutColors_.Default() << Endl; for (size_t i = 0; i < ResultSets_.size(); ++i) { - if (ResultSets_.size() > 1) { + if (ResultSets_.size() > 1 && Options_.YdbSettings.VerboseLevel >= 1) { *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; } PrintScriptResult(ResultSets_[i]); @@ -233,7 +233,7 @@ class TKqpRunner::TImpl { } private: - bool WaitScriptExecutionOperation() { + bool WaitScriptExecutionOperation(ui64 queryId) { StartTime_ = TInstant::Now(); TDuration getOperationPeriod = TDuration::Seconds(1); @@ -244,7 +244,7 @@ class TKqpRunner::TImpl { TRequestResult status; while (true) { status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionMeta_.Database, ExecutionOperation_, ExecutionMeta_); - PrintScriptProgress(ExecutionMeta_.Plan); + PrintScriptProgress(queryId, ExecutionMeta_.Plan); if (ExecutionMeta_.Ready) { break; @@ -267,9 +267,11 @@ class TKqpRunner::TImpl { Sleep(getOperationPeriod); } - PrintScriptAst(ExecutionMeta_.Ast); - PrintScriptProgress(ExecutionMeta_.Plan); - PrintScriptPlan(ExecutionMeta_.Plan); + TYdbSetup::StopTraceOpt(); + + PrintScriptAst(queryId, ExecutionMeta_.Ast); + PrintScriptProgress(queryId, ExecutionMeta_.Plan); + PrintScriptPlan(queryId, ExecutionMeta_.Plan); PrintScriptFinish(ExecutionMeta_, "Script"); if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) { @@ -290,23 +292,33 @@ class TKqpRunner::TImpl { } } - void StartScriptTraceOpt() const { - if (Options_.TraceOptType == TRunnerOptions::ETraceOptType::All || Options_.TraceOptType == TRunnerOptions::ETraceOptType::Script) { + void StartScriptTraceOpt(size_t queryId) const { + bool startTraceOpt = Options_.TraceOptType == TRunnerOptions::ETraceOptType::All; + + if (Options_.TraceOptType == TRunnerOptions::ETraceOptType::Script) { + startTraceOpt |= !Options_.TraceOptScriptId || *Options_.TraceOptScriptId == queryId; + } + + if (startTraceOpt) { YdbSetup_.StartTraceOpt(); } } void PrintSchemeQueryAst(const TString& ast) const { if (Options_.SchemeQueryAstOutput) { - Cout << CoutColors_.Cyan() << "Writing scheme query ast" << CoutColors_.Default() << Endl; + if (Options_.YdbSettings.VerboseLevel >= 1) { + Cout << CoutColors_.Cyan() << "Writing scheme query ast" << CoutColors_.Default() << Endl; + } Options_.SchemeQueryAstOutput->Write(ast); } } - void PrintScriptAst(const TString& ast) const { - if (Options_.ScriptQueryAstOutput) { - Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl; - Options_.ScriptQueryAstOutput->Write(ast); + void PrintScriptAst(size_t queryId, const TString& ast) const { + if (const auto output = GetValue(queryId, Options_.ScriptQueryAstOutputs, nullptr)) { + if (Options_.YdbSettings.VerboseLevel >= 1) { + Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl; + } + output->Write(ast); } } @@ -325,16 +337,18 @@ class TKqpRunner::TImpl { printer.Print(plan); } - void PrintScriptPlan(const TString& plan) const { - if (Options_.ScriptQueryPlanOutput) { - Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; - PrintPlan(plan, Options_.ScriptQueryPlanOutput); + void PrintScriptPlan(size_t queryId, const TString& plan) const { + if (const auto output = GetValue(queryId, Options_.ScriptQueryPlanOutputs, nullptr)) { + if (Options_.YdbSettings.VerboseLevel >= 1) { + Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; + } + PrintPlan(plan, output); } } - void PrintScriptProgress(const TString& plan) const { - if (Options_.InProgressStatisticsOutputFile) { - TFileOutput outputStream(Options_.InProgressStatisticsOutputFile); + void PrintScriptProgress(size_t queryId, const TString& plan) const { + if (const auto& output = GetValue(queryId, Options_.InProgressStatisticsOutputFiles, {})) { + TFileOutput outputStream(output); outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; auto convertedPlan = plan; @@ -361,8 +375,8 @@ class TKqpRunner::TImpl { outputStream.Finish(); } - if (Options_.ScriptQueryTimelineFile) { - TFileOutput outputStream(Options_.ScriptQueryTimelineFile); + if (const auto& output = GetValue(queryId, Options_.ScriptQueryTimelineFiles, {})) { + TFileOutput outputStream(output); TPlanVisualizer planVisualizer; planVisualizer.LoadPlans(plan); @@ -373,10 +387,10 @@ class TKqpRunner::TImpl { } TProgressCallback GetProgressCallback() { - return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable { + return [this](ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable { const TString& plan = executerProgress.GetQueryPlan(); ExecutionMeta_.Plan = plan; - PrintScriptProgress(plan); + PrintScriptProgress(queryId, plan); }; } @@ -411,6 +425,9 @@ class TKqpRunner::TImpl { } void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const { + if (Options_.YdbSettings.VerboseLevel < 1) { + return; + } Cout << CoutColors_.Cyan() << queryType << " request finished."; if (meta.TotalDuration) { Cout << " Total duration: " << meta.TotalDuration; diff --git a/ydb/tests/tools/kqprun/src/proto/storage_meta.proto b/ydb/tests/tools/kqprun/src/proto/storage_meta.proto new file mode 100644 index 000000000000..fabc54f2ba64 --- /dev/null +++ b/ydb/tests/tools/kqprun/src/proto/storage_meta.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package NKqpRun; + +message TStorageMeta { + uint64 StorageGeneration = 1; +} diff --git a/ydb/tests/tools/kqprun/src/proto/ya.make b/ydb/tests/tools/kqprun/src/proto/ya.make new file mode 100644 index 000000000000..8cbcbe198594 --- /dev/null +++ b/ydb/tests/tools/kqprun/src/proto/ya.make @@ -0,0 +1,9 @@ +PROTO_LIBRARY() + +ONLY_TAGS(CPP_PROTO) + +SRCS( + storage_meta.proto +) + +END() diff --git a/ydb/tests/tools/kqprun/src/ya.make b/ydb/tests/tools/kqprun/src/ya.make index 8b74f61204d0..59097668037a 100644 --- a/ydb/tests/tools/kqprun/src/ya.make +++ b/ydb/tests/tools/kqprun/src/ya.make @@ -2,14 +2,19 @@ LIBRARY() SRCS( actors.cpp + common.cpp kqp_runner.cpp ydb_setup.cpp ) PEERDIR( ydb/core/testlib + + ydb/tests/tools/kqprun/src/proto ) +GENERATE_ENUM_SERIALIZATION(common.h) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 97ab3a0b9b5f..856ab445ed7d 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -9,6 +9,9 @@ #include #include + +#include + #include @@ -65,7 +68,7 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred class TSessionState { public: - explicit TSessionState(NActors::TTestActorRuntime* runtime, ui32 targetNodeIndex, const TString& database, const TString& traceId) + explicit TSessionState(NActors::TTestActorRuntime* runtime, ui32 targetNodeIndex, const TString& database, const TString& traceId, ui8 verboseLevel) : Runtime_(runtime) , TargetNodeIndex_(targetNodeIndex) { @@ -78,7 +81,8 @@ class TSessionState { auto closePromise = NThreading::NewPromise(); SessionHolderActor_ = Runtime_->Register(CreateSessionHolderActor(TCreateSessionRequest{ .Event = std::move(event), - .TargetNode = Runtime_->GetNodeId(targetNodeIndex) + .TargetNode = Runtime_->GetNodeId(targetNodeIndex), + .VerboseLevel = verboseLevel }, openPromise, closePromise)); SessionId_ = openPromise.GetFuture().GetValueSync(); @@ -149,13 +153,8 @@ class TYdbSetup::TImpl { } } - for (auto setting : Settings_.AppConfig.GetLogConfig().get_arr_entry()) { - NKikimrServices::EServiceKikimr service; - if (!NKikimrServices::EServiceKikimr_Parse(setting.GetComponent(), &service)) { - ythrow yexception() << "Invalid kikimr service name " << setting.GetComponent(); - } - - runtime.SetLogPriority(service, NActors::NLog::EPriority(setting.GetLevel())); + for (const auto& setting : Settings_.AppConfig.GetLogConfig().get_arr_entry()) { + runtime.SetLogPriority(GetLogService(setting.GetComponent()), NActors::NLog::EPriority(setting.GetLevel())); } runtime.SetLogBackendFactory([this]() { return CreateLogBackend(); }); @@ -177,15 +176,46 @@ class TYdbSetup::TImpl { } void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const { + TString diskPath; + if (Settings_.PDisksPath && *Settings_.PDisksPath != "-") { + diskPath = TStringBuilder() << *Settings_.PDisksPath << "/"; + } + + bool formatDisk = true; + NKqpRun::TStorageMeta storageMeta; + if (diskPath) { + TFsPath storageMetaPath(diskPath); + storageMetaPath.MkDirs(); + + storageMetaPath = storageMetaPath.Child("kqprun_storage_meta.conf"); + if (storageMetaPath.Exists() && !Settings_.FormatStorage) { + if (!google::protobuf::TextFormat::ParseFromString(TFileInput(storageMetaPath.GetPath()).ReadAll(), &storageMeta)) { + ythrow yexception() << "Storage meta is corrupted, please use --format-storage"; + } + storageMeta.SetStorageGeneration(storageMeta.GetStorageGeneration() + 1); + formatDisk = false; + } + + TString storageMetaStr; + google::protobuf::TextFormat::PrintToString(storageMeta, &storageMetaStr); + + TFileOutput storageMetaOutput(storageMetaPath.GetPath()); + storageMetaOutput.Write(storageMetaStr); + storageMetaOutput.Finish(); + } + const NKikimr::NFake::TStorage storage = { - .UseDisk = Settings_.UseRealPDisks, + .UseDisk = !!Settings_.PDisksPath, .SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE, - .ChunkSize = Settings_.UseRealPDisks ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE, - .DiskSize = Settings_.DiskSize + .ChunkSize = Settings_.PDisksPath ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE, + .DiskSize = Settings_.DiskSize, + .FormatDisk = formatDisk, + .DiskPath = diskPath }; - serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.UseRealPDisks); + serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.PDisksPath); serverSettings.SetCustomDiskParams(storage); + serverSettings.SetStorageGeneration(storageMeta.GetStorageGeneration()); } NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { @@ -210,7 +240,7 @@ class TYdbSetup::TImpl { serverSettings.SetYtGateway(Settings_.YtGateway); serverSettings.S3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); serverSettings.SetInitializeFederatedQuerySetupFactory(true); - serverSettings.SetVerbose(false); + serverSettings.SetVerbose(Settings_.VerboseLevel >= 2); SetLoggerSettings(serverSettings); SetFunctionRegistry(serverSettings); @@ -315,21 +345,7 @@ class TYdbSetup::TImpl { return; } - bool found = false; - for (auto& entry : *Settings_.AppConfig.MutableLogConfig()->MutableEntry()) { - if (entry.GetComponent() == "KQP_YQL") { - entry.SetLevel(NActors::NLog::PRI_TRACE); - found = true; - break; - } - } - - if (!found) { - auto entry = Settings_.AppConfig.MutableLogConfig()->AddEntry(); - entry->SetComponent("KQP_YQL"); - entry->SetLevel(NActors::NLog::PRI_TRACE); - } - + ModifyLogPriorities({{NKikimrServices::EServiceKikimr::KQP_YQL, NActors::NLog::PRI_TRACE}}, *Settings_.AppConfig.MutableLogConfig()); NYql::NLog::InitLogger(NActors::CreateNullBackend()); } @@ -355,13 +371,13 @@ class TYdbSetup::TImpl { InitializeServer(grpcPort); WaitResourcesPublishing(); - if (Settings_.MonitoringEnabled) { + if (Settings_.MonitoringEnabled && Settings_.VerboseLevel >= 1) { for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) { Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl; } } - if (Settings_.GrpcEnabled) { + if (Settings_.GrpcEnabled && Settings_.VerboseLevel >= 1) { Cout << CoutColors_.Cyan() << "Domain gRPC port: " << CoutColors_.Default() << grpcPort << Endl; } } @@ -516,7 +532,7 @@ class TYdbSetup::TImpl { if (Settings_.SameSession) { if (!SessionState_) { - SessionState_ = TSessionState(GetRuntime(), targetNodeIndex, database, query.TraceId); + SessionState_ = TSessionState(GetRuntime(), targetNodeIndex, database, query.TraceId, Settings_.VerboseLevel); } request->SetSessionId(SessionState_->GetSessionId()); } @@ -535,7 +551,8 @@ class TYdbSetup::TImpl { .Event = std::move(event), .TargetNode = GetRuntime()->GetNodeId(targetNodeIndex), .ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(), - .ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit() + .ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(), + .QueryId = query.QueryId }; } diff --git a/ydb/tests/tools/kqprun/ya.make b/ydb/tests/tools/kqprun/ya.make index 1ed5ff21cdd1..05b119d8e099 100644 --- a/ydb/tests/tools/kqprun/ya.make +++ b/ydb/tests/tools/kqprun/ya.make @@ -1,5 +1,11 @@ PROGRAM(kqprun) +IF (PROFILE_MEMORY_ALLOCATIONS) + MESSAGE("Enabled profile memory allocations") + ALLOCATOR(LF_DBG) + CFLAGS(-D PROFILE_MEMORY_ALLOCATIONS) +ENDIF() + SRCS( kqprun.cpp )