Skip to content

Commit

Permalink
Support Console interoperation through distconf
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Dec 28, 2024
1 parent ada1a64 commit 94d09aa
Show file tree
Hide file tree
Showing 15 changed files with 495 additions and 95 deletions.
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/base/blobstorage_console_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NKikimr {
NKikimrBlobStorage::TEvControllerProposeConfigRequest, TEvBlobStorage::EvControllerProposeConfigRequest> {
TEvControllerProposeConfigRequest() = default;

TEvControllerProposeConfigRequest(const ui32 configHash, const ui32 configVersion) {
TEvControllerProposeConfigRequest(ui64 configHash, ui32 configVersion) {
Record.SetConfigHash(configHash);
Record.SetConfigVersion(configVersion);
}
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "distconf.h"
#include "node_warden_impl.h"
#include <ydb/core/mind/dynamic_nameserver.h>
#include <ydb/library/yaml_config/yaml_config_helpers.h>
#include <ydb/library/yaml_config/yaml_config.h>
#include <library/cpp/streams/zstd/zstd.h>

namespace NKikimr::NStorage {

Expand Down Expand Up @@ -61,16 +64,56 @@ namespace NKikimr::NStorage {

bool TDistributedConfigKeeper::ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config) {
if (!StorageConfig || StorageConfig->GetGeneration() < config.GetGeneration()) {
const bool stateStorageAppeared = (!StorageConfig || !StorageConfig->HasStateStorageConfig()) &&
config.HasStateStorageConfig();

if (config.HasConfigComposite()) {
try {
// parse the composite stream
TStringInput ss(config.GetConfigComposite());
TZstdDecompress zstd(&ss);
StorageConfigYaml = TString::Uninitialized(LoadSize(&zstd));
zstd.LoadOrFail(StorageConfigYaml.Detach(), StorageConfigYaml.size());
StorageConfigFetchYaml = TString::Uninitialized(LoadSize(&zstd));
zstd.LoadOrFail(StorageConfigFetchYaml.Detach(), StorageConfigFetchYaml.size());

// extract _current_ config version
auto metadata = NYamlConfig::GetMetadata(StorageConfigYaml);
Y_DEBUG_ABORT_UNLESS(metadata.Version.has_value());
StorageConfigYamlVersion = metadata.Version.value_or(0);

// and _fetched_ config hash
StorageConfigFetchYamlHash = NYaml::GetConfigHash(StorageConfigFetchYaml);
} catch (const std::exception& ex) {
Y_ABORT("ConfigComposite format incorrect: %s", ex.what());
}
} else {
StorageConfigYaml = StorageConfigFetchYaml = {};
StorageConfigFetchYamlHash = 0;
StorageConfigYamlVersion = 0;
}

StorageConfig.emplace(config);
if (ProposedStorageConfig && ProposedStorageConfig->GetGeneration() <= StorageConfig->GetGeneration()) {
ProposedStorageConfig.reset();
}

Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenStorageConfig(*StorageConfig,
ProposedStorageConfig ? &ProposedStorageConfig.value() : nullptr));

if (IsSelfStatic) {
PersistConfig({});
ApplyConfigUpdateToDynamicNodes(false);
}

if (Scepter && stateStorageAppeared) {
ConnectToConsole();
}

if (Scepter) {
SendConfigProposeRequest();
}

return true;
} else if (StorageConfig->GetGeneration() && StorageConfig->GetGeneration() == config.GetGeneration() &&
StorageConfig->GetFingerprint() != config.GetFingerprint()) {
Expand Down Expand Up @@ -280,6 +323,11 @@ namespace NKikimr::NStorage {
fFunc(TEvents::TSystem::Gone, HandleGone);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
cFunc(TEvents::TSystem::Poison, PassAway);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerValidateConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerProposeConfigResponse, Handle);
hFunc(TEvBlobStorage::TEvControllerConsoleCommitResponse, Handle);
)
for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) {
UnsubscribeInterconnect(nodeId);
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ namespace NKikimr::NStorage {

// currently active storage config
std::optional<NKikimrBlobStorage::TStorageConfig> StorageConfig;
TString StorageConfigYaml; // the part we have to push (unless this is storage-only) to console
TString StorageConfigFetchYaml; // the part we would get is we fetch from console
ui64 StorageConfigFetchYamlHash = 0;
ui32 StorageConfigYamlVersion = 0;

// base config from config file
NKikimrBlobStorage::TStorageConfig BaseConfig;
Expand Down Expand Up @@ -261,6 +265,18 @@ namespace NKikimr::NStorage {
// child actors
THashSet<TActorId> ChildActors;

// pipe to Console
TActorId ConsolePipeId;
bool ConsoleConnected = false;
bool ConfigCommittedToConsole = false;
THashMap<TActorId, std::function<void()>> PendingConsoleCancellation;
THashMap<ui64, TActorId> PendingValidation;
ui64 ValidateRequestCookie = 0;
ui64 ProposeRequestCookie = 0;
ui64 CommitRequestCookie = 0;
bool ProposeRequestInFlight = false;
std::optional<std::tuple<ui64, ui32>> ProposedConfigHashVersion;

friend void ::Out<ERootState>(IOutputStream&, ERootState);

public:
Expand Down Expand Up @@ -421,6 +437,22 @@ namespace NKikimr::NStorage {

void Handle(NMon::TEvHttpInfo::TPtr ev);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Console interaction

void ConnectToConsole();
void DisconnectFromConsole();
void SendConfigProposeRequest();
void Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerProposeConfigResponse::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerConsoleCommitResponse::TPtr ev);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void OnConsolePipeError();

static std::optional<TString> UpdateConfigComposite(NKikimrBlobStorage::TStorageConfig& config, const TString& yaml,
const std::optional<TString>& fetched);

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Consistency checking

Expand Down
211 changes: 211 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_console.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#include "distconf.h"

#include <ydb/library/yaml_config/yaml_config.h>
#include <library/cpp/streams/zstd/zstd.h>

namespace NKikimr::NStorage {

void TDistributedConfigKeeper::ConnectToConsole() {
STLOG(PRI_DEBUG, BS_NODE, NWDC66, "ConnectToConsole");
Y_ABORT_UNLESS(!ConsolePipeId);
ConsolePipeId = Register(NTabletPipe::CreateClient(SelfId(), MakeConsoleID(),
NTabletPipe::TClientRetryPolicy::WithRetries()));
}

void TDistributedConfigKeeper::DisconnectFromConsole() {
NTabletPipe::CloseAndForgetClient(SelfId(), ConsolePipeId);
ConsoleConnected = false;
}

void TDistributedConfigKeeper::SendConfigProposeRequest() {
auto& sc = StorageConfig;

STLOG(PRI_DEBUG, BS_NODE, NWDC67, "SendConfigProposeRequest",
(ConsoleConnected, ConsoleConnected),
(ProposeRequestInFlight, ProposeRequestInFlight),
(HasStorageConfig, sc.has_value()),
(StorageConfigFetchYamlHash, StorageConfigFetchYamlHash),
(StorageConfigYamlVersion, StorageConfigYamlVersion),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(ProposeRequestCookie, ProposeRequestCookie),
(NextProposeRequestCookie, ProposeRequestCookie + 1));

if (!ConsoleConnected) {
return;
}

if (ProposeRequestInFlight) {
return; // still waiting for previous one
}

if (!sc || !sc->HasConfigComposite()) {
return; // no config yet
}

Y_DEBUG_ABORT_UNLESS(!ProposedConfigHashVersion || ProposedConfigHashVersion == std::make_tuple(
StorageConfigFetchYamlHash, StorageConfigYamlVersion));
ProposedConfigHashVersion.emplace(StorageConfigFetchYamlHash, StorageConfigYamlVersion);
NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerProposeConfigRequest(
StorageConfigFetchYamlHash, StorageConfigYamlVersion), ++ProposeRequestCookie);
ProposeRequestInFlight = true;
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerValidateConfigResponse::TPtr ev) {
if (!ConsoleConnected) {
return;
}

const auto it = PendingValidation.find(ev->Cookie);
if (it == PendingValidation.end()) {
return;
}

TActivationContext::Send(ev->Forward(it->second));
PendingValidation.erase(it);
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerProposeConfigResponse::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC68, "received TEvControllerProposeConfigResponse",
(ConsoleConnected, ConsoleConnected),
(ProposeRequestInFlight, ProposeRequestInFlight),
(Cookie, ev->Cookie),
(ProposeRequestCookie, ProposeRequestCookie),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(Record, ev->Get()->Record));

if (!ConsoleConnected || !ProposeRequestInFlight || ev->Cookie != ProposeRequestCookie) {
return;
}
ProposeRequestInFlight = false;

const auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrBlobStorage::TEvControllerProposeConfigResponse::HashMismatch:
case NKikimrBlobStorage::TEvControllerProposeConfigResponse::UnexpectedConfig:
// TODO: error condition; restart?
ProposedConfigHashVersion.reset();
break;

case NKikimrBlobStorage::TEvControllerProposeConfigResponse::CommitIsNeeded: {
if (!StorageConfig || !StorageConfig->HasConfigComposite() || ProposedConfigHashVersion !=
std::make_tuple(StorageConfigFetchYamlHash, StorageConfigYamlVersion)) {
const char *err = "proposed config, but something has gone awfully wrong";
STLOG(PRI_CRIT, BS_NODE, NWDC69, err, (StorageConfig, StorageConfig),
(ProposedConfigHashVersion, ProposedConfigHashVersion),
(StorageConfigFetchYamlHash, StorageConfigFetchYamlHash),
(StorageConfigYamlVersion, StorageConfigYamlVersion));
Y_DEBUG_ABORT("%s", err);
return;
}

NTabletPipe::SendData(SelfId(), ConsolePipeId, new TEvBlobStorage::TEvControllerConsoleCommitRequest(
StorageConfigYaml), ++CommitRequestCookie);
break;
}

case NKikimrBlobStorage::TEvControllerProposeConfigResponse::CommitIsNotNeeded:
// it's okay, just wait for another configuration change or something like that
ConfigCommittedToConsole = true;
break;
}
}

void TDistributedConfigKeeper::Handle(TEvBlobStorage::TEvControllerConsoleCommitResponse::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC70, "received TEvControllerConsoleCommitResponse",
(ConsoleConnected, ConsoleConnected),
(Cookie, ev->Cookie),
(CommitRequestCookie, CommitRequestCookie),
(Record, ev->Get()->Record));

if (!ConsoleConnected || ev->Cookie != CommitRequestCookie) {
return;
}

const auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::SessionMismatch:
DisconnectFromConsole();
ConnectToConsole();
break;

case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::NotCommitted:
break;

case NKikimrBlobStorage::TEvControllerConsoleCommitResponse::Committed:
ConfigCommittedToConsole = true;
break;
}

ProposedConfigHashVersion.reset();
}

void TDistributedConfigKeeper::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC71, "received TEvClientConnected", (ConsolePipeId, ConsolePipeId),
(TabletId, ev->Get()->TabletId), (Status, ev->Get()->Status), (ClientId, ev->Get()->ClientId),
(ServerId, ev->Get()->ServerId));
if (ev->Get()->ClientId == ConsolePipeId) {
if (ev->Get()->Status == NKikimrProto::OK) {
ConsoleConnected = true;
SendConfigProposeRequest();
} else {
OnConsolePipeError();
}
}
}

