Skip to content

Commit

Permalink
Allow PG in DQ output/input consumers (#13043)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st authored Dec 27, 2024
1 parent ad762d3 commit 8e01871
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 29 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const override
TVector<IDqOutput::TPtr>&& outputs, NUdf::IPgBuilder* pgBuilder) const override
{
return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), MinFillPercentage_);
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,10 @@ namespace {

class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext {
public:
NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<NDq::IDqOutput::TPtr>&& outputs) const override {
NDq::IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<NDq::IDqOutput::TPtr>&& outputs, NUdf::IPgBuilder* pgBuilder) const override
{
return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs), {});
}

Expand Down
15 changes: 8 additions & 7 deletions ydb/library/yql/dq/runtime/dq_input_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,14 @@ TVector<std::unique_ptr<IBlockReader>> MakeReaders(const TVector<NKikimr::NMiniK
return result;
}

TVector<std::unique_ptr<IArrayBuilder>> MakeBuilders(ui64 blockLen, const TVector<NKikimr::NMiniKQL::TType*> itemTypes) {
TVector<std::unique_ptr<IArrayBuilder>> MakeBuilders(ui64 blockLen, const TVector<NKikimr::NMiniKQL::TType*> itemTypes, NUdf::IPgBuilder* pgBuilder) {
TVector<std::unique_ptr<IArrayBuilder>> result;
TTypeInfoHelper helper;
for (auto& itemType : itemTypes) {
if (itemType) {
// TODO: pass memory pool
// TODO: IPgBuilder
YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
result.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), blockLen, nullptr));
result.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), blockLen, pgBuilder));
} else {
result.emplace_back();
}
Expand All @@ -380,13 +379,14 @@ class TDqInputMergeBlockStreamValue : public TComputationValue<TDqInputMergeBloc
using TBase = TComputationValue<TDqInputMergeBlockStreamValue>;
public:
TDqInputMergeBlockStreamValue(TMemoryUsageInfo* memInfo, const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats,
NUdf::IPgBuilder* pgBuilder)
: TBase(memInfo)
, SortCols_(std::move(sortCols))
, ItemTypes_(ExtractBlockItemTypes(type))
, MaxOutputBlockLen_(CalcMaxBlockLength(ItemTypes_.begin(), ItemTypes_.end(), TTypeInfoHelper()))
, Comparators_(MakeComparators(SortCols_, ItemTypes_))
, Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_))
, Builders_(MakeBuilders(MaxOutputBlockLen_, ItemTypes_, pgBuilder))
, Factory_(factory)
, Stats_(stats)
{
Expand Down Expand Up @@ -750,7 +750,8 @@ NUdf::TUnboxedValue CreateInputUnionValue(const NKikimr::NMiniKQL::TType* type,
}

NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats)
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory, TDqMeteringStats::TInputStatsMeter stats,
NUdf::IPgBuilder* pgBuilder)
{
ValidateInputTypes(type, inputs);
YQL_ENSURE(!inputs.empty());
Expand All @@ -761,7 +762,7 @@ NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TTyp
if (sortCols.empty()) {
return factory.Create<TDqInputUnionStreamValue<true>>(type, std::move(inputs), stats);
}
return factory.Create<TDqInputMergeBlockStreamValue>(type, std::move(inputs), std::move(sortCols), factory, stats);
return factory.Create<TDqInputMergeBlockStreamValue>(type, std::move(inputs), std::move(sortCols), factory, stats, pgBuilder);
}
return factory.Create<TDqInputMergeStreamValue<true>>(type, std::move(inputs), std::move(sortCols), stats);
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/dq/runtime/dq_input_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "dq_input_channel.h"
#include "dq_columns_resolve.h"

#include <yql/essentials/public/udf/udf_value_builder.h>

