Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring FillTopicDescription & FillChangefeedDescription #13077

Merged
107 changes: 55 additions & 52 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,64 +1136,67 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) {
outAttrs[inAttr.GetKey()] = inAttr.GetValue();
}
}
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in) {

out.set_name(in.GetName());
out.set_virtual_timestamps(in.GetVirtualTimestamps());
out.set_aws_region(in.GetAwsRegion());

if (const auto value = in.GetResolvedTimestampsIntervalMs()) {
out.mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}

switch (in.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
out.set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(in.GetMode()));
break;
default:
break;
}

void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

for (const auto& stream : in.GetCdcStreams()) {
auto changefeed = out.add_changefeeds();

changefeed->set_name(stream.GetName());
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
changefeed->set_aws_region(stream.GetAwsRegion());

if (const auto value = stream.GetResolvedTimestampsIntervalMs()) {
changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}
switch (in.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}

switch (stream.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
changefeed->set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(stream.GetMode()));
break;
default:
break;
}
switch (in.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
out.set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(in.GetState()));
break;
default:
break;
}

switch (stream.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}
if (in.HasScanProgress()) {
auto& scanProgress = *out.mutable_initial_scan_progress();
scanProgress.set_parts_total(in.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(in.GetScanProgress().GetShardsCompleted());
}

switch (stream.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
changefeed->set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(stream.GetState()));
break;
default:
break;
}
FillAttributesImpl(out, in);

if (stream.HasScanProgress()) {
auto& scanProgress = *changefeed->mutable_initial_scan_progress();
scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted());
}
}
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

FillAttributesImpl(*changefeed, stream);
for (const auto& stream : in.GetCdcStreams()) {
FillChangefeedDescription(*out.add_changefeeds(), stream);
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out,
const Ydb::Table::CreateTableRequest& in, Ydb::StatusIds::StatusCode& status, TString& error);

// out
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in);
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in);
// in
Expand Down
171 changes: 171 additions & 0 deletions ydb/core/ydb_convert/topic_description.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include "topic_description.h"
#include "ydb_convert.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/core/protos/feature_flags.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>

namespace NKikimr {

bool FillConsumer(Ydb::Topic::Consumer& out, const NKikimrPQ::TPQTabletConfig_TConsumer& in,
Ydb::StatusIds_StatusCode& status, TString& error)
{
const NKikimrPQ::TPQConfig pqConfig = AppData()->PQConfig;
auto consumerName = NPersQueue::ConvertOldConsumerName(in.GetName(), pqConfig);
out.set_name(consumerName);
out.mutable_read_from()->set_seconds(in.GetReadFromTimestampsMs() / 1000);
auto version = in.GetVersion();
if (version != 0)
(*out.mutable_attributes())["_version"] = TStringBuilder() << version;
for (const auto &codec : in.GetCodec().GetIds()) {
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
}

out.set_important(in.GetImportant());
TString serviceType = "";
if (in.HasServiceType()) {
serviceType = in.GetServiceType();
} else {
if (pqConfig.GetDisallowDefaultClientServiceType()) {
error = "service type must be set for all read rules";
status = Ydb::StatusIds::INTERNAL_ERROR;
return false;
}
serviceType = pqConfig.GetDefaultClientServiceType().GetName();
}
(*out.mutable_attributes())["_service_type"] = serviceType;
return true;
}

bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& inDesc,
const NKikimrSchemeOp::TDirEntry& inDirEntry, const TMaybe<TString>& cdcName,
Ydb::StatusIds_StatusCode& status, TString& error) {

const NKikimrPQ::TPQConfig pqConfig = AppData()->PQConfig;

Ydb::Scheme::Entry *selfEntry = out.mutable_self();
ConvertDirectoryEntry(inDirEntry, selfEntry, true);
if (cdcName) {
selfEntry->set_name(*cdcName);
}

for (auto& sourcePart: inDesc.GetPartitions()) {
auto destPart = out.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = inDesc.GetPQTabletConfig();
if (AppData()->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
out.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
} else {
out.mutable_partitioning_settings()->set_min_active_partitions(inDesc.GetTotalGroupCount());
}

out.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount());
switch(config.GetPartitionStrategy().GetPartitionStrategyType()) {
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED);
break;
default:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
break;
}
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent());

if (!config.GetRequireAuthWrite()) {
(*out.mutable_attributes())["_allow_unauthenticated_write"] = "true";
}

if (!config.GetRequireAuthRead()) {
(*out.mutable_attributes())["_allow_unauthenticated_read"] = "true";
}

if (inDesc.GetPartitionPerTablet() != 2) {
(*out.mutable_attributes())["_partitions_per_tablet"] =
TStringBuilder() << inDesc.GetPartitionPerTablet();
}
if (config.HasAbcId()) {
(*out.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
}
if (config.HasAbcSlug()) {
(*out.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
}
if (config.HasFederationAccount()) {
(*out.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
}
bool local = config.GetLocalDC();
const auto &partConfig = config.GetPartitionConfig();
i64 msip = partConfig.GetMaxSizeInPartition();
if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) {
(*out.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip;
}
out.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
out.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
(*out.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
(*out.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();

if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
out.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
out.set_partition_write_burst_bytes(partConfig.GetBurstSize());
}

if (pqConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
auto readSpeedPerConsumer = partConfig.GetWriteSpeedInBytesPerSecond() * 2;
out.set_partition_total_read_speed_bytes_per_second(readSpeedPerConsumer * pqConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition());
out.set_partition_consumer_read_speed_bytes_per_second(readSpeedPerConsumer);
}

for (const auto &codec : config.GetCodecs().GetIds()) {
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}

if (pqConfig.GetBillingMeteringConfig().GetEnabled()) {
switch (config.GetMeteringMode()) {
case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY:
out.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
break;
case NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS:
out.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
break;
default:
break;
}
}

for (const auto& consumer : config.GetConsumers()) {
if (!FillConsumer(*out.add_consumers(), consumer, status, error)) {
return false;
}
}
return true;
}

} // namespace NKikimr
35 changes: 35 additions & 0 deletions ydb/core/ydb_convert/topic_description.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <util/generic/fwd.h>

namespace Ydb {
namespace Topic {
class Consumer;
class DescribeTopicResult;
}
class StatusIds;
enum StatusIds_StatusCode : int;
}

namespace NKikimrSchemeOp {
class TPersQueueGroupDescription;
class TDirEntry;
}

namespace NYql {
class TIssue;
}

namespace NKikimrPQ {
class TPQTabletConfig_TConsumer;
class TPQConfig;
}

namespace NKikimr {

bool FillConsumer(Ydb::Topic::Consumer& out, const NKikimrPQ::TPQTabletConfig_TConsumer& in, Ydb::StatusIds_StatusCode& status, TString& error);
bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& inDesc,
const NKikimrSchemeOp::TDirEntry& inDirEntry, const TMaybe<TString>& cdcName,
Ydb::StatusIds_StatusCode& status, TString& error);

} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/ydb_convert/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
table_settings.cpp
table_description.cpp
table_profiles.cpp
topic_description.cpp
ydb_convert.cpp
tx_proxy_status.cpp
)
Expand Down
Loading
Loading