From 95580a71f2263eede02efe83689ba8b4f1ddcd6a Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Wed, 15 Jan 2025 17:27:52 +0700 Subject: [PATCH 01/11] issue-2725: mv acquire release logic from DR --- .../acquire_devices_actor.cpp} | 259 +++++------- .../storage/core/acquire_release_devices.h | 107 +++++ .../libs/storage/core/proto_helpers.cpp | 11 + .../libs/storage/core/proto_helpers.h | 1 + .../storage/core/release_devices_actor.cpp | 271 +++++++++++++ cloud/blockstore/libs/storage/core/ya.make | 2 + .../disk_registry/disk_registry_actor.cpp | 45 ++- .../disk_registry/disk_registry_actor.h | 18 +- .../disk_registry_actor_acquire_release.cpp | 245 ++++++++++++ .../disk_registry_actor_release.cpp | 373 ------------------ .../disk_registry/disk_registry_private.h | 67 +--- .../disk_registry/disk_registry_state.cpp | 6 + .../disk_registry/disk_registry_state.h | 1 + .../disk_registry_ut_session.cpp | 8 +- .../storage/disk_registry/testlib/test_env.h | 12 - .../libs/storage/disk_registry/ya.make | 3 +- 16 files changed, 788 insertions(+), 641 deletions(-) rename cloud/blockstore/libs/storage/{disk_registry/disk_registry_actor_acquire.cpp => core/acquire_devices_actor.cpp} (58%) create mode 100644 cloud/blockstore/libs/storage/core/acquire_release_devices.h create mode 100644 cloud/blockstore/libs/storage/core/release_devices_actor.cpp create mode 100644 cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp delete mode 100644 cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp similarity index 58% rename from cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp rename to cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp index f7d78f8ef3..88eb853c88 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp +++ b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp @@ -1,53 +1,52 @@ -#include "disk_registry_actor.h" +#include "acquire_release_devices.h" -#include +#include +#include +#include -#include +#include +#include +#include #include -#include - -namespace NCloud::NBlockStore::NStorage { +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { using namespace NActors; -using namespace NKikimr::NTabletFlatExecutor; - namespace { //////////////////////////////////////////////////////////////////////////////// -class TAcquireDiskActor final - : public TActorBootstrapped +class TAcquireDevicesActor final: public TActorBootstrapped { private: const TActorId Owner; - TRequestInfoPtr RequestInfo; TVector Devices; - const ui32 LogicalBlockSize = 0; - const TString DiskId; - const TString ClientId; + TString DiskId; + TString ClientId; const NProto::EVolumeAccessMode AccessMode; const ui64 MountSeqNumber; const ui32 VolumeGeneration; const TDuration RequestTimeout; + const bool MuteIOErrors; + NLog::EComponent Component; int PendingRequests = 0; TVector SentAcquireRequests; public: - TAcquireDiskActor( + TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, TVector devices, - ui32 logicalBlockSize, TString diskId, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber, ui32 volumeGeneration, - TDuration requestTimeout); + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component); void Bootstrap(const TActorContext& ctx); @@ -55,8 +54,6 @@ class TAcquireDiskActor final void PrepareRequest(NProto::TAcquireDevicesRequest& request) const; void PrepareRequest(NProto::TReleaseDevicesRequest& request) const; - void FinishAcquireDisk(const TActorContext& ctx, NProto::TError error); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); void OnAcquireResponse( @@ -104,45 +101,47 @@ class TAcquireDiskActor final //////////////////////////////////////////////////////////////////////////////// -TAcquireDiskActor::TAcquireDiskActor( +TAcquireDevicesActor::TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, TVector devices, - ui32 logicalBlockSize, TString diskId, TString clientId, NProto::EVolumeAccessMode accessMode, ui64 mountSeqNumber, ui32 volumeGeneration, - TDuration requestTimeout) + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component) : Owner(owner) - , RequestInfo(std::move(requestInfo)) , Devices(std::move(devices)) - , LogicalBlockSize(logicalBlockSize) , DiskId(std::move(diskId)) , ClientId(std::move(clientId)) , AccessMode(accessMode) , MountSeqNumber(mountSeqNumber) , VolumeGeneration(volumeGeneration) , RequestTimeout(requestTimeout) + , MuteIOErrors(muteIOErrors) + , Component(component) { SortBy(Devices, [] (auto& d) { return d.GetNodeId(); }); } -void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) +void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateAcquire); if (Devices.empty()) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); return; } ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Sending acquire devices requests for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), @@ -156,29 +155,13 @@ void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) TInstant now = ctx.Now(); for (auto& x: sentRequests) { SentAcquireRequests.push_back(TAgentAcquireDevicesCachedRequest{ - std::move(x.AgentId), - std::move(x.Record), - now}); + .AgentId = std::move(x.AgentId), + .Request = std::move(x.Record), + .RequestTime = now}); } } -void TAcquireDiskActor::FinishAcquireDisk( - const TActorContext& ctx, - NProto::TError error) -{ - using TType = TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest; - NCloud::Send( - ctx, - Owner, - std::make_unique( - DiskId, - ClientId, - std::move(SentAcquireRequests))); - - ReplyAndDie(ctx, std::move(error)); -} - -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TAcquireDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); @@ -188,14 +171,14 @@ void TAcquireDiskActor::PrepareRequest( request.SetVolumeGeneration(VolumeGeneration); } -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TReleaseDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); } template -auto TAcquireDiskActor::CreateRequests() const +auto TAcquireDevicesActor::CreateRequests() const -> TVector> { auto it = Devices.begin(); @@ -216,7 +199,7 @@ auto TAcquireDiskActor::CreateRequests() const } template -void TAcquireDiskActor::SendRequests( +void TAcquireDevicesActor::SendRequests( const TActorContext& ctx, const TVector>& requests) { @@ -227,7 +210,9 @@ void TAcquireDiskActor::SendRequests( TCallContextPtr {}, r.Record); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Send an acquire request to node #%d. Devices: %s", ClientId.c_str(), r.NodeId, @@ -248,40 +233,34 @@ void TAcquireDiskActor::SendRequests( } } -void TAcquireDiskActor::ReplyAndDie( +void TAcquireDevicesActor::ReplyAndDie( const TActorContext& ctx, NProto::TError error) { - auto response = std::make_unique( - std::move(error)); - - if (HasError(response->GetError())) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s targets %s error: %s", + if (HasError(error)) { + LOG_ERROR( + ctx, + Component, + "[%s] AcquireDevices %s targets %s error: %s", ClientId.c_str(), DiskId.c_str(), LogTargets().c_str(), - FormatError(response->GetError()).c_str()); - } else { - response->Record.MutableDevices()->Reserve(Devices.size()); - - for (auto& device: Devices) { - ToLogicalBlocks(device, LogicalBlockSize); - *response->Record.AddDevices() = std::move(device); - } + FormatError(error).c_str()); } - NCloud::Reply(ctx, *RequestInfo, std::move(response)); + auto response = std::make_unique( + std::move(DiskId), + std::move(ClientId), + std::move(SentAcquireRequests), + std::move(Devices), + std::move(error)); - NCloud::Send( - ctx, - Owner, - std::make_unique()); + NCloud::Send(ctx, Owner, std::move(response)); Die(ctx); } //////////////////////////////////////////////////////////////////////////////// -void TAcquireDiskActor::HandlePoisonPill( +void TAcquireDevicesActor::HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx) { @@ -290,15 +269,17 @@ void TAcquireDiskActor::HandlePoisonPill( ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } -void TAcquireDiskActor::OnAcquireResponse( +void TAcquireDevicesActor::OnAcquireResponse( const TActorContext& ctx, ui32 nodeId, NProto::TError error) { Y_ABORT_UNLESS(PendingRequests > 0); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + if (HasError(error) && !MuteIOErrors) { + LOG_ERROR( + ctx, + Component, "[%s] AcquireDevices on the node #%d %s error: %s", ClientId.c_str(), nodeId, @@ -306,7 +287,9 @@ void TAcquireDiskActor::OnAcquireResponse( FormatError(error).c_str()); if (GetErrorKind(error) != EErrorKind::ErrorRetriable) { - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Canceling acquire operation for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), @@ -318,17 +301,17 @@ void TAcquireDiskActor::OnAcquireResponse( } SentAcquireRequests.clear(); - FinishAcquireDisk(ctx, std::move(error)); + ReplyAndDie(ctx, std::move(error)); return; } if (--PendingRequests == 0) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); } } -void TAcquireDiskActor::HandleAcquireDevicesResponse( +void TAcquireDevicesActor::HandleAcquireDevicesResponse( const TEvDiskAgent::TEvAcquireDevicesResponse::TPtr& ev, const TActorContext& ctx) { @@ -338,7 +321,7 @@ void TAcquireDiskActor::HandleAcquireDevicesResponse( ev->Get()->GetError()); } -void TAcquireDiskActor::HandleAcquireDevicesUndelivery( +void TAcquireDevicesActor::HandleAcquireDevicesUndelivery( const TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, const TActorContext& ctx) { @@ -348,7 +331,7 @@ void TAcquireDiskActor::HandleAcquireDevicesUndelivery( MakeError(E_REJECTED, "not delivered")); } -void TAcquireDiskActor::HandleWakeup( +void TAcquireDevicesActor::HandleWakeup( const TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { @@ -360,14 +343,14 @@ void TAcquireDiskActor::HandleWakeup( //////////////////////////////////////////////////////////////////////////////// -TString TAcquireDiskActor::LogTargets() const +TString TAcquireDevicesActor::LogTargets() const { return LogDevices(Devices); } //////////////////////////////////////////////////////////////////////////////// -STFUNC(TAcquireDiskActor::StateAcquire) +STFUNC(TAcquireDevicesActor::StateAcquire) { switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); @@ -380,7 +363,7 @@ STFUNC(TAcquireDiskActor::StateAcquire) HFunc(TEvents::TEvWakeup, HandleWakeup); default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); + HandleUnexpectedEvent(ev, Component); break; } } @@ -389,91 +372,31 @@ STFUNC(TAcquireDiskActor::StateAcquire) //////////////////////////////////////////////////////////////////////////////// -void TDiskRegistryActor::HandleAcquireDisk( - const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) +TActorId AcquireDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors, + NLog::EComponent component) { - BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); - - const auto* msg = ev->Get(); - - auto clientId = msg->Record.GetHeaders().GetClientId(); - auto diskId = msg->Record.GetDiskId(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received AcquireDisk request: " - "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - static_cast(msg->Record.GetAccessMode()), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration()); - - TDiskInfo diskInfo; - auto error = State->StartAcquireDisk(diskId, diskInfo); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s error: %s", - clientId.c_str(), - diskId.c_str(), - FormatError(error).c_str()); - - NCloud::Reply( - ctx, - *ev, - std::make_unique( - std::move(error))); - return; - } - - State->FilterDevicesAtUnavailableAgents(diskInfo); - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - devices.insert(devices.end(), - std::make_move_iterator(replica.begin()), - std::make_move_iterator(replica.end())); - } - - auto actor = NCloud::Register( + return NCloud::Register( ctx, - ctx.SelfID, - CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext), + owner, std::move(devices), - diskInfo.LogicalBlockSize, - std::move(diskId), + diskId, std::move(clientId), - msg->Record.GetAccessMode(), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration(), - Config->GetAgentRequestTimeout()); - Actors.insert(actor); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleFinishAcquireDisk( - const TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - auto* msg = ev->Get(); - - State->FinishAcquireDisk(msg->DiskId); - - OnDiskAcquired(std::move(msg->SentRequests)); - - auto response = std::make_unique< - TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse>(); - NCloud::Reply(ctx, *ev, std::move(response)); + accessMode, + mountSeqNumber, + volumeGeneration, + requestTimeout, + muteIOErrors, + component); } -} // namespace NCloud::NBlockStore::NStorage +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices.h new file mode 100644 index 0000000000..93ebe41823 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices.h @@ -0,0 +1,107 @@ + +#pragma once + +#include + +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +struct TAgentAcquireDevicesCachedRequest +{ + TString AgentId; + NProto::TAcquireDevicesRequest Request; + TInstant RequestTime; +}; + +struct TAgentReleaseDevicesCachedRequest +{ + TString AgentId; + NProto::TReleaseDevicesRequest Request; +}; + +struct TDevicesAcquireFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + TVector Devices; + NProto::TError Error; + + TDevicesAcquireFinished( + TString diskId, + TString clientId, + TVector sentRequests, + TVector devices, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Devices(std::move(devices)) + , Error(std::move(error)) + {} +}; + +struct TDevicesReleaseFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + NProto::TError Error; + + TDevicesReleaseFinished( + TString diskId, + TString clientId, + TVector sentRequests, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Error(std::move(error)) + {} +}; + +enum EEvents +{ + EvBegin, + + EvDevicesAcquireFinished, + EvDevicesReleaseFinished, + + EvEnd +}; + +using TEvDevicesAcquireFinished = + TRequestEvent; + +using TEvDevicesReleaseFinished = + TRequestEvent; + +TActorId AcquireDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TVector devices, + TString diskId, + TString clientId, + NProto::EVolumeAccessMode accessMode, + ui64 mountSeqNumber, + ui32 volumeGeneration, + TDuration requestTimeout, + bool muteIOErrors, + NActors::NLog::EComponent component); + +TActorId ReleaseDevices( + const NActors::TActorContext& ctx, + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NActors::NLog::EComponent component); + +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 4f1c472d4f..51a23fb1e7 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -444,4 +444,15 @@ ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request) return request.Record.GetVolumeRequestId(); } +TString LogDevices(const TVector& devices) +{ + TStringBuilder sb; + sb << "( "; + for (const auto& d: devices) { + sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; + } + sb << ")"; + return sb; +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index 0fbecf7cbc..f8ecfb9032 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -223,4 +223,5 @@ TBlockRange64 BuildRequestBlockRange( ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); +TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp new file mode 100644 index 0000000000..931724ae91 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -0,0 +1,271 @@ +#include "acquire_release_devices.h" + +#include +#include + +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { +using namespace NActors; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TReleaseDevicesActor final + : public TActorBootstrapped +{ +private: + const TActorId Owner; + const TString DiskId; + const TString ClientId; + const ui32 VolumeGeneration; + const TDuration RequestTimeout; + TVector Devices; + bool MuteIOErrors; + NLog::EComponent Component; + + int PendingRequests = 0; + + TVector SentReleaseRequests; + +public: + TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIoErrors, + NLog::EComponent component); + + void Bootstrap(const TActorContext& ctx); + +private: + void PrepareRequest(NProto::TReleaseDevicesRequest& request); + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx); + + void HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + TString LogTargets() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TReleaseDevicesActor::TReleaseDevicesActor( + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NLog::EComponent component) + : Owner(owner) + , DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , VolumeGeneration(volumeGeneration) + , RequestTimeout(requestTimeout) + , Devices(std::move(devices)) + , MuteIOErrors(muteIOErrors) + , Component(component) +{} + +void TReleaseDevicesActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) +{ + request.MutableHeaders()->SetClientId(ClientId); + request.SetDiskId(DiskId); + request.SetVolumeGeneration(VolumeGeneration); +} + +void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + SortBy(Devices, [] (auto& d) { + return d.GetNodeId(); + }); + + auto it = Devices.begin(); + while (it != Devices.end()) { + auto request = + std::make_unique(); + NProto::TReleaseDevicesRequest requestCopy; + PrepareRequest(request->Record); + PrepareRequest(requestCopy); + + const ui32 nodeId = it->GetNodeId(); + const TString& agentId = it->GetAgentId(); + + for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { + *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); + *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); + } + + ++PendingRequests; + SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); + NCloud::Send( + ctx, + MakeDiskAgentServiceId(nodeId), + std::move(request), + nodeId); + } + + ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); +} + +void TReleaseDevicesActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) +{ + NCloud::Send( + ctx, + Owner, + std::make_unique( + DiskId, + ClientId, + std::move(SentReleaseRequests), + std::move(error))); + + Die(ctx); +} + +void TReleaseDevicesActor::OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error) +{ + Y_ABORT_UNLESS(PendingRequests > 0); + + if (HasError(error)) { + LOG_LOG( + ctx, + MuteIOErrors ? NLog::PRI_WARN : NLog::PRI_ERROR, + Component, + "ReleaseDevices %s error: %s, %llu", + LogTargets().c_str(), + FormatError(error).c_str(), + cookie); + } + + if (--PendingRequests == 0) { + ReplyAndDie(ctx, {}); + } +} + +void TReleaseDevicesActor::HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); +} + +void TReleaseDevicesActor::HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); +} + +void TReleaseDevicesActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); +} + +void TReleaseDevicesActor::HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + const auto err = TStringBuilder() + << "TReleaseDevicesActor timeout." + << " DiskId: " << DiskId + << " ClientId: " << ClientId + << " Targets: " << LogTargets() + << " VolumeGeneration: " << VolumeGeneration + << " PendingRequests: " << PendingRequests; + + LOG_WARN(ctx, Component, err); + + ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); +} + +STFUNC(TReleaseDevicesActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleTimeout); + + HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, + HandleReleaseDevicesResponse); + HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, + HandleReleaseDevicesUndelivery); + + default: + HandleUnexpectedEvent(ev, Component); + break; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TString TReleaseDevicesActor::LogTargets() const +{ + return LogDevices(Devices); +} + +} // namespace + +TActorId ReleaseDevices( + const TActorContext& ctx, + const TActorId& owner, + TString diskId, + TString clientId, + ui32 volumeGeneration, + TDuration requestTimeout, + TVector devices, + bool muteIOErrors, + NActors::NLog::EComponent component) +{ + return NCloud::Register( + ctx, + owner, + std::move(diskId), + std::move(clientId), + volumeGeneration, + requestTimeout, + std::move(devices), + muteIOErrors, + component); +} +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/core/ya.make b/cloud/blockstore/libs/storage/core/ya.make index 6467cd9719..1ca815aaee 100644 --- a/cloud/blockstore/libs/storage/core/ya.make +++ b/cloud/blockstore/libs/storage/core/ya.make @@ -3,6 +3,7 @@ LIBRARY() GENERATE_ENUM_SERIALIZATION(mount_token.h) SRCS( + acquire_devices_actor.cpp block_handler.cpp compaction_map.cpp compaction_options.cpp @@ -19,6 +20,7 @@ SRCS( pending_request.cpp probes.cpp proto_helpers.cpp + release_devices_actor.cpp request_buffer.cpp request_info.cpp storage_request_counters.cpp diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 5f251d85cb..efd2a4ca6d 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -188,6 +188,24 @@ void TDiskRegistryActor::BeforeDie(const NActors::TActorContext& ctx) MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } PendingDiskDeallocationRequests.clear(); + + for (auto& [actorId, requestInfo]: PendingAcquireDiskRequests) { + NCloud::Reply( + ctx, + *requestInfo, + std::make_unique( + MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); + } + PendingAcquireDiskRequests.clear(); + + for (auto& [actorId, requestInfo]: PendingReleaseDiskRequests) { + NCloud::Reply( + ctx, + *requestInfo, + std::make_unique( + MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); + } + PendingReleaseDiskRequests.clear(); } void TDiskRegistryActor::OnDetach(const TActorContext& ctx) @@ -708,6 +726,14 @@ STFUNC(TDiskRegistryActor::StateWork) TEvDiskRegistryPrivate::TEvDiskRegistryAgentListExpiredParamsCleanup, TDiskRegistryActor::HandleDiskRegistryAgentListExpiredParamsCleanup); + HFunc( + NAcquireReleaseDevices::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); + + HFunc( + NAcquireReleaseDevices::TEvDevicesReleaseFinished, + HandleDevicesReleaseFinished); + default: if (!HandleRequests(ev) && !HandleDefaultEvents(ev, SelfId())) { HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY); @@ -879,21 +905,9 @@ bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize) //////////////////////////////////////////////////////////////////////////////// -TString LogDevices(const TVector& devices) -{ - TStringBuilder sb; - sb << "( "; - for (const auto& d: devices) { - sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; - } - sb << ")"; - return sb; -} - -//////////////////////////////////////////////////////////////////////////////// - void TDiskRegistryActor::OnDiskAcquired( - TVector sentAcquireRequests) + TVector + sentAcquireRequests) { for (auto& sentRequest: sentAcquireRequests) { TCachedAcquireRequests& cachedRequests = @@ -906,7 +920,8 @@ void TDiskRegistryActor::OnDiskAcquired( } void TDiskRegistryActor::OnDiskReleased( - const TVector& sentReleaseRequests) + const TVector& + sentReleaseRequests) { auto& acquireCacheByAgentId = State->GetAcquireCacheByAgentId(); for (const auto& [agentId, releaseRequest]: sentReleaseRequests) { diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index 32a19889b3..b3739a29a0 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -85,6 +86,9 @@ class TDiskRegistryActor final THashMap> PendingDiskDeallocationRequests; + THashMap PendingAcquireDiskRequests; + THashMap PendingReleaseDiskRequests; + bool BrokenDisksDestructionInProgress = false; bool DisksNotificationInProgress = false; bool UsersNotificationInProgress = false; @@ -243,10 +247,19 @@ class TDiskRegistryActor final void ProcessAutomaticallyReplacedDevices(const NActors::TActorContext& ctx); + void HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); void OnDiskAcquired( - TVector sentAcquireRequests); + TVector + sentAcquireRequests); + void HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); void OnDiskReleased( - const TVector& sentReleaseRequests); + const TVector< + NAcquireReleaseDevices::TAgentReleaseDevicesCachedRequest>& + sentReleaseRequests); void OnDiskDeallocated(const TDiskId& diskId); void SendCachedAcquireRequestsToAgent( const NActors::TActorContext& ctx, @@ -503,6 +516,5 @@ class TDiskRegistryActor final // BLOCKSTORE_DISK_REGISTRY_COUNTER bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize); -TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp new file mode 100644 index 0000000000..715774a235 --- /dev/null +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -0,0 +1,245 @@ +#include "disk_registry_actor.h" + +#include +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleAcquireDisk( + const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); + + const auto* msg = ev->Get(); + + auto clientId = msg->Record.GetHeaders().GetClientId(); + auto diskId = msg->Record.GetDiskId(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received AcquireDisk request: " + "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + static_cast(msg->Record.GetAccessMode()), + msg->Record.GetMountSeqNumber(), + msg->Record.GetVolumeGeneration()); + + TDiskInfo diskInfo; + auto error = State->StartAcquireDisk(diskId, diskInfo); + + if (HasError(error)) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + "[%s] AcquireDisk %s error: %s", + clientId.c_str(), + diskId.c_str(), + FormatError(error).c_str()); + + NCloud::Reply( + ctx, + *ev, + std::make_unique( + std::move(error))); + return; + } + + State->FilterDevicesAtUnavailableAgents(diskInfo); + + TVector devices = std::move(diskInfo.Devices); + for (auto& migration: diskInfo.Migrations) { + devices.push_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + devices.insert( + devices.end(), + std::make_move_iterator(replica.begin()), + std::make_move_iterator(replica.end())); + } + + auto actor = NAcquireReleaseDevices::AcquireDevices( + ctx, + ctx.SelfID, + std::move(devices), + std::move(diskId), + std::move(clientId), + msg->Record.GetAccessMode(), + msg->Record.GetMountSeqNumber(), + msg->Record.GetVolumeGeneration(), + Config->GetAgentRequestTimeout(), + /*muteIOErrors=*/false, + TBlockStoreComponents::DISK_REGISTRY); + Actors.insert(actor); + PendingAcquireDiskRequests[actor] = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); +} + +void TDiskRegistryActor::HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + + State->FinishAcquireDisk(msg->DiskId); + + OnDiskAcquired(std::move(msg->SentRequests)); + + auto reqInfo = PendingAcquireDiskRequests.at(ev->Sender); + + auto response = std::make_unique( + std::move(msg->Error)); + + const auto* disk = State->GetDisk(msg->DiskId); + + if (HasError(response->GetError())) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + "[%s] AcquireDisk %s targets %s error: %s", + msg->ClientId.c_str(), + msg->DiskId.c_str(), + LogDevices(msg->Devices).c_str(), + FormatError(response->GetError()).c_str()); + } else { + response->Record.MutableDevices()->Reserve(msg->Devices.size()); + + for (auto& device: msg->Devices) { + if (disk) { + ToLogicalBlocks(device, disk->LogicalBlockSize); + } + *response->Record.AddDevices() = std::move(device); + } + } + + NCloud::Reply(ctx, *reqInfo, std::move(response)); + Actors.erase(ev->Sender); + PendingAcquireDiskRequests.erase(ev->Sender); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleReleaseDisk( + const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); + + auto replyWithError = [&](auto error) + { + auto response = + std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *ev, std::move(response)); + }; + + auto* msg = ev->Get(); + TString& diskId = *msg->Record.MutableDiskId(); + TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); + ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + volumeGeneration); + + if (!clientId) { + replyWithError(MakeError(E_ARGUMENT, "empty client id")); + return; + } + + if (!diskId) { + replyWithError(MakeError(E_ARGUMENT, "empty disk id")); + return; + } + + TDiskInfo diskInfo; + const auto error = State->GetDiskInfo(diskId, diskInfo); + if (HasError(error)) { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. GetDiskInfo error: %s", + diskId.c_str(), + FormatError(error).c_str()); + + replyWithError(error); + return; + } + + if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { + LOG_WARN( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. Nothing to release", + diskId.c_str()); + + replyWithError(MakeError(S_ALREADY, {})); + return; + } + + TVector devices = std::move(diskInfo.Devices); + for (auto& migration: diskInfo.Migrations) { + devices.push_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + for (auto& device: replica) { + devices.push_back(std::move(device)); + } + } + + auto actor = NAcquireReleaseDevices::ReleaseDevices( + ctx, + ctx.SelfID, + std::move(diskId), + std::move(clientId), + volumeGeneration, + Config->GetAgentRequestTimeout(), + std::move(devices), + /*muteIOErrors=*/false, + TBlockStoreComponents::DISK_REGISTRY); + + Actors.insert(actor); + PendingReleaseDiskRequests[actor] = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); +} + +void TDiskRegistryActor::HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + OnDiskReleased(msg->SentRequests); + + State->FinishAcquireDisk(msg->DiskId); + auto reqInfo = PendingReleaseDiskRequests.at(ev->Sender); + + auto response = + std::make_unique(msg->Error); + NCloud::Reply(ctx, *reqInfo, std::move(response)); + + Actors.erase(ev->Sender); + PendingReleaseDiskRequests.erase(ev->Sender); +} + +} // namespace NCloud::NBlockStore::NStorage \ No newline at end of file diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp deleted file mode 100644 index 2cff5a6d1a..0000000000 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp +++ /dev/null @@ -1,373 +0,0 @@ -#include "disk_registry_actor.h" - -#include - -#include - -namespace NCloud::NBlockStore::NStorage { - -using namespace NActors; - -using namespace NKikimr::NTabletFlatExecutor; - -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -class TReleaseDiskActor final - : public TActorBootstrapped -{ -private: - const TActorId Owner; - const TRequestInfoPtr RequestInfo; - const TString DiskId; - const TString ClientId; - const ui32 VolumeGeneration; - const TDuration RequestTimeout; - - TVector Devices; - int PendingRequests = 0; - - TVector SentReleaseRequests; - -public: - TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices); - - void Bootstrap(const TActorContext& ctx); - -private: - void PrepareRequest(NProto::TReleaseDevicesRequest& request); - void RemoveDiskSession(const TActorContext& ctx); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); - - void OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error); - -private: - STFUNC(StateWork); - - void HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx); - - void HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx); - - TString LogTargets() const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -TReleaseDiskActor::TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices) - : Owner(owner) - , RequestInfo(std::move(requestInfo)) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) - , Devices(std::move(devices)) -{} - -void TReleaseDiskActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) -{ - request.MutableHeaders()->SetClientId(ClientId); - request.SetDiskId(DiskId); - request.SetVolumeGeneration(VolumeGeneration); -} - -void TReleaseDiskActor::Bootstrap(const TActorContext& ctx) -{ - Become(&TThis::StateWork); - - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); - - auto it = Devices.begin(); - while (it != Devices.end()) { - auto request = - std::make_unique(); - NProto::TReleaseDevicesRequest requestCopy; - PrepareRequest(request->Record); - PrepareRequest(requestCopy); - - const ui32 nodeId = it->GetNodeId(); - const TString& agentId = it->GetAgentId(); - - for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { - *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); - *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); - } - - ++PendingRequests; - SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); - NCloud::Send( - ctx, - MakeDiskAgentServiceId(nodeId), - std::move(request), - nodeId); - } - - ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); -} - -void TReleaseDiskActor::RemoveDiskSession(const TActorContext& ctx) -{ - auto request = - std::make_unique( - DiskId, - ClientId, - std::move(SentReleaseRequests)); - - NCloud::Send(ctx, Owner, std::move(request)); -} - -void TReleaseDiskActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) -{ - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *RequestInfo, std::move(response)); - - NCloud::Send( - ctx, - Owner, - std::make_unique()); - - Die(ctx); -} - -void TReleaseDiskActor::OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error) -{ - Y_ABORT_UNLESS(PendingRequests > 0); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "ReleaseDevices %s error: %s, %llu", - LogTargets().c_str(), - FormatError(error).c_str(), - cookie); - } - - if (--PendingRequests == 0) { - RemoveDiskSession(ctx); - } -} - -void TReleaseDiskActor::HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); -} - -void TReleaseDiskActor::HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); -} - -void TReleaseDiskActor::HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - ReplyAndDie(ctx, msg->GetError()); -} - -void TReleaseDiskActor::HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); -} - -void TReleaseDiskActor::HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - const auto err = TStringBuilder() - << "TReleaseDiskActor timeout." - << " DiskId: " << DiskId - << " ClientId: " << ClientId - << " Targets: " << LogTargets() - << " VolumeGeneration: " << VolumeGeneration - << " PendingRequests: " << PendingRequests; - - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, err); - - ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); -} - -STFUNC(TReleaseDiskActor::StateWork) -{ - switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvents::TEvWakeup, HandleTimeout); - - HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, - HandleReleaseDevicesResponse); - HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, - HandleReleaseDevicesUndelivery); - - HFunc(TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse, - HandleRemoveDiskSessionResponse); - - default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); - break; - } -} - -//////////////////////////////////////////////////////////////////////////////// - -TString TReleaseDiskActor::LogTargets() const -{ - return LogDevices(Devices); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleReleaseDisk( - const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); - - auto replyWithError = [&] (auto error) { - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *ev, std::move(response)); - }; - - auto* msg = ev->Get(); - TString& diskId = *msg->Record.MutableDiskId(); - TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); - ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - volumeGeneration); - - if (!clientId) { - replyWithError(MakeError(E_ARGUMENT, "empty client id")); - return; - } - - if (!diskId) { - replyWithError(MakeError(E_ARGUMENT, "empty disk id")); - return; - } - - TDiskInfo diskInfo; - const auto error = State->GetDiskInfo(diskId, diskInfo); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. GetDiskInfo error: %s", - diskId.c_str(), - FormatError(error).c_str()); - - replyWithError(error); - return; - } - - if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. Nothing to release", - diskId.c_str()); - - replyWithError(MakeError(S_ALREADY, {})); - return; - } - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - for (auto& device: replica) { - devices.push_back(std::move(device)); - } - } - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - auto actor = NCloud::Register( - ctx, - ctx.SelfID, - std::move(requestInfo), - std::move(diskId), - std::move(clientId), - volumeGeneration, - Config->GetAgentRequestTimeout(), - std::move(devices)); - - Actors.insert(actor); -} - -void TDiskRegistryActor::HandleRemoveDiskSession( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - OnDiskReleased(msg->SentRequests); - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - State->FinishAcquireDisk(msg->DiskId); - auto response = - std::make_unique(); - NCloud::Reply(ctx, *ev, std::move(response)); -} - -} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h index 1c51f0d77b..be27a49619 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -74,19 +75,6 @@ struct TUserNotificationKey //////////////////////////////////////////////////////////////////////////////// -struct TAgentAcquireDevicesCachedRequest -{ - TString AgentId; - NProto::TAcquireDevicesRequest Request; - TInstant RequestTime; -}; - -struct TAgentReleaseDevicesCachedRequest -{ - TString AgentId; - NProto::TReleaseDevicesRequest Request; -}; - struct TCachedAcquireKey { TString DiskId; @@ -104,8 +92,9 @@ struct TCachedAcquireKey } }; -using TCachedAcquireRequests = - TMap; +using TCachedAcquireRequests = TMap< + TCachedAcquireKey, + NAcquireReleaseDevices::TAgentAcquireDevicesCachedRequest>; //////////////////////////////////////////////////////////////////////////////// @@ -179,8 +168,6 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; xxx(CleanupDisks, __VA_ARGS__) \ xxx(SecureErase, __VA_ARGS__) \ xxx(CleanupDevices, __VA_ARGS__) \ - xxx(FinishAcquireDisk, __VA_ARGS__) \ - xxx(RemoveDiskSession, __VA_ARGS__) \ xxx(DestroyBrokenDisks, __VA_ARGS__) \ xxx(ListBrokenDisks, __VA_ARGS__) \ xxx(NotifyDisks, __VA_ARGS__) \ @@ -204,52 +191,6 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; struct TEvDiskRegistryPrivate { - // - // FinishAcquireDisk - // - - struct TFinishAcquireDiskRequest - { - TString DiskId; - TString ClientId; - TVector SentRequests; - - TFinishAcquireDiskRequest( - TString diskId, - TString clientId, - TVector sentRequests) - : DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , SentRequests(std::move(sentRequests)) - {} - }; - - struct TFinishAcquireDiskResponse - {}; - - // - // RemoveDiskSession - // - - struct TRemoveDiskSessionRequest - { - TString DiskId; - TString ClientId; - TVector SentRequests; - - TRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) - : DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , SentRequests(std::move(sentRequests)) - {} - }; - - struct TRemoveDiskSessionResponse - {}; - // // CleanupDisks // diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index 51714d32e5..d581a013bd 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3363,6 +3363,12 @@ NProto::EDiskState TDiskRegistryState::GetDiskState(const TDiskId& diskId) const return disk->State; } +const TDiskRegistryState::TDiskState* TDiskRegistryState::GetDisk( + const TDiskId& diskId) const +{ + return Disks.FindPtr(diskId); +} + NProto::TError TDiskRegistryState::GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h index 3b78b8786f..99c65de006 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h @@ -447,6 +447,7 @@ class TDiskRegistryState NProto::TError GetDiskInfo(const TDiskId& diskId, TDiskInfo& diskInfo) const; NProto::EDiskState GetDiskState(const TDiskId& diskId) const; + const TDiskState* GetDisk(const TDiskId& diskId) const; NProto::TError GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp index a8446be4a7..bc592ea2e9 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp @@ -507,7 +507,8 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) runtime->SetObserverFunc( [&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case TEvDiskRegistryPrivate::EvFinishAcquireDiskResponse: { + case NAcquireReleaseDevices::TEvDevicesAcquireFinished:: + EventType: { finished = true; break; } @@ -676,10 +677,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) } { - auto response = diskRegistry.RemoveDiskSession( - "disk-1", - "session-1", - TVector()); + auto response = diskRegistry.ReleaseDisk("disk-1", "session-1"); UNIT_ASSERT(!HasError(response->GetError())); } } diff --git a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h index 1539d257c0..2c5d82fe82 100644 --- a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h +++ b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h @@ -780,18 +780,6 @@ class TDiskRegistryClient std::move(devices)); } - auto CreateRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) - { - return std::make_unique< - TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>( - std::move(diskId), - std::move(clientId), - std::move(sentRequests)); - } - auto CreateUpdateAgentStatsRequest(NProto::TAgentStats stats) { auto request = std::make_unique(); diff --git a/cloud/blockstore/libs/storage/disk_registry/ya.make b/cloud/blockstore/libs/storage/disk_registry/ya.make index fc8d924270..49000e240d 100644 --- a/cloud/blockstore/libs/storage/disk_registry/ya.make +++ b/cloud/blockstore/libs/storage/disk_registry/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - disk_registry_actor_acquire.cpp + disk_registry_actor_acquire_release.cpp disk_registry_actor_allocate.cpp disk_registry_actor_backup_state.cpp disk_registry_actor_change_disk_device.cpp @@ -34,7 +34,6 @@ SRCS( disk_registry_actor_query_available_storage.cpp disk_registry_actor_register.cpp disk_registry_actor_regular.cpp - disk_registry_actor_release.cpp disk_registry_actor_replace.cpp disk_registry_actor_restore_state.cpp disk_registry_actor_resume_device.cpp From d06b1f4353e886501147937ef69f7dce9c19b64a Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Wed, 15 Jan 2025 17:38:07 +0700 Subject: [PATCH 02/11] issue-2725: add new line at eof --- cloud/blockstore/libs/storage/core/acquire_release_devices.h | 2 +- cloud/blockstore/libs/storage/core/release_devices_actor.cpp | 2 +- .../disk_registry/disk_registry_actor_acquire_release.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices.h index 93ebe41823..d9dcdb4021 100644 --- a/cloud/blockstore/libs/storage/core/acquire_release_devices.h +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices.h @@ -104,4 +104,4 @@ TActorId ReleaseDevices( bool muteIOErrors, NActors::NLog::EComponent component); -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp index 931724ae91..1f361cd3cb 100644 --- a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -268,4 +268,4 @@ TActorId ReleaseDevices( muteIOErrors, component); } -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 715774a235..355c21b8f0 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -242,4 +242,4 @@ void TDiskRegistryActor::HandleDevicesReleaseFinished( PendingReleaseDiskRequests.erase(ev->Sender); } -} // namespace NCloud::NBlockStore::NStorage \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage From eca306b8ec7ecd2dbf990299a8666597f040df40 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 14:26:05 +0700 Subject: [PATCH 03/11] issue-2725: correct issues --- .../storage/core/acquire_devices_actor.cpp | 24 ++-- .../storage/core/acquire_release_devices.h | 8 +- .../storage/core/release_devices_actor.cpp | 34 +++--- .../disk_registry/disk_registry_actor.cpp | 4 +- .../disk_registry_actor_acquire_release.cpp | 115 +++++++++++------- .../disk_registry_ut_session.cpp | 3 +- 6 files changed, 108 insertions(+), 80 deletions(-) diff --git a/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp index 88eb853c88..8d11435fb2 100644 --- a/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp @@ -17,7 +17,8 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -class TAcquireDevicesActor final: public TActorBootstrapped +class TAcquireDevicesActor final + : public TActorBootstrapped { private: const TActorId Owner; @@ -123,9 +124,7 @@ TAcquireDevicesActor::TAcquireDevicesActor( , MuteIOErrors(muteIOErrors) , Component(component) { - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); } void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) @@ -147,7 +146,8 @@ void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) DiskId.c_str(), LogTargets().c_str()); - auto sentRequests = CreateRequests(); + auto sentRequests = + CreateRequests(); SendRequests(ctx, sentRequests); Y_ABORT_UNLESS(SentAcquireRequests.empty()); @@ -206,9 +206,7 @@ void TAcquireDevicesActor::SendRequests( PendingRequests = 0; for (const auto& r: requests) { - auto request = std::make_unique( - TCallContextPtr {}, - r.Record); + auto request = std::make_unique(TCallContextPtr{}, r.Record); LOG_DEBUG( ctx, @@ -355,9 +353,11 @@ STFUNC(TAcquireDevicesActor::StateAcquire) switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvDiskAgent::TEvAcquireDevicesResponse, + HFunc( + TEvDiskAgent::TEvAcquireDevicesResponse, HandleAcquireDevicesResponse); - HFunc(TEvDiskAgent::TEvAcquireDevicesRequest, + HFunc( + TEvDiskAgent::TEvAcquireDevicesRequest, HandleAcquireDevicesUndelivery); HFunc(TEvents::TEvWakeup, HandleWakeup); @@ -372,7 +372,7 @@ STFUNC(TAcquireDevicesActor::StateAcquire) //////////////////////////////////////////////////////////////////////////////// -TActorId AcquireDevices( +TActorId CreateAcquireDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, TVector devices, @@ -399,4 +399,4 @@ TActorId AcquireDevices( component); } -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices \ No newline at end of file +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices.h index d9dcdb4021..ab4493dea2 100644 --- a/cloud/blockstore/libs/storage/core/acquire_release_devices.h +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices.h @@ -1,12 +1,10 @@ - #pragma once -#include - #include #include #include +#include namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { @@ -80,7 +78,7 @@ using TEvDevicesAcquireFinished = using TEvDevicesReleaseFinished = TRequestEvent; -TActorId AcquireDevices( +TActorId CreateAcquireDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, TVector devices, @@ -93,7 +91,7 @@ TActorId AcquireDevices( bool muteIOErrors, NActors::NLog::EComponent component); -TActorId ReleaseDevices( +TActorId CreateReleaseDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, TString diskId, diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp index 1f361cd3cb..fbd8445c67 100644 --- a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -98,7 +98,8 @@ TReleaseDevicesActor::TReleaseDevicesActor( , Component(component) {} -void TReleaseDevicesActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) +void TReleaseDevicesActor::PrepareRequest( + NProto::TReleaseDevicesRequest& request) { request.MutableHeaders()->SetClientId(ClientId); request.SetDiskId(DiskId); @@ -109,9 +110,7 @@ void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateWork); - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); auto it = Devices.begin(); while (it != Devices.end()) { @@ -141,7 +140,9 @@ void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); } -void TReleaseDevicesActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) +void TReleaseDevicesActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) { NCloud::Send( ctx, @@ -208,16 +209,15 @@ void TReleaseDevicesActor::HandleTimeout( Y_UNUSED(ev); const auto err = TStringBuilder() - << "TReleaseDevicesActor timeout." - << " DiskId: " << DiskId - << " ClientId: " << ClientId - << " Targets: " << LogTargets() - << " VolumeGeneration: " << VolumeGeneration - << " PendingRequests: " << PendingRequests; + << "TReleaseDevicesActor timeout." << " DiskId: " << DiskId + << " ClientId: " << ClientId + << " Targets: " << LogTargets() + << " VolumeGeneration: " << VolumeGeneration + << " PendingRequests: " << PendingRequests; LOG_WARN(ctx, Component, err); - ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); + ReplyAndDie(ctx, MakeError(E_TIMEOUT, std::move(err))); } STFUNC(TReleaseDevicesActor::StateWork) @@ -226,9 +226,11 @@ STFUNC(TReleaseDevicesActor::StateWork) HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); HFunc(TEvents::TEvWakeup, HandleTimeout); - HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, + HFunc( + TEvDiskAgent::TEvReleaseDevicesResponse, HandleReleaseDevicesResponse); - HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, + HFunc( + TEvDiskAgent::TEvReleaseDevicesRequest, HandleReleaseDevicesUndelivery); default: @@ -246,7 +248,7 @@ TString TReleaseDevicesActor::LogTargets() const } // namespace -TActorId ReleaseDevices( +TActorId CreateReleaseDevicesActor( const TActorContext& ctx, const TActorId& owner, TString diskId, @@ -268,4 +270,4 @@ TActorId ReleaseDevices( muteIOErrors, component); } -} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index efd2a4ca6d..c18eb9f275 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -189,7 +189,7 @@ void TDiskRegistryActor::BeforeDie(const NActors::TActorContext& ctx) } PendingDiskDeallocationRequests.clear(); - for (auto& [actorId, requestInfo]: PendingAcquireDiskRequests) { + for (auto& [_, requestInfo]: PendingAcquireDiskRequests) { NCloud::Reply( ctx, *requestInfo, @@ -198,7 +198,7 @@ void TDiskRegistryActor::BeforeDie(const NActors::TActorContext& ctx) } PendingAcquireDiskRequests.clear(); - for (auto& [actorId, requestInfo]: PendingReleaseDiskRequests) { + for (auto& [_, requestInfo]: PendingReleaseDiskRequests) { NCloud::Reply( ctx, *requestInfo, diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 355c21b8f0..6c5f9dc726 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -13,6 +13,35 @@ using namespace NActors; using namespace NKikimr::NTabletFlatExecutor; +namespace { + + +TVector ExtractDevicesFromDiskInfo(TDiskInfo &diskInfo) { + TVector devices = std::move(diskInfo.Devices); + + auto devicesToAdd = diskInfo.Migrations.size(); + for (const auto& replica: diskInfo.Replicas) { + devicesToAdd += replica.size(); + } + + devices.reserve(devices.size() + devicesToAdd); + + for (auto& migration: diskInfo.Migrations) { + devices.emplace_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + devices.insert( + devices.end(), + std::make_move_iterator(replica.begin()), + std::make_move_iterator(replica.end())); + } + + return devices; +} + +} // namespace + + //////////////////////////////////////////////////////////////////////////////// void TDiskRegistryActor::HandleAcquireDisk( @@ -21,6 +50,14 @@ void TDiskRegistryActor::HandleAcquireDisk( { BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); + auto replyWithError = [&](auto error) + { + auto response = + std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *ev, std::move(response)); + }; + const auto* msg = ev->Get(); auto clientId = msg->Record.GetHeaders().GetClientId(); @@ -45,34 +82,30 @@ void TDiskRegistryActor::HandleAcquireDisk( if (HasError(error)) { LOG_ERROR( ctx, - TBlockStoreComponents::DISK_REGISTRY_WORKER, + TBlockStoreComponents::DISK_REGISTRY, "[%s] AcquireDisk %s error: %s", clientId.c_str(), diskId.c_str(), FormatError(error).c_str()); - NCloud::Reply( - ctx, - *ev, - std::make_unique( - std::move(error))); + replyWithError(std::move(error)); return; } - State->FilterDevicesAtUnavailableAgents(diskInfo); + if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "AcquireeDisk %s. Nothing to acquire", + diskId.c_str()); - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - devices.insert( - devices.end(), - std::make_move_iterator(replica.begin()), - std::make_move_iterator(replica.end())); + replyWithError(std::move(error)); + return; } - auto actor = NAcquireReleaseDevices::AcquireDevices( + TVector devices = ExtractDevicesFromDiskInfo(diskInfo); + + auto actor = NAcquireReleaseDevices::CreateAcquireDevicesActor( ctx, ctx.SelfID, std::move(devices), @@ -83,7 +116,7 @@ void TDiskRegistryActor::HandleAcquireDisk( msg->Record.GetVolumeGeneration(), Config->GetAgentRequestTimeout(), /*muteIOErrors=*/false, - TBlockStoreComponents::DISK_REGISTRY); + TBlockStoreComponents::DISK_REGISTRY_WORKER); Actors.insert(actor); PendingAcquireDiskRequests[actor] = CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); @@ -95,18 +128,7 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( { auto* msg = ev->Get(); - State->FinishAcquireDisk(msg->DiskId); - - OnDiskAcquired(std::move(msg->SentRequests)); - - auto reqInfo = PendingAcquireDiskRequests.at(ev->Sender); - - auto response = std::make_unique( - std::move(msg->Error)); - - const auto* disk = State->GetDisk(msg->DiskId); - - if (HasError(response->GetError())) { + if (HasError(msg->Error)) { LOG_ERROR( ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, @@ -114,8 +136,22 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( msg->ClientId.c_str(), msg->DiskId.c_str(), LogDevices(msg->Devices).c_str(), - FormatError(response->GetError()).c_str()); - } else { + FormatError(msg->Error).c_str()); + } + + State->FinishAcquireDisk(msg->DiskId); + + OnDiskAcquired(std::move(msg->SentRequests)); + + auto* reqInfo = PendingAcquireDiskRequests.FindPtr(ev->Sender); + if (!reqInfo) { + return; + } + + auto response = std::make_unique( + std::move(msg->Error)); + if (!HasError(response->GetError())) { + const auto* disk = State->GetDisk(msg->DiskId); response->Record.MutableDevices()->Reserve(msg->Devices.size()); for (auto& device: msg->Devices) { @@ -126,7 +162,7 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( } } - NCloud::Reply(ctx, *reqInfo, std::move(response)); + NCloud::Reply(ctx, **reqInfo, std::move(response)); Actors.erase(ev->Sender); PendingAcquireDiskRequests.erase(ev->Sender); } @@ -197,17 +233,10 @@ void TDiskRegistryActor::HandleReleaseDisk( return; } - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - for (auto& device: replica) { - devices.push_back(std::move(device)); - } - } + TVector devices = + ExtractDevicesFromDiskInfo(diskInfo); - auto actor = NAcquireReleaseDevices::ReleaseDevices( + auto actor = NAcquireReleaseDevices::CreateReleaseDevicesActor( ctx, ctx.SelfID, std::move(diskId), diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp index bc592ea2e9..068171733f 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp @@ -507,8 +507,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) runtime->SetObserverFunc( [&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case NAcquireReleaseDevices::TEvDevicesAcquireFinished:: - EventType: { + case NAcquireReleaseDevices::EvDevicesAcquireFinished: { finished = true; break; } From 4a1b1419278fe87bfc3fcb4ff1fb451cbe2f1434 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 14:40:05 +0700 Subject: [PATCH 04/11] issue-2725: correct issues --- .../disk_registry_actor_acquire_release.cpp | 8 +++----- .../storage/disk_registry/disk_registry_state.cpp | 12 ++++-------- .../libs/storage/disk_registry/disk_registry_state.h | 3 +-- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 6c5f9dc726..aae4536b8b 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -131,7 +131,7 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( if (HasError(msg->Error)) { LOG_ERROR( ctx, - TBlockStoreComponents::DISK_REGISTRY_WORKER, + TBlockStoreComponents::DISK_REGISTRY, "[%s] AcquireDisk %s targets %s error: %s", msg->ClientId.c_str(), msg->DiskId.c_str(), @@ -139,8 +139,6 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( FormatError(msg->Error).c_str()); } - State->FinishAcquireDisk(msg->DiskId); - OnDiskAcquired(std::move(msg->SentRequests)); auto* reqInfo = PendingAcquireDiskRequests.FindPtr(ev->Sender); @@ -150,8 +148,8 @@ void TDiskRegistryActor::HandleDevicesAcquireFinished( auto response = std::make_unique( std::move(msg->Error)); + const auto* disk = State->FinishAcquireDisk(msg->DiskId); if (!HasError(response->GetError())) { - const auto* disk = State->GetDisk(msg->DiskId); response->Record.MutableDevices()->Reserve(msg->Devices.size()); for (auto& device: msg->Devices) { @@ -245,7 +243,7 @@ void TDiskRegistryActor::HandleReleaseDisk( Config->GetAgentRequestTimeout(), std::move(devices), /*muteIOErrors=*/false, - TBlockStoreComponents::DISK_REGISTRY); + TBlockStoreComponents::DISK_REGISTRY_WORKER); Actors.insert(actor); PendingReleaseDiskRequests[actor] = diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index d581a013bd..76aa6c7d2e 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3363,12 +3363,6 @@ NProto::EDiskState TDiskRegistryState::GetDiskState(const TDiskId& diskId) const return disk->State; } -const TDiskRegistryState::TDiskState* TDiskRegistryState::GetDisk( - const TDiskId& diskId) const -{ - return Disks.FindPtr(diskId); -} - NProto::TError TDiskRegistryState::GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, @@ -3474,17 +3468,19 @@ NProto::TError TDiskRegistryState::StartAcquireDisk( return {}; } -void TDiskRegistryState::FinishAcquireDisk(const TString& diskId) +const TDiskRegistryState::TDiskState* TDiskRegistryState::FinishAcquireDisk( + const TString& diskId) { auto it = Disks.find(diskId); if (it == Disks.end()) { - return; + return nullptr; } auto& disk = it->second; disk.AcquireInProgress = false; + return &disk; } bool TDiskRegistryState::IsAcquireInProgress(const TString& diskId) const diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h index 99c65de006..435b00b521 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h @@ -447,7 +447,6 @@ class TDiskRegistryState NProto::TError GetDiskInfo(const TDiskId& diskId, TDiskInfo& diskInfo) const; NProto::EDiskState GetDiskState(const TDiskId& diskId) const; - const TDiskState* GetDisk(const TDiskId& diskId) const; NProto::TError GetShadowDiskId( const TDiskId& sourceDiskId, const TCheckpointId& checkpointId, @@ -465,7 +464,7 @@ class TDiskRegistryState bool HasPendingCleanup(const TDiskId& diskId) const; - void FinishAcquireDisk(const TString& diskId); + const TDiskState* FinishAcquireDisk(const TString& diskId); bool IsAcquireInProgress(const TString& diskId) const; From fbeefebcf888957d4b2019473500cbcaff741743 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 14:43:18 +0700 Subject: [PATCH 05/11] issue-2725: find() -> FindPtr() --- .../libs/storage/disk_registry/disk_registry_state.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index 76aa6c7d2e..b18965c0dd 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3471,16 +3471,14 @@ NProto::TError TDiskRegistryState::StartAcquireDisk( const TDiskRegistryState::TDiskState* TDiskRegistryState::FinishAcquireDisk( const TString& diskId) { - auto it = Disks.find(diskId); + auto* diskPtr = Disks.FindPtr(diskId); - if (it == Disks.end()) { + if (!diskPtr) { return nullptr; } - auto& disk = it->second; - - disk.AcquireInProgress = false; - return &disk; + diskPtr->AcquireInProgress = false; + return diskPtr; } bool TDiskRegistryState::IsAcquireInProgress(const TString& diskId) const From 010bdce3aa67c5ed058a9660bb4c6cf8acb0f01d Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 17:41:12 +0700 Subject: [PATCH 06/11] issue-2725: refactor file and fix responce --- .../disk_registry_actor_acquire_release.cpp | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index aae4536b8b..6a7cb7467d 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -15,16 +15,12 @@ using namespace NKikimr::NTabletFlatExecutor; namespace { - -TVector ExtractDevicesFromDiskInfo(TDiskInfo &diskInfo) { +TVector ExtractDevicesFromDiskInfo(TDiskInfo& diskInfo) +{ TVector devices = std::move(diskInfo.Devices); - - auto devicesToAdd = diskInfo.Migrations.size(); - for (const auto& replica: diskInfo.Replicas) { - devicesToAdd += replica.size(); - } - - devices.reserve(devices.size() + devicesToAdd); + devices.reserve( + devices.size() * (diskInfo.Replicas.size() + 1) + + diskInfo.Migrations.size()); for (auto& migration: diskInfo.Migrations) { devices.emplace_back(std::move(*migration.MutableTargetDevice())); @@ -39,8 +35,7 @@ TVector ExtractDevicesFromDiskInfo(TDiskInfo &diskInfo) { return devices; } -} // namespace - +} // namespace //////////////////////////////////////////////////////////////////////////////// @@ -53,7 +48,7 @@ void TDiskRegistryActor::HandleAcquireDisk( auto replyWithError = [&](auto error) { auto response = - std::make_unique( + std::make_unique( std::move(error)); NCloud::Reply(ctx, *ev, std::move(response)); }; From bfa8c07944a8fba8ef1257222f18511f21ecc718 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 17:37:12 +0700 Subject: [PATCH 07/11] issue-2725: correct issues --- .../storage/core/acquire_devices_actor.cpp | 59 +-- ...ces.h => acquire_release_devices_actors.h} | 29 +- .../storage/core/release_devices_actor.cpp | 49 +-- .../disk_registry/disk_registry_actor.cpp | 26 -- .../disk_registry/disk_registry_actor.h | 12 +- .../disk_registry_actor_acquire_release.cpp | 396 ++++++++++++++---- .../disk_registry/disk_registry_private.h | 54 ++- .../disk_registry/disk_registry_state.cpp | 6 +- .../disk_registry/disk_registry_state.h | 2 +- 9 files changed, 423 insertions(+), 210 deletions(-) rename cloud/blockstore/libs/storage/core/{acquire_release_devices.h => acquire_release_devices_actors.h} (82%) diff --git a/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp index 8d11435fb2..f267dcbb3c 100644 --- a/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp @@ -1,4 +1,4 @@ -#include "acquire_release_devices.h" +#include "acquire_release_devices_actors.h" #include #include @@ -11,6 +11,9 @@ #include namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +//////////////////////////////////////////////////////////////////////////////// + using namespace NActors; namespace { @@ -39,14 +42,7 @@ class TAcquireDevicesActor final public: TAcquireDevicesActor( const TActorId& owner, - TVector devices, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout, - bool muteIOErrors, + TAcquireReleaseDevicesInfo acquireDevicesInfo, NLog::EComponent component); void Bootstrap(const TActorContext& ctx); @@ -104,24 +100,17 @@ class TAcquireDevicesActor final TAcquireDevicesActor::TAcquireDevicesActor( const TActorId& owner, - TVector devices, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout, - bool muteIOErrors, + TAcquireReleaseDevicesInfo acquireDevicesInfo, NLog::EComponent component) : Owner(owner) - , Devices(std::move(devices)) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , AccessMode(accessMode) - , MountSeqNumber(mountSeqNumber) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) - , MuteIOErrors(muteIOErrors) + , Devices(std::move(acquireDevicesInfo.Devices)) + , DiskId(std::move(acquireDevicesInfo.DiskId)) + , ClientId(std::move(acquireDevicesInfo.ClientId)) + , AccessMode(acquireDevicesInfo.AccessMode.value()) + , MountSeqNumber(acquireDevicesInfo.MountSeqNumber.value()) + , VolumeGeneration(acquireDevicesInfo.VolumeGeneration) + , RequestTimeout(acquireDevicesInfo.RequestTimeout) + , MuteIOErrors(acquireDevicesInfo.MuteIOErrors) , Component(component) { SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); @@ -375,27 +364,13 @@ STFUNC(TAcquireDevicesActor::StateAcquire) TActorId CreateAcquireDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, - TVector devices, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout, - bool muteIOErrors, - NLog::EComponent component) + TAcquireReleaseDevicesInfo acquireDevicesInfo, + NActors::NLog::EComponent component) { return NCloud::Register( ctx, owner, - std::move(devices), - diskId, - std::move(clientId), - accessMode, - mountSeqNumber, - volumeGeneration, - requestTimeout, - muteIOErrors, + std::move(acquireDevicesInfo), component); } diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices.h b/cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h similarity index 82% rename from cloud/blockstore/libs/storage/core/acquire_release_devices.h rename to cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h index ab4493dea2..582c87b78a 100644 --- a/cloud/blockstore/libs/storage/core/acquire_release_devices.h +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h @@ -78,28 +78,29 @@ using TEvDevicesAcquireFinished = using TEvDevicesReleaseFinished = TRequestEvent; +struct TAcquireReleaseDevicesInfo +{ + TVector Devices; + TString DiskId; + TString ClientId; + std::optional + AccessMode; // Only AcquireDevicesActor need it. + std::optional MountSeqNumber; // Only AcquireDevicesActor need it. + ui32 VolumeGeneration; + TDuration RequestTimeout; + bool MuteIOErrors; +}; + TActorId CreateAcquireDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, - TVector devices, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout, - bool muteIOErrors, + TAcquireReleaseDevicesInfo acquireDevicesInfo, NActors::NLog::EComponent component); TActorId CreateReleaseDevicesActor( const NActors::TActorContext& ctx, const TActorId& owner, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices, - bool muteIOErrors, + TAcquireReleaseDevicesInfo releaseDevicesInfo, NActors::NLog::EComponent component); } // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp index fbd8445c67..de97e1f539 100644 --- a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -1,4 +1,4 @@ -#include "acquire_release_devices.h" +#include "acquire_release_devices_actors.h" #include #include @@ -10,6 +10,9 @@ #include namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +//////////////////////////////////////////////////////////////////////////////// + using namespace NActors; namespace { @@ -21,11 +24,11 @@ class TReleaseDevicesActor final { private: const TActorId Owner; + TVector Devices; const TString DiskId; const TString ClientId; const ui32 VolumeGeneration; const TDuration RequestTimeout; - TVector Devices; bool MuteIOErrors; NLog::EComponent Component; @@ -36,12 +39,7 @@ class TReleaseDevicesActor final public: TReleaseDevicesActor( const TActorId& owner, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices, - bool muteIoErrors, + TAcquireReleaseDevicesInfo releaseDevicesInfo, NLog::EComponent component); void Bootstrap(const TActorContext& ctx); @@ -81,20 +79,15 @@ class TReleaseDevicesActor final TReleaseDevicesActor::TReleaseDevicesActor( const TActorId& owner, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices, - bool muteIOErrors, + TAcquireReleaseDevicesInfo releaseDevicesInfo, NLog::EComponent component) : Owner(owner) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) - , Devices(std::move(devices)) - , MuteIOErrors(muteIOErrors) + , Devices(std::move(releaseDevicesInfo.Devices)) + , DiskId(std::move(releaseDevicesInfo.DiskId)) + , ClientId(std::move(releaseDevicesInfo.ClientId)) + , VolumeGeneration(releaseDevicesInfo.VolumeGeneration) + , RequestTimeout(releaseDevicesInfo.RequestTimeout) + , MuteIOErrors(releaseDevicesInfo.MuteIOErrors) , Component(component) {} @@ -249,25 +242,15 @@ TString TReleaseDevicesActor::LogTargets() const } // namespace TActorId CreateReleaseDevicesActor( - const TActorContext& ctx, + const NActors::TActorContext& ctx, const TActorId& owner, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices, - bool muteIOErrors, + TAcquireReleaseDevicesInfo releaseDevicesInfo, NActors::NLog::EComponent component) { return NCloud::Register( ctx, owner, - std::move(diskId), - std::move(clientId), - volumeGeneration, - requestTimeout, - std::move(devices), - muteIOErrors, + releaseDevicesInfo, component); } } // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index c18eb9f275..e80bdafa9a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -188,24 +188,6 @@ void TDiskRegistryActor::BeforeDie(const NActors::TActorContext& ctx) MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } PendingDiskDeallocationRequests.clear(); - - for (auto& [_, requestInfo]: PendingAcquireDiskRequests) { - NCloud::Reply( - ctx, - *requestInfo, - std::make_unique( - MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); - } - PendingAcquireDiskRequests.clear(); - - for (auto& [_, requestInfo]: PendingReleaseDiskRequests) { - NCloud::Reply( - ctx, - *requestInfo, - std::make_unique( - MakeTabletIsDeadError(E_REJECTED, __LOCATION__))); - } - PendingReleaseDiskRequests.clear(); } void TDiskRegistryActor::OnDetach(const TActorContext& ctx) @@ -726,14 +708,6 @@ STFUNC(TDiskRegistryActor::StateWork) TEvDiskRegistryPrivate::TEvDiskRegistryAgentListExpiredParamsCleanup, TDiskRegistryActor::HandleDiskRegistryAgentListExpiredParamsCleanup); - HFunc( - NAcquireReleaseDevices::TEvDevicesAcquireFinished, - HandleDevicesAcquireFinished); - - HFunc( - NAcquireReleaseDevices::TEvDevicesReleaseFinished, - HandleDevicesReleaseFinished); - default: if (!HandleRequests(ev) && !HandleDefaultEvents(ev, SelfId())) { HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY); diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index b3739a29a0..56a67c0493 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -86,9 +86,6 @@ class TDiskRegistryActor final THashMap> PendingDiskDeallocationRequests; - THashMap PendingAcquireDiskRequests; - THashMap PendingReleaseDiskRequests; - bool BrokenDisksDestructionInProgress = false; bool DisksNotificationInProgress = false; bool UsersNotificationInProgress = false; @@ -246,16 +243,9 @@ class TDiskRegistryActor final NProto::TError error); void ProcessAutomaticallyReplacedDevices(const NActors::TActorContext& ctx); - - void HandleDevicesAcquireFinished( - const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, - const NActors::TActorContext& ctx); void OnDiskAcquired( TVector sentAcquireRequests); - void HandleDevicesReleaseFinished( - const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, - const NActors::TActorContext& ctx); void OnDiskReleased( const TVector< NAcquireReleaseDevices::TAgentReleaseDevicesCachedRequest>& diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 6a7cb7467d..4a2e158e3c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace NCloud::NBlockStore::NStorage { using namespace NActors; @@ -15,6 +17,8 @@ using namespace NKikimr::NTabletFlatExecutor; namespace { +//////////////////////////////////////////////////////////////////////////////// + TVector ExtractDevicesFromDiskInfo(TDiskInfo& diskInfo) { TVector devices = std::move(diskInfo.Devices); @@ -35,6 +39,269 @@ TVector ExtractDevicesFromDiskInfo(TDiskInfo& diskInfo) return devices; } +//////////////////////////////////////////////////////////////////////////////// + +class TAcquireReleaseDiskProxyActor final + : public TActorBootstrapped +{ +public: + enum EOperationType { + ACQUIRE_DISK, + RELEASE_DISK, + }; + +private: + const TActorId Owner; + + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo AcquireReleaseInfo; + + const ui32 LogicalBlockSize; + + TRequestInfoPtr RequestInfo; + + EOperationType OperationType; + + std::optional WorkerId; + + std::optional> + OperationFinishedResponce; + + +public: + TAcquireReleaseDiskProxyActor( + const TActorId& owner, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo acquireReleaseInfo, + ui32 logicalBlockSize, + TRequestInfoPtr requestInfo, + EOperationType operationType); + + void Bootstrap(const TActorContext& ctx); + +private: + template + void SendOperationFinishedToOwner( + const TActorContext& ctx, + const TEventType& ev) + { + auto* msg = ev->Get(); + + WorkerId = std::nullopt; + + OperationFinishedResponce = *msg; + auto request = std::make_unique( + msg->DiskId, + msg->ClientId, + msg->SentRequests); + NCloud::Send(ctx, Owner, std::move(request)); + } + + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void ReplyAndDieAcquire(const TActorContext& ctx, NProto::TError error); + + void ReplyAndDieRelease(const TActorContext& ctx, NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleFinishAcquireDiskResponse( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleRemoveDiskSessionResponse( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TAcquireReleaseDiskProxyActor::TAcquireReleaseDiskProxyActor( + const TActorId& owner, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo acquireReleaseInfo, + ui32 logicalBlockSize, + TRequestInfoPtr requestInfo, + EOperationType operationType) + : Owner(owner) + , AcquireReleaseInfo(std::move(acquireReleaseInfo)) + , LogicalBlockSize(logicalBlockSize) + , RequestInfo(std::move(requestInfo)) + , OperationType(operationType) +{ +} + +void TAcquireReleaseDiskProxyActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + WorkerId = [&]() + { + switch (OperationType) { + case ACQUIRE_DISK: + return NAcquireReleaseDevices::CreateAcquireDevicesActor( + ctx, + ctx.SelfID, + std::move(AcquireReleaseInfo), + TBlockStoreComponents::DISK_REGISTRY_WORKER); + case RELEASE_DISK: + return NAcquireReleaseDevices::CreateReleaseDevicesActor( + ctx, + ctx.SelfID, + std::move(AcquireReleaseInfo), + TBlockStoreComponents::DISK_REGISTRY_WORKER); + } + }(); +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + switch (OperationType) { + case ACQUIRE_DISK: + ReplyAndDieAcquire(ctx, std::move(error)); + return; + case RELEASE_DISK: + ReplyAndDieRelease(ctx, std::move(error)); + return; + } +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDieAcquire( + const TActorContext& ctx, + NProto::TError error) +{ + auto* msg = + OperationFinishedResponce.has_value() + ? &std::get( + OperationFinishedResponce.value()) + : nullptr; + + auto response = std::make_unique( + !HasError(error) && msg ? std::move(msg->Error) : std::move(error)); + + if (!HasError(response->GetError()) && msg) { + response->Record.MutableDevices()->Reserve(msg->Devices.size()); + + for (auto& device: msg->Devices) { + ToLogicalBlocks(device, LogicalBlockSize); + *response->Record.AddDevices() = std::move(device); + } + } + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + NCloud::Send( + ctx, + Owner, + std::make_unique()); + Die(ctx); +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDieRelease( + const TActorContext& ctx, + NProto::TError error) +{ + auto response = std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + NCloud::Send( + ctx, + Owner, + std::make_unique()); + Die(ctx); +} + +STFUNC(TAcquireReleaseDiskProxyActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + NAcquireReleaseDevices::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); + HFunc( + TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse, + HandleFinishAcquireDiskResponse); + + HFunc( + NAcquireReleaseDevices::TEvDevicesReleaseFinished, + HandleDevicesReleaseFinished); + HFunc( + TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse, + HandleRemoveDiskSessionResponse); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); + break; + } +} + +void TAcquireReleaseDiskProxyActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + if (WorkerId) { + NCloud::Send( + ctx, + WorkerId.value(), + std::make_unique()); + } + + ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); +} + +void TAcquireReleaseDiskProxyActor::HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == ACQUIRE_DISK); + SendOperationFinishedToOwner< + TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest>(ctx, ev); +} + +void TAcquireReleaseDiskProxyActor::HandleFinishAcquireDiskResponse( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == ACQUIRE_DISK); + Y_UNUSED(ev); + + ReplyAndDie(ctx, {}); +} + +void TAcquireReleaseDiskProxyActor::HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == RELEASE_DISK); + SendOperationFinishedToOwner< + TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>(ctx, ev); +} + +void TAcquireReleaseDiskProxyActor::HandleRemoveDiskSessionResponse( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == RELEASE_DISK); + ReplyAndDie(ctx, ev->Get()->GetError()); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -72,9 +339,8 @@ void TDiskRegistryActor::HandleAcquireDisk( msg->Record.GetVolumeGeneration()); TDiskInfo diskInfo; - auto error = State->StartAcquireDisk(diskId, diskInfo); - - if (HasError(error)) { + if (auto error = State->StartAcquireDisk(diskId, diskInfo); HasError(error)) + { LOG_ERROR( ctx, TBlockStoreComponents::DISK_REGISTRY, @@ -94,70 +360,43 @@ void TDiskRegistryActor::HandleAcquireDisk( "AcquireeDisk %s. Nothing to acquire", diskId.c_str()); - replyWithError(std::move(error)); + replyWithError(MakeError(S_ALREADY, {})); return; } TVector devices = ExtractDevicesFromDiskInfo(diskInfo); - - auto actor = NAcquireReleaseDevices::CreateAcquireDevicesActor( + auto actor = NCloud::Register( ctx, ctx.SelfID, - std::move(devices), - std::move(diskId), - std::move(clientId), - msg->Record.GetAccessMode(), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration(), - Config->GetAgentRequestTimeout(), - /*muteIOErrors=*/false, - TBlockStoreComponents::DISK_REGISTRY_WORKER); + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo{ + .Devices = std::move(devices), + .DiskId = std::move(diskId), + .ClientId = std::move(clientId), + .AccessMode = msg->Record.GetAccessMode(), + .MountSeqNumber = msg->Record.GetMountSeqNumber(), + .VolumeGeneration = msg->Record.GetVolumeGeneration(), + .RequestTimeout = Config->GetAgentRequestTimeout(), + .MuteIOErrors = false, + }, + diskInfo.LogicalBlockSize, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + TAcquireReleaseDiskProxyActor::ACQUIRE_DISK); Actors.insert(actor); - PendingAcquireDiskRequests[actor] = - CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); } -void TDiskRegistryActor::HandleDevicesAcquireFinished( - const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, - const NActors::TActorContext& ctx) +void TDiskRegistryActor::HandleFinishAcquireDisk( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest::TPtr& ev, + const TActorContext& ctx) { auto* msg = ev->Get(); - if (HasError(msg->Error)) { - LOG_ERROR( - ctx, - TBlockStoreComponents::DISK_REGISTRY, - "[%s] AcquireDisk %s targets %s error: %s", - msg->ClientId.c_str(), - msg->DiskId.c_str(), - LogDevices(msg->Devices).c_str(), - FormatError(msg->Error).c_str()); - } + State->FinishAcquireDisk(msg->DiskId); OnDiskAcquired(std::move(msg->SentRequests)); - auto* reqInfo = PendingAcquireDiskRequests.FindPtr(ev->Sender); - if (!reqInfo) { - return; - } - - auto response = std::make_unique( - std::move(msg->Error)); - const auto* disk = State->FinishAcquireDisk(msg->DiskId); - if (!HasError(response->GetError())) { - response->Record.MutableDevices()->Reserve(msg->Devices.size()); - - for (auto& device: msg->Devices) { - if (disk) { - ToLogicalBlocks(device, disk->LogicalBlockSize); - } - *response->Record.AddDevices() = std::move(device); - } - } - - NCloud::Reply(ctx, **reqInfo, std::move(response)); - Actors.erase(ev->Sender); - PendingAcquireDiskRequests.erase(ev->Sender); + auto response = std::make_unique< + TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse>(); + NCloud::Reply(ctx, *ev, std::move(response)); } //////////////////////////////////////////////////////////////////////////////// @@ -202,8 +441,10 @@ void TDiskRegistryActor::HandleReleaseDisk( } TDiskInfo diskInfo; - const auto error = State->GetDiskInfo(diskId, diskInfo); - if (HasError(error)) { + + if (const auto error = State->GetDiskInfo(diskId, diskInfo); + HasError(error)) + { LOG_ERROR( ctx, TBlockStoreComponents::DISK_REGISTRY, @@ -226,42 +467,41 @@ void TDiskRegistryActor::HandleReleaseDisk( return; } - TVector devices = - ExtractDevicesFromDiskInfo(diskInfo); - - auto actor = NAcquireReleaseDevices::CreateReleaseDevicesActor( + auto actor = NCloud::Register( ctx, ctx.SelfID, - std::move(diskId), - std::move(clientId), - volumeGeneration, - Config->GetAgentRequestTimeout(), - std::move(devices), - /*muteIOErrors=*/false, - TBlockStoreComponents::DISK_REGISTRY_WORKER); - + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo{ + .Devices = ExtractDevicesFromDiskInfo(diskInfo), + .DiskId = std::move(diskId), + .ClientId = std::move(clientId), + .AccessMode = std::nullopt, + .MountSeqNumber = std::nullopt, + .VolumeGeneration = msg->Record.GetVolumeGeneration(), + .RequestTimeout = Config->GetAgentRequestTimeout(), + .MuteIOErrors = false, + }, + TBlockStoreComponents::DISK_REGISTRY_WORKER, + TAcquireReleaseDiskProxyActor::RELEASE_DISK); Actors.insert(actor); - PendingReleaseDiskRequests[actor] = - CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); } -void TDiskRegistryActor::HandleDevicesReleaseFinished( - const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, - const NActors::TActorContext& ctx) +void TDiskRegistryActor::HandleRemoveDiskSession( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest::TPtr& ev, + const TActorContext& ctx) { const auto* msg = ev->Get(); OnDiskReleased(msg->SentRequests); - State->FinishAcquireDisk(msg->DiskId); - auto reqInfo = PendingReleaseDiskRequests.at(ev->Sender); + auto requestInfo = CreateRequestInfo( + ev->Sender, + ev->Cookie, + msg->CallContext); + State->FinishAcquireDisk(msg->DiskId); auto response = - std::make_unique(msg->Error); - NCloud::Reply(ctx, *reqInfo, std::move(response)); - - Actors.erase(ev->Sender); - PendingReleaseDiskRequests.erase(ev->Sender); + std::make_unique(); + NCloud::Reply(ctx, *ev, std::move(response)); } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h index be27a49619..f91bee7b1a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include @@ -168,6 +168,8 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; xxx(CleanupDisks, __VA_ARGS__) \ xxx(SecureErase, __VA_ARGS__) \ xxx(CleanupDevices, __VA_ARGS__) \ + xxx(FinishAcquireDisk, __VA_ARGS__) \ + xxx(RemoveDiskSession, __VA_ARGS__) \ xxx(DestroyBrokenDisks, __VA_ARGS__) \ xxx(ListBrokenDisks, __VA_ARGS__) \ xxx(NotifyDisks, __VA_ARGS__) \ @@ -191,6 +193,56 @@ using TVolumeConfig = NKikimrBlockStore::TVolumeConfig; struct TEvDiskRegistryPrivate { + // + // FinishAcquireDisk + // + + struct TFinishAcquireDiskRequest + { + TString DiskId; + TString ClientId; + TVector + SentRequests; + + TFinishAcquireDiskRequest( + TString diskId, + TString clientId, + TVector + sentRequests) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + {} + }; + + struct TFinishAcquireDiskResponse + {}; + + // + // RemoveDiskSession + // + + struct TRemoveDiskSessionRequest + { + TString DiskId; + TString ClientId; + TVector + SentRequests; + + TRemoveDiskSessionRequest( + TString diskId, + TString clientId, + TVector + sentRequests) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + {} + }; + + struct TRemoveDiskSessionResponse + {}; + // // CleanupDisks // diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index b18965c0dd..7c2fc1d3d7 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3468,17 +3468,15 @@ NProto::TError TDiskRegistryState::StartAcquireDisk( return {}; } -const TDiskRegistryState::TDiskState* TDiskRegistryState::FinishAcquireDisk( - const TString& diskId) +void TDiskRegistryState::FinishAcquireDisk(const TString& diskId) { auto* diskPtr = Disks.FindPtr(diskId); if (!diskPtr) { - return nullptr; + return; } diskPtr->AcquireInProgress = false; - return diskPtr; } bool TDiskRegistryState::IsAcquireInProgress(const TString& diskId) const diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h index 435b00b521..3b78b8786f 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.h @@ -464,7 +464,7 @@ class TDiskRegistryState bool HasPendingCleanup(const TDiskId& diskId) const; - const TDiskState* FinishAcquireDisk(const TString& diskId); + void FinishAcquireDisk(const TString& diskId); bool IsAcquireInProgress(const TString& diskId) const; From 710e3a8fb7e8a3ab35f0838a62e20e7643f48deb Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 17:40:45 +0700 Subject: [PATCH 08/11] issue-2725: fmt code --- .../disk_registry_actor_acquire_release.cpp | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 4a2e158e3c..a9d820a09c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -45,7 +45,8 @@ class TAcquireReleaseDiskProxyActor final : public TActorBootstrapped { public: - enum EOperationType { + enum EOperationType + { ACQUIRE_DISK, RELEASE_DISK, }; @@ -68,7 +69,6 @@ class TAcquireReleaseDiskProxyActor final NAcquireReleaseDevices::TDevicesReleaseFinished>> OperationFinishedResponce; - public: TAcquireReleaseDiskProxyActor( const TActorId& owner, @@ -140,8 +140,7 @@ TAcquireReleaseDiskProxyActor::TAcquireReleaseDiskProxyActor( , LogicalBlockSize(logicalBlockSize) , RequestInfo(std::move(requestInfo)) , OperationType(operationType) -{ -} +{} void TAcquireReleaseDiskProxyActor::Bootstrap(const TActorContext& ctx) { @@ -245,7 +244,9 @@ STFUNC(TAcquireReleaseDiskProxyActor::StateWork) HandleRemoveDiskSessionResponse); default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); + HandleUnexpectedEvent( + ev, + TBlockStoreComponents::DISK_REGISTRY_WORKER); break; } } @@ -493,14 +494,12 @@ void TDiskRegistryActor::HandleRemoveDiskSession( OnDiskReleased(msg->SentRequests); - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); State->FinishAcquireDisk(msg->DiskId); - auto response = - std::make_unique(); + auto response = std::make_unique< + TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse>(); NCloud::Reply(ctx, *ev, std::move(response)); } From ead4ce686e41eb186472ec7202146584a7e6a218 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 18:12:28 +0700 Subject: [PATCH 09/11] issue-2725: fix build issue --- .../disk_registry/disk_registry_actor_acquire_release.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index a9d820a09c..4e2e59b9ac 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -380,7 +380,7 @@ void TDiskRegistryActor::HandleAcquireDisk( .MuteIOErrors = false, }, diskInfo.LogicalBlockSize, - TBlockStoreComponents::DISK_REGISTRY_WORKER, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), TAcquireReleaseDiskProxyActor::ACQUIRE_DISK); Actors.insert(actor); } @@ -481,7 +481,7 @@ void TDiskRegistryActor::HandleReleaseDisk( .RequestTimeout = Config->GetAgentRequestTimeout(), .MuteIOErrors = false, }, - TBlockStoreComponents::DISK_REGISTRY_WORKER, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), TAcquireReleaseDiskProxyActor::RELEASE_DISK); Actors.insert(actor); } From 71294f7e353205a58fc48cb3cb08e2dac5d1f25b Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 18:41:12 +0700 Subject: [PATCH 10/11] issue-2725: fix build issue --- .../disk_registry/disk_registry_actor_acquire_release.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 4e2e59b9ac..351f068e1a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -481,6 +481,7 @@ void TDiskRegistryActor::HandleReleaseDisk( .RequestTimeout = Config->GetAgentRequestTimeout(), .MuteIOErrors = false, }, + diskInfo.LogicalBlockSize, CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), TAcquireReleaseDiskProxyActor::RELEASE_DISK); Actors.insert(actor); From a42ac5d731a555ea2747e6d970798d88bcb1936c Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Sat, 18 Jan 2025 17:35:08 +0000 Subject: [PATCH 11/11] issue-2725: correct issues --- .../disk_registry_actor_acquire_release.cpp | 53 +++++++++++-------- .../disk_registry_ut_session.cpp | 4 +- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp index 351f068e1a..2870b91781 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -45,10 +45,10 @@ class TAcquireReleaseDiskProxyActor final : public TActorBootstrapped { public: - enum EOperationType + enum class EOperationType { - ACQUIRE_DISK, - RELEASE_DISK, + AcquireDisk, + ReleaseDisk, }; private: @@ -97,10 +97,16 @@ class TAcquireReleaseDiskProxyActor final NCloud::Send(ctx, Owner, std::move(request)); } - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + template + TEventType* GetFinishedOperationResponce() + { + return OperationFinishedResponce.has_value() + ? &std::get(OperationFinishedResponce.value()) + : nullptr; + } + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); void ReplyAndDieAcquire(const TActorContext& ctx, NProto::TError error); - void ReplyAndDieRelease(const TActorContext& ctx, NProto::TError error); private: @@ -149,13 +155,13 @@ void TAcquireReleaseDiskProxyActor::Bootstrap(const TActorContext& ctx) WorkerId = [&]() { switch (OperationType) { - case ACQUIRE_DISK: + case EOperationType::AcquireDisk: return NAcquireReleaseDevices::CreateAcquireDevicesActor( ctx, ctx.SelfID, std::move(AcquireReleaseInfo), TBlockStoreComponents::DISK_REGISTRY_WORKER); - case RELEASE_DISK: + case EOperationType::ReleaseDisk: return NAcquireReleaseDevices::CreateReleaseDevicesActor( ctx, ctx.SelfID, @@ -170,10 +176,10 @@ void TAcquireReleaseDiskProxyActor::ReplyAndDie( NProto::TError error) { switch (OperationType) { - case ACQUIRE_DISK: + case EOperationType::AcquireDisk: ReplyAndDieAcquire(ctx, std::move(error)); return; - case RELEASE_DISK: + case EOperationType::ReleaseDisk: ReplyAndDieRelease(ctx, std::move(error)); return; } @@ -183,11 +189,8 @@ void TAcquireReleaseDiskProxyActor::ReplyAndDieAcquire( const TActorContext& ctx, NProto::TError error) { - auto* msg = - OperationFinishedResponce.has_value() - ? &std::get( - OperationFinishedResponce.value()) - : nullptr; + auto* msg = GetFinishedOperationResponce< + NAcquireReleaseDevices::TDevicesAcquireFinished>(); auto response = std::make_unique( !HasError(error) && msg ? std::move(msg->Error) : std::move(error)); @@ -213,8 +216,11 @@ void TAcquireReleaseDiskProxyActor::ReplyAndDieRelease( const TActorContext& ctx, NProto::TError error) { + auto* msg = GetFinishedOperationResponce< + NAcquireReleaseDevices::TDevicesReleaseFinished>(); + auto response = std::make_unique( - std::move(error)); + !HasError(error) && msg ? std::move(msg->Error) : std::move(error)); NCloud::Reply(ctx, *RequestInfo, std::move(response)); NCloud::Send( @@ -271,7 +277,7 @@ void TAcquireReleaseDiskProxyActor::HandleDevicesAcquireFinished( const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(OperationType == ACQUIRE_DISK); + Y_ABORT_UNLESS(OperationType == EOperationType::AcquireDisk); SendOperationFinishedToOwner< TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest>(ctx, ev); } @@ -280,7 +286,7 @@ void TAcquireReleaseDiskProxyActor::HandleFinishAcquireDiskResponse( const TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse::TPtr& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(OperationType == ACQUIRE_DISK); + Y_ABORT_UNLESS(OperationType == EOperationType::AcquireDisk); Y_UNUSED(ev); ReplyAndDie(ctx, {}); @@ -290,7 +296,7 @@ void TAcquireReleaseDiskProxyActor::HandleDevicesReleaseFinished( const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, const NActors::TActorContext& ctx) { - Y_ABORT_UNLESS(OperationType == RELEASE_DISK); + Y_ABORT_UNLESS(OperationType == EOperationType::ReleaseDisk); SendOperationFinishedToOwner< TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>(ctx, ev); } @@ -299,7 +305,7 @@ void TAcquireReleaseDiskProxyActor::HandleRemoveDiskSessionResponse( const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(OperationType == RELEASE_DISK); + Y_ABORT_UNLESS(OperationType == EOperationType::ReleaseDisk); ReplyAndDie(ctx, ev->Get()->GetError()); } @@ -361,7 +367,8 @@ void TDiskRegistryActor::HandleAcquireDisk( "AcquireeDisk %s. Nothing to acquire", diskId.c_str()); - replyWithError(MakeError(S_ALREADY, {})); + State->FinishAcquireDisk(diskId); + replyWithError(MakeError(S_ALREADY, "Nothing to acquire")); return; } @@ -381,7 +388,7 @@ void TDiskRegistryActor::HandleAcquireDisk( }, diskInfo.LogicalBlockSize, CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), - TAcquireReleaseDiskProxyActor::ACQUIRE_DISK); + TAcquireReleaseDiskProxyActor::EOperationType::AcquireDisk); Actors.insert(actor); } @@ -464,7 +471,7 @@ void TDiskRegistryActor::HandleReleaseDisk( "ReleaseDisk %s. Nothing to release", diskId.c_str()); - replyWithError(MakeError(S_ALREADY, {})); + replyWithError(MakeError(S_ALREADY, "Nothing to acquire")); return; } @@ -483,7 +490,7 @@ void TDiskRegistryActor::HandleReleaseDisk( }, diskInfo.LogicalBlockSize, CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), - TAcquireReleaseDiskProxyActor::RELEASE_DISK); + TAcquireReleaseDiskProxyActor::EOperationType::ReleaseDisk); Actors.insert(actor); } diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp index 068171733f..13f731e31c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp @@ -618,7 +618,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) { diskRegistry.SendAcquireDiskRequest("disk-1", "session-1"); auto response = diskRegistry.RecvAcquireDiskResponse(); - UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_ALREADY, response->GetStatus()); UNIT_ASSERT_VALUES_EQUAL(0, response->Record.DevicesSize()); } @@ -631,7 +631,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) NProto::VOLUME_ACCESS_READ_ONLY); auto response = diskRegistry.RecvAcquireDiskResponse(); - UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_ALREADY, response->GetStatus()); UNIT_ASSERT_VALUES_EQUAL(0, response->Record.DevicesSize()); }