Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2709: add cleanup of leaked devices #2745

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <contrib/ydb/core/base/appdata.h>

#include <util/generic/algorithm.h>
#include <util/string/join.h>

namespace NCloud::NBlockStore::NStorage {

Expand Down Expand Up @@ -201,6 +202,17 @@ void TDiskRegistryActor::CompleteLoadState(
ScheduleMakeBackup(ctx, args.LastBackupTime);

ScheduleDiskRegistryAgentListExpiredParamsCleanup(ctx);

if (auto orphanDevices = State->FindOrphanDevices()) {
LOG_INFO(
ctx,
TBlockStoreComponents::DISK_REGISTRY,
"Found devices without agent and try to remove them: "
"DeviceUUIDs=%s",
JoinSeq(" ", orphanDevices).c_str());

ExecuteTx<TRemoveOrphanDevices>(ctx, std::move(orphanDevices));
}
}

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "disk_registry_actor.h"


namespace NCloud::NBlockStore::NStorage {

using namespace NActors;

using namespace NKikimr;
using namespace NKikimr::NTabletFlatExecutor;

////////////////////////////////////////////////////////////////////////////////

bool TDiskRegistryActor::PrepareRemoveOrphanDevices(
const TActorContext& ctx,
TTransactionContext& tx,
TTxDiskRegistry::TRemoveOrphanDevices& args)
{
Y_UNUSED(ctx);
Y_UNUSED(tx);
Y_UNUSED(args);

return true;
}

void TDiskRegistryActor::ExecuteRemoveOrphanDevices(
const TActorContext& ctx,
TTransactionContext& tx,
TTxDiskRegistry::TRemoveOrphanDevices& args)
{
Y_UNUSED(ctx);

TDiskRegistryDatabase db(tx.DB);
if (!args.OrphanDevices) {
State->RemoveOrphanDevices(db, args.OrphanDevices);
}
}

void TDiskRegistryActor::CompleteRemoveOrphanDevices(
const TActorContext& ctx,
TTxDiskRegistry::TRemoveOrphanDevices& args)
{
Y_UNUSED(args);
Y_UNUSED(ctx);
}

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -3689,6 +3689,8 @@ void TDiskRegistryState::ForgetDevices(
DeviceList.ForgetDevice(id);
db.DeleteSuspendedDevice(id);
db.DeleteDirtyDevice(id);

DeleteAutomaticallyReplacedDevice(db, id);
}
}

Expand Down Expand Up @@ -7506,6 +7508,45 @@ TVector<NProto::TAgentInfo> TDiskRegistryState::QueryAgentsInfo() const
return ret;
}

TVector<TString> TDiskRegistryState::FindOrphanDevices() const
{
THashSet<TString> allKnownDevicesWithAgents;
for (const auto& agent: AgentList.GetAgents()) {
for (const auto& device: agent.GetDevices()) {
const auto& deviceUUID = device.GetDeviceUUID();
allKnownDevicesWithAgents.insert(deviceUUID);
}
}

TVector<TString> orphanDevices;
for (auto& deviceUUID: DeviceList.GetDirtyDevicesId()) {
if (!allKnownDevicesWithAgents.contains(deviceUUID)) {
orphanDevices.emplace_back(std::move(deviceUUID));
}
}
for (auto& device: DeviceList.GetSuspendedDevices()) {
if (!allKnownDevicesWithAgents.contains(device.GetId())) {
orphanDevices.emplace_back(std::move(*device.MutableId()));
}
}
for (const auto& deviceUUID: AutomaticallyReplacedDeviceIds) {
if (!allKnownDevicesWithAgents.contains(deviceUUID)) {
orphanDevices.emplace_back(deviceUUID);
}
}

SortUnique(orphanDevices);

return orphanDevices;
}

void TDiskRegistryState::RemoveOrphanDevices(
TDiskRegistryDatabase& db,
const TVector<TString>& orphanDevicesIds)
{
ForgetDevices(db, orphanDevicesIds);
}

std::optional<ui64> TDiskRegistryState::GetDiskBlockCount(
const TDiskId& diskId) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,12 @@ class TDiskRegistryState

