Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2709: add cleanup of leaked devices #2745

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "disk_registry_actor.h"

namespace NCloud::NBlockStore::NStorage {

using namespace NActors;

using namespace NKikimr;
using namespace NKikimr::NTabletFlatExecutor;

////////////////////////////////////////////////////////////////////////////////

bool TDiskRegistryActor::PrepareCleanupDevicesWithoutAgent(
const TActorContext& ctx,
TTransactionContext& tx,
TTxDiskRegistry::TCleanupDevicesWithoutAgent& args)
{
Y_UNUSED(ctx);
Y_UNUSED(tx);
Y_UNUSED(args);

return true;
}

void TDiskRegistryActor::ExecuteCleanupDevicesWithoutAgent(
const TActorContext& ctx,
TTransactionContext& tx,
TTxDiskRegistry::TCleanupDevicesWithoutAgent& args)
{
Y_UNUSED(ctx);
Y_UNUSED(args);

TDiskRegistryDatabase db(tx.DB);
State->CleanupDevices(db);
}

void TDiskRegistryActor::CompleteCleanupDevicesWithoutAgent(
const TActorContext& ctx,
TTxDiskRegistry::TCleanupDevicesWithoutAgent& args)
{
Y_UNUSED(args);
Y_UNUSED(ctx);
}

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ void TDiskRegistryActor::CompleteLoadState(
ScheduleMakeBackup(ctx, args.LastBackupTime);

ScheduleDiskRegistryAgentListExpiredParamsCleanup(ctx);

ExecuteTx<TCleanupDevicesWithoutAgent>(ctx);
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <util/string/builder.h>
#include <util/string/join.h>

#include <ranges>
#include <tuple>

namespace NCloud::NBlockStore::NStorage {
Expand Down Expand Up @@ -3689,7 +3690,27 @@ void TDiskRegistryState::ForgetDevices(
DeviceList.ForgetDevice(id);
db.DeleteSuspendedDevice(id);
db.DeleteDirtyDevice(id);

// Cleanup AutomaticallyReplacedDevice
{
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
auto idIt = AutomaticallyReplacedDeviceIds.find(id);
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved

if (idIt != AutomaticallyReplacedDeviceIds.end()) {
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
AutomaticallyReplacedDeviceIds.erase(idIt);

auto deviceItToCleanup = FindIf(
AutomaticallyReplacedDevices,
[&](const TAutomaticallyReplacedDeviceInfo& value)
{ return value.DeviceId == id; });
if (deviceItToCleanup != AutomaticallyReplacedDevices.end()) {
AutomaticallyReplacedDevices.erase(deviceItToCleanup);
}

db.DeleteAutomaticallyReplacedDevice(id);
}
}
}

}

void TDiskRegistryState::RemoveAgent(
Expand Down Expand Up @@ -7506,6 +7527,53 @@ TVector<NProto::TAgentInfo> TDiskRegistryState::QueryAgentsInfo() const
return ret;
}

void TDiskRegistryState::CleanupDevices(TDiskRegistryDatabase& db)
{
THashSet<TString> allKnownDevicesWithAgents;
for (const auto& agent: AgentList.GetAgents()) {
for (const auto& device: agent.GetDevices()) {
const auto& deviceId = device.GetDeviceUUID();
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
allKnownDevicesWithAgents.insert(deviceId);
}
}

TVector<TString> devicesToRemoveVector;
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved


auto filterLeakedDevices = [&](const TString& id)
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
{
return allKnownDevicesWithAgents.find(id) ==
allKnownDevicesWithAgents.end();
};
auto addIdToVector = [&](TString id)
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
{
devicesToRemoveVector.emplace_back(std::move(id));
};

for (auto& deviceId: DeviceList.GetDirtyDevicesId() |
std::views::filter(filterLeakedDevices))
{
addIdToVector(std::move(deviceId));
}
for (auto& device: DeviceList.GetSuspendedDevices() |
std::views::filter(
[&](const auto& device)
{ return filterLeakedDevices(device.GetId()); }))
{
addIdToVector(std::move(*device.MutableId()));
}
for (const auto& deviceId: AutomaticallyReplacedDeviceIds |
std::views::filter(filterLeakedDevices))
{
addIdToVector(deviceId);
}

SortUnique(devicesToRemoveVector);
ForgetDevices(db, devicesToRemoveVector);

return {};
}

std::optional<ui64> TDiskRegistryState::GetDiskBlockCount(
const TDiskId& diskId) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ class TDiskRegistryState

TVector<NProto::TAgentInfo> QueryAgentsInfo() const;

void CleanupDevices(TDiskRegistryDatabase& db);
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved

private:
void ProcessConfig(const NProto::TDiskRegistryConfig& config);
void ProcessDisks(TVector<NProto::TDiskConfig> disks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11911,6 +11911,114 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateTest)
// Crit event shouldn't be reported
UNIT_ASSERT_VALUES_EQUAL(2, criticalEvents->Val());
}
}