namespace NYql::NDq {

struct TDqMeteringStats {
Expand Down Expand Up @@ -33,6 +35,6 @@ NKikimr::NUdf::TUnboxedValue CreateInputUnionValue(const NKikimr::NMiniKQL::TTyp

NKikimr::NUdf::TUnboxedValue CreateInputMergeValue(const NKikimr::NMiniKQL::TType* type, TVector<IDqInput::TPtr>&& inputs,
TVector<TSortColumnInfo>&& sortCols, const NKikimr::NMiniKQL::THolderFactory& factory,
TDqMeteringStats::TInputStatsMeter = {});
TDqMeteringStats::TInputStatsMeter = {}, NUdf::IPgBuilder* pgBuilder = nullptr);

} // namespace NYql::NDq
14 changes: 9 additions & 5 deletions ydb/library/yql/dq/runtime/dq_output_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,16 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
TDqOutputHashPartitionConsumerBlock(TVector<IDqOutput::TPtr>&& outputs, TVector<TColumnInfo>&& keyColumns,
const NKikimr::NMiniKQL::TType* outputType,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TMaybe<ui8> minFillPercentage)
TMaybe<ui8> minFillPercentage,
NUdf::IPgBuilder* pgBuilder)
: OutputType_(static_cast<const NMiniKQL::TMultiType*>(outputType))
, HolderFactory_(holderFactory)
, Outputs_(std::move(outputs))
, KeyColumns_(std::move(keyColumns))
, ScalarColumnHashes_(KeyColumns_.size())
, OutputWidth_(OutputType_->GetElementsCount())
, MinFillPercentage_(minFillPercentage)
, PgBuilder_(pgBuilder)
{
TTypeInfoHelper helper;
YQL_ENSURE(OutputWidth_ > KeyColumns_.size());
Expand Down Expand Up @@ -517,8 +519,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
auto blockType = static_cast<const NMiniKQL::TBlockType*>(columnType);
if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) {
auto itemType = blockType->GetItemType();
YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr, {.MinFillPercentage=MinFillPercentage_}));
Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, PgBuilder_, {.MinFillPercentage=MinFillPercentage_}));
} else {
Builders_.emplace_back();
}
Expand All @@ -542,6 +543,8 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
TVector<std::unique_ptr<IArrayBuilder>> Builders_;

mutable bool IsWaitingFlag_ = false;

NUdf::IPgBuilder* PgBuilder_;
};

class TDqOutputBroadcastConsumer : public IDqOutputConsumer {
Expand Down Expand Up @@ -605,7 +608,8 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TMaybe<ui8> minFillPercentage)
TMaybe<ui8> minFillPercentage,
NUdf::IPgBuilder* pgBuilder)
{
YQL_ENSURE(!outputs.empty());
YQL_ENSURE(!keyColumns.empty());
Expand All @@ -624,7 +628,7 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
return MakeIntrusive<TDqOutputHashPartitionConsumerScalar>(std::move(outputs), std::move(keyColumns), outputType);
}

return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage);
return MakeIntrusive<TDqOutputHashPartitionConsumerBlock>(std::move(outputs), std::move(keyColumns), outputType, holderFactory, minFillPercentage, pgBuilder);
}

IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/dq/runtime/dq_output_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "dq_columns_resolve.h"
#include "dq_output.h"

#include <yql/essentials/public/udf/udf_value_builder.h>

#include <yql/essentials/minikql/mkql_alloc.h>

namespace NKikimr::NMiniKQL {
Expand Down Expand Up @@ -48,7 +50,8 @@ IDqOutputConsumer::TPtr CreateOutputHashPartitionConsumer(
TVector<IDqOutput::TPtr>&& outputs,
TVector<TColumnInfo>&& keyColumns, const NKikimr::NMiniKQL::TType* outputType,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TMaybe<ui8> minFillPercentage);
TMaybe<ui8> minFillPercentage,
NUdf::IPgBuilder* pgBuilder);

IDqOutputConsumer::TPtr CreateOutputBroadcastConsumer(TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui32> outputWidth);

Expand Down
31 changes: 21 additions & 10 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h>

#include <yql/essentials/parser/pg_wrapper/interface/utils.h>
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>

#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
Expand Down Expand Up @@ -141,7 +142,8 @@ void ValidateParamValue(std::string_view paramName, const TType* type, const NUd
#define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0)

NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,
TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory, TDqMeteringStats::TInputStatsMeter stats)
TVector<IDqInput::TPtr>&& inputs, const THolderFactory& holderFactory, TDqMeteringStats::TInputStatsMeter stats,
NUdf::IPgBuilder* pgBuilder)
{
switch (inputDesc.GetTypeCase()) {
case NYql::NDqProto::TTaskInput::kSource:
Expand All @@ -155,7 +157,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con
GetSortColumnsInfo(type, protoSortCols, sortColsInfo);
YQL_ENSURE(!sortColsInfo.empty());

return CreateInputMergeValue(type, std::move(inputs), std::move(sortColsInfo), holderFactory, stats);
return CreateInputMergeValue(type, std::move(inputs), std::move(sortColsInfo), holderFactory, stats, pgBuilder);
}
default:
YQL_ENSURE(false, "Unknown input type: " << (ui32) inputDesc.GetTypeCase());
Expand All @@ -164,7 +166,7 @@ NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, con

IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type,
const NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs, TMaybe<ui8> minFillPercentage)
TVector<IDqOutput::TPtr>&& outputs,NUdf::IPgBuilder* pgBuilder, TMaybe<ui8> minFillPercentage)
{
TMaybe<ui32> outputWidth;
if (type->IsMulti()) {
Expand All @@ -185,7 +187,7 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
GetColumnsInfo(type, outputDesc.GetHashPartition().GetKeyColumns(), keyColumns);
YQL_ENSURE(!keyColumns.empty());
YQL_ENSURE(outputDesc.GetHashPartition().GetPartitionsCount() == outputDesc.ChannelsSize());
return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage);
return CreateOutputHashPartitionConsumer(std::move(outputs), std::move(keyColumns), type, holderFactory, minFillPercentage, pgBuilder);
}