TVector<NProto::TAgentInfo> QueryAgentsInfo() const;

TVector<TString> FindOrphanDevices() const;

void RemoveOrphanDevices(
TDiskRegistryDatabase& db,
const TVector<TString>& orphanDevicesIds);

private:
void ProcessConfig(const NProto::TDiskRegistryConfig& config);
void ProcessDisks(TVector<NProto::TDiskConfig> disks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11911,6 +11911,126 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateTest)
// Crit event shouldn't be reported
UNIT_ASSERT_VALUES_EQUAL(2, criticalEvents->Val());
}
}

Y_UNIT_TEST(ShouldRemoveAlreadyLeakedDevices)
{
TTestExecutor executor;

executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

const auto agent = AgentConfig(
1,
"agent-1",
{
Device("NVMENBS01", "uuid-2.1", "rack-2"),
Device("NVMENBS02", "uuid-2.2", "rack-2"),
Device("NVMENBS03", "uuid-2.3", "rack-2"),
});

const TString leakedDirtyDevice = "uuid-100.1";
const TString leakedSuspendedDevice = "uuid-100.2";
const TString leakedAutomaticallyReplacedDevice = "uuid-100.3";

const TVector<TString> allLeakedDevices = {
leakedDirtyDevice,
leakedSuspendedDevice,
leakedAutomaticallyReplacedDevice};

// Add leaked devices.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
db.UpdateDirtyDevice(leakedDirtyDevice, "");
NProto::TSuspendedDevice device;
device.SetId(leakedSuspendedDevice);
db.UpdateSuspendedDevice(device);
db.AddAutomaticallyReplacedDevice(
TAutomaticallyReplacedDeviceInfo{
leakedAutomaticallyReplacedDevice,
Now()});
});

// Register agent.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
auto state = TDiskRegistryStateBuilder::LoadState(db).Build();

auto orphanDevices = state.FindOrphanDevices();
UNIT_ASSERT_EQUAL(static_cast<size_t>(3), orphanDevices.size());
for (const auto& leakedDevice: allLeakedDevices) {
UNIT_ASSERT_UNEQUAL(
orphanDevices.end(),
Find(orphanDevices, leakedDevice));
}

state.RemoveOrphanDevices(db, orphanDevices);

// Check that device cleaned up from tables.
const auto dirtyDevicesFromState = state.GetDirtyDevices();
UNIT_ASSERT_EQUAL(
dirtyDevicesFromState.end(),
FindIf(
dirtyDevicesFromState,
[&](const TDeviceConfig& val)
{ return val.deviceuuid() == leakedDirtyDevice; }));

TVector<TDirtyDevice> dirtyDevicesDb;
db.ReadDirtyDevices(dirtyDevicesDb);
UNIT_ASSERT_EQUAL(
dirtyDevicesDb.end(),
FindIf(
dirtyDevicesDb,
[&](const TDirtyDevice& val)
{ return val.Id == leakedDirtyDevice; }));

const auto suspendedDevicesFromState =
state.GetSuspendedDevices();

auto deviceIdPredicateForSuspendDevices =
[&](const NProto::TSuspendedDevice& val)
{
return val.GetId() == leakedSuspendedDevice;
};

UNIT_ASSERT_EQUAL(
suspendedDevicesFromState.end(),
FindIf(
suspendedDevicesFromState,
deviceIdPredicateForSuspendDevices));

TVector<NProto::TSuspendedDevice> suspendedDevicesDb;
db.ReadSuspendedDevices(suspendedDevicesDb);
UNIT_ASSERT_EQUAL(
suspendedDevicesDb.end(),
FindIf(
suspendedDevicesDb,
deviceIdPredicateForSuspendDevices));

auto deviceIdPredicateForAutomaticallyReplacedDevices =
[&](const TAutomaticallyReplacedDeviceInfo& val)
{
return val.DeviceId == leakedAutomaticallyReplacedDevice;
};

