Skip to content

Commit

Permalink
Merge different schemas (#13192)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 8, 2025
1 parent 96abd52 commit e7c573f
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 81 deletions.
30 changes: 27 additions & 3 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(

} // namespace

TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(const std::vector<ui32>& columnIndexes)
: ColumnIndexes(columnIndexes) {
for (ui32 i = 0; i + 1 < columnIndexes.size(); ++i) {
AFL_VERIFY(ColumnIndexes[i] < ColumnIndexes[i + 1]);
}
}

TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(std::vector<ui32>&& columnIndexes)
: ColumnIndexes(std::move(columnIndexes)) {
for (ui32 i = 0; i + 1 < ColumnIndexes.size(); ++i) {
AFL_VERIFY(ColumnIndexes[i] < ColumnIndexes[i + 1]);
}
}

TOrderedColumnIndexesImpl::TOrderedColumnIndexesImpl(const ui32 columnsCount) {
ColumnIndexes.reserve(columnsCount);
for (ui32 i = 0; i < columnsCount; ++i) {
ColumnIndexes.emplace_back(i);
}
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
Expand Down Expand Up @@ -259,7 +280,7 @@ TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
}
namespace {
template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
TConclusion<TContainerWithIndexes<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver, const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
Expand Down Expand Up @@ -318,14 +339,17 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(c
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
columns.reserve(resultColumns.size());
fields.reserve(resultColumns.size());
std::vector<ui32> indexes;
for (auto&& i : resultColumns) {
fields.emplace_back(dstSchema.field(i.Index));
columns.emplace_back(i.Column);
indexes.emplace_back(i.Index);
}
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
return TContainerWithIndexes<TDataContainer>(indexes,
NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows()));
}
} // namespace
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
TConclusion<TContainerWithIndexes<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const TSchemaLiteView& dstSchema,
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
Expand Down
112 changes: 111 additions & 1 deletion ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/accessor/validator_simple.h>
#include <ydb/library/conclusion/result.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

#include <functional>

namespace NKikimr::NArrow {
Expand All @@ -10,6 +13,113 @@ class TSchemaSubset;
class TSchemaLite;
class TSchemaLiteView;

class TOrderedColumnIndexesImpl {
private:
YDB_READONLY_DEF(std::vector<ui32>, ColumnIndexes);

public:
TOrderedColumnIndexesImpl() = default;

explicit TOrderedColumnIndexesImpl(const ui32 columnsCount);
explicit TOrderedColumnIndexesImpl(const std::vector<ui32>& columnIndexes);
explicit TOrderedColumnIndexesImpl(std::vector<ui32>&& columnIndexes);

template <class TContainerWithIndexes>
static std::vector<ui32> MergeColumnIdxs(const std::vector<TContainerWithIndexes>& sources) {
class TIterator {
private:
std::vector<ui32>::const_iterator ItCurrent;
std::vector<ui32>::const_iterator ItFinish;

public:
TIterator(const std::vector<ui32>& indexes)
: ItCurrent(indexes.begin())
, ItFinish(indexes.end()) {
}

bool operator<(const TIterator& item) const {
return *ItCurrent > *item.ItCurrent;
}

bool IsValid() const {
return ItCurrent != ItFinish;
}

ui32 operator*() const {
return *ItCurrent;
}

bool Next() {
return ++ItCurrent != ItFinish;
}
};

std::vector<TIterator> heapToMerge;
for (auto&& i : sources) {
heapToMerge.emplace_back(TIterator(i.GetColumnIndexes()));
if (!heapToMerge.back().IsValid()) {
heapToMerge.pop_back();
}
}
std::make_heap(heapToMerge.begin(), heapToMerge.end());
std::vector<ui32> result;
while (heapToMerge.size()) {
std::pop_heap(heapToMerge.begin(), heapToMerge.end());
if (result.empty() || result.back() != *heapToMerge.back()) {
result.emplace_back(*heapToMerge.back());
}
if (!heapToMerge.back().Next()) {
heapToMerge.pop_back();
} else {
std::push_heap(heapToMerge.begin(), heapToMerge.end());
}
}
return result;
}
};

template <class TDataContainer>
class TContainerWithIndexes: public TOrderedColumnIndexesImpl {
private:
using TBase = TOrderedColumnIndexesImpl;
YDB_ACCESSOR_DEF(std::shared_ptr<TDataContainer>, Container);

public:
TContainerWithIndexes() = default;

TContainerWithIndexes(const std::vector<ui32>& columnIndexes, const std::shared_ptr<TDataContainer>& container)
: TBase(columnIndexes)
, Container(container) {
if (Container) {
Y_ABORT_UNLESS((ui32)Container->num_columns() == columnIndexes.size());
} else {
Y_ABORT_UNLESS(!columnIndexes.size());
}
}

explicit TContainerWithIndexes(const std::shared_ptr<TDataContainer>& container)
: TBase(TSimpleValidator::CheckNotNull(container)->num_columns())
, Container(container) {
}

TContainerWithIndexes<TDataContainer> BuildWithAnotherContainer(const std::shared_ptr<TDataContainer>& container) const {
return TContainerWithIndexes<TDataContainer>(GetColumnIndexes(), container);
}

bool operator!() const {
return !HasContainer();
}

bool HasContainer() const {
return !!Container;
}

const TDataContainer* operator->() const {
return Container.get();
}

};

class TColumnOperator {
public:
enum class EAbsentFieldPolicy {
Expand Down Expand Up @@ -59,7 +169,7 @@ class TColumnOperator {
return *this;
}

TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
TConclusion<TContainerWithIndexes<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const TSchemaLiteView& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
const std::function<i32(const std::string&)>& nameResolver) const;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ TString TTypedLocalHelper::GetTestTableSchema() const {
if (TypeName) {
result = R"(Columns { Name: "field" Type: ")" + TypeName + "\"}";
}
if (TypeName1) {
result += R"(Columns { Name: "field1" Type: ")" + TypeName1 + "\"}";
}
result += R"(
Columns { Name: "pk_int" Type: "Int64" NotNull: true }
Columns { Name: "ts" Type: "Timestamp" }
Expand Down
18 changes: 15 additions & 3 deletions ydb/core/kqp/ut/olap/helpers/typed_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
private:
using TBase = Tests::NCS::THelper;
const TString TypeName;
const TString TypeName1;
TKikimrRunner& KikimrRunner;
const TString TablePath;
const TString TableName;
Expand All @@ -37,6 +38,18 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64");
}

