Skip to content

Commit

Permalink
NBS-5637: Asynchronous disks allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
ya-ksgamora committed Dec 27, 2024
1 parent 6d14024 commit e6ba142
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 58 deletions.
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TDiskRegistryActor final
// Pending requests
TDeque<TPendingRequest> PendingRequests;

THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskAllocationRequests;
THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskDeallocationRequests;

bool BrokenDisksDestructionInProgress = false;
Expand Down Expand Up @@ -227,6 +228,20 @@ class TDiskRegistryActor final
TDiskRegistryDatabase& db,
TDiskRegistryStateSnapshot& args);

void AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfoPtr);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error);

void AddPendingDeallocation(
const NActors::TActorContext& ctx,
const TString& diskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,70 @@ void TDiskRegistryActor::CompleteAddDisk(
response->Record.SetMuteIOErrors(args.MuteIOErrors);
}

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
if (HasError(args.Error) || args.Error.GetCode() == S_ALREADY || !args.Sync) {
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
} else {
AddPendingAllocation(ctx, args.DiskId, args.RequestInfo);
}

DestroyBrokenDisks(ctx);
SecureErase(ctx); // I think we need this only if allocating dirty devices
NotifyUsers(ctx);
}

void TDiskRegistryActor::AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfo)
{
auto& requestInfos = PendingDiskAllocationRequests[diskId];

if (requestInfos.size() > Config->GetMaxNonReplicatedDiskAllocationRequests()) { // TODO: does it make sense?
LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Too many pending allocation requests (%lu) for disk %s. "
"Reject all requests.",
requestInfos.size(),
diskId.Quote().c_str());

ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED));
}