const auto automaticallyReplacedDevicesFromState =
state.GetAutomaticallyReplacedDevices();
UNIT_ASSERT_EQUAL(
automaticallyReplacedDevicesFromState.end(),
FindIf(
automaticallyReplacedDevicesFromState,
deviceIdPredicateForAutomaticallyReplacedDevices));

TDeque<TAutomaticallyReplacedDeviceInfo>
automaticallyReplacedDevicesFromDb;
db.ReadAutomaticallyReplacedDevices(
automaticallyReplacedDevicesFromDb);
UNIT_ASSERT_EQUAL(
automaticallyReplacedDevicesFromDb.end(),
FindIf(
automaticallyReplacedDevicesFromDb,
deviceIdPredicateForAutomaticallyReplacedDevices));
});
}
}
} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,114 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateCMSTest)
});
}

Y_UNIT_TEST(ShouldRemoveDevicesAfterAgentDelete)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

const auto agent = AgentConfig(
2,
"agent-2",
{
Device("NVMENBS01", "uuid-2.1", "rack-2"),
Device("NVMENBS02", "uuid-2.2", "rack-2"),
Device("NVMENBS03", "uuid-2.3", "rack-2"),
});

// Init state.
{
TDiskRegistryState state =
TDiskRegistryStateBuilder().WithConfig({agent}).Build();

// Register agent.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
UNIT_ASSERT_SUCCESS(RegisterAgent(state, db, agent, Now()));
for (const auto& device: agent.GetDevices()) {
state.MarkDeviceAsClean(
Now(),
db,
device.GetDeviceUUID());
}
});

// Mark devices.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
state.MarkDeviceAsDirty(
db,
agent.devices()[0].deviceuuid());
state.SuspendDevice(db, agent.devices()[1].deviceuuid());
db.AddAutomaticallyReplacedDevice(
TAutomaticallyReplacedDeviceInfo{
agent.devices()[2].deviceuuid(),
Now()});
});
}

executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
// Load state from db.
auto state = TDiskRegistryStateBuilder::LoadState(db)
.WithConfig({agent})
.Build();

// Remove agent.
TVector<TString> affectedDisks;
TDuration timeout;

UNIT_ASSERT_SUCCESS(state.UpdateCmsHostState(
db,
agent.GetAgentId(),
NProto::AGENT_STATE_WARNING,
Now(),
false, // dryRun
affectedDisks,
timeout));

UNIT_ASSERT_VALUES_EQUAL(1, state.GetAgents().size());

UNIT_ASSERT_SUCCESS(state.PurgeHost(
db,
agent.GetAgentId(),
Now(),
false, // dryRun
affectedDisks));

UNIT_ASSERT_SUCCESS(state.UpdateAgentState(
db,
agent.GetAgentId(),
NProto::AGENT_STATE_UNAVAILABLE,
Now(),
"lost",
affectedDisks));

UNIT_ASSERT_VALUES_EQUAL(0, state.GetAgents().size());

UNIT_ASSERT_VALUES_EQUAL(0, state.GetDirtyDevices().size());
TVector<TDirtyDevice> dirtyDevices;
db.ReadDirtyDevices(dirtyDevices);
UNIT_ASSERT_VALUES_EQUAL(0, dirtyDevices.size());

UNIT_ASSERT_VALUES_EQUAL(0, state.GetSuspendedDevices().size());
TVector<NProto::TSuspendedDevice> suspendedDevices;
db.ReadSuspendedDevices(suspendedDevices);
UNIT_ASSERT_VALUES_EQUAL(0, suspendedDevices.size());

UNIT_ASSERT_VALUES_EQUAL(
0,
state.GetAutomaticallyReplacedDevices().size());
TDeque<TAutomaticallyReplacedDeviceInfo>
automaticalyReplacedDevices;
db.ReadAutomaticallyReplacedDevices(
automaticalyReplacedDevices);
UNIT_ASSERT_VALUES_EQUAL(0, automaticalyReplacedDevices.size());
});
}

Y_UNIT_TEST(ShouldReturnAffectedDisksFromPurgeHost)
{
TTestExecutor executor;
Expand Down
Loading
Loading