TTypedLocalHelper(const TString& typeName, const TString& typeName1, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable",
const TString& storeName = "olapStore")
: TBase(kikimrRunner.GetTestServer())
, TypeName(typeName)
, TypeName1(typeName1)
, KikimrRunner(kikimrRunner)
, TablePath(storeName.empty() ? "/Root/" + tableName : "/Root/" + storeName + "/" + tableName)
, TableName(tableName)
, StoreName(storeName) {
SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64");
}

class TSimultaneousWritingSession {
private:
bool Finished = false;
Expand All @@ -54,13 +67,12 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
}

template <class TFiller>
void FillTable(const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const {
AFL_VERIFY(!Finished);
void FillTable(const TString& fieldName, const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const {
std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
builders.emplace_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>::BuildNotNullable(
"pk_int", numRows * pkKff));
builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<TFiller>>("field", fillPolicy));
builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<TFiller>>(fieldName, fillPolicy));
NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders);
std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows);
SendDataViaActorSystem(TablePath, batch, Ydb::StatusIds::SUCCESS);
Expand Down
40 changes: 37 additions & 3 deletions ydb/core/kqp/ut/olap/write_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) {
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
auto writeSession = helper.StartWriting("/Root/olapStore/olapTable");
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
Sleep(TDuration::Seconds(1));
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
Sleep(TDuration::Seconds(1));
writeSession.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
writeSession.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
Sleep(TDuration::Seconds(1));
writeSession.Finalize();

Expand All @@ -228,6 +228,40 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) {
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("field")), "ccc");
}

Y_UNIT_TEST(MultiWriteInTimeDiffSchemas) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
settings.AppConfig.MutableColumnShardConfig()->SetWritingBufferDurationMs(15000);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", "Utf8", kikimr);
helper.CreateTestOlapTable();
auto writeGuard = helper.StartWriting("/Root/olapStore/olapTable");
writeGuard.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable("field1", NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable("field", NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.Finalize();

auto selectQuery = TString(R"(
SELECT
field, count(*) as count,
FROM `/Root/olapStore/olapTable`
GROUP BY field
ORDER BY field
)");

auto tableClient = kikimr.GetTableClient();
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("count")), 200000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("field")), "");
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("count")), 400000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[1].at("field")), "aaa");
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("count")), 800000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("field")), "ccc");
}

Y_UNIT_TEST(WriteDeleteCleanGC) {
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(1000000);
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, std::shared_pt
Cerr << "\n";
}
UNIT_ASSERT_VALUES_EQUAL(op.status(), expectedStatus);
});
});