void TDistributedConfigKeeper::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC72, "received TEvClientDestroyed", (ConsolePipeId, ConsolePipeId),
(TabletId, ev->Get()->TabletId), (ClientId, ev->Get()->ClientId), (ServerId, ev->Get()->ServerId));
if (ev->Get()->ClientId == ConsolePipeId) {
OnConsolePipeError();
}
}

void TDistributedConfigKeeper::OnConsolePipeError() {
ConsolePipeId = {};
ConsoleConnected = false;
ConfigCommittedToConsole = false;
ProposedConfigHashVersion.reset();
ProposeRequestInFlight = false;
++CommitRequestCookie; // to prevent processing any messages

// cancel any pending requests
for (const auto& [actorId, cancellation] : PendingConsoleCancellation) {
cancellation();
}
PendingConsoleCancellation.clear();
PendingValidation.clear();

ConnectToConsole();
}

std::optional<TString> TDistributedConfigKeeper::UpdateConfigComposite(NKikimrBlobStorage::TStorageConfig& config,
const TString& yaml, const std::optional<TString>& fetched) {
TString temp;
const TString *finalFetchedConfig = fetched ? &fetched.value() : &temp;

if (!fetched) { // fill in 'to-be-fetched' version of config with version incremented by one
try {
auto metadata = NYamlConfig::GetMetadata(yaml);
metadata.Cluster = metadata.Cluster.value_or("unknown"); // TODO: fix this
metadata.Version = metadata.Version.value_or(0) + 1;
temp = NYamlConfig::ReplaceMetadata(yaml, metadata);
} catch (const std::exception& ex) {
return ex.what();
}
}

TStringStream ss;
{
TZstdCompress zstd(&ss);
SaveSize(&zstd, yaml.size());
zstd.Write(yaml);
SaveSize(&zstd, finalFetchedConfig->size());
zstd.Write(*finalFetchedConfig);
}
config.SetConfigComposite(ss.Str());

return {};
}

}
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ namespace NKikimr::NStorage {
task.SetTaskId(RandomNumber<ui64>());
task.MutableCollectConfigs();
IssueScatterTask(TActorId(), std::move(task));
if (StorageConfig && StorageConfig->HasStateStorageConfig()) {
ConnectToConsole();
}
}

void TDistributedConfigKeeper::UnbecomeRoot() {
DisconnectFromConsole();
}

void TDistributedConfigKeeper::SwitchToError(const TString& reason) {
Expand Down
Loading

0 comments on commit 94d09aa

Please sign in to comment.