diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp similarity index 50% rename from cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp rename to cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp index f7d78f8ef3..f267dcbb3c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire.cpp +++ b/cloud/blockstore/libs/storage/core/acquire_devices_actor.cpp @@ -1,53 +1,49 @@ -#include "disk_registry_actor.h" +#include "acquire_release_devices_actors.h" -#include +#include +#include +#include -#include +#include +#include +#include #include -#include -namespace NCloud::NBlockStore::NStorage { +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { -using namespace NActors; +//////////////////////////////////////////////////////////////////////////////// -using namespace NKikimr::NTabletFlatExecutor; +using namespace NActors; namespace { //////////////////////////////////////////////////////////////////////////////// -class TAcquireDiskActor final - : public TActorBootstrapped +class TAcquireDevicesActor final + : public TActorBootstrapped { private: const TActorId Owner; - TRequestInfoPtr RequestInfo; TVector Devices; - const ui32 LogicalBlockSize = 0; - const TString DiskId; - const TString ClientId; + TString DiskId; + TString ClientId; const NProto::EVolumeAccessMode AccessMode; const ui64 MountSeqNumber; const ui32 VolumeGeneration; const TDuration RequestTimeout; + const bool MuteIOErrors; + NLog::EComponent Component; int PendingRequests = 0; TVector SentAcquireRequests; public: - TAcquireDiskActor( + TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, - TVector devices, - ui32 logicalBlockSize, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout); + TAcquireReleaseDevicesInfo acquireDevicesInfo, + NLog::EComponent component); void Bootstrap(const TActorContext& ctx); @@ -55,8 +51,6 @@ class TAcquireDiskActor final void PrepareRequest(NProto::TAcquireDevicesRequest& request) const; void PrepareRequest(NProto::TReleaseDevicesRequest& request) const; - void FinishAcquireDisk(const TActorContext& ctx, NProto::TError error); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); void OnAcquireResponse( @@ -104,51 +98,45 @@ class TAcquireDiskActor final //////////////////////////////////////////////////////////////////////////////// -TAcquireDiskActor::TAcquireDiskActor( +TAcquireDevicesActor::TAcquireDevicesActor( const TActorId& owner, - TRequestInfoPtr requestInfo, - TVector devices, - ui32 logicalBlockSize, - TString diskId, - TString clientId, - NProto::EVolumeAccessMode accessMode, - ui64 mountSeqNumber, - ui32 volumeGeneration, - TDuration requestTimeout) + TAcquireReleaseDevicesInfo acquireDevicesInfo, + NLog::EComponent component) : Owner(owner) - , RequestInfo(std::move(requestInfo)) - , Devices(std::move(devices)) - , LogicalBlockSize(logicalBlockSize) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , AccessMode(accessMode) - , MountSeqNumber(mountSeqNumber) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) + , Devices(std::move(acquireDevicesInfo.Devices)) + , DiskId(std::move(acquireDevicesInfo.DiskId)) + , ClientId(std::move(acquireDevicesInfo.ClientId)) + , AccessMode(acquireDevicesInfo.AccessMode.value()) + , MountSeqNumber(acquireDevicesInfo.MountSeqNumber.value()) + , VolumeGeneration(acquireDevicesInfo.VolumeGeneration) + , RequestTimeout(acquireDevicesInfo.RequestTimeout) + , MuteIOErrors(acquireDevicesInfo.MuteIOErrors) + , Component(component) { - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); } -void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) +void TAcquireDevicesActor::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateAcquire); if (Devices.empty()) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); return; } ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Sending acquire devices requests for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), LogTargets().c_str()); - auto sentRequests = CreateRequests(); + auto sentRequests = + CreateRequests(); SendRequests(ctx, sentRequests); Y_ABORT_UNLESS(SentAcquireRequests.empty()); @@ -156,29 +144,13 @@ void TAcquireDiskActor::Bootstrap(const TActorContext& ctx) TInstant now = ctx.Now(); for (auto& x: sentRequests) { SentAcquireRequests.push_back(TAgentAcquireDevicesCachedRequest{ - std::move(x.AgentId), - std::move(x.Record), - now}); + .AgentId = std::move(x.AgentId), + .Request = std::move(x.Record), + .RequestTime = now}); } } -void TAcquireDiskActor::FinishAcquireDisk( - const TActorContext& ctx, - NProto::TError error) -{ - using TType = TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest; - NCloud::Send( - ctx, - Owner, - std::make_unique( - DiskId, - ClientId, - std::move(SentAcquireRequests))); - - ReplyAndDie(ctx, std::move(error)); -} - -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TAcquireDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); @@ -188,14 +160,14 @@ void TAcquireDiskActor::PrepareRequest( request.SetVolumeGeneration(VolumeGeneration); } -void TAcquireDiskActor::PrepareRequest( +void TAcquireDevicesActor::PrepareRequest( NProto::TReleaseDevicesRequest& request) const { request.MutableHeaders()->SetClientId(ClientId); } template -auto TAcquireDiskActor::CreateRequests() const +auto TAcquireDevicesActor::CreateRequests() const -> TVector> { auto it = Devices.begin(); @@ -216,18 +188,18 @@ auto TAcquireDiskActor::CreateRequests() const } template -void TAcquireDiskActor::SendRequests( +void TAcquireDevicesActor::SendRequests( const TActorContext& ctx, const TVector>& requests) { PendingRequests = 0; for (const auto& r: requests) { - auto request = std::make_unique( - TCallContextPtr {}, - r.Record); + auto request = std::make_unique(TCallContextPtr{}, r.Record); - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Send an acquire request to node #%d. Devices: %s", ClientId.c_str(), r.NodeId, @@ -248,40 +220,34 @@ void TAcquireDiskActor::SendRequests( } } -void TAcquireDiskActor::ReplyAndDie( +void TAcquireDevicesActor::ReplyAndDie( const TActorContext& ctx, NProto::TError error) { - auto response = std::make_unique( - std::move(error)); - - if (HasError(response->GetError())) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s targets %s error: %s", + if (HasError(error)) { + LOG_ERROR( + ctx, + Component, + "[%s] AcquireDevices %s targets %s error: %s", ClientId.c_str(), DiskId.c_str(), LogTargets().c_str(), - FormatError(response->GetError()).c_str()); - } else { - response->Record.MutableDevices()->Reserve(Devices.size()); - - for (auto& device: Devices) { - ToLogicalBlocks(device, LogicalBlockSize); - *response->Record.AddDevices() = std::move(device); - } + FormatError(error).c_str()); } - NCloud::Reply(ctx, *RequestInfo, std::move(response)); + auto response = std::make_unique( + std::move(DiskId), + std::move(ClientId), + std::move(SentAcquireRequests), + std::move(Devices), + std::move(error)); - NCloud::Send( - ctx, - Owner, - std::make_unique()); + NCloud::Send(ctx, Owner, std::move(response)); Die(ctx); } //////////////////////////////////////////////////////////////////////////////// -void TAcquireDiskActor::HandlePoisonPill( +void TAcquireDevicesActor::HandlePoisonPill( const TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx) { @@ -290,15 +256,17 @@ void TAcquireDiskActor::HandlePoisonPill( ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); } -void TAcquireDiskActor::OnAcquireResponse( +void TAcquireDevicesActor::OnAcquireResponse( const TActorContext& ctx, ui32 nodeId, NProto::TError error) { Y_ABORT_UNLESS(PendingRequests > 0); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + if (HasError(error) && !MuteIOErrors) { + LOG_ERROR( + ctx, + Component, "[%s] AcquireDevices on the node #%d %s error: %s", ClientId.c_str(), nodeId, @@ -306,7 +274,9 @@ void TAcquireDiskActor::OnAcquireResponse( FormatError(error).c_str()); if (GetErrorKind(error) != EErrorKind::ErrorRetriable) { - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, + LOG_DEBUG( + ctx, + Component, "[%s] Canceling acquire operation for disk %s, targets %s", ClientId.c_str(), DiskId.c_str(), @@ -318,17 +288,17 @@ void TAcquireDiskActor::OnAcquireResponse( } SentAcquireRequests.clear(); - FinishAcquireDisk(ctx, std::move(error)); + ReplyAndDie(ctx, std::move(error)); return; } if (--PendingRequests == 0) { - FinishAcquireDisk(ctx, {}); + ReplyAndDie(ctx, {}); } } -void TAcquireDiskActor::HandleAcquireDevicesResponse( +void TAcquireDevicesActor::HandleAcquireDevicesResponse( const TEvDiskAgent::TEvAcquireDevicesResponse::TPtr& ev, const TActorContext& ctx) { @@ -338,7 +308,7 @@ void TAcquireDiskActor::HandleAcquireDevicesResponse( ev->Get()->GetError()); } -void TAcquireDiskActor::HandleAcquireDevicesUndelivery( +void TAcquireDevicesActor::HandleAcquireDevicesUndelivery( const TEvDiskAgent::TEvAcquireDevicesRequest::TPtr& ev, const TActorContext& ctx) { @@ -348,7 +318,7 @@ void TAcquireDiskActor::HandleAcquireDevicesUndelivery( MakeError(E_REJECTED, "not delivered")); } -void TAcquireDiskActor::HandleWakeup( +void TAcquireDevicesActor::HandleWakeup( const TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { @@ -360,27 +330,29 @@ void TAcquireDiskActor::HandleWakeup( //////////////////////////////////////////////////////////////////////////////// -TString TAcquireDiskActor::LogTargets() const +TString TAcquireDevicesActor::LogTargets() const { return LogDevices(Devices); } //////////////////////////////////////////////////////////////////////////////// -STFUNC(TAcquireDiskActor::StateAcquire) +STFUNC(TAcquireDevicesActor::StateAcquire) { switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvDiskAgent::TEvAcquireDevicesResponse, + HFunc( + TEvDiskAgent::TEvAcquireDevicesResponse, HandleAcquireDevicesResponse); - HFunc(TEvDiskAgent::TEvAcquireDevicesRequest, + HFunc( + TEvDiskAgent::TEvAcquireDevicesRequest, HandleAcquireDevicesUndelivery); HFunc(TEvents::TEvWakeup, HandleWakeup); default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); + HandleUnexpectedEvent(ev, Component); break; } } @@ -389,91 +361,17 @@ STFUNC(TAcquireDiskActor::StateAcquire) //////////////////////////////////////////////////////////////////////////////// -void TDiskRegistryActor::HandleAcquireDisk( - const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) +TActorId CreateAcquireDevicesActor( + const NActors::TActorContext& ctx, + const TActorId& owner, + TAcquireReleaseDevicesInfo acquireDevicesInfo, + NActors::NLog::EComponent component) { - BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); - - const auto* msg = ev->Get(); - - auto clientId = msg->Record.GetHeaders().GetClientId(); - auto diskId = msg->Record.GetDiskId(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received AcquireDisk request: " - "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - static_cast(msg->Record.GetAccessMode()), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration()); - - TDiskInfo diskInfo; - auto error = State->StartAcquireDisk(diskId, diskInfo); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "[%s] AcquireDisk %s error: %s", - clientId.c_str(), - diskId.c_str(), - FormatError(error).c_str()); - - NCloud::Reply( - ctx, - *ev, - std::make_unique( - std::move(error))); - return; - } - - State->FilterDevicesAtUnavailableAgents(diskInfo); - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - devices.insert(devices.end(), - std::make_move_iterator(replica.begin()), - std::make_move_iterator(replica.end())); - } - - auto actor = NCloud::Register( + return NCloud::Register( ctx, - ctx.SelfID, - CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext), - std::move(devices), - diskInfo.LogicalBlockSize, - std::move(diskId), - std::move(clientId), - msg->Record.GetAccessMode(), - msg->Record.GetMountSeqNumber(), - msg->Record.GetVolumeGeneration(), - Config->GetAgentRequestTimeout()); - Actors.insert(actor); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleFinishAcquireDisk( - const TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - auto* msg = ev->Get(); - - State->FinishAcquireDisk(msg->DiskId); - - OnDiskAcquired(std::move(msg->SentRequests)); - - auto response = std::make_unique< - TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse>(); - NCloud::Reply(ctx, *ev, std::move(response)); + owner, + std::move(acquireDevicesInfo), + component); } -} // namespace NCloud::NBlockStore::NStorage +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h b/cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h new file mode 100644 index 0000000000..582c87b78a --- /dev/null +++ b/cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h @@ -0,0 +1,106 @@ +#pragma once + +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +struct TAgentAcquireDevicesCachedRequest +{ + TString AgentId; + NProto::TAcquireDevicesRequest Request; + TInstant RequestTime; +}; + +struct TAgentReleaseDevicesCachedRequest +{ + TString AgentId; + NProto::TReleaseDevicesRequest Request; +}; + +struct TDevicesAcquireFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + TVector Devices; + NProto::TError Error; + + TDevicesAcquireFinished( + TString diskId, + TString clientId, + TVector sentRequests, + TVector devices, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Devices(std::move(devices)) + , Error(std::move(error)) + {} +}; + +struct TDevicesReleaseFinished +{ + TString DiskId; + TString ClientId; + TVector SentRequests; + NProto::TError Error; + + TDevicesReleaseFinished( + TString diskId, + TString clientId, + TVector sentRequests, + NProto::TError error) + : DiskId(std::move(diskId)) + , ClientId(std::move(clientId)) + , SentRequests(std::move(sentRequests)) + , Error(std::move(error)) + {} +}; + +enum EEvents +{ + EvBegin, + + EvDevicesAcquireFinished, + EvDevicesReleaseFinished, + + EvEnd +}; + +using TEvDevicesAcquireFinished = + TRequestEvent; + +using TEvDevicesReleaseFinished = + TRequestEvent; + +struct TAcquireReleaseDevicesInfo +{ + TVector Devices; + TString DiskId; + TString ClientId; + std::optional + AccessMode; // Only AcquireDevicesActor need it. + std::optional MountSeqNumber; // Only AcquireDevicesActor need it. + ui32 VolumeGeneration; + TDuration RequestTimeout; + bool MuteIOErrors; +}; + +TActorId CreateAcquireDevicesActor( + const NActors::TActorContext& ctx, + const TActorId& owner, + TAcquireReleaseDevicesInfo acquireDevicesInfo, + NActors::NLog::EComponent component); + +TActorId CreateReleaseDevicesActor( + const NActors::TActorContext& ctx, + const TActorId& owner, + TAcquireReleaseDevicesInfo releaseDevicesInfo, + NActors::NLog::EComponent component); + +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 4f1c472d4f..51a23fb1e7 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -444,4 +444,15 @@ ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request) return request.Record.GetVolumeRequestId(); } +TString LogDevices(const TVector& devices) +{ + TStringBuilder sb; + sb << "( "; + for (const auto& d: devices) { + sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; + } + sb << ")"; + return sb; +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index 0fbecf7cbc..f8ecfb9032 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -223,4 +223,5 @@ TBlockRange64 BuildRequestBlockRange( ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); +TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/release_devices_actor.cpp b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp new file mode 100644 index 0000000000..de97e1f539 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/release_devices_actor.cpp @@ -0,0 +1,256 @@ +#include "acquire_release_devices_actors.h" + +#include +#include + +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices { + +//////////////////////////////////////////////////////////////////////////////// + +using namespace NActors; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TReleaseDevicesActor final + : public TActorBootstrapped +{ +private: + const TActorId Owner; + TVector Devices; + const TString DiskId; + const TString ClientId; + const ui32 VolumeGeneration; + const TDuration RequestTimeout; + bool MuteIOErrors; + NLog::EComponent Component; + + int PendingRequests = 0; + + TVector SentReleaseRequests; + +public: + TReleaseDevicesActor( + const TActorId& owner, + TAcquireReleaseDevicesInfo releaseDevicesInfo, + NLog::EComponent component); + + void Bootstrap(const TActorContext& ctx); + +private: + void PrepareRequest(NProto::TReleaseDevicesRequest& request); + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + + void OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx); + + void HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + TString LogTargets() const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TReleaseDevicesActor::TReleaseDevicesActor( + const TActorId& owner, + TAcquireReleaseDevicesInfo releaseDevicesInfo, + NLog::EComponent component) + : Owner(owner) + , Devices(std::move(releaseDevicesInfo.Devices)) + , DiskId(std::move(releaseDevicesInfo.DiskId)) + , ClientId(std::move(releaseDevicesInfo.ClientId)) + , VolumeGeneration(releaseDevicesInfo.VolumeGeneration) + , RequestTimeout(releaseDevicesInfo.RequestTimeout) + , MuteIOErrors(releaseDevicesInfo.MuteIOErrors) + , Component(component) +{} + +void TReleaseDevicesActor::PrepareRequest( + NProto::TReleaseDevicesRequest& request) +{ + request.MutableHeaders()->SetClientId(ClientId); + request.SetDiskId(DiskId); + request.SetVolumeGeneration(VolumeGeneration); +} + +void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + SortBy(Devices, [](auto& d) { return d.GetNodeId(); }); + + auto it = Devices.begin(); + while (it != Devices.end()) { + auto request = + std::make_unique(); + NProto::TReleaseDevicesRequest requestCopy; + PrepareRequest(request->Record); + PrepareRequest(requestCopy); + + const ui32 nodeId = it->GetNodeId(); + const TString& agentId = it->GetAgentId(); + + for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { + *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); + *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); + } + + ++PendingRequests; + SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); + NCloud::Send( + ctx, + MakeDiskAgentServiceId(nodeId), + std::move(request), + nodeId); + } + + ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); +} + +void TReleaseDevicesActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + NCloud::Send( + ctx, + Owner, + std::make_unique( + DiskId, + ClientId, + std::move(SentReleaseRequests), + std::move(error))); + + Die(ctx); +} + +void TReleaseDevicesActor::OnReleaseResponse( + const TActorContext& ctx, + ui64 cookie, + NProto::TError error) +{ + Y_ABORT_UNLESS(PendingRequests > 0); + + if (HasError(error)) { + LOG_LOG( + ctx, + MuteIOErrors ? NLog::PRI_WARN : NLog::PRI_ERROR, + Component, + "ReleaseDevices %s error: %s, %llu", + LogTargets().c_str(), + FormatError(error).c_str(), + cookie); + } + + if (--PendingRequests == 0) { + ReplyAndDie(ctx, {}); + } +} + +void TReleaseDevicesActor::HandleReleaseDevicesResponse( + const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); +} + +void TReleaseDevicesActor::HandleReleaseDevicesUndelivery( + const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); +} + +void TReleaseDevicesActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); +} + +void TReleaseDevicesActor::HandleTimeout( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + const auto err = TStringBuilder() + << "TReleaseDevicesActor timeout." << " DiskId: " << DiskId + << " ClientId: " << ClientId + << " Targets: " << LogTargets() + << " VolumeGeneration: " << VolumeGeneration + << " PendingRequests: " << PendingRequests; + + LOG_WARN(ctx, Component, err); + + ReplyAndDie(ctx, MakeError(E_TIMEOUT, std::move(err))); +} + +STFUNC(TReleaseDevicesActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleTimeout); + + HFunc( + TEvDiskAgent::TEvReleaseDevicesResponse, + HandleReleaseDevicesResponse); + HFunc( + TEvDiskAgent::TEvReleaseDevicesRequest, + HandleReleaseDevicesUndelivery); + + default: + HandleUnexpectedEvent(ev, Component); + break; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TString TReleaseDevicesActor::LogTargets() const +{ + return LogDevices(Devices); +} + +} // namespace + +TActorId CreateReleaseDevicesActor( + const NActors::TActorContext& ctx, + const TActorId& owner, + TAcquireReleaseDevicesInfo releaseDevicesInfo, + NActors::NLog::EComponent component) +{ + return NCloud::Register( + ctx, + owner, + releaseDevicesInfo, + component); +} +} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices diff --git a/cloud/blockstore/libs/storage/core/ya.make b/cloud/blockstore/libs/storage/core/ya.make index 6467cd9719..1ca815aaee 100644 --- a/cloud/blockstore/libs/storage/core/ya.make +++ b/cloud/blockstore/libs/storage/core/ya.make @@ -3,6 +3,7 @@ LIBRARY() GENERATE_ENUM_SERIALIZATION(mount_token.h) SRCS( + acquire_devices_actor.cpp block_handler.cpp compaction_map.cpp compaction_options.cpp @@ -19,6 +20,7 @@ SRCS( pending_request.cpp probes.cpp proto_helpers.cpp + release_devices_actor.cpp request_buffer.cpp request_info.cpp storage_request_counters.cpp diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 5f251d85cb..e80bdafa9a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -879,21 +879,9 @@ bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize) //////////////////////////////////////////////////////////////////////////////// -TString LogDevices(const TVector& devices) -{ - TStringBuilder sb; - sb << "( "; - for (const auto& d: devices) { - sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " "; - } - sb << ")"; - return sb; -} - -//////////////////////////////////////////////////////////////////////////////// - void TDiskRegistryActor::OnDiskAcquired( - TVector sentAcquireRequests) + TVector + sentAcquireRequests) { for (auto& sentRequest: sentAcquireRequests) { TCachedAcquireRequests& cachedRequests = @@ -906,7 +894,8 @@ void TDiskRegistryActor::OnDiskAcquired( } void TDiskRegistryActor::OnDiskReleased( - const TVector& sentReleaseRequests) + const TVector& + sentReleaseRequests) { auto& acquireCacheByAgentId = State->GetAcquireCacheByAgentId(); for (const auto& [agentId, releaseRequest]: sentReleaseRequests) { diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h index 32a19889b3..56a67c0493 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -242,11 +243,13 @@ class TDiskRegistryActor final NProto::TError error); void ProcessAutomaticallyReplacedDevices(const NActors::TActorContext& ctx); - void OnDiskAcquired( - TVector sentAcquireRequests); + TVector + sentAcquireRequests); void OnDiskReleased( - const TVector& sentReleaseRequests); + const TVector< + NAcquireReleaseDevices::TAgentReleaseDevicesCachedRequest>& + sentReleaseRequests); void OnDiskDeallocated(const TDiskId& diskId); void SendCachedAcquireRequestsToAgent( const NActors::TActorContext& ctx, @@ -503,6 +506,5 @@ class TDiskRegistryActor final // BLOCKSTORE_DISK_REGISTRY_COUNTER bool ToLogicalBlocks(NProto::TDeviceConfig& device, ui32 logicalBlockSize); -TString LogDevices(const TVector& devices); } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp new file mode 100644 index 0000000000..2870b91781 --- /dev/null +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_acquire_release.cpp @@ -0,0 +1,514 @@ +#include "disk_registry_actor.h" + +#include +#include +#include + +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr::NTabletFlatExecutor; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TVector ExtractDevicesFromDiskInfo(TDiskInfo& diskInfo) +{ + TVector devices = std::move(diskInfo.Devices); + devices.reserve( + devices.size() * (diskInfo.Replicas.size() + 1) + + diskInfo.Migrations.size()); + + for (auto& migration: diskInfo.Migrations) { + devices.emplace_back(std::move(*migration.MutableTargetDevice())); + } + for (auto& replica: diskInfo.Replicas) { + devices.insert( + devices.end(), + std::make_move_iterator(replica.begin()), + std::make_move_iterator(replica.end())); + } + + return devices; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TAcquireReleaseDiskProxyActor final + : public TActorBootstrapped +{ +public: + enum class EOperationType + { + AcquireDisk, + ReleaseDisk, + }; + +private: + const TActorId Owner; + + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo AcquireReleaseInfo; + + const ui32 LogicalBlockSize; + + TRequestInfoPtr RequestInfo; + + EOperationType OperationType; + + std::optional WorkerId; + + std::optional> + OperationFinishedResponce; + +public: + TAcquireReleaseDiskProxyActor( + const TActorId& owner, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo acquireReleaseInfo, + ui32 logicalBlockSize, + TRequestInfoPtr requestInfo, + EOperationType operationType); + + void Bootstrap(const TActorContext& ctx); + +private: + template + void SendOperationFinishedToOwner( + const TActorContext& ctx, + const TEventType& ev) + { + auto* msg = ev->Get(); + + WorkerId = std::nullopt; + + OperationFinishedResponce = *msg; + auto request = std::make_unique( + msg->DiskId, + msg->ClientId, + msg->SentRequests); + NCloud::Send(ctx, Owner, std::move(request)); + } + + template + TEventType* GetFinishedOperationResponce() + { + return OperationFinishedResponce.has_value() + ? &std::get(OperationFinishedResponce.value()) + : nullptr; + } + + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + void ReplyAndDieAcquire(const TActorContext& ctx, NProto::TError error); + void ReplyAndDieRelease(const TActorContext& ctx, NProto::TError error); + +private: + STFUNC(StateWork); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleFinishAcquireDiskResponse( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleRemoveDiskSessionResponse( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TAcquireReleaseDiskProxyActor::TAcquireReleaseDiskProxyActor( + const TActorId& owner, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo acquireReleaseInfo, + ui32 logicalBlockSize, + TRequestInfoPtr requestInfo, + EOperationType operationType) + : Owner(owner) + , AcquireReleaseInfo(std::move(acquireReleaseInfo)) + , LogicalBlockSize(logicalBlockSize) + , RequestInfo(std::move(requestInfo)) + , OperationType(operationType) +{} + +void TAcquireReleaseDiskProxyActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + WorkerId = [&]() + { + switch (OperationType) { + case EOperationType::AcquireDisk: + return NAcquireReleaseDevices::CreateAcquireDevicesActor( + ctx, + ctx.SelfID, + std::move(AcquireReleaseInfo), + TBlockStoreComponents::DISK_REGISTRY_WORKER); + case EOperationType::ReleaseDisk: + return NAcquireReleaseDevices::CreateReleaseDevicesActor( + ctx, + ctx.SelfID, + std::move(AcquireReleaseInfo), + TBlockStoreComponents::DISK_REGISTRY_WORKER); + } + }(); +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + switch (OperationType) { + case EOperationType::AcquireDisk: + ReplyAndDieAcquire(ctx, std::move(error)); + return; + case EOperationType::ReleaseDisk: + ReplyAndDieRelease(ctx, std::move(error)); + return; + } +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDieAcquire( + const TActorContext& ctx, + NProto::TError error) +{ + auto* msg = GetFinishedOperationResponce< + NAcquireReleaseDevices::TDevicesAcquireFinished>(); + + auto response = std::make_unique( + !HasError(error) && msg ? std::move(msg->Error) : std::move(error)); + + if (!HasError(response->GetError()) && msg) { + response->Record.MutableDevices()->Reserve(msg->Devices.size()); + + for (auto& device: msg->Devices) { + ToLogicalBlocks(device, LogicalBlockSize); + *response->Record.AddDevices() = std::move(device); + } + } + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + NCloud::Send( + ctx, + Owner, + std::make_unique()); + Die(ctx); +} + +void TAcquireReleaseDiskProxyActor::ReplyAndDieRelease( + const TActorContext& ctx, + NProto::TError error) +{ + auto* msg = GetFinishedOperationResponce< + NAcquireReleaseDevices::TDevicesReleaseFinished>(); + + auto response = std::make_unique( + !HasError(error) && msg ? std::move(msg->Error) : std::move(error)); + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + + NCloud::Send( + ctx, + Owner, + std::make_unique()); + Die(ctx); +} + +STFUNC(TAcquireReleaseDiskProxyActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc( + NAcquireReleaseDevices::TEvDevicesAcquireFinished, + HandleDevicesAcquireFinished); + HFunc( + TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse, + HandleFinishAcquireDiskResponse); + + HFunc( + NAcquireReleaseDevices::TEvDevicesReleaseFinished, + HandleDevicesReleaseFinished); + HFunc( + TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse, + HandleRemoveDiskSessionResponse); + + default: + HandleUnexpectedEvent( + ev, + TBlockStoreComponents::DISK_REGISTRY_WORKER); + break; + } +} + +void TAcquireReleaseDiskProxyActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + if (WorkerId) { + NCloud::Send( + ctx, + WorkerId.value(), + std::make_unique()); + } + + ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); +} + +void TAcquireReleaseDiskProxyActor::HandleDevicesAcquireFinished( + const NAcquireReleaseDevices::TEvDevicesAcquireFinished::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == EOperationType::AcquireDisk); + SendOperationFinishedToOwner< + TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest>(ctx, ev); +} + +void TAcquireReleaseDiskProxyActor::HandleFinishAcquireDiskResponse( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == EOperationType::AcquireDisk); + Y_UNUSED(ev); + + ReplyAndDie(ctx, {}); +} + +void TAcquireReleaseDiskProxyActor::HandleDevicesReleaseFinished( + const NAcquireReleaseDevices::TEvDevicesReleaseFinished::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == EOperationType::ReleaseDisk); + SendOperationFinishedToOwner< + TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>(ctx, ev); +} + +void TAcquireReleaseDiskProxyActor::HandleRemoveDiskSessionResponse( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(OperationType == EOperationType::ReleaseDisk); + ReplyAndDie(ctx, ev->Get()->GetError()); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleAcquireDisk( + const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(AcquireDisk); + + auto replyWithError = [&](auto error) + { + auto response = + std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *ev, std::move(response)); + }; + + const auto* msg = ev->Get(); + + auto clientId = msg->Record.GetHeaders().GetClientId(); + auto diskId = msg->Record.GetDiskId(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received AcquireDisk request: " + "DiskId=%s, ClientId=%s, AccessMode=%u, MountSeqNumber=%lu" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + static_cast(msg->Record.GetAccessMode()), + msg->Record.GetMountSeqNumber(), + msg->Record.GetVolumeGeneration()); + + TDiskInfo diskInfo; + if (auto error = State->StartAcquireDisk(diskId, diskInfo); HasError(error)) + { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%s] AcquireDisk %s error: %s", + clientId.c_str(), + diskId.c_str(), + FormatError(error).c_str()); + + replyWithError(std::move(error)); + return; + } + + if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "AcquireeDisk %s. Nothing to acquire", + diskId.c_str()); + + State->FinishAcquireDisk(diskId); + replyWithError(MakeError(S_ALREADY, "Nothing to acquire")); + return; + } + + TVector devices = ExtractDevicesFromDiskInfo(diskInfo); + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo{ + .Devices = std::move(devices), + .DiskId = std::move(diskId), + .ClientId = std::move(clientId), + .AccessMode = msg->Record.GetAccessMode(), + .MountSeqNumber = msg->Record.GetMountSeqNumber(), + .VolumeGeneration = msg->Record.GetVolumeGeneration(), + .RequestTimeout = Config->GetAgentRequestTimeout(), + .MuteIOErrors = false, + }, + diskInfo.LogicalBlockSize, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + TAcquireReleaseDiskProxyActor::EOperationType::AcquireDisk); + Actors.insert(actor); +} + +void TDiskRegistryActor::HandleFinishAcquireDisk( + const TEvDiskRegistryPrivate::TEvFinishAcquireDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + State->FinishAcquireDisk(msg->DiskId); + + OnDiskAcquired(std::move(msg->SentRequests)); + + auto response = std::make_unique< + TEvDiskRegistryPrivate::TEvFinishAcquireDiskResponse>(); + NCloud::Reply(ctx, *ev, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TDiskRegistryActor::HandleReleaseDisk( + const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); + + auto replyWithError = [&](auto error) + { + auto response = + std::make_unique( + std::move(error)); + NCloud::Reply(ctx, *ev, std::move(response)); + }; + + auto* msg = ev->Get(); + TString& diskId = *msg->Record.MutableDiskId(); + TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); + ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); + + LOG_DEBUG( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" + ", VolumeGeneration=%u", + TabletID(), + diskId.c_str(), + clientId.c_str(), + volumeGeneration); + + if (!clientId) { + replyWithError(MakeError(E_ARGUMENT, "empty client id")); + return; + } + + if (!diskId) { + replyWithError(MakeError(E_ARGUMENT, "empty disk id")); + return; + } + + TDiskInfo diskInfo; + + if (const auto error = State->GetDiskInfo(diskId, diskInfo); + HasError(error)) + { + LOG_ERROR( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. GetDiskInfo error: %s", + diskId.c_str(), + FormatError(error).c_str()); + + replyWithError(error); + return; + } + + if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { + LOG_WARN( + ctx, + TBlockStoreComponents::DISK_REGISTRY, + "ReleaseDisk %s. Nothing to release", + diskId.c_str()); + + replyWithError(MakeError(S_ALREADY, "Nothing to acquire")); + return; + } + + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + NAcquireReleaseDevices::TAcquireReleaseDevicesInfo{ + .Devices = ExtractDevicesFromDiskInfo(diskInfo), + .DiskId = std::move(diskId), + .ClientId = std::move(clientId), + .AccessMode = std::nullopt, + .MountSeqNumber = std::nullopt, + .VolumeGeneration = msg->Record.GetVolumeGeneration(), + .RequestTimeout = Config->GetAgentRequestTimeout(), + .MuteIOErrors = false, + }, + diskInfo.LogicalBlockSize, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + TAcquireReleaseDiskProxyActor::EOperationType::ReleaseDisk); + Actors.insert(actor); +} + +void TDiskRegistryActor::HandleRemoveDiskSession( + const TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + OnDiskReleased(msg->SentRequests); + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + + State->FinishAcquireDisk(msg->DiskId); + auto response = std::make_unique< + TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse>(); + NCloud::Reply(ctx, *ev, std::move(response)); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp deleted file mode 100644 index 2cff5a6d1a..0000000000 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor_release.cpp +++ /dev/null @@ -1,373 +0,0 @@ -#include "disk_registry_actor.h" - -#include - -#include - -namespace NCloud::NBlockStore::NStorage { - -using namespace NActors; - -using namespace NKikimr::NTabletFlatExecutor; - -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -class TReleaseDiskActor final - : public TActorBootstrapped -{ -private: - const TActorId Owner; - const TRequestInfoPtr RequestInfo; - const TString DiskId; - const TString ClientId; - const ui32 VolumeGeneration; - const TDuration RequestTimeout; - - TVector Devices; - int PendingRequests = 0; - - TVector SentReleaseRequests; - -public: - TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices); - - void Bootstrap(const TActorContext& ctx); - -private: - void PrepareRequest(NProto::TReleaseDevicesRequest& request); - void RemoveDiskSession(const TActorContext& ctx); - void ReplyAndDie(const TActorContext& ctx, NProto::TError error); - - void OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error); - -private: - STFUNC(StateWork); - - void HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx); - - void HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx); - - TString LogTargets() const; -}; - -//////////////////////////////////////////////////////////////////////////////// - -TReleaseDiskActor::TReleaseDiskActor( - const TActorId& owner, - TRequestInfoPtr requestInfo, - TString diskId, - TString clientId, - ui32 volumeGeneration, - TDuration requestTimeout, - TVector devices) - : Owner(owner) - , RequestInfo(std::move(requestInfo)) - , DiskId(std::move(diskId)) - , ClientId(std::move(clientId)) - , VolumeGeneration(volumeGeneration) - , RequestTimeout(requestTimeout) - , Devices(std::move(devices)) -{} - -void TReleaseDiskActor::PrepareRequest(NProto::TReleaseDevicesRequest& request) -{ - request.MutableHeaders()->SetClientId(ClientId); - request.SetDiskId(DiskId); - request.SetVolumeGeneration(VolumeGeneration); -} - -void TReleaseDiskActor::Bootstrap(const TActorContext& ctx) -{ - Become(&TThis::StateWork); - - SortBy(Devices, [] (auto& d) { - return d.GetNodeId(); - }); - - auto it = Devices.begin(); - while (it != Devices.end()) { - auto request = - std::make_unique(); - NProto::TReleaseDevicesRequest requestCopy; - PrepareRequest(request->Record); - PrepareRequest(requestCopy); - - const ui32 nodeId = it->GetNodeId(); - const TString& agentId = it->GetAgentId(); - - for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) { - *request->Record.AddDeviceUUIDs() = it->GetDeviceUUID(); - *requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID(); - } - - ++PendingRequests; - SentReleaseRequests.emplace_back(agentId, std::move(requestCopy)); - NCloud::Send( - ctx, - MakeDiskAgentServiceId(nodeId), - std::move(request), - nodeId); - } - - ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup()); -} - -void TReleaseDiskActor::RemoveDiskSession(const TActorContext& ctx) -{ - auto request = - std::make_unique( - DiskId, - ClientId, - std::move(SentReleaseRequests)); - - NCloud::Send(ctx, Owner, std::move(request)); -} - -void TReleaseDiskActor::ReplyAndDie(const TActorContext& ctx, NProto::TError error) -{ - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *RequestInfo, std::move(response)); - - NCloud::Send( - ctx, - Owner, - std::make_unique()); - - Die(ctx); -} - -void TReleaseDiskActor::OnReleaseResponse( - const TActorContext& ctx, - ui64 cookie, - NProto::TError error) -{ - Y_ABORT_UNLESS(PendingRequests > 0); - - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, - "ReleaseDevices %s error: %s, %llu", - LogTargets().c_str(), - FormatError(error).c_str(), - cookie); - } - - if (--PendingRequests == 0) { - RemoveDiskSession(ctx); - } -} - -void TReleaseDiskActor::HandleReleaseDevicesResponse( - const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError()); -} - -void TReleaseDiskActor::HandleReleaseDevicesUndelivery( - const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev, - const TActorContext& ctx) -{ - OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered")); -} - -void TReleaseDiskActor::HandleRemoveDiskSessionResponse( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - ReplyAndDie(ctx, msg->GetError()); -} - -void TReleaseDiskActor::HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__)); -} - -void TReleaseDiskActor::HandleTimeout( - const TEvents::TEvWakeup::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - const auto err = TStringBuilder() - << "TReleaseDiskActor timeout." - << " DiskId: " << DiskId - << " ClientId: " << ClientId - << " Targets: " << LogTargets() - << " VolumeGeneration: " << VolumeGeneration - << " PendingRequests: " << PendingRequests; - - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY_WORKER, err); - - ReplyAndDie(ctx, MakeError(E_TIMEOUT, err)); -} - -STFUNC(TReleaseDiskActor::StateWork) -{ - switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvents::TEvWakeup, HandleTimeout); - - HFunc(TEvDiskAgent::TEvReleaseDevicesResponse, - HandleReleaseDevicesResponse); - HFunc(TEvDiskAgent::TEvReleaseDevicesRequest, - HandleReleaseDevicesUndelivery); - - HFunc(TEvDiskRegistryPrivate::TEvRemoveDiskSessionResponse, - HandleRemoveDiskSessionResponse); - - default: - HandleUnexpectedEvent(ev, TBlockStoreComponents::DISK_REGISTRY_WORKER); - break; - } -} - -//////////////////////////////////////////////////////////////////////////////// - -TString TReleaseDiskActor::LogTargets() const -{ - return LogDevices(Devices); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -void TDiskRegistryActor::HandleReleaseDisk( - const TEvDiskRegistry::TEvReleaseDiskRequest::TPtr& ev, - const TActorContext& ctx) -{ - BLOCKSTORE_DISK_REGISTRY_COUNTER(ReleaseDisk); - - auto replyWithError = [&] (auto error) { - auto response = std::make_unique( - std::move(error)); - NCloud::Reply(ctx, *ev, std::move(response)); - }; - - auto* msg = ev->Get(); - TString& diskId = *msg->Record.MutableDiskId(); - TString& clientId = *msg->Record.MutableHeaders()->MutableClientId(); - ui32 volumeGeneration = msg->Record.GetVolumeGeneration(); - - LOG_DEBUG(ctx, TBlockStoreComponents::DISK_REGISTRY, - "[%lu] Received ReleaseDisk request: DiskId=%s, ClientId=%s" - ", VolumeGeneration=%u", - TabletID(), - diskId.c_str(), - clientId.c_str(), - volumeGeneration); - - if (!clientId) { - replyWithError(MakeError(E_ARGUMENT, "empty client id")); - return; - } - - if (!diskId) { - replyWithError(MakeError(E_ARGUMENT, "empty disk id")); - return; - } - - TDiskInfo diskInfo; - const auto error = State->GetDiskInfo(diskId, diskInfo); - if (HasError(error)) { - LOG_ERROR(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. GetDiskInfo error: %s", - diskId.c_str(), - FormatError(error).c_str()); - - replyWithError(error); - return; - } - - if (!State->FilterDevicesAtUnavailableAgents(diskInfo)) { - LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY, - "ReleaseDisk %s. Nothing to release", - diskId.c_str()); - - replyWithError(MakeError(S_ALREADY, {})); - return; - } - - TVector devices = std::move(diskInfo.Devices); - for (auto& migration: diskInfo.Migrations) { - devices.push_back(std::move(*migration.MutableTargetDevice())); - } - for (auto& replica: diskInfo.Replicas) { - for (auto& device: replica) { - devices.push_back(std::move(device)); - } - } - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - auto actor = NCloud::Register( - ctx, - ctx.SelfID, - std::move(requestInfo), - std::move(diskId), - std::move(clientId), - volumeGeneration, - Config->GetAgentRequestTimeout(), - std::move(devices)); - - Actors.insert(actor); -} - -void TDiskRegistryActor::HandleRemoveDiskSession( - const TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest::TPtr& ev, - const TActorContext& ctx) -{ - const auto* msg = ev->Get(); - - OnDiskReleased(msg->SentRequests); - - auto requestInfo = CreateRequestInfo( - ev->Sender, - ev->Cookie, - msg->CallContext); - - State->FinishAcquireDisk(msg->DiskId); - auto response = - std::make_unique(); - NCloud::Reply(ctx, *ev, std::move(response)); -} - -} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h index 1c51f0d77b..f91bee7b1a 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_private.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -74,19 +75,6 @@ struct TUserNotificationKey //////////////////////////////////////////////////////////////////////////////// -struct TAgentAcquireDevicesCachedRequest -{ - TString AgentId; - NProto::TAcquireDevicesRequest Request; - TInstant RequestTime; -}; - -struct TAgentReleaseDevicesCachedRequest -{ - TString AgentId; - NProto::TReleaseDevicesRequest Request; -}; - struct TCachedAcquireKey { TString DiskId; @@ -104,8 +92,9 @@ struct TCachedAcquireKey } }; -using TCachedAcquireRequests = - TMap; +using TCachedAcquireRequests = TMap< + TCachedAcquireKey, + NAcquireReleaseDevices::TAgentAcquireDevicesCachedRequest>; //////////////////////////////////////////////////////////////////////////////// @@ -212,12 +201,14 @@ struct TEvDiskRegistryPrivate { TString DiskId; TString ClientId; - TVector SentRequests; + TVector + SentRequests; TFinishAcquireDiskRequest( - TString diskId, - TString clientId, - TVector sentRequests) + TString diskId, + TString clientId, + TVector + sentRequests) : DiskId(std::move(diskId)) , ClientId(std::move(clientId)) , SentRequests(std::move(sentRequests)) @@ -235,12 +226,14 @@ struct TEvDiskRegistryPrivate { TString DiskId; TString ClientId; - TVector SentRequests; + TVector + SentRequests; TRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) + TString diskId, + TString clientId, + TVector + sentRequests) : DiskId(std::move(diskId)) , ClientId(std::move(clientId)) , SentRequests(std::move(sentRequests)) diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp index 51714d32e5..7c2fc1d3d7 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp @@ -3470,15 +3470,13 @@ NProto::TError TDiskRegistryState::StartAcquireDisk( void TDiskRegistryState::FinishAcquireDisk(const TString& diskId) { - auto it = Disks.find(diskId); + auto* diskPtr = Disks.FindPtr(diskId); - if (it == Disks.end()) { + if (!diskPtr) { return; } - auto& disk = it->second; - - disk.AcquireInProgress = false; + diskPtr->AcquireInProgress = false; } bool TDiskRegistryState::IsAcquireInProgress(const TString& diskId) const diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp index a8446be4a7..13f731e31c 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_ut_session.cpp @@ -507,7 +507,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) runtime->SetObserverFunc( [&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case TEvDiskRegistryPrivate::EvFinishAcquireDiskResponse: { + case NAcquireReleaseDevices::EvDevicesAcquireFinished: { finished = true; break; } @@ -618,7 +618,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) { diskRegistry.SendAcquireDiskRequest("disk-1", "session-1"); auto response = diskRegistry.RecvAcquireDiskResponse(); - UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_ALREADY, response->GetStatus()); UNIT_ASSERT_VALUES_EQUAL(0, response->Record.DevicesSize()); } @@ -631,7 +631,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) NProto::VOLUME_ACCESS_READ_ONLY); auto response = diskRegistry.RecvAcquireDiskResponse(); - UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_ALREADY, response->GetStatus()); UNIT_ASSERT_VALUES_EQUAL(0, response->Record.DevicesSize()); } @@ -676,10 +676,7 @@ Y_UNIT_TEST_SUITE(TDiskRegistryTest) } { - auto response = diskRegistry.RemoveDiskSession( - "disk-1", - "session-1", - TVector()); + auto response = diskRegistry.ReleaseDisk("disk-1", "session-1"); UNIT_ASSERT(!HasError(response->GetError())); } } diff --git a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h index 1539d257c0..2c5d82fe82 100644 --- a/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h +++ b/cloud/blockstore/libs/storage/disk_registry/testlib/test_env.h @@ -780,18 +780,6 @@ class TDiskRegistryClient std::move(devices)); } - auto CreateRemoveDiskSessionRequest( - TString diskId, - TString clientId, - TVector sentRequests) - { - return std::make_unique< - TEvDiskRegistryPrivate::TEvRemoveDiskSessionRequest>( - std::move(diskId), - std::move(clientId), - std::move(sentRequests)); - } - auto CreateUpdateAgentStatsRequest(NProto::TAgentStats stats) { auto request = std::make_unique(); diff --git a/cloud/blockstore/libs/storage/disk_registry/ya.make b/cloud/blockstore/libs/storage/disk_registry/ya.make index fc8d924270..49000e240d 100644 --- a/cloud/blockstore/libs/storage/disk_registry/ya.make +++ b/cloud/blockstore/libs/storage/disk_registry/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - disk_registry_actor_acquire.cpp + disk_registry_actor_acquire_release.cpp disk_registry_actor_allocate.cpp disk_registry_actor_backup_state.cpp disk_registry_actor_change_disk_device.cpp @@ -34,7 +34,6 @@ SRCS( disk_registry_actor_query_available_storage.cpp disk_registry_actor_register.cpp disk_registry_actor_regular.cpp - disk_registry_actor_release.cpp disk_registry_actor_replace.cpp disk_registry_actor_restore_state.cpp disk_registry_actor_resume_device.cpp