case NDqProto::TTaskOutput::kBroadcast: {
Expand All @@ -206,11 +208,18 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
}

IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& channels, TMaybe<ui8> minFillPercentage)
{
return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(channels), nullptr, minFillPercentage);
}

IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContextBase::CreateOutputConsumer(const TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const
const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs, NUdf::IPgBuilder* pgBuilder) const
{
return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs));
return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs), pgBuilder, {});
}

inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode statsMode) {
Expand Down Expand Up @@ -583,7 +592,7 @@ class TDqTaskRunner : public IDqTaskRunner {

auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true);
if (transform) {
transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {});
transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory, {}, PgBuilder_.get());
inputs.clear();
inputs.emplace_back(transform->TransformOutput);
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
Expand All @@ -592,7 +601,7 @@ class TDqTaskRunner : public IDqTaskRunner {
} else {
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory,
{inputStats, entry->InputItemTypes[i]}));
{inputStats, entry->InputItemTypes[i]}, PgBuilder_.get()));
}
}

Expand Down Expand Up @@ -663,7 +672,7 @@ class TDqTaskRunner : public IDqTaskRunner {
if (transform) {
auto guard = BindAllocator();
transform->TransformOutput = execCtx.CreateOutputConsumer(outputDesc, transform->TransformOutputType,
Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs));
Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs), PgBuilder_.get());

outputs.clear();
outputs.emplace_back(transform->TransformInput);
Expand All @@ -672,7 +681,7 @@ class TDqTaskRunner : public IDqTaskRunner {
{
auto guard = BindAllocator();
outputConsumers[i] = execCtx.CreateOutputConsumer(outputDesc, entry->OutputItemTypes[i],
Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs));
Context.ApplyCtx, typeEnv, holderFactory, std::move(outputs), PgBuilder_.get());
}
}

Expand Down Expand Up @@ -1017,6 +1026,8 @@ class TDqTaskRunner : public IDqTaskRunner {
std::optional<TInstant> StartWaitInputTime;
std::optional<TInstant> StartWaitOutputTime;

std::unique_ptr<NUdf::IPgBuilder> PgBuilder_ = CreatePgBuilder();

void StartWaitingInput() {
if (!StartWaitInputTime) {
StartWaitInputTime = TInstant::Now();
Expand Down
12 changes: 9 additions & 3 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <yql/essentials/minikql/mkql_node.h>
#include <yql/essentials/minikql/mkql_watermark.h>

#include <yql/essentials/public/udf/udf_value_builder.h>

#include <library/cpp/monlib/metrics/histogram_collector.h>

#include <util/generic/size_literals.h>
Expand Down Expand Up @@ -145,7 +147,7 @@ class IDqTaskRunnerExecutionContext {
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const = 0;
TVector<IDqOutput::TPtr>&& outputs, NUdf::IPgBuilder* pgBuilder) const = 0;

virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const = 0;
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const = 0;
Expand All @@ -162,7 +164,7 @@ class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext {
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const override;
TVector<IDqOutput::TPtr>&& outputs, NUdf::IPgBuilder* pgBuilder) const override;
};

class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContextBase {
Expand Down Expand Up @@ -210,7 +212,11 @@ struct TDqTaskRunnerMemoryLimits {
};

NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,
TVector<IDqInputChannel::TPtr>&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory);
TVector<IDqInputChannel::TPtr>&& channels, const NKikimr::NMiniKQL::THolderFactory& holderFactory, NUdf::IPgBuilder* pgBuilder);

IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& channels, NUdf::IPgBuilder* pgBuilder, TMaybe<ui8> minFillPercentage = {});

IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
Expand Down

0 comments on commit 8e01871

Please sign in to comment.