TDispatchOptions options;
options.CustomFinalCondition = [&]() {
Expand Down Expand Up @@ -228,7 +228,7 @@ void THelper::CreateSchemaOlapTablesWithStore(const TString tableSchema, TVector
}

void THelper::CreateOlapTablesWithStore(TVector<TString> tableNames /*= {"olapTable"}*/, TString storeName /*= "olapStore"*/, ui32 storeShardsCount /*= 4*/, ui32 tableShardsCount /*= 3*/) {
CreateSchemaOlapTablesWithStore(GetTestTableSchema(), tableNames, storeName, storeShardsCount, tableShardsCount);
CreateSchemaOlapTablesWithStore(GetTestTableSchema(), tableNames, storeName, storeShardsCount, tableShardsCount);
}

void THelper::CreateSchemaOlapTables(const TString tableSchema, TVector<TString> tableNames, ui32 tableShardsCount) {
Expand All @@ -254,15 +254,15 @@ void THelper::CreateSchemaOlapTables(const TString tableSchema, TVector<TString>
}

void THelper::CreateOlapTables(TVector<TString> tableNames /*= {"olapTable"}*/, ui32 tableShardsCount /*= 3*/) {
CreateSchemaOlapTables(GetTestTableSchema(), tableNames, tableShardsCount);
CreateSchemaOlapTables(GetTestTableSchema(), tableNames, tableShardsCount);
}

// Clickbench table

std::shared_ptr<arrow::Schema> TCickBenchHelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>> {
arrow::field("WatchID", arrow::int64(), false),
arrow::field("WatchID", arrow::int64(), false),
arrow::field("JavaEnable", arrow::int16(), false),
arrow::field("Title", arrow::utf8(), false),
arrow::field("GoodEvent", arrow::int16(), false),
Expand Down Expand Up @@ -430,7 +430,7 @@ std::shared_ptr<arrow::RecordBatch> TCickBenchHelper::TestArrowBatch(ui64, ui64
std::shared_ptr<arrow::Schema> TTableWithNullsHelper::GetArrowSchema() const {
return std::make_shared<arrow::Schema>(
std::vector<std::shared_ptr<arrow::Field>>{
arrow::field("id", arrow::int32(), false),
arrow::field("id", arrow::int32(), false),
arrow::field("resource_id", arrow::utf8()),
arrow::field("level", arrow::int32()),
arrow::field("binary_str", arrow::binary()),
Expand Down
30 changes: 29 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,28 @@ std::shared_ptr<arrow::Field> TIndexInfo::GetColumnFieldVerified(const ui32 colu
}

std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnsSchema(const std::set<ui32>& columnIds) const {
Y_ABORT_UNLESS(columnIds.size());
AFL_VERIFY(columnIds.size());
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto&& i : columnIds) {
fields.emplace_back(GetColumnFieldVerified(i));
}
return std::make_shared<arrow::Schema>(fields);
}

std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnsSchemaByOrderedIndexes(const std::vector<ui32>& columnIdxs) const {
AFL_VERIFY(columnIdxs.size());
std::vector<std::shared_ptr<arrow::Field>> fields;
std::optional<ui32> predColumnIdx;
for (auto&& i : columnIdxs) {
if (predColumnIdx) {
AFL_VERIFY(*predColumnIdx < i);
}
predColumnIdx = i;
fields.emplace_back(ArrowSchemaWithSpecials()->GetFieldByIndexVerified(i));
}
return std::make_shared<arrow::Schema>(fields);
}

std::shared_ptr<arrow::Schema> TIndexInfo::GetColumnSchema(const ui32 columnId) const {
return GetColumnsSchema({ columnId });
}
Expand Down Expand Up @@ -621,4 +635,18 @@ TIndexInfo TIndexInfo::BuildDefault() {
return result;
}

TConclusion<std::shared_ptr<arrow::Array>> TIndexInfo::BuildDefaultColumn(
const ui32 fieldIndex, const ui32 rowsCount, const bool force) const {
auto defaultValue = GetColumnExternalDefaultValueByIndexVerified(fieldIndex);
auto f = ArrowSchemaWithSpecials()->GetFieldByIndexVerified(fieldIndex);
if (!defaultValue && !IsNullableVerifiedByIndex(fieldIndex)) {
if (force) {
defaultValue = NArrow::DefaultScalar(f->type());
} else {
return TConclusionStatus::Fail("not nullable field with no default: " + f->name());
}
}
return NArrow::TThreadSimpleArraysCache::Get(f->type(), defaultValue, rowsCount);
}

} // namespace NKikimr::NOlap
Loading

0 comments on commit e7c573f

Please sign in to comment.