From 8e01871d71eb1575e18292ec4f418070890fae4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D0=BE=D0=B6=D0=B8=D0=BB=D0=BE=D0=B2?= Date: Fri, 27 Dec 2024 19:46:52 +0300 Subject: [PATCH] Allow PG in DQ output/input consumers (#13043) --- .../compute_actor/kqp_compute_actor_impl.h | 2 +- ydb/core/tx/datashard/datashard_kqp.cpp | 5 ++- .../yql/dq/runtime/dq_input_producer.cpp | 15 ++++----- .../yql/dq/runtime/dq_input_producer.h | 4 ++- .../yql/dq/runtime/dq_output_consumer.cpp | 14 ++++++--- .../yql/dq/runtime/dq_output_consumer.h | 5 ++- .../yql/dq/runtime/dq_tasks_runner.cpp | 31 +++++++++++++------ ydb/library/yql/dq/runtime/dq_tasks_runner.h | 12 +++++-- 8 files changed, 59 insertions(+), 29 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index fc68f913aced..62ae9d1fe5fe 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -24,7 +24,7 @@ class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector&& outputs) const override + TVector&& outputs, NUdf::IPgBuilder* pgBuilder) const override { return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), MinFillPercentage_); } diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 2959a0f34e10..80b8ab2e59ed 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -1001,7 +1001,10 @@ namespace { class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext { public: - NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector&& outputs) const override { + NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, + NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector&& outputs, NUdf::IPgBuilder* pgBuilder) const override + { return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), {}); } diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.cpp b/ydb/library/yql/dq/runtime/dq_input_producer.cpp index a346209a5a3f..eb7f45757dee 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_producer.cpp @@ -345,15 +345,14 @@ TVector> MakeReaders(const TVector> MakeBuilders(ui64 blockLen, const TVector itemTypes) { +TVector> MakeBuilders(ui64 blockLen, const TVector itemTypes, NUdf::IPgBuilder* pgBuilder) { TVector> result; TTypeInfoHelper helper; for (auto& itemType : itemTypes) { if (itemType) { // TODO: pass memory pool - // TODO: IPgBuilder YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet"); - result.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), blockLen, nullptr)); + result.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), blockLen, pgBuilder)); } else { result.emplace_back(); } @@ -380,13 +379,14 @@ class TDqInputMergeBlockStreamValue : public TComputationValue; public: TDqInputMergeBlockStreamValue(TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector&& inputs, - TVector&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) + TVector&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats, + NUdf::IPgBuilder* pgBuilder) : TBase(memInfo) , SortCols_(std::move(sortCols)) , ItemTypes_(ExtractBlockItemTypes(type)) , MaxOutputBlockLen_(CalcMaxBlockLength(ItemTypes_.begin(), ItemTypes_.end(), TTypeInfoHelper())) , Comparators_(MakeComparators(SortCols_, ItemTypes_)) - , Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_)) + , Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_, pgBuilder)) , Factory_(factory) , Stats_(stats) { @@ -750,7 +750,8 @@ NUdf::TUnboxedValue CreateInputUnionValue(const NKikimr::NMiniKQL::TType* type, } NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TType* type, TVector&& inputs, - TVector&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats) + TVector&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats, + NUdf::IPgBuilder* pgBuilder) { ValidateInputTypes(type, inputs); YQL_ENSURE(!inputs.empty()); @@ -761,7 +762,7 @@ NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TTyp if (sortCols.empty()) { return factory.Create>(type, std::move(inputs), stats); } - return factory.Create(type, std::move(inputs), std::move(sortCols), factory, stats); + return factory.Create(type, std::move(inputs), std::move(sortCols), factory, stats, pgBuilder); } return factory.Create>(type, std::move(inputs), std::move(sortCols), stats); } diff --git a/ydb/library/yql/dq/runtime/dq_input_producer.h b/ydb/library/yql/dq/runtime/dq_input_producer.h index 8c4ac669c89d..4c6dda91d59e 100644 --- a/ydb/library/yql/dq/runtime/dq_input_producer.h +++ b/ydb/library/yql/dq/runtime/dq_input_producer.h @@ -3,6 +3,8 @@ #include "dq_input_channel.h" #include "dq_columns_resolve.h" +#include + namespace NYql::NDq { struct TDqMeteringStats { @@ -33,6 +35,6 @@ NKikimr::NUdf::TUnboxedValue CreateInputUnionValue(const NKikimr::NMiniKQL::TTyp NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TType* type, TVector&& inputs, TVector&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, - TDqMeteringStats::TInputStatsMeter = {}); + TDqMeteringStats::TInputStatsMeter = {}, NUdf::IPgBuilder* pgBuilder = nullptr); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index edf2c285d91a..12d505fb2dc0 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -328,7 +328,8 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { TDqOutputHashPartitionConsumerBlock(TVector&& outputs, TVector&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TMaybe minFillPercentage) + TMaybe minFillPercentage, + NUdf::IPgBuilder* pgBuilder) : OutputType_(static_cast(outputType)) , HolderFactory_(holderFactory) , Outputs_(std::move(outputs)) @@ -336,6 +337,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { , ScalarColumnHashes_(KeyColumns_.size()) , OutputWidth_(OutputType_->GetElementsCount()) , MinFillPercentage_(minFillPercentage) + , PgBuilder_(pgBuilder) { TTypeInfoHelper helper; YQL_ENSURE(OutputWidth_ > KeyColumns_.size()); @@ -517,8 +519,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { auto blockType = static_cast(columnType); if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) { auto itemType = blockType->GetItemType(); - YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet"); - Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr, {.MinFillPercentage=MinFillPercentage_})); + Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, PgBuilder_, {.MinFillPercentage=MinFillPercentage_})); } else { Builders_.emplace_back(); } @@ -542,6 +543,8 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer { TVector> Builders_; mutable bool IsWaitingFlag_ = false; + + NUdf::IPgBuilder* PgBuilder_; }; class TDqOutputBroadcastConsumer : public IDqOutputConsumer { @@ -605,7 +608,8 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector&& outputs, TVector&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TMaybe minFillPercentage) + TMaybe minFillPercentage, + NUdf::IPgBuilder* pgBuilder) { YQL_ENSURE(!outputs.empty()); YQL_ENSURE(!keyColumns.empty()); @@ -624,7 +628,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( return MakeIntrusive(std::move(outputs), std::move(keyColumns), outputType); } - return MakeIntrusive(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage); + return MakeIntrusive(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage, pgBuilder); } IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector&& outputs, TMaybe outputWidth) { diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index 17a817dd4419..5a2703912e63 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -3,6 +3,8 @@ #include "dq_columns_resolve.h" #include "dq_output.h" +#include + #include namespace NKikimr::NMiniKQL { @@ -48,7 +50,8 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer( TVector&& outputs, TVector&& keyColumns, const NKikimr::NMiniKQL::TType* outputType, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TMaybe minFillPercentage); + TMaybe minFillPercentage, + NUdf::IPgBuilder* pgBuilder); IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector&& outputs, TMaybe outputWidth); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index dfc062d95933..7d413615c98a 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -141,7 +142,8 @@ void ValidateParamValue(std::string_view paramName, const TType* type, const NUd #define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0) NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type, - TVector&& inputs, const THolderFactory& holderFactory, TDqMeteringStats::TInputStatsMeter stats) + TVector&& inputs, const THolderFactory& holderFactory, TDqMeteringStats::TInputStatsMeter stats, + NUdf::IPgBuilder* pgBuilder) { switch (inputDesc.GetTypeCase()) { case NYql::NDqProto::TTaskInput::kSource: @@ -155,7 +157,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con GetSortColumnsInfo(type, protoSortCols, sortColsInfo); YQL_ENSURE(!sortColsInfo.empty()); - return CreateInputMergeValue(type, std::move(inputs), std::move(sortColsInfo), holderFactory, stats); + return CreateInputMergeValue(type, std::move(inputs), std::move(sortColsInfo), holderFactory, stats, pgBuilder); } default: YQL_ENSURE(false, "Unknown input type: " << (ui32) inputDesc.GetTypeCase()); @@ -164,7 +166,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector&& outputs, TMaybe minFillPercentage) + TVector&& outputs,NUdf::IPgBuilder* pgBuilder, TMaybe minFillPercentage) { TMaybe outputWidth; if (type->IsMulti()) { @@ -185,7 +187,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns); YQL_ENSURE(!keyColumns.empty()); YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize()); - return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage); + return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage, pgBuilder); } case NDqProto::TTaskOutput::kBroadcast: { @@ -206,11 +208,18 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } } +IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector&& channels, TMaybe minFillPercentage) +{ + return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(channels), nullptr, minFillPercentage); +} + IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContextBase::CreateOutputConsumer(const TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv, - const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector&& outputs) const + const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector&& outputs, NUdf::IPgBuilder* pgBuilder) const { - return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs)); + return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs), pgBuilder, {}); } inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode statsMode) { @@ -583,7 +592,7 @@ class TDqTaskRunner : public IDqTaskRunner { auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true); if (transform) { - transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {}); + transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {}, PgBuilder_.get()); inputs.clear(); inputs.emplace_back(transform->TransformOutput); entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), @@ -592,7 +601,7 @@ class TDqTaskRunner : public IDqTaskRunner { } else { entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory, - {inputStats, entry->InputItemTypes[i]})); + {inputStats, entry->InputItemTypes[i]}, PgBuilder_.get())); } } @@ -663,7 +672,7 @@ class TDqTaskRunner : public IDqTaskRunner { if (transform) { auto guard = BindAllocator(); transform->TransformOutput = execCtx.CreateOutputConsumer(outputDesc, transform->TransformOutputType, - Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs)); + Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs), PgBuilder_.get()); outputs.clear(); outputs.emplace_back(transform->TransformInput); @@ -672,7 +681,7 @@ class TDqTaskRunner : public IDqTaskRunner { { auto guard = BindAllocator(); outputConsumers[i] = execCtx.CreateOutputConsumer(outputDesc, entry->OutputItemTypes[i], - Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs)); + Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs), PgBuilder_.get()); } } @@ -1017,6 +1026,8 @@ class TDqTaskRunner : public IDqTaskRunner { std::optional StartWaitInputTime; std::optional StartWaitOutputTime; + std::unique_ptr PgBuilder_ = CreatePgBuilder(); + void StartWaitingInput() { if (!StartWaitInputTime) { StartWaitInputTime = TInstant::Now(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 413083b5acdb..9b12a8738abc 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -20,6 +20,8 @@ #include #include +#include + #include #include @@ -145,7 +147,7 @@ class IDqTaskRunnerExecutionContext { const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector&& outputs) const = 0; + TVector&& outputs, NUdf::IPgBuilder* pgBuilder) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const = 0; virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const = 0; @@ -162,7 +164,7 @@ class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - TVector&& outputs) const override; + TVector&& outputs, NUdf::IPgBuilder* pgBuilder) const override; }; class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContextBase { @@ -210,7 +212,11 @@ struct TDqTaskRunnerMemoryLimits { }; NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type, - TVector&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory); + TVector&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::IPgBuilder* pgBuilder); + +IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector&& channels, NUdf::IPgBuilder* pgBuilder, TMaybe minFillPercentage = {}); IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,