From e7c573f90847b91511e65ae4b6ae5869bb2c6c22 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 8 Jan 2025 18:22:27 +0300 Subject: [PATCH] Merge different schemas (#13192) --- ydb/core/formats/arrow/process_columns.cpp | 30 ++++- ydb/core/formats/arrow/process_columns.h | 112 +++++++++++++++++- ydb/core/kqp/ut/olap/helpers/typed_local.cpp | 3 + ydb/core/kqp/ut/olap/helpers/typed_local.h | 18 ++- ydb/core/kqp/ut/olap/write_ut.cpp | 40 ++++++- ydb/core/testlib/cs_helper.cpp | 10 +- .../columnshard/engines/scheme/index_info.cpp | 30 ++++- .../columnshard/engines/scheme/index_info.h | 5 + .../scheme/versions/abstract_scheme.cpp | 16 ++- .../engines/scheme/versions/abstract_scheme.h | 7 +- .../engines/writer/buffer/actor2.cpp | 3 +- .../engines/writer/buffer/actor2.h | 14 +-- .../engines/writer/buffer/events.h | 5 +- .../operations/batch_builder/builder.cpp | 10 +- .../operations/batch_builder/merger.cpp | 11 +- .../operations/batch_builder/merger.h | 20 ++-- .../operations/batch_builder/restore.cpp | 10 +- .../operations/batch_builder/restore.h | 6 +- .../operations/slice_builder/pack_builder.cpp | 63 +++++++--- .../operations/slice_builder/pack_builder.h | 6 +- ydb/library/accessor/validator_simple.cpp | 1 + ydb/library/accessor/validator_simple.h | 16 +++ ydb/library/accessor/ya.make | 1 + 23 files changed, 356 insertions(+), 81 deletions(-) create mode 100644 ydb/library/accessor/validator_simple.cpp create mode 100644 ydb/library/accessor/validator_simple.h diff --git a/ydb/core/formats/arrow/process_columns.cpp b/ydb/core/formats/arrow/process_columns.cpp index cdc18dd9d1db..89ee7e078cf4 100644 --- a/ydb/core/formats/arrow/process_columns.cpp +++ b/ydb/core/formats/arrow/process_columns.cpp @@ -138,6 +138,27 @@ TConclusion> ReorderImpl( } // namespace +TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(const std::vector& columnIndexes) + : ColumnIndexes(columnIndexes) { + for (ui32 i = 0; i + 1 < columnIndexes.size(); ++i) { + AFL_VERIFY(ColumnIndexes[i] < ColumnIndexes[i + 1]); + } +} + +TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(std::vector&& columnIndexes) + : ColumnIndexes(std::move(columnIndexes)) { + for (ui32 i = 0; i + 1 < ColumnIndexes.size(); ++i) { + AFL_VERIFY(ColumnIndexes[i] < ColumnIndexes[i + 1]); + } +} + +TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(const ui32 columnsCount) { + ColumnIndexes.reserve(columnsCount); + for (ui32 i = 0; i < columnsCount; ++i) { + ColumnIndexes.emplace_back(i); + } +} + std::shared_ptr TColumnOperator::Extract( const std::shared_ptr& incoming, const std::vector& columnNames) { return ExtractImpl(AbsentColumnPolicy, incoming, columnNames); @@ -259,7 +280,7 @@ TConclusion TColumnOperator::BuildSequentialSubset( } namespace { template -TConclusion> AdaptIncomingToDestinationExtImpl(const std::shared_ptr& incoming, +TConclusion> AdaptIncomingToDestinationExtImpl(const std::shared_ptr& incoming, const TSchemaLiteView& dstSchema, const std::function& checker, const std::function& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy, const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) { @@ -318,14 +339,17 @@ TConclusion> AdaptIncomingToDestinationExtImpl(c std::vector::TColumn>> columns; columns.reserve(resultColumns.size()); fields.reserve(resultColumns.size()); + std::vector indexes; for (auto&& i : resultColumns) { fields.emplace_back(dstSchema.field(i.Index)); columns.emplace_back(i.Column); + indexes.emplace_back(i.Index); } - return NAdapter::TDataBuilderPolicy::Build(std::make_shared(fields), std::move(columns), incoming->num_rows()); + return TContainerWithIndexes(indexes, + NAdapter::TDataBuilderPolicy::Build(std::make_shared(fields), std::move(columns), incoming->num_rows())); } } // namespace -TConclusion> TColumnOperator::AdaptIncomingToDestinationExt( +TConclusion> TColumnOperator::AdaptIncomingToDestinationExt( const std::shared_ptr& incoming, const TSchemaLiteView& dstSchema, const std::function& checker, const std::function& nameResolver) const { return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy); diff --git a/ydb/core/formats/arrow/process_columns.h b/ydb/core/formats/arrow/process_columns.h index 2eb7e77330b7..f6e6f3912890 100644 --- a/ydb/core/formats/arrow/process_columns.h +++ b/ydb/core/formats/arrow/process_columns.h @@ -1,7 +1,10 @@ #pragma once +#include +#include #include #include + #include namespace NKikimr::NArrow { @@ -10,6 +13,113 @@ class TSchemaSubset; class TSchemaLite; class TSchemaLiteView; +class TOrderedColumnIndexesImpl { +private: + YDB_READONLY_DEF(std::vector, ColumnIndexes); + +public: + TOrderedColumnIndexesImpl() = default; + + explicit TOrderedColumnIndexesImpl(const ui32 columnsCount); + explicit TOrderedColumnIndexesImpl(const std::vector& columnIndexes); + explicit TOrderedColumnIndexesImpl(std::vector&& columnIndexes); + + template + static std::vector MergeColumnIdxs(const std::vector& sources) { + class TIterator { + private: + std::vector::const_iterator ItCurrent; + std::vector::const_iterator ItFinish; + + public: + TIterator(const std::vector& indexes) + : ItCurrent(indexes.begin()) + , ItFinish(indexes.end()) { + } + + bool operator<(const TIterator& item) const { + return *ItCurrent > *item.ItCurrent; + } + + bool IsValid() const { + return ItCurrent != ItFinish; + } + + ui32 operator*() const { + return *ItCurrent; + } + + bool Next() { + return ++ItCurrent != ItFinish; + } + }; + + std::vector heapToMerge; + for (auto&& i : sources) { + heapToMerge.emplace_back(TIterator(i.GetColumnIndexes())); + if (!heapToMerge.back().IsValid()) { + heapToMerge.pop_back(); + } + } + std::make_heap(heapToMerge.begin(), heapToMerge.end()); + std::vector result; + while (heapToMerge.size()) { + std::pop_heap(heapToMerge.begin(), heapToMerge.end()); + if (result.empty() || result.back() != *heapToMerge.back()) { + result.emplace_back(*heapToMerge.back()); + } + if (!heapToMerge.back().Next()) { + heapToMerge.pop_back(); + } else { + std::push_heap(heapToMerge.begin(), heapToMerge.end()); + } + } + return result; + } +}; + +template +class TContainerWithIndexes: public TOrderedColumnIndexesImpl { +private: + using TBase = TOrderedColumnIndexesImpl; + YDB_ACCESSOR_DEF(std::shared_ptr, Container); + +public: + TContainerWithIndexes() = default; + + TContainerWithIndexes(const std::vector& columnIndexes, const std::shared_ptr& container) + : TBase(columnIndexes) + , Container(container) { + if (Container) { + Y_ABORT_UNLESS((ui32)Container->num_columns() == columnIndexes.size()); + } else { + Y_ABORT_UNLESS(!columnIndexes.size()); + } + } + + explicit TContainerWithIndexes(const std::shared_ptr& container) + : TBase(TSimpleValidator::CheckNotNull(container)->num_columns()) + , Container(container) { + } + + TContainerWithIndexes BuildWithAnotherContainer(const std::shared_ptr& container) const { + return TContainerWithIndexes(GetColumnIndexes(), container); + } + + bool operator!() const { + return !HasContainer(); + } + + bool HasContainer() const { + return !!Container; + } + + const TDataContainer* operator->() const { + return Container.get(); + } + +}; + class TColumnOperator { public: enum class EAbsentFieldPolicy { @@ -59,7 +169,7 @@ class TColumnOperator { return *this; } - TConclusion> AdaptIncomingToDestinationExt(const std::shared_ptr& incoming, + TConclusion> AdaptIncomingToDestinationExt(const std::shared_ptr& incoming, const TSchemaLiteView& dstSchema, const std::function& checker, const std::function& nameResolver) const; diff --git a/ydb/core/kqp/ut/olap/helpers/typed_local.cpp b/ydb/core/kqp/ut/olap/helpers/typed_local.cpp index 492bc7f4d2c0..e9704de25e6c 100644 --- a/ydb/core/kqp/ut/olap/helpers/typed_local.cpp +++ b/ydb/core/kqp/ut/olap/helpers/typed_local.cpp @@ -15,6 +15,9 @@ TString TTypedLocalHelper::GetTestTableSchema() const { if (TypeName) { result = R"(Columns { Name: "field" Type: ")" + TypeName + "\"}"; } + if (TypeName1) { + result += R"(Columns { Name: "field1" Type: ")" + TypeName1 + "\"}"; + } result += R"( Columns { Name: "pk_int" Type: "Int64" NotNull: true } Columns { Name: "ts" Type: "Timestamp" } diff --git a/ydb/core/kqp/ut/olap/helpers/typed_local.h b/ydb/core/kqp/ut/olap/helpers/typed_local.h index a0ead56e8769..9feb72ea3a63 100644 --- a/ydb/core/kqp/ut/olap/helpers/typed_local.h +++ b/ydb/core/kqp/ut/olap/helpers/typed_local.h @@ -17,6 +17,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper { private: using TBase = Tests::NCS::THelper; const TString TypeName; + const TString TypeName1; TKikimrRunner& KikimrRunner; const TString TablePath; const TString TableName; @@ -37,6 +38,18 @@ class TTypedLocalHelper: public Tests::NCS::THelper { SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64"); } + TTypedLocalHelper(const TString& typeName, const TString& typeName1, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", + const TString& storeName = "olapStore") + : TBase(kikimrRunner.GetTestServer()) + , TypeName(typeName) + , TypeName1(typeName1) + , KikimrRunner(kikimrRunner) + , TablePath(storeName.empty() ? "/Root/" + tableName : "/Root/" + storeName + "/" + tableName) + , TableName(tableName) + , StoreName(storeName) { + SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64"); + } + class TSimultaneousWritingSession { private: bool Finished = false; @@ -54,13 +67,12 @@ class TTypedLocalHelper: public Tests::NCS::THelper { } template - void FillTable(const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const { - AFL_VERIFY(!Finished); + void FillTable(const TString& fieldName, const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const { std::vector builders; builders.emplace_back( NArrow::NConstruction::TSimpleArrayConstructor>::BuildNotNullable( "pk_int", numRows * pkKff)); - builders.emplace_back(std::make_shared>("field", fillPolicy)); + builders.emplace_back(std::make_shared>(fieldName, fillPolicy)); NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders); std::shared_ptr batch = batchBuilder.BuildBatch(numRows); SendDataViaActorSystem(TablePath, batch, Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/write_ut.cpp b/ydb/core/kqp/ut/olap/write_ut.cpp index 470bd0e9a0bc..52550aca661c 100644 --- a/ydb/core/kqp/ut/olap/write_ut.cpp +++ b/ydb/core/kqp/ut/olap/write_ut.cpp @@ -202,11 +202,11 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) { TTypedLocalHelper helper("Utf8", kikimr); helper.CreateTestOlapTable(); auto writeSession = helper.StartWriting("/Root/olapStore/olapTable"); - writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000); + writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000); Sleep(TDuration::Seconds(1)); - writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000); + writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000); Sleep(TDuration::Seconds(1)); - writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000); + writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000); Sleep(TDuration::Seconds(1)); writeSession.Finalize(); @@ -228,6 +228,40 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) { UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("field")), "ccc"); } + Y_UNIT_TEST(MultiWriteInTimeDiffSchemas) { + auto settings = TKikimrSettings().SetWithSampleTables(false); + settings.AppConfig.MutableColumnShardConfig()->SetWritingBufferDurationMs(15000); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTypedLocalHelper helper("Utf8", "Utf8", kikimr); + helper.CreateTestOlapTable(); + auto writeGuard = helper.StartWriting("/Root/olapStore/olapTable"); + writeGuard.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000); + Sleep(TDuration::Seconds(1)); + writeGuard.FillTable("field1", NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000); + Sleep(TDuration::Seconds(1)); + writeGuard.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000); + Sleep(TDuration::Seconds(1)); + writeGuard.Finalize(); + + auto selectQuery = TString(R"( + SELECT + field, count(*) as count, + FROM `/Root/olapStore/olapTable` + GROUP BY field + ORDER BY field + )"); + + auto tableClient = kikimr.GetTableClient(); + auto rows = ExecuteScanQuery(tableClient, selectQuery); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("count")), 200000); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("field")), ""); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("count")), 400000); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[1].at("field")), "aaa"); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("count")), 800000); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("field")), "ccc"); + } + Y_UNIT_TEST(WriteDeleteCleanGC) { auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetSmallSizeDetector(1000000); diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 37a08dd30a2f..a22469415b0f 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -91,7 +91,7 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt Cerr << "\n"; } UNIT_ASSERT_VALUES_EQUAL(op.status(), expectedStatus); - }); + }); TDispatchOptions options; options.CustomFinalCondition = [&]() { @@ -228,7 +228,7 @@ void THelper::CreateSchemaOlapTablesWithStore(const TString tableSchema, TVector } void THelper::CreateOlapTablesWithStore(TVector tableNames /*= {"olapTable"}*/, TString storeName /*= "olapStore"*/, ui32 storeShardsCount /*= 4*/, ui32 tableShardsCount /*= 3*/) { - CreateSchemaOlapTablesWithStore(GetTestTableSchema(), tableNames, storeName, storeShardsCount, tableShardsCount); + CreateSchemaOlapTablesWithStore(GetTestTableSchema(), tableNames, storeName, storeShardsCount, tableShardsCount); } void THelper::CreateSchemaOlapTables(const TString tableSchema, TVector tableNames, ui32 tableShardsCount) { @@ -254,7 +254,7 @@ void THelper::CreateSchemaOlapTables(const TString tableSchema, TVector } void THelper::CreateOlapTables(TVector tableNames /*= {"olapTable"}*/, ui32 tableShardsCount /*= 3*/) { - CreateSchemaOlapTables(GetTestTableSchema(), tableNames, tableShardsCount); + CreateSchemaOlapTables(GetTestTableSchema(), tableNames, tableShardsCount); } // Clickbench table @@ -262,7 +262,7 @@ void THelper::CreateOlapTables(TVector tableNames /*= {"olapTable"}*/, std::shared_ptr TCickBenchHelper::GetArrowSchema() const { return std::make_shared( std::vector> { - arrow::field("WatchID", arrow::int64(), false), + arrow::field("WatchID", arrow::int64(), false), arrow::field("JavaEnable", arrow::int16(), false), arrow::field("Title", arrow::utf8(), false), arrow::field("GoodEvent", arrow::int16(), false), @@ -430,7 +430,7 @@ std::shared_ptr TCickBenchHelper::TestArrowBatch(ui64, ui64 std::shared_ptr TTableWithNullsHelper::GetArrowSchema() const { return std::make_shared( std::vector>{ - arrow::field("id", arrow::int32(), false), + arrow::field("id", arrow::int32(), false), arrow::field("resource_id", arrow::utf8()), arrow::field("level", arrow::int32()), arrow::field("binary_str", arrow::binary()), diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 62abfcf9be9a..69b63c5fafbb 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -165,7 +165,7 @@ std::shared_ptr TIndexInfo::GetColumnFieldVerified(const ui32 colu } std::shared_ptr TIndexInfo::GetColumnsSchema(const std::set& columnIds) const { - Y_ABORT_UNLESS(columnIds.size()); + AFL_VERIFY(columnIds.size()); std::vector> fields; for (auto&& i : columnIds) { fields.emplace_back(GetColumnFieldVerified(i)); @@ -173,6 +173,20 @@ std::shared_ptr TIndexInfo::GetColumnsSchema(const std::set return std::make_shared(fields); } +std::shared_ptr TIndexInfo::GetColumnsSchemaByOrderedIndexes(const std::vector& columnIdxs) const { + AFL_VERIFY(columnIdxs.size()); + std::vector> fields; + std::optional predColumnIdx; + for (auto&& i : columnIdxs) { + if (predColumnIdx) { + AFL_VERIFY(*predColumnIdx < i); + } + predColumnIdx = i; + fields.emplace_back(ArrowSchemaWithSpecials()->GetFieldByIndexVerified(i)); + } + return std::make_shared(fields); +} + std::shared_ptr TIndexInfo::GetColumnSchema(const ui32 columnId) const { return GetColumnsSchema({ columnId }); } @@ -621,4 +635,18 @@ TIndexInfo TIndexInfo::BuildDefault() { return result; } +TConclusion> TIndexInfo::BuildDefaultColumn( + const ui32 fieldIndex, const ui32 rowsCount, const bool force) const { + auto defaultValue = GetColumnExternalDefaultValueByIndexVerified(fieldIndex); + auto f = ArrowSchemaWithSpecials()->GetFieldByIndexVerified(fieldIndex); + if (!defaultValue && !IsNullableVerifiedByIndex(fieldIndex)) { + if (force) { + defaultValue = NArrow::DefaultScalar(f->type()); + } else { + return TConclusionStatus::Fail("not nullable field with no default: " + f->name()); + } + } + return NArrow::TThreadSimpleArraysCache::Get(f->type(), defaultValue, rowsCount); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index eaf4f63eb2ea..ddc655b600d9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -162,6 +162,10 @@ struct TIndexInfo: public IIndexInfo { AFL_VERIFY(MetadataManagerConstructor); return MetadataManagerConstructor; } + + TConclusion> BuildDefaultColumn(const ui32 fieldIndex, const ui32 rowsCount, const bool force) const; + + bool IsNullableVerifiedByIndex(const ui32 colIndex) const { AFL_VERIFY(colIndex < ColumnFeatures.size()); return ColumnFeatures[colIndex]->GetIsNullable(); @@ -269,6 +273,7 @@ struct TIndexInfo: public IIndexInfo { std::shared_ptr GetColumnFieldVerified(const ui32 columnId) const; std::shared_ptr GetColumnSchema(const ui32 columnId) const; std::shared_ptr GetColumnsSchema(const std::set& columnIds) const; + std::shared_ptr GetColumnsSchemaByOrderedIndexes(const std::vector& columnIds) const; TColumnSaver GetColumnSaver(const ui32 columnId) const; virtual const std::shared_ptr& GetColumnLoaderOptional(const ui32 columnId) const override; std::optional GetColumnNameOptional(const ui32 columnId) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp index d8009b2d8531..c5899e2f9dc0 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp @@ -22,6 +22,13 @@ std::shared_ptr ISnapshotSchema::GetFieldByIndex(const int index) } return schema->field(index); } + +std::shared_ptr ISnapshotSchema::GetFieldByIndexVerified(const int index) const { + auto schema = GetSchema(); + AFL_VERIFY(!!schema && index >= 0 && index < schema->num_fields()); + return schema->field(index); +} + std::shared_ptr ISnapshotSchema::GetFieldByColumnIdOptional(const ui32 columnId) const { return GetFieldByIndex(GetFieldIndex(columnId)); } @@ -68,7 +75,7 @@ TConclusion> ISnapshotSchema::Normali return result; } -TConclusion> ISnapshotSchema::PrepareForModification( +TConclusion> ISnapshotSchema::PrepareForModification( const std::shared_ptr& incomingBatch, const NEvWrite::EModificationType mType) const { if (!incomingBatch) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "DeserializeBatch() failed"); @@ -134,9 +141,10 @@ TConclusion> ISnapshotSchema::PrepareForModi if (pkColumnsCount < pkColumns.size()) { return TConclusionStatus::Fail("not enough pk fields"); } - auto batch = NArrow::SortBatch(batchConclusion.DetachResult(), pkColumns, true); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, GetIndexInfo().GetPrimaryKey())); - return batch; + auto result = batchConclusion.DetachResult(); + result.MutableContainer() = NArrow::SortBatch(result.GetContainer(), pkColumns, true); + Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(result.GetContainer(), GetIndexInfo().GetPrimaryKey())); + return result; } std::set ISnapshotSchema::GetColumnIdsToDelete(const ISnapshotSchema::TPtr& targetSchema) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h index 825f3f7e543b..967509d228f8 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include @@ -60,6 +61,7 @@ class ISnapshotSchema { ui32 GetColumnId(const std::string& columnName) const; std::shared_ptr GetFieldByIndex(const int index) const; + std::shared_ptr GetFieldByIndexVerified(const int index) const; std::shared_ptr GetFieldByColumnIdOptional(const ui32 columnId) const; std::shared_ptr GetFieldByColumnIdVerified(const ui32 columnId) const; @@ -80,11 +82,12 @@ class ISnapshotSchema { [[nodiscard]] TConclusion> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr& batch, const std::set& restoreColumnIds) const; - [[nodiscard]] TConclusion> PrepareForModification( + [[nodiscard]] TConclusion> PrepareForModification( const std::shared_ptr& incomingBatch, const NEvWrite::EModificationType mType) const; [[nodiscard]] TConclusion PrepareForWrite(const ISnapshotSchema::TPtr& selfPtr, const ui64 pathId, const std::shared_ptr& incomingBatch, const NEvWrite::EModificationType mType, - const std::shared_ptr& storagesManager, const std::shared_ptr& splitterCounters) const; + const std::shared_ptr& storagesManager, + const std::shared_ptr& splitterCounters) const; void AdaptBatchToSchema(NArrow::TGeneralContainer& batch, const ISnapshotSchema::TPtr& targetSchema) const; std::set GetColumnIdsToDelete(const ISnapshotSchema::TPtr& targetSchema) const; std::vector ConvertColumnIdsToIndexes(const std::set& idxs) const; diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp index 5252c742bd64..34ad09d3f006 100644 --- a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp @@ -50,8 +50,7 @@ void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) { SumSize += evBase->GetWriteData()->GetSize(); const ui64 pathId = evBase->GetWriteData()->GetWriteMeta().GetTableId(); const ui64 schemaVersion = evBase->GetContext()->GetActualSchema()->GetVersion(); - TAggregationId aggrId(pathId, schemaVersion, evBase->GetWriteData()->GetWriteMeta().GetModificationType(), - evBase->GetRecordBatch()->schema()->ToString()); + TAggregationId aggrId(pathId, schemaVersion, evBase->GetWriteData()->GetWriteMeta().GetModificationType()); auto it = Aggregations.find(aggrId); if (it == Aggregations.end()) { it = Aggregations diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.h b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.h index 49f7c481368d..805e1d149f05 100644 --- a/ydb/core/tx/columnshard/engines/writer/buffer/actor2.h +++ b/ydb/core/tx/columnshard/engines/writer/buffer/actor2.h @@ -17,26 +17,20 @@ class TAggregationId { const ui64 PathId; const ui64 SchemaVersion; const NEvWrite::EModificationType ModificationType; - const TString SchemaDescription; - const ui64 SchemaDescriptionHash; public: - TAggregationId(const ui64 pathId, const ui64 schemaVersion, const NEvWrite::EModificationType mType, const TString& schemaDescription) + TAggregationId(const ui64 pathId, const ui64 schemaVersion, const NEvWrite::EModificationType mType) : PathId(pathId) , SchemaVersion(schemaVersion) - , ModificationType(mType) - , SchemaDescription(schemaDescription) - , SchemaDescriptionHash(MurmurHash(SchemaDescription.data(), SchemaDescription.size())) { + , ModificationType(mType) { } bool operator==(const TAggregationId& item) const { - return PathId == item.PathId && SchemaVersion == item.SchemaVersion && ModificationType == item.ModificationType && - SchemaDescription == item.SchemaDescription; + return PathId == item.PathId && SchemaVersion == item.SchemaVersion && ModificationType == item.ModificationType; } operator size_t() const { - return CombineHashes( - CombineHashes(CombineHashes(PathId, SchemaVersion), (ui64)ModificationType), SchemaDescriptionHash); + return CombineHashes(CombineHashes(PathId, SchemaVersion), (ui64)ModificationType); } }; diff --git a/ydb/core/tx/columnshard/engines/writer/buffer/events.h b/ydb/core/tx/columnshard/engines/writer/buffer/events.h index a35bc9e35db2..d880b859aaa2 100644 --- a/ydb/core/tx/columnshard/engines/writer/buffer/events.h +++ b/ydb/core/tx/columnshard/engines/writer/buffer/events.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -42,12 +43,12 @@ class TEvAddInsertedDataToBuffer : public NActors::TEventLocal { private: YDB_READONLY_DEF(std::shared_ptr, WriteData); - YDB_READONLY_DEF(std::shared_ptr, RecordBatch); + YDB_READONLY_DEF(NArrow::TContainerWithIndexes, RecordBatch); YDB_READONLY_DEF(std::shared_ptr, Context); public: explicit TEvAddInsertedDataToBuffer(const std::shared_ptr& writeData, - const std::shared_ptr& recordBatch, const std::shared_ptr& context) + const NArrow::TContainerWithIndexes& recordBatch, const std::shared_ptr& context) : WriteData(writeData) , RecordBatch(recordBatch) , Context(context) { diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp index 80d57ffc9ecd..cf9746640459 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp @@ -49,11 +49,11 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr& /*t std::shared_ptr merger; switch (WriteData.GetWriteMeta().GetModificationType()) { case NEvWrite::EModificationType::Upsert: { - const std::vector> defaultFields = Context.GetActualSchema()->GetAbsentFields(batch->schema()); + const std::vector> defaultFields = Context.GetActualSchema()->GetAbsentFields(batch.GetContainer()->schema()); if (defaultFields.empty()) { if (!WriteData.GetWritePortions() || !Context.GetNoTxWrite()) { std::shared_ptr task = - std::make_shared(std::move(WriteData), batch, Context); + std::make_shared(std::move(WriteData), batch.GetContainer(), Context); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } else { NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(), @@ -68,8 +68,8 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr& /*t auto batchDefault = conclusion.DetachResult(); NArrow::NMerger::TSortableBatchPosition pos( batchDefault, 0, batchDefault->schema()->field_names(), batchDefault->schema()->field_names(), false); - merger = std::make_shared( - batch, Context.GetActualSchema(), insertionConclusion.IsSuccess() ? "" : insertionConclusion.GetErrorMessage(), pos); + merger = std::make_shared(batch, Context.GetActualSchema(), + insertionConclusion.IsSuccess() ? "" : insertionConclusion.GetErrorMessage(), pos); break; } } @@ -85,7 +85,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr& /*t case NEvWrite::EModificationType::Delete: { if (!WriteData.GetWritePortions() || !Context.GetNoTxWrite()) { std::shared_ptr task = - std::make_shared(std::move(WriteData), batch, Context); + std::make_shared(std::move(WriteData), batch.GetContainer(), Context); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } else { NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(), diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp index ceb87eb027e5..8d4ddecc042f 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp @@ -60,7 +60,8 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo return TConclusionStatus::Success(); } -TUpdateMerger::TUpdateMerger(const std::shared_ptr& incoming, const std::shared_ptr& actualSchema, +TUpdateMerger::TUpdateMerger(const NArrow::TContainerWithIndexes& incoming, + const std::shared_ptr& actualSchema, const TString& insertDenyReason, const std::optional& defaultExists /*= {}*/) : TBase(incoming, actualSchema) , Builder({ actualSchema->GetIndexInfo().ArrowSchema().begin(), actualSchema->GetIndexInfo().ArrowSchema().end() }) @@ -85,4 +86,12 @@ TUpdateMerger::TUpdateMerger(const std::shared_ptr& incoming } } } + +NArrow::TContainerWithIndexes TUpdateMerger::BuildResultBatch() { + auto resultBatch = Builder.Finalize(); + AFL_VERIFY(Schema->GetColumnsCount() == (ui32)resultBatch->num_columns() + IIndexInfo::SpecialColumnsCount)("schema", + Schema->GetColumnsCount())("result", resultBatch->num_columns()); + return NArrow::TContainerWithIndexes(resultBatch); +} + } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.h b/ydb/core/tx/columnshard/operations/batch_builder/merger.h index e503f742b18f..f1d8fcddfa84 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.h +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.h @@ -15,11 +15,11 @@ class IMerger { virtual TConclusionStatus OnIncomingOnly(const NArrow::NMerger::TSortableBatchPosition& incoming) = 0; protected: std::shared_ptr Schema; - std::shared_ptr IncomingData; + NArrow::TContainerWithIndexes IncomingData; bool IncomingFinished = false; public: - IMerger(const std::shared_ptr& incoming, const std::shared_ptr& actualSchema) - : IncomingPosition(incoming, 0, actualSchema->GetPKColumnNames(), incoming->schema()->field_names(), false) + IMerger(const NArrow::TContainerWithIndexes& incoming, const std::shared_ptr& actualSchema) + : IncomingPosition(incoming.GetContainer(), 0, actualSchema->GetPKColumnNames(), incoming->schema()->field_names(), false) , Schema(actualSchema) , IncomingData(incoming) { IncomingFinished = !IncomingPosition.InitPosition(0); @@ -27,7 +27,7 @@ class IMerger { virtual ~IMerger() = default; - virtual std::shared_ptr BuildResultBatch() = 0; + virtual NArrow::TContainerWithIndexes BuildResultBatch() = 0; TConclusionStatus Finish(); @@ -45,7 +45,7 @@ class TInsertMerger: public IMerger { } public: using TBase::TBase; - virtual std::shared_ptr BuildResultBatch() override { + virtual NArrow::TContainerWithIndexes BuildResultBatch() override { return IncomingData; } }; @@ -65,9 +65,9 @@ class TReplaceMerger: public IMerger { public: using TBase::TBase; - virtual std::shared_ptr BuildResultBatch() override { + virtual NArrow::TContainerWithIndexes BuildResultBatch() override { auto result = IncomingData; - AFL_VERIFY(Filter.Apply(result)); + AFL_VERIFY(Filter.Apply(result.MutableContainer())); return result; } }; @@ -92,11 +92,9 @@ class TUpdateMerger: public IMerger { } } public: - virtual std::shared_ptr BuildResultBatch() override { - return Builder.Finalize(); - } + virtual NArrow::TContainerWithIndexes BuildResultBatch() override; - TUpdateMerger(const std::shared_ptr& incoming, const std::shared_ptr& actualSchema, + TUpdateMerger(const NArrow::TContainerWithIndexes& incoming, const std::shared_ptr& actualSchema, const TString& insertDenyReason, const std::optional& defaultExists = {}); }; diff --git a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp index c7a722979f48..67b8de6f784b 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp @@ -11,9 +11,9 @@ std::unique_ptr TModificationRestoreTask::DoBui auto request = std::make_unique(LocalPathId, WriteData.GetWriteMeta().GetLockIdOptional()); request->TaskIdentifier = GetTaskId(); request->ReadToSnapshot = Snapshot; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_start")("count", IncomingData ? IncomingData->num_rows() : 0)( + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_start")("count", IncomingData.HasContainer() ? IncomingData->num_rows() : 0)( "task_id", WriteData.GetWriteMeta().GetId()); - auto pkData = NArrow::TColumnOperator().VerifyIfAbsent().Extract(IncomingData, Context.GetActualSchema()->GetPKColumnNames()); + auto pkData = NArrow::TColumnOperator().VerifyIfAbsent().Extract(IncomingData.GetContainer(), Context.GetActualSchema()->GetPKColumnNames()); request->RangesFilter = TPKRangesFilter::BuildFromRecordBatchLines(pkData, false); for (auto&& i : Context.GetActualSchema()->GetIndexInfo().GetColumnIds(false)) { request->AddColumn(i, Context.GetActualSchema()->GetIndexInfo().GetColumnName(i)); @@ -49,7 +49,7 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() { auto batchResult = Merger->BuildResultBatch(); if (!WriteData.GetWritePortions() || !Context.GetNoTxWrite()) { std::shared_ptr task = - std::make_shared(std::move(WriteData), batchResult, Context); + std::make_shared(std::move(WriteData), batchResult.GetContainer(), Context); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } else { NActors::TActivationContext::ActorSystem()->Send( @@ -59,8 +59,8 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() { return TConclusionStatus::Success(); } -TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, - const std::shared_ptr& merger, const TSnapshot actualSnapshot, const std::shared_ptr& incomingData, +TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, + const TSnapshot actualSnapshot, const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context) : TBase(context.GetTabletId(), context.GetTabletActorId(), writeData.GetWriteMeta().GetId() + "::" + ::ToString(writeData.GetWriteMeta().GetWriteId())) diff --git a/ydb/core/tx/columnshard/operations/batch_builder/restore.h b/ydb/core/tx/columnshard/operations/batch_builder/restore.h index b63145c7807e..67e061e81965 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/restore.h +++ b/ydb/core/tx/columnshard/operations/batch_builder/restore.h @@ -15,7 +15,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn std::shared_ptr Merger; const ui64 LocalPathId; const TSnapshot Snapshot; - std::shared_ptr IncomingData; + NArrow::TContainerWithIndexes IncomingData; const TWritingContext Context; virtual std::unique_ptr DoBuildRequestInitiator() const override; @@ -31,8 +31,8 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn virtual TDuration GetTimeout() const override; - TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, - const TSnapshot actualSnapshot, const std::shared_ptr& incomingData, const TWritingContext& context); + TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, const TSnapshot actualSnapshot, + const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context); }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp index bbea6c07d710..e335268253a8 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp @@ -70,9 +70,8 @@ class TPortionWriteController: public NColumnShard::IWriteController, class TSliceToMerge { private: - YDB_READONLY_DEF(std::vector>, Batches); + YDB_READONLY_DEF(std::vector>, Batches); std::vector SequentialWriteId; - std::vector> Schemas; const ui64 PathId; const NEvWrite::EModificationType ModificationType; @@ -82,31 +81,57 @@ class TSliceToMerge { , ModificationType(modificationType) { } - void Add(const std::shared_ptr& rb, const std::shared_ptr& data) { + void Add(const NArrow::TContainerWithIndexes& rb, const std::shared_ptr& data) { if (!rb) { return; } Batches.emplace_back(rb); SequentialWriteId.emplace_back(data->GetWriteMeta().GetWriteId()); - Schemas.emplace_back(rb->schema()); } - void Finalize(const NOlap::TWritingContext& context, std::vector& result) { + [[nodiscard]] TConclusionStatus Finalize(const NOlap::TWritingContext& context, std::vector& result) { if (Batches.size() == 0) { - return; + return TConclusionStatus::Success(); } if (Batches.size() == 1) { - auto portionConclusion = context.GetActualSchema()->PrepareForWrite(context.GetActualSchema(), PathId, Batches.front(), + auto portionConclusion = context.GetActualSchema()->PrepareForWrite(context.GetActualSchema(), PathId, Batches.front().GetContainer(), ModificationType, context.GetStoragesManager(), context.GetSplitterCounters()); result.emplace_back(portionConclusion.DetachResult()); } else { ui32 idx = 0; std::vector> containers; ui32 recordsCountSum = 0; - const std::shared_ptr dataSchema = Batches.front()->schema(); + auto indexes = NArrow::TOrderedColumnIndexesImpl::MergeColumnIdxs(Batches); + std::shared_ptr dataSchema; + const auto& indexInfo = context.GetActualSchema()->GetIndexInfo(); for (auto&& i : Batches) { - auto gContainer = std::make_shared(i); -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("data", NArrow::DebugJson(i, 5, 5))("write_id", SequentialWriteId[idx]); + std::shared_ptr gContainer; + if (i.GetColumnIndexes().size() == indexes.size()) { + if (!dataSchema) { + dataSchema = i->schema(); + } + gContainer = std::make_shared(i.GetContainer()); + } else { + gContainer = std::make_shared(i->num_rows()); + auto itBatchIndexes = i.GetColumnIndexes().begin(); + AFL_VERIFY(i.GetColumnIndexes().size() < indexes.size()); + for (auto itAllIndexes = indexes.begin(); itAllIndexes != indexes.end(); ++itAllIndexes) { + if (itBatchIndexes == i.GetColumnIndexes().end() || *itAllIndexes < *itBatchIndexes) { + auto defaultColumn = indexInfo.BuildDefaultColumn(*itAllIndexes, i->num_rows(), false); + if (defaultColumn.IsFail()) { + return defaultColumn; + } + gContainer->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), defaultColumn.DetachResult()).Validate(); + } else { + AFL_VERIFY(*itAllIndexes == *itBatchIndexes); + gContainer->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), + i->column(itBatchIndexes - i.GetColumnIndexes().begin())) + .Validate(); + ++itBatchIndexes; + } + } + } + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("data", NArrow::DebugJson(i, 5, 5))("write_id", SequentialWriteId[idx]); recordsCountSum += i->num_rows(); gContainer ->AddField(IIndexInfo::GetWriteIdField(), NArrow::TStatusValidator::GetValid(arrow::MakeArrayFromScalar( @@ -115,6 +140,9 @@ class TSliceToMerge { ++idx; containers.emplace_back(gContainer); } + if (!dataSchema) { + dataSchema = indexInfo.GetColumnsSchemaByOrderedIndexes(indexes); + } NArrow::NMerger::TMergePartialStream stream( context.GetActualSchema()->GetIndexInfo().GetReplaceKey(), dataSchema, false, { IIndexInfo::GetWriteIdField()->name() }); for (auto&& i : containers) { @@ -126,6 +154,7 @@ class TSliceToMerge { ModificationType, context.GetStoragesManager(), context.GetSplitterCounters()); result.emplace_back(portionConclusion.DetachResult()); } + return TConclusionStatus::Success(); } }; @@ -150,25 +179,25 @@ TConclusionStatus TBuildPackSlicesTask::DoExecute(const std::shared_ptr& continue; } auto batches = NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInIntervalPositions( - originalBatch, Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->field_names(), splitPositions); + originalBatch.GetContainer(), Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->field_names(), splitPositions); std::shared_ptr pkBatch = - NArrow::TColumnOperator().Extract(originalBatch, Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->fields()); + NArrow::TColumnOperator().Extract(originalBatch.GetContainer(), Context.GetActualSchema()->GetIndexInfo().GetPrimaryKey()->fields()); writeResults.emplace_back(unit.GetData()->GetWriteMeta(), unit.GetData()->GetSize(), pkBatch, false, originalBatch->num_rows()); ui32 idx = 0; for (auto&& batch : batches) { if (!!batch) { - slicesToMerge[idx].Add(batch, unit.GetData()); + slicesToMerge[idx].Add(originalBatch.BuildWithAnotherContainer(batch), unit.GetData()); } ++idx; } } std::vector portionsToWrite; for (auto&& i : slicesToMerge) { - i.Finalize(Context, portionsToWrite); + i.Finalize(Context, portionsToWrite).Validate(); } auto actions = WriteUnits.front().GetData()->GetBlobsAction(); - auto writeController = std::make_shared( - Context.GetTabletActorId(), actions, std::move(writeResults), std::move(portionsToWrite)); + auto writeController = + std::make_shared(Context.GetTabletActorId(), actions, std::move(writeResults), std::move(portionsToWrite)); if (actions->NeedDraftTransaction()) { TActorContext::AsActorContext().Send( Context.GetTabletActorId(), std::make_unique(writeController)); @@ -177,4 +206,4 @@ TConclusionStatus TBuildPackSlicesTask::DoExecute(const std::shared_ptr& } return TConclusionStatus::Success(); } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap::NWritingPortions diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.h b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.h index 2e90adebf5f4..fdde60fea882 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.h +++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.h @@ -11,14 +11,14 @@ namespace NKikimr::NOlap::NWritingPortions { class TWriteUnit { private: YDB_READONLY_DEF(std::shared_ptr, Data); - YDB_READONLY_DEF(std::shared_ptr, Batch); + YDB_READONLY_DEF(NArrow::TContainerWithIndexes, Batch); public: - TWriteUnit(const std::shared_ptr& data, const std::shared_ptr& batch) + TWriteUnit(const std::shared_ptr& data, const NArrow::TContainerWithIndexes& batch) : Data(data) , Batch(batch) { AFL_VERIFY(Data->GetWritePortions()); - AFL_VERIFY(Batch); + AFL_VERIFY(Batch.HasContainer()); } }; diff --git a/ydb/library/accessor/validator_simple.cpp b/ydb/library/accessor/validator_simple.cpp new file mode 100644 index 000000000000..1edc584b2cec --- /dev/null +++ b/ydb/library/accessor/validator_simple.cpp @@ -0,0 +1 @@ +#include "validator_simple.h" diff --git a/ydb/library/accessor/validator_simple.h b/ydb/library/accessor/validator_simple.h new file mode 100644 index 000000000000..54d44743aef1 --- /dev/null +++ b/ydb/library/accessor/validator_simple.h @@ -0,0 +1,16 @@ +#pragma once +#include + +class TSimpleValidator { +public: + template + static const T& CheckNotNull(const T& object) { + Y_ABORT_UNLESS(!!object); + return object; + } + template + static T&& CheckNotNull(T&& object) { + Y_ABORT_UNLESS(!!object); + return std::forward(object); + } +}; diff --git a/ydb/library/accessor/ya.make b/ydb/library/accessor/ya.make index 876a37f1f70b..057baaa27ce9 100644 --- a/ydb/library/accessor/ya.make +++ b/ydb/library/accessor/ya.make @@ -8,6 +8,7 @@ SRCS( accessor.cpp validator.cpp positive_integer.cpp + validator_simple.cpp ) END()