Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring FillTopicDescription & FillChangefeedDescription #13077

Merged
107 changes: 55 additions & 52 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,64 +1136,67 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) {
outAttrs[inAttr.GetKey()] = inAttr.GetValue();
}
}
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in) {

out.set_name(in.GetName());
out.set_virtual_timestamps(in.GetVirtualTimestamps());
out.set_aws_region(in.GetAwsRegion());

if (const auto value = in.GetResolvedTimestampsIntervalMs()) {
out.mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}

switch (in.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
out.set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(in.GetMode()));
break;
default:
break;
}

void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

for (const auto& stream : in.GetCdcStreams()) {
auto changefeed = out.add_changefeeds();

changefeed->set_name(stream.GetName());
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
changefeed->set_aws_region(stream.GetAwsRegion());

if (const auto value = stream.GetResolvedTimestampsIntervalMs()) {
changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}
switch (in.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}

switch (stream.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
changefeed->set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(stream.GetMode()));
break;
default:
break;
}
switch (in.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
out.set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(in.GetState()));
break;
default:
break;
}

switch (stream.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}
if (in.HasScanProgress()) {
auto& scanProgress = *out.mutable_initial_scan_progress();
scanProgress.set_parts_total(in.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(in.GetScanProgress().GetShardsCompleted());
}

switch (stream.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
changefeed->set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(stream.GetState()));
break;
default:
break;
}
FillAttributesImpl(out, in);

if (stream.HasScanProgress()) {
auto& scanProgress = *changefeed->mutable_initial_scan_progress();
scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted());
}
}
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

FillAttributesImpl(*changefeed, stream);
for (const auto& stream : in.GetCdcStreams()) {
FillChangefeedDescription(*out.add_changefeeds(), stream);
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out,
const Ydb::Table::CreateTableRequest& in, Ydb::StatusIds::StatusCode& status, TString& error);

// out
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in);
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in);
// in
Expand Down
118 changes: 118 additions & 0 deletions ydb/core/ydb_convert/topic_description.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include "topic_description.h"

#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved

#include <util/string/builder.h>

namespace NKikimr {

void FillConsumer(Ydb::Topic::Consumer& out, const NKikimrPQ::TPQTabletConfig::TConsumer& in) {
auto consumerName = NPersQueue::ConvertOldConsumerName(in.GetName());
out.set_name(consumerName);
out.mutable_read_from()->set_seconds(in.GetReadFromTimestampsMs() / 1000);
auto version = in.GetVersion();
if (version != 0)
(*out.mutable_attributes())["_version"] = TStringBuilder() << version;
for (const auto &codec : in.GetCodec().GetIds()) {
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
}

out.set_important(in.GetImportant());
TString serviceType = "";
if (in.HasServiceType()) {
serviceType = in.GetServiceType();
}
(*out.mutable_attributes())["_service_type"] = serviceType;
}

void FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& in) {
for (auto& sourcePart: in.GetPartitions()) {
auto destPart = out.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = in.GetPQTabletConfig();

out.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount());
switch(config.GetPartitionStrategy().GetPartitionStrategyType()) {
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED);
break;
default:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
break;
}
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent());

if (!config.GetRequireAuthWrite()) {
(*out.mutable_attributes())["_allow_unauthenticated_write"] = "true";
}

if (!config.GetRequireAuthRead()) {
(*out.mutable_attributes())["_allow_unauthenticated_read"] = "true";
}

if (in.GetPartitionPerTablet() != 2) {
(*out.mutable_attributes())["_partitions_per_tablet"] =
TStringBuilder() << in.GetPartitionPerTablet();
}
if (config.HasAbcId()) {
(*out.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
}
if (config.HasAbcSlug()) {
(*out.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
}
if (config.HasFederationAccount()) {
(*out.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
}

const auto &partConfig = config.GetPartitionConfig();
i64 msip = partConfig.GetMaxSizeInPartition();
if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) {
(*out.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip;
}
out.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
out.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
(*out.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
(*out.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();

for (const auto &codec : config.GetCodecs().GetIds()) {
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}

for (const auto& consumer : config.GetConsumers()) {
auto rr = out.add_consumers();
FillConsumer(*rr, consumer);
}

}

} // namespace NKikimr
21 changes: 21 additions & 0 deletions ydb/core/ydb_convert/topic_description.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <ydb/core/protos/pqconfig.pb.h>
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved

namespace Ydb::Topic {
class Consumer;
class DescribeTopicResult;
}

namespace NKikimrSchemeOp {
class TPersQueueGroupDescription;
}

namespace NKikimr {

void FillConsumer(Ydb::Topic::Consumer& out,
const NKikimrPQ::TPQTabletConfig::TConsumer& in);
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
void FillTopicDescription(Ydb::Topic::DescribeTopicResult& out,
const NKikimrSchemeOp::TPersQueueGroupDescription& in);

} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/ydb_convert/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
table_settings.cpp
table_description.cpp
table_profiles.cpp
topic_description.cpp
ydb_convert.cpp
tx_proxy_status.cpp
)
Expand Down
Loading
Loading