Y_UNIT_TEST(ShouldCleanupAlreadyLeakedDevices)
{
TTestExecutor executor;

executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

const auto agent = AgentConfig(
1,
"agent-1",
{
Device("NVMENBS01", "uuid-2.1", "rack-2"),
Device("NVMENBS02", "uuid-2.2", "rack-2"),
Device("NVMENBS03", "uuid-2.3", "rack-2"),
});

const TString leakedDirtyDevice = "uuid-100.1";
const TString leakedSuspendedDevice = "uuid-100.2";
const TString leakedAutomaticallyReplacedDevice = "uuid-100.3";

// Add leaked devices.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
db.UpdateDirtyDevice(leakedDirtyDevice, "");
NProto::TSuspendedDevice device;
device.SetId(leakedSuspendedDevice);
db.UpdateSuspendedDevice(device);
db.AddAutomaticallyReplacedDevice(
TAutomaticallyReplacedDeviceInfo{
leakedAutomaticallyReplacedDevice,
Now()});
});

// Register agent.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
auto state = TDiskRegistryStateBuilder::LoadState(db)->Build();

state.CleanupDevices(db);

const auto dirtyDevicesFromState = state.GetDirtyDevices();

// Check that device cleaned up from tables.
UNIT_ASSERT_EQUAL(
dirtyDevicesFromState.end(),
FindIf(
dirtyDevicesFromState,
[&](const TDeviceConfig& val)
{ return val.deviceuuid() == leakedDirtyDevice; }));

TVector<TDirtyDevice> dirtyDevicesDb;
db.ReadDirtyDevices(dirtyDevicesDb);
UNIT_ASSERT_EQUAL(
dirtyDevicesDb.end(),
FindIf(
dirtyDevicesDb,
[&](const TDirtyDevice& val)
{ return val.Id == leakedDirtyDevice; }));

const auto suspendedDevicesFromState =
state.GetSuspendedDevices();

auto deviceIdPredicateForSuspendDevices =
[&](const NProto::TSuspendedDevice& val)
{
return val.GetId() == leakedSuspendedDevice;
};

UNIT_ASSERT_EQUAL(
suspendedDevicesFromState.end(),
FindIf(
suspendedDevicesFromState,
deviceIdPredicateForSuspendDevices));

TVector<NProto::TSuspendedDevice> suspendedDevicesDb;
db.ReadSuspendedDevices(suspendedDevicesDb);
UNIT_ASSERT_EQUAL(
suspendedDevicesDb.end(),
FindIf(
suspendedDevicesDb,
deviceIdPredicateForSuspendDevices));

auto deviceIdPredicateForAutomaticallyReplacedDevices =
[&](const TAutomaticallyReplacedDeviceInfo& val)
{
return val.DeviceId == leakedAutomaticallyReplacedDevice;
};

const auto automaticallyReplacedDevicesFromState =
state.GetAutomaticallyReplacedDevices();
UNIT_ASSERT_EQUAL(
automaticallyReplacedDevicesFromState.end(),
FindIf(
automaticallyReplacedDevicesFromState,
deviceIdPredicateForAutomaticallyReplacedDevices));

