Skip to content

Commit

Permalink
Insert into nullable PK
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny committed Jan 7, 2025
1 parent dcb70d0 commit 8f7e630
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 25 deletions.
41 changes: 41 additions & 0 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,47 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_UNIT_TEST_TWIN(DeleteAbsentMultipleShards, Reboot) {
TestDeleteAbsent(2, Reboot);
}

Y_UNIT_TEST(InsertIntoNullablePK) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableColumnShardConfig()->SetAllowNullableColumnsInPK(true);
auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false);
TTestHelper testHelper(settings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("pk1").SetType(NScheme::NTypeIds::Int64).SetNullable(true),
TTestHelper::TColumnSchema().SetName("pk2").SetType(NScheme::NTypeIds::Int32).SetNullable(true),
TTestHelper::TColumnSchema().SetName("value").SetType(NScheme::NTypeIds::String).SetNullable(true),
};
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ttt").SetPrimaryKey({ "pk1", "pk2" }).SetSharding({ "pk1" }).SetSchema(schema);
testHelper.CreateTable(testTable);
auto client = testHelper.GetKikimr().GetQueryClient();
const auto result = client
.ExecuteQuery(
R"(
INSERT INTO `/Root/ttt` (pk1, pk2, value) VALUES
(1, 2, "value"),
(null, 2, "value"),
(1, null, "value"),
(null, null, "value")
)",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
{
const auto resultSelect = client
.ExecuteQuery(
"SELECT * FROM `/Root/ttt`",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
const auto resultSets = resultSelect.GetResultSets();
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
const auto resultSet = resultSets[0];
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 4);
}
}
}

}
47 changes: 22 additions & 25 deletions ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>

#include <ydb/library/formats/arrow/simple_arrays_cache.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/config.pb.h>

#include <util/string/join.h>

Expand Down Expand Up @@ -92,36 +94,31 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
if (targetIdx == -1) {
return TConclusionStatus::Success();
}
const auto hasNull = NArrow::HasNulls(incomingBatch->column(incomingIdx));
const std::optional<i32> pkFieldIdx = GetIndexInfo().GetPKColumnIndexByIndexVerified(targetIdx);
if (!NArrow::HasNulls(incomingBatch->column(incomingIdx))) {
if (pkFieldIdx) {
AFL_VERIFY(*pkFieldIdx < (i32)pkColumns.size());
AFL_VERIFY(!pkColumns[*pkFieldIdx]);
pkColumns[*pkFieldIdx] = incomingBatch->column(incomingIdx);
++pkColumnsCount;
}
return TConclusionStatus::Success();
}
if (pkFieldIdx) {
if (pkFieldIdx && hasNull && !AppData()->ColumnShardConfig.GetAllowNullableColumnsInPK()) {
return TConclusionStatus::Fail("null data for pk column is impossible for '" + dstSchema.field(targetIdx)->name() + "'");
}
switch (mType) {
case NEvWrite::EModificationType::Replace:
case NEvWrite::EModificationType::Insert:
case NEvWrite::EModificationType::Upsert: {
if (GetIndexInfo().IsNullableVerifiedByIndex(targetIdx)) {
return TConclusionStatus::Success();
}
if (GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(targetIdx)) {
return TConclusionStatus::Success();
} else {
return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema.field(targetIdx)->name() + "'");
}
if (hasNull) {
switch (mType) {
case NEvWrite::EModificationType::Replace:
case NEvWrite::EModificationType::Insert:
case NEvWrite::EModificationType::Upsert: {
if (!GetIndexInfo().IsNullableVerifiedByIndex(targetIdx) && !GetIndexInfo().GetColumnExternalDefaultValueByIndexVerified(targetIdx))
return TConclusionStatus::Fail("empty field for non-default column: '" + dstSchema.field(targetIdx)->name() + "'");
}
case NEvWrite::EModificationType::Delete:
case NEvWrite::EModificationType::Update:
break;
}
case NEvWrite::EModificationType::Delete:
case NEvWrite::EModificationType::Update:
return TConclusionStatus::Success();
}
if (pkFieldIdx) {
AFL_VERIFY(*pkFieldIdx < (i32)pkColumns.size());
AFL_VERIFY(!pkColumns[*pkFieldIdx]);
pkColumns[*pkFieldIdx] = incomingBatch->column(incomingIdx);
++pkColumnsCount;
}
return TConclusionStatus::Success();
};
const auto nameResolver = [&](const std::string& fieldName) -> i32 {
return GetIndexInfo().GetColumnIndexOptional(fieldName).value_or(-1);
Expand Down

0 comments on commit 8f7e630

Please sign in to comment.