Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring FillTopicDescription & FillChangefeedDescription #13077

Merged
107 changes: 55 additions & 52 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,64 +1136,67 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) {
outAttrs[inAttr.GetKey()] = inAttr.GetValue();
}
}
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in) {

out.set_name(in.GetName());
out.set_virtual_timestamps(in.GetVirtualTimestamps());
out.set_aws_region(in.GetAwsRegion());

if (const auto value = in.GetResolvedTimestampsIntervalMs()) {
out.mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}

switch (in.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
out.set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(in.GetMode()));
break;
default:
break;
}

void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

for (const auto& stream : in.GetCdcStreams()) {
auto changefeed = out.add_changefeeds();

changefeed->set_name(stream.GetName());
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
changefeed->set_aws_region(stream.GetAwsRegion());

if (const auto value = stream.GetResolvedTimestampsIntervalMs()) {
changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
}
switch (in.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
out.set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}

switch (stream.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeNewAndOldImages:
changefeed->set_mode(static_cast<Ydb::Table::ChangefeedMode::Mode>(stream.GetMode()));
break;
default:
break;
}
switch (in.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
out.set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(in.GetState()));
break;
default:
break;
}

switch (stream.GetFormat()) {
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDynamoDBStreamsJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DYNAMODB_STREAMS_JSON);
break;
case NKikimrSchemeOp::ECdcStreamFormat::ECdcStreamFormatDebeziumJson:
changefeed->set_format(Ydb::Table::ChangefeedFormat::FORMAT_DEBEZIUM_JSON);
break;
default:
break;
}
if (in.HasScanProgress()) {
auto& scanProgress = *out.mutable_initial_scan_progress();
scanProgress.set_parts_total(in.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(in.GetScanProgress().GetShardsCompleted());
}

switch (stream.GetState()) {
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
changefeed->set_state(static_cast<Ydb::Table::ChangefeedDescription::State>(stream.GetState()));
break;
default:
break;
}
FillAttributesImpl(out, in);

if (stream.HasScanProgress()) {
auto& scanProgress = *changefeed->mutable_initial_scan_progress();
scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted());
}
}
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {

FillAttributesImpl(*changefeed, stream);
for (const auto& stream : in.GetCdcStreams()) {
FillChangefeedDescription(*out.add_changefeeds(), stream);
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out,
const Ydb::Table::CreateTableRequest& in, Ydb::StatusIds::StatusCode& status, TString& error);

// out
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in);
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in);
// in
Expand Down
207 changes: 207 additions & 0 deletions ydb/core/ydb_convert/topic_description.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include "topic_description.h"

#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <yql/essentials/public/issue/yql_issue.h>

#include <util/string/builder.h>

namespace NKikimr {

bool FillConsumer(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
const TString& consumerName,
const NKikimrPQ::TPQConfig& pqConfig,
Ydb::StatusIds_StatusCode& status, TString& error)
{
rr->set_name(consumerName);
rr->mutable_read_from()->set_seconds(consumer.GetReadFromTimestampsMs() / 1000);
auto version = consumer.GetVersion();
if (version != 0)
(*rr->mutable_attributes())["_version"] = TStringBuilder() << version;
for (const auto &codec : consumer.GetCodec().GetIds()) {
rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
}

rr->set_important(consumer.GetImportant());
TString serviceType = "";
if (consumer.HasServiceType()) {
serviceType = consumer.GetServiceType();
} else {
if (pqConfig.GetDisallowDefaultClientServiceType()) {
error = "service type must be set for all read rules";
status = Ydb::StatusIds::INTERNAL_ERROR;
return false;
}
serviceType = pqConfig.GetDefaultClientServiceType().GetName();
}
(*rr->mutable_attributes())["_service_type"] = serviceType;
return true;
}

bool FillConsumer(Ydb::Topic::Consumer* rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
const NActors::TActorContext& ctx, Ydb::StatusIds_StatusCode& status, TString& error)
{
const auto& pqConfig = AppData(ctx)->PQConfig;
auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx);
return FillConsumer(rr, consumer, consumerName, pqConfig, status, error);
}

bool FillConsumer(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
Ydb::StatusIds_StatusCode& status, TString& error)
{
const auto& pqConfig = AppData()->PQConfig;
auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName());
return FillConsumer(rr, consumer, consumerName, pqConfig, status, error);
}

bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& in,
const NKikimrPQ::TPQConfig& pqConfig, const NKikimrSchemeOp::TDirEntry &fromDirEntry, const TMaybe<TString>& cdcName,
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
bool EnableTopicSplitMerge, NYql::TIssue& issue, const TActorContext& ctx, const TString& consumer = "", bool includeStats = false, bool includeLocation = false) {
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved

Ydb::Scheme::Entry *selfEntry = out.mutable_self();
ConvertDirectoryEntry(fromDirEntry, selfEntry, true);
if (cdcName) {
selfEntry->set_name(*cdcName);
}

for (auto& sourcePart: in.GetPartitions()) {
auto destPart = out.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = in.GetPQTabletConfig();
if (EnableTopicSplitMerge && NPQ::SplitMergeEnabled(config)) {
out.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
} else {
out.mutable_partitioning_settings()->set_min_active_partitions(in.GetTotalGroupCount());
}

out.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount());
switch(config.GetPartitionStrategy().GetPartitionStrategyType()) {
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
break;
case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_PAUSED:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED);
break;
default:
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
break;
}
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent());
out.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent());

if (!config.GetRequireAuthWrite()) {
(*out.mutable_attributes())["_allow_unauthenticated_write"] = "true";
}

if (!config.GetRequireAuthRead()) {
(*out.mutable_attributes())["_allow_unauthenticated_read"] = "true";
}

if (in.GetPartitionPerTablet() != 2) {
(*out.mutable_attributes())["_partitions_per_tablet"] =
TStringBuilder() << in.GetPartitionPerTablet();
}
if (config.HasAbcId()) {
(*out.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
}
if (config.HasAbcSlug()) {
(*out.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
}
if (config.HasFederationAccount()) {
(*out.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
}
bool local = config.GetLocalDC();
const auto &partConfig = config.GetPartitionConfig();
i64 msip = partConfig.GetMaxSizeInPartition();
if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) {
(*out.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip;
}
out.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
out.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
(*out.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
(*out.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();

if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
out.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
out.set_partition_write_burst_bytes(partConfig.GetBurstSize());
}

if (pqConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
auto readSpeedPerConsumer = partConfig.GetWriteSpeedInBytesPerSecond() * 2;
out.set_partition_total_read_speed_bytes_per_second(readSpeedPerConsumer * pqConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition());
out.set_partition_consumer_read_speed_bytes_per_second(readSpeedPerConsumer);
}

for (const auto &codec : config.GetCodecs().GetIds()) {
out.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}

if (pqConfig.GetBillingMeteringConfig().GetEnabled()) {
switch (config.GetMeteringMode()) {
case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY:
out.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
break;
case NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS:
out.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
break;
default:
break;
}
}

bool found = false;
auto consumerName = NPersQueue::ConvertNewConsumerName(consumer, ctx);
for (const auto& consumer : config.GetConsumers()) {
if (consumerName == consumer.GetName()) {
found = true;
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
}
auto rr = out.add_consumers();
Ydb::StatusIds::StatusCode status;
TString error;
if (!FillConsumer(rr, consumer, ctx, status, error)) {
issue = NGRpcProxy::V1::FillIssue(error, status);
return false;
}
}
if (includeStats || includeLocation) {
if (consumer && !found) {
issue = NGRpcProxy::V1::FillIssue(
TStringBuilder() << "no consumer '" << consumer << "' in topic",
Ydb::PersQueue::ErrorCode::BAD_REQUEST
);
return false;
}
}
return true;
}

} // namespace NKikimr
37 changes: 37 additions & 0 deletions ydb/core/ydb_convert/topic_description.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <ydb/core/protos/pqconfig.pb.h>
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved

namespace Ydb {
namespace Topic {
class Consumer;
class DescribeTopicResult;
}
class StatusIds;
enum StatusIds_StatusCode : int;
}

namespace NKikimrSchemeOp {
class TPersQueueGroupDescription;
class TDirEntry;
}

namespace NYql {
class TIssue;
}

namespace NActors {
struct TActorContext;
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
}

namespace NKikimr {

bool FillConsumer(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
Ydb::StatusIds_StatusCode& status, TString& error);
bool FillConsumer(Ydb::Topic::Consumer* rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer,
const NActors::TActorContext& ctx, Ydb::StatusIds_StatusCode& status, TString& error);
bool FillTopicDescription(Ydb::Topic::DescribeTopicResult& out, const NKikimrSchemeOp::TPersQueueGroupDescription& in, const NKikimrPQ::TPQConfig& pqConfig,
const NKikimrSchemeOp::TDirEntry &fromDirEntry, const TMaybe<TString>& cdcName,
bool EnableTopicSplitMerge, NYql::TIssue& issue, const NActors::TActorContext& ctx, const TString& consumer, bool includeStats, bool includeLocation);

} // namespace NKikimr
4 changes: 4 additions & 0 deletions ydb/core/ydb_convert/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SRCS(
table_settings.cpp
table_description.cpp
table_profiles.cpp
topic_description.cpp
ydb_convert.cpp
tx_proxy_status.cpp
)
Expand All @@ -25,6 +26,9 @@ PEERDIR(
yql/essentials/minikql/dom
yql/essentials/public/udf
ydb/public/api/protos
ydb/public/api/grpc/draft
ydb/core/grpc_services/cancelation/protos
ydb/public/lib/operation_id/protos
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading
Loading