TDeque<TAutomaticallyReplacedDeviceInfo>
automaticallyReplacedDevicesFromDb;
db.ReadAutomaticallyReplacedDevices(
automaticallyReplacedDevicesFromDb);
UNIT_ASSERT_EQUAL(
automaticallyReplacedDevicesFromDb.end(),
FindIf(
automaticallyReplacedDevicesFromDb,
deviceIdPredicateForAutomaticallyReplacedDevices));
});
}
}
} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,114 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateCMSTest)
});
}

Y_UNIT_TEST(ShouldCleanupDevicesFromOtherTables)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

const auto agent = AgentConfig(
2,
"agent-2",
{
Device("NVMENBS01", "uuid-2.1", "rack-2"),
Device("NVMENBS02", "uuid-2.2", "rack-2"),
Device("NVMENBS03", "uuid-2.3", "rack-2"),
});

// Init state.
{
TDiskRegistryState state =
TDiskRegistryStateBuilder().WithConfig({agent}).Build();

// Register agent.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
UNIT_ASSERT_SUCCESS(RegisterAgent(state, db, agent, Now()));
for (const auto& device: agent.GetDevices()) {
state.MarkDeviceAsClean(
Now(),
db,
device.GetDeviceUUID());
}
});

// Mark devices.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
state.MarkDeviceAsDirty(
db,
agent.devices()[0].deviceuuid());
state.SuspendDevice(db, agent.devices()[1].deviceuuid());
db.AddAutomaticallyReplacedDevice(
TAutomaticallyReplacedDeviceInfo{
agent.devices()[2].deviceuuid(),
Now()});
});
}

executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
// Load state from db.
auto state = TDiskRegistryStateBuilder::LoadState(db)
->WithConfig({agent})
.Build();

// Remove agent.
TVector<TString> affectedDisks;
TDuration timeout;

UNIT_ASSERT_SUCCESS(state.UpdateCmsHostState(
db,
agent.GetAgentId(),
NProto::AGENT_STATE_WARNING,
Now(),
false, // dryRun
affectedDisks,
timeout));

UNIT_ASSERT_VALUES_EQUAL(1, state.GetAgents().size());

UNIT_ASSERT_SUCCESS(state.PurgeHost(
db,
agent.GetAgentId(),
Now(),
false, // dryRun
affectedDisks));

UNIT_ASSERT_SUCCESS(state.UpdateAgentState(
db,
agent.GetAgentId(),
NProto::AGENT_STATE_UNAVAILABLE,
Now(),
"lost",
affectedDisks));

UNIT_ASSERT_VALUES_EQUAL(0, state.GetAgents().size());

UNIT_ASSERT_VALUES_EQUAL(0, state.GetDirtyDevices().size());
TVector<TDirtyDevice> dirtyDevices;
db.ReadDirtyDevices(dirtyDevices);
UNIT_ASSERT_VALUES_EQUAL(0, dirtyDevices.size());

UNIT_ASSERT_VALUES_EQUAL(0, state.GetSuspendedDevices().size());
TVector<NProto::TSuspendedDevice> suspendedDevices;
db.ReadSuspendedDevices(suspendedDevices);
UNIT_ASSERT_VALUES_EQUAL(0, suspendedDevices.size());

UNIT_ASSERT_VALUES_EQUAL(
0,
state.GetAutomaticallyReplacedDevices().size());
TDeque<TAutomaticallyReplacedDeviceInfo>
automaticalyReplacedDevices;
db.ReadAutomaticallyReplacedDevices(
automaticalyReplacedDevices);
UNIT_ASSERT_VALUES_EQUAL(0, automaticalyReplacedDevices.size());
});
}

Y_UNIT_TEST(ShouldReturnAffectedDisksFromPurgeHost)
{
TTestExecutor executor;
Expand Down
Loading
Loading