requestInfos.emplace_back(std::move(requestInfo));
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error)
{
for (auto& requestInfo: requestInfos) {
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TEvDiskRegistry::TEvAllocateDiskResponse>(error));
}
requestInfos.clear();
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId)
{
auto it = PendingDiskAllocationRequests.find(diskId);
if (it == PendingDiskAllocationRequests.end()) {
return;
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending allocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

ReplyToPendingAllocations(ctx, it->second, MakeError(S_OK));

PendingDiskAllocationRequests.erase(it);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskRegistryActor::HandleDeallocateDisk(
Expand Down Expand Up @@ -379,7 +437,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations(
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending deallocation requests. DiskId=%s PendingRquests=%d",
"Reply to pending deallocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void TDiskRegistryActor::ExecuteCleanupDevices(
TTxDiskRegistry::TCleanupDevices& args)
{
TDiskRegistryDatabase db(tx.DB);
args.SyncDeallocatedDisks =
State->MarkDevicesAsClean(ctx.Now(), db, args.Devices);
std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) =
std::move(State->MarkDevicesAsClean(ctx.Now(), db, args.Devices));
}

void TDiskRegistryActor::CompleteCleanupDevices(
Expand All @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices(
for (const auto& diskId: args.SyncDeallocatedDisks) {
ReplyToPendingDeallocations(ctx, diskId);
}

for (const auto& diskId: args.SyncAllocatedDisks) {
ReplyToPendingAllocations(ctx, diskId);
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3855,16 +3855,20 @@ bool TDiskRegistryState::MarkDeviceAsDirty(
return true;
}

TDiskRegistryState::TDiskId TDiskRegistryState::MarkDeviceAsClean(
TDiskRegistryState::TOpt2Disk TDiskRegistryState::MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid)
{
auto ret = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return ret.empty() ? "" : ret[0];
auto [alloc, dealloc] = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return {
alloc.empty() ? std::nullopt : std::make_optional(std::move(alloc[0])),
dealloc.empty() ? std::nullopt
: std::make_optional(std::move(dealloc[0]))};
}

TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
std::pair<TDiskRegistryState::TAllocatedDisksList, TDiskRegistryState::TDellocatedDisksList>
TDiskRegistryState::MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids)
Expand All @@ -3878,14 +3882,19 @@ TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
}
}

TVector<TDiskId> ret;
TAllocatedDisksList allocatedDisks;
TDellocatedDisksList dellocatedDisks;
for (const auto& uuid: TryUpdateDevices(now, db, uuids)) {
if (auto diskId = PendingCleanup.EraseDevice(uuid); !diskId.empty()) {
ret.push_back(std::move(diskId));
auto [allocatedDisk, deallocatedDisk] = PendingCleanup.EraseDevice(uuid);
if (allocatedDisk) {
allocatedDisks.push_back(std::move(*allocatedDisk));
}
if (deallocatedDisk) {
dellocatedDisks.push_back(std::move(*deallocatedDisk));
}
}

return ret;
return {std::move(allocatedDisks), std::move(dellocatedDisks)};
}

bool TDiskRegistryState::TryUpdateDevice(
Expand Down
18 changes: 14 additions & 4 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ class TDiskRegistryState
using TCheckpoints = THashMap<TCheckpointId, TCheckpointInfo>;
using TPlacementGroups = THashMap<TString, TPlacementGroupInfo>;

using TAllocatingDiskId = TDiskRegistryState::TDiskId;
using TDeallocatingDiskId = TDiskRegistryState::TDiskId;
using TAllocatedDisksList = TVector<TAllocatingDiskId>;
using TDellocatedDisksList = TVector<TDeallocatingDiskId>;

template<typename T, typename U>
using TOpt2 = std::pair<std::optional<T>, std::optional<U>>;
using TOpt2Disk = TOpt2<TAllocatingDiskId, TDeallocatingDiskId>;

private:
TLog Log;

Expand Down Expand Up @@ -503,16 +512,17 @@ class TDiskRegistryState

/// Mark selected device as clean and remove it
/// from lists of suspended/dirty/pending cleanup devices
/// @return disk id where selected device was allocated
TDiskId MarkDeviceAsClean(
/// @return allocated/deallocated disk id of where selected device was allocated/deallocated
TOpt2Disk MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid);

/// Mark selected devices as clean and remove them
/// from lists of suspended/dirty/pending cleanup devices
/// @return vector of disk ids where selected devices were allocated
TVector<TDiskId> MarkDevicesAsClean(
/// @return vector of allocated/deallocated disk ids where selected devices were allocated/deallocated
std::pair<TAllocatedDisksList, TDellocatedDisksList>
MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ struct TTxDiskRegistry
const TRequestInfoPtr RequestInfo;
const TVector<TString> Devices;

TVector<TString> SyncAllocatedDisks;
TVector<TString> SyncDeallocatedDisks;

explicit TCleanupDevices(
Expand All @@ -316,6 +317,7 @@ struct TTxDiskRegistry

void Clear()
{
SyncAllocatedDisks.clear();
SyncDeallocatedDisks.clear();
}
};
Expand Down
48 changes: 37 additions & 11 deletions cloud/blockstore/libs/storage/disk_registry/model/device_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void TDeviceList::UpdateDevices(

auto& nodeDevices = NodeDevices[agent.GetNodeId()];
nodeDevices.FreeDevices.clear();
nodeDevices.FreeDevicesIncludingDirty.clear();
nodeDevices.Rack.clear();
nodeDevices.TotalSize = 0;

Expand Down Expand Up @@ -187,6 +188,15 @@ void TDeviceList::UpdateDevices(
nodeDevices.FreeDevices.push_back(device);
}

const bool isFreeIncludingDirty =
DevicesAllocationAllowed(device.GetPoolKind(), agent.GetState()) &&
device.GetState() == NProto::DEVICE_STATE_ONLINE &&
!AllocatedDevices.contains(uuid) &&
!SuspendedDevices.contains(uuid);
if (isFreeIncludingDirty) {
nodeDevices.FreeDevicesIncludingDirty.push_back(device);
}

auto& poolNames = PoolKind2PoolNames[device.GetPoolKind()];
auto it = Find(poolNames, device.GetPoolName());
if (it == poolNames.end()) {
Expand All @@ -197,6 +207,7 @@ void TDeviceList::UpdateDevices(
}

SortBy(nodeDevices.FreeDevices, TBySortQueryKey());
SortBy(nodeDevices.FreeDevicesIncludingDirty, TBySortQueryKey());
}

void TDeviceList::RemoveDevices(const NProto::TAgentConfig& agent)
Expand Down Expand Up @@ -396,19 +407,20 @@ bool TDeviceList::ValidateAllocationQuery(
return false;
}

const TNodeDevices& nodeDevices = nodeItr->second;
TNodeDevices& nodeDevices = nodeItr->second;

if (query.ForbiddenRacks.contains(nodeDevices.Rack)) {
return false;
}

const auto& availableDevices = GetAvailableDevices(&nodeDevices, query);
const auto freeDeviceItr = FindIf(
nodeDevices.FreeDevices,
availableDevices,
[&targetDeviceId] (const NProto::TDeviceConfig& device) {
return device.GetDeviceUUID() == targetDeviceId;
});

if (freeDeviceItr == nodeDevices.FreeDevices.end()) {
if (freeDeviceItr == availableDevices.end()) {
return false;
}

Expand Down Expand Up @@ -526,6 +538,15 @@ auto TDeviceList::SelectRacks(
return result;
}

TVector<NProto::TDeviceConfig>& TDeviceList::GetAvailableDevices(
TNodeDevices* nodeDevices,
const TAllocationQuery& query) const {
const bool allowDirtyDevices =
query.PoolKind == NProto::DEVICE_POOL_KIND_LOCAL &&
true; // TODO: query.allowDirtyLocalDevices;
return allowDirtyDevices ? nodeDevices->FreeDevicesIncludingDirty : nodeDevices->FreeDevices;
}

TVector<TDeviceList::TDeviceRange> TDeviceList::CollectDevices(
const TAllocationQuery& query,
const TString& poolName)
Expand All @@ -545,7 +566,7 @@ TVector<TDeviceList::TDeviceRange> TDeviceList::CollectDevices(
// finding free devices belonging to this node that match our
// query
auto [begin, end] =
FindDeviceRange(query, poolName, nodeDevices->FreeDevices);
FindDeviceRange(query, poolName, GetAvailableDevices(nodeDevices, query));

using TDeviceIter = decltype(begin);
struct TDeviceInfo
Expand Down Expand Up @@ -670,9 +691,9 @@ TVector<NProto::TDeviceConfig> TDeviceList::AllocateDevices(
});

auto& nodeDevices = NodeDevices[nodeId];

auto& availableDevices = GetAvailableDevices(&nodeDevices, query);
for (const auto& arange: aranges) {
nodeDevices.FreeDevices.erase(arange.first, arange.second);
availableDevices.erase(arange.first, arange.second);
}
}

Expand Down Expand Up @@ -716,15 +737,20 @@ void TDeviceList::MarkDeviceAsDirty(const TDeviceId& id)
void TDeviceList::RemoveDeviceFromFreeList(const TDeviceId& id)
{
auto nodeId = FindNodeId(id);

const auto& predicate = [&](const auto& x)
{
return x.GetDeviceUUID() == id;
};
if (nodeId) {
auto& devices = NodeDevices[nodeId].FreeDevices;
auto& devicesIncludingDirty =
NodeDevices[nodeId].FreeDevicesIncludingDirty;

auto it = FindIf(devices, [&] (const auto& x) {
return x.GetDeviceUUID() == id;
});
if (auto* it = FindIf(devices, predicate); it != devices.end()) {
devices.erase(it);
}

if (it != devices.end()) {
if (auto* it = FindIf(devicesIncludingDirty, predicate); it != devices.end()) {
devices.erase(it);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class TDeviceList

// sorted by {PoolKind, BlockSize}
TVector<NProto::TDeviceConfig> FreeDevices;
// sorted by {PoolKind, BlockSize}
TVector<NProto::TDeviceConfig> FreeDevicesIncludingDirty;

ui64 TotalSize = 0;
};
Expand Down Expand Up @@ -182,6 +184,10 @@ class TDeviceList
const TDeviceId& id,
const NProto::TDeviceConfig& device);
void RemoveFromAllDevices(const TDeviceId& id);
// TODO: this maybe class method of nodeDevices
TVector<NProto::TDeviceConfig>& GetAvailableDevices(
TNodeDevices* nodeDevices,
const TAllocationQuery& query) const;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit e6ba142

Please sign in to comment.