diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index aa372f426e0..6542e883990 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -83,6 +83,7 @@ class TDiskRegistryActor final // Pending requests TDeque PendingRequests; + THashMap> PendingDiskAllocationRequests; THashMap> PendingDiskDeallocationRequests; bool BrokenDisksDestructionInProgress = false; @@ -227,6 +228,20 @@ class TDiskRegistryActor final TDiskRegistryDatabase& db, TDiskRegistryStateSnapshot& args); + void AddPendingAllocation( + const NActors::TActorContext& ctx, + const TString& diskId, + TRequestInfoPtr requestInfoPtr); + + void ReplyToPendingAllocations( + const NActors::TActorContext& ctx, + const TString& diskId); + + void ReplyToPendingAllocations( + const NActors::TActorContext& ctx, + TVector& requestInfos, + NProto::TError error); + void AddPendingDeallocation( const NActors::TActorContext& ctx, const TString& diskId, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_allocate.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_allocate.cpp index 06ad674e2a9..31a25b1a8e3 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_allocate.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_allocate.cpp @@ -240,12 +240,70 @@ void TDiskRegistryActor::CompleteAddDisk( response->Record.SetMuteIOErrors(args.MuteIOErrors); } - NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); + if (HasError(args.Error) || args.Error.GetCode() == S_ALREADY || !args.Sync) { + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); + } else { + AddPendingAllocation(ctx, args.DiskId, args.RequestInfo); + } DestroyBrokenDisks(ctx); + SecureErase(ctx); // I think we need this only if allocating dirty devices NotifyUsers(ctx); } +void TDiskRegistryActor::AddPendingAllocation( + const NActors::TActorContext& ctx, + const TString& diskId, + TRequestInfoPtr requestInfo) +{ + auto& requestInfos = PendingDiskAllocationRequests[diskId]; + + if (requestInfos.size() > Config->GetMaxNonReplicatedDiskAllocationRequests()) { // TODO: does it make sense? + LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY, + "Too many pending allocation requests (%lu) for disk %s. " + "Reject all requests.", + requestInfos.size(), + diskId.Quote().c_str()); + + ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED)); + } + + requestInfos.emplace_back(std::move(requestInfo)); +} + +void TDiskRegistryActor::ReplyToPendingAllocations( + const NActors::TActorContext& ctx, + TVector& requestInfos, + NProto::TError error) +{ + for (auto& requestInfo: requestInfos) { + NCloud::Reply( + ctx, + *requestInfo, + std::make_unique(error)); + } + requestInfos.clear(); +} + +void TDiskRegistryActor::ReplyToPendingAllocations( + const NActors::TActorContext& ctx, + const TString& diskId) +{ + auto it = PendingDiskAllocationRequests.find(diskId); + if (it == PendingDiskAllocationRequests.end()) { + return; + } + + LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY, + "Reply to pending allocation requests. DiskId=%s PendingRequests=%d", + diskId.Quote().c_str(), + static_cast(it->second.size())); + + ReplyToPendingAllocations(ctx, it->second, MakeError(S_OK)); + + PendingDiskAllocationRequests.erase(it); +} + //////////////////////////////////////////////////////////////////////////////// void TDiskRegistryActor::HandleDeallocateDisk( @@ -379,7 +437,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations( } LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY, - "Reply to pending deallocation requests. DiskId=%s PendingRquests=%d", + "Reply to pending deallocation requests. DiskId=%s PendingRequests=%d", diskId.Quote().c_str(), static_cast(it->second.size())); diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_secure_erase.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_secure_erase.cpp index e65ac54d5d7..34995de5c35 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_secure_erase.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_secure_erase.cpp @@ -279,8 +279,8 @@ void TDiskRegistryActor::ExecuteCleanupDevices( TTxDiskRegistry::TCleanupDevices& args) { TDiskRegistryDatabase db(tx.DB); - args.SyncDeallocatedDisks = - State->MarkDevicesAsClean(ctx.Now(), db, args.Devices); + std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) = + std::move(State->MarkDevicesAsClean(ctx.Now(), db, args.Devices)); } void TDiskRegistryActor::CompleteCleanupDevices( @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices( for (const auto& diskId: args.SyncDeallocatedDisks) { ReplyToPendingDeallocations(ctx, diskId); } + + for (const auto& diskId: args.SyncAllocatedDisks) { + ReplyToPendingAllocations(ctx, diskId); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index 0ee39952938..11c0febd313 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3855,16 +3855,20 @@ bool TDiskRegistryState::MarkDeviceAsDirty( return true; } -TDiskRegistryState::TDiskId TDiskRegistryState::MarkDeviceAsClean( +TDiskRegistryState::TOpt2Disk TDiskRegistryState::MarkDeviceAsClean( TInstant now, TDiskRegistryDatabase& db, const TDeviceId& uuid) { - auto ret = MarkDevicesAsClean(now, db, TVector{uuid}); - return ret.empty() ? "" : ret[0]; + auto [alloc, dealloc] = MarkDevicesAsClean(now, db, TVector{uuid}); + return { + alloc.empty() ? std::nullopt : std::make_optional(std::move(alloc[0])), + dealloc.empty() ? std::nullopt + : std::make_optional(std::move(dealloc[0]))}; } -TVector TDiskRegistryState::MarkDevicesAsClean( +std::pair +TDiskRegistryState::MarkDevicesAsClean( TInstant now, TDiskRegistryDatabase& db, const TVector& uuids) @@ -3878,14 +3882,19 @@ TVector TDiskRegistryState::MarkDevicesAsClean( } } - TVector ret; + TAllocatedDisksList allocatedDisks; + TDellocatedDisksList dellocatedDisks; for (const auto& uuid: TryUpdateDevices(now, db, uuids)) { - if (auto diskId = PendingCleanup.EraseDevice(uuid); !diskId.empty()) { - ret.push_back(std::move(diskId)); + auto [allocatedDisk, deallocatedDisk] = PendingCleanup.EraseDevice(uuid); + if (allocatedDisk) { + allocatedDisks.push_back(std::move(*allocatedDisk)); + } + if (deallocatedDisk) { + dellocatedDisks.push_back(std::move(*deallocatedDisk)); } } - return ret; + return {std::move(allocatedDisks), std::move(dellocatedDisks)}; } bool TDiskRegistryState::TryUpdateDevice( diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h index 3d806eb2189..d655d0f691c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h @@ -281,6 +281,15 @@ class TDiskRegistryState using TCheckpoints = THashMap; using TPlacementGroups = THashMap; + using TAllocatingDiskId = TDiskRegistryState::TDiskId; + using TDeallocatingDiskId = TDiskRegistryState::TDiskId; + using TAllocatedDisksList = TVector; + using TDellocatedDisksList = TVector; + + template + using TOpt2 = std::pair, std::optional>; + using TOpt2Disk = TOpt2; + private: TLog Log; @@ -503,16 +512,17 @@ class TDiskRegistryState /// Mark selected device as clean and remove it /// from lists of suspended/dirty/pending cleanup devices - /// @return disk id where selected device was allocated - TDiskId MarkDeviceAsClean( + /// @return allocated/deallocated disk id of where selected device was allocated/deallocated + TOpt2Disk MarkDeviceAsClean( TInstant now, TDiskRegistryDatabase& db, const TDeviceId& uuid); /// Mark selected devices as clean and remove them /// from lists of suspended/dirty/pending cleanup devices - /// @return vector of disk ids where selected devices were allocated - TVector MarkDevicesAsClean( + /// @return vector of allocated/deallocated disk ids where selected devices were allocated/deallocated + std::pair + MarkDevicesAsClean( TInstant now, TDiskRegistryDatabase& db, const TVector& uuids); diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_tx.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_tx.h index 43619484ef4..c29c1d96ad8 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_tx.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_tx.h @@ -305,6 +305,7 @@ struct TTxDiskRegistry const TRequestInfoPtr RequestInfo; const TVector Devices; + TVector SyncAllocatedDisks; TVector SyncDeallocatedDisks; explicit TCleanupDevices( @@ -316,6 +317,7 @@ struct TTxDiskRegistry void Clear() { + SyncAllocatedDisks.clear(); SyncDeallocatedDisks.clear(); } }; diff --git a/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.cpp b/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.cpp index d96bdf1211f..d680292b35f 100644 --- a/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.cpp @@ -10,9 +10,10 @@ namespace NCloud::NBlockStore::NStorage { NProto::TError TPendingCleanup::Insert( const TString& diskId, - TVector uuids) + TVector uuids, + bool allocation) { - auto error = ValidateInsertion(diskId, uuids); + auto error = ValidateInsertion(diskId, uuids, allocation); if (HasError(error)) { return error; } @@ -21,8 +22,8 @@ NProto::TError TPendingCleanup::Insert( for (auto& uuid: uuids) { Y_DEBUG_ABORT_UNLESS(!uuid.empty()); - auto [_, success] = DeviceToDisk.emplace(std::move(uuid), diskId); - Y_DEBUG_ABORT_UNLESS(success); + auto& [allocatingDiskId, deallocatingDiskId] = DeviceToDisk[uuid]; + (allocation ? allocatingDiskId : deallocatingDiskId) = diskId; } return {}; @@ -35,7 +36,8 @@ NProto::TError TPendingCleanup::Insert(const TString& diskId, TString uuid) [[nodiscard]] NProto::TError TPendingCleanup::ValidateInsertion( const TString& diskId, - const TVector& uuids) const + const TVector& uuids, + bool allocation) const { if (diskId.empty() || uuids.empty()) { return MakeError( @@ -55,42 +57,57 @@ NProto::TError TPendingCleanup::Insert(const TString& diskId, TString uuid) << JoinStrings(uuids, ", ") << "]"); } - const auto* foundDiskId = DeviceToDisk.FindPtr(uuid); - if (foundDiskId) { - return MakeError( - E_ARGUMENT, - TStringBuilder() << "Could not insert device: " << uuid - << "; diskId: " << diskId - << " to PendingCleanup. It was already " - "inserted with the diskId: " - << *foundDiskId); + const auto* foundDisks = DeviceToDisk.FindPtr(uuid); + if (foundDisks) { + const auto& foundDiskId = allocation ? foundDisks->first : foundDisks->second; + if (foundDiskId) { + return MakeError( + E_ARGUMENT, + TStringBuilder() << "Could not insert device: " << uuid + << "; diskId: " << diskId + << " to PendingCleanup. It was already " + "inserted with the diskId: " + << *foundDiskId); + } } } return {}; } -TString TPendingCleanup::EraseDevice(const TString& uuid) +TPendingCleanup::TOpt2Disk TPendingCleanup::EraseDevice(const TString& uuid) { auto it = DeviceToDisk.find(uuid); if (it == DeviceToDisk.end()) { return {}; } - auto diskId = std::move(it->second); - DeviceToDisk.erase(it); + auto& [allocatingDisk, deallocatingDisk] = it->second; + TOpt2Disk ret; - Y_DEBUG_ABORT_UNLESS(DiskToDeviceCount.contains(diskId)); - if (--DiskToDeviceCount[diskId] > 0) { - return {}; + Y_DEBUG_ABORT_UNLESS(allocatingDisk || deallocatingDisk); + Y_DEBUG_ABORT_UNLESS(!allocatingDisk || DiskToDeviceCount.contains(*allocatingDisk)); + Y_DEBUG_ABORT_UNLESS(!deallocatingDisk || DiskToDeviceCount.contains(*deallocatingDisk)); + if (allocatingDisk && --DiskToDeviceCount[*allocatingDisk] <= 0) { + DiskToDeviceCount.erase(*allocatingDisk); + ret.first = std::move(allocatingDisk); + allocatingDisk.reset(); } - DiskToDeviceCount.erase(diskId); + if (deallocatingDisk && --DiskToDeviceCount[*deallocatingDisk] <= 0) { + DiskToDeviceCount.erase(*deallocatingDisk); + ret.second = std::move(deallocatingDisk); + deallocatingDisk.reset(); + } - return diskId; + if (!allocatingDisk && !deallocatingDisk) { + DeviceToDisk.erase(it); + } + + return ret; } -TString TPendingCleanup::FindDiskId(const TString& uuid) const +TPendingCleanup::TOpt2Disk TPendingCleanup::FindDiskId(const TString& uuid) const { auto it = DeviceToDisk.find(uuid); if (it == DeviceToDisk.end()) { @@ -106,9 +123,31 @@ bool TPendingCleanup::EraseDisk(const TString& diskId) return false; } - EraseNodesIf(DeviceToDisk, [&] (auto& x) { - return x.second == diskId; - }); + bool isAllocatingDisk = false; + bool isDellocatingDisk = false; + auto it = std::find_if( + DeviceToDisk.begin(), + DeviceToDisk.end(), + [&](const auto& elem) + { + auto& [allocating, deallocating] = elem.second; + isAllocatingDisk = allocating && *allocating == diskId; + isDellocatingDisk = deallocating && *deallocating == diskId; + return isAllocatingDisk || isDellocatingDisk; + }); + + Y_ABORT_IF(isAllocatingDisk && isDellocatingDisk); + + auto& [allocating, deallocating] = it->second; + if (isAllocatingDisk) { + allocating.reset(); + } else { + deallocating.reset(); + } + + if (!allocating && !deallocating) { + DiskToDeviceCount.erase(it); + } return true; } diff --git a/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.h b/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.h index 841184ce555..3430a0897ac 100644 --- a/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.h +++ b/cloud/blockstore/libs/storage/disk_registry/model/pending_cleanup.h @@ -15,27 +15,38 @@ namespace NCloud::NBlockStore::NStorage { class TPendingCleanup { +public: + using TAllocatingDiskId = TString; + using TDeallocatingDiskId = TString; + + template + using TOpt2 = std::pair, std::optional>; + using TOpt2Disk = TOpt2; private: THashMap DiskToDeviceCount; - THashMap DeviceToDisk; + THashMap DeviceToDisk; public: [[nodiscard]] NProto::TError Insert( const TString& diskId, - TVector uuids); + TVector uuids, + bool allocation = false); [[nodiscard]] NProto::TError Insert(const TString& diskId, TString uuid); - TString EraseDevice(const TString& uuid); + /// Removes the device from deallocating disk (for pending deallocation disks) + /// @return diskId of allocated/deallocated disk if the device with the given UUID was the last to complete its allocation/deallocation + TOpt2Disk EraseDevice(const TString& uuid); bool EraseDisk(const TString& diskId); - [[nodiscard]] TString FindDiskId(const TString& uuid) const; + [[nodiscard]] TOpt2Disk FindDiskId(const TString& uuid) const; [[nodiscard]] bool IsEmpty() const; [[nodiscard]] bool Contains(const TString& diskId) const; private: [[nodiscard]] NProto::TError ValidateInsertion( const TString& diskId, - const TVector& uuids) const; + const TVector& uuids, + bool allocation) const; }; } // namespace NCloud::NBlockStore::NStorage