Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2725: mv acquire/release disk actors from DR to libs/storage/core #2855

Closed
wants to merge 11 commits into from

Large diffs are not rendered by default.

106 changes: 106 additions & 0 deletions cloud/blockstore/libs/storage/core/acquire_release_devices_actors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <cloud/blockstore/libs/kikimr/events.h>
#include <cloud/blockstore/libs/storage/api/disk_agent.h>

#include <contrib/ydb/library/actors/core/actor.h>
#include <contrib/ydb/library/actors/interconnect/types.h>

namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices {

struct TAgentAcquireDevicesCachedRequest
{
TString AgentId;
NProto::TAcquireDevicesRequest Request;
TInstant RequestTime;
};

struct TAgentReleaseDevicesCachedRequest
{
TString AgentId;
NProto::TReleaseDevicesRequest Request;
};

struct TDevicesAcquireFinished
{
TString DiskId;
TString ClientId;
TVector<TAgentAcquireDevicesCachedRequest> SentRequests;
TVector<NProto::TDeviceConfig> Devices;
NProto::TError Error;

TDevicesAcquireFinished(
TString diskId,
TString clientId,
TVector<TAgentAcquireDevicesCachedRequest> sentRequests,
TVector<NProto::TDeviceConfig> devices,
NProto::TError error)
: DiskId(std::move(diskId))
, ClientId(std::move(clientId))
, SentRequests(std::move(sentRequests))
, Devices(std::move(devices))
, Error(std::move(error))
{}
};

struct TDevicesReleaseFinished
{
TString DiskId;
TString ClientId;
TVector<TAgentReleaseDevicesCachedRequest> SentRequests;
NProto::TError Error;

TDevicesReleaseFinished(
TString diskId,
TString clientId,
TVector<TAgentReleaseDevicesCachedRequest> sentRequests,
NProto::TError error)
: DiskId(std::move(diskId))
, ClientId(std::move(clientId))
, SentRequests(std::move(sentRequests))
, Error(std::move(error))
{}
};

enum EEvents
{
EvBegin,

EvDevicesAcquireFinished,
EvDevicesReleaseFinished,

EvEnd
};

using TEvDevicesAcquireFinished =
TRequestEvent<TDevicesAcquireFinished, EvDevicesAcquireFinished>;

using TEvDevicesReleaseFinished =
TRequestEvent<TDevicesReleaseFinished, EvDevicesReleaseFinished>;

struct TAcquireReleaseDevicesInfo
{
TVector<NProto::TDeviceConfig> Devices;
TString DiskId;
TString ClientId;
std::optional<NProto::EVolumeAccessMode>
AccessMode; // Only AcquireDevicesActor need it.
std::optional<ui64> MountSeqNumber; // Only AcquireDevicesActor need it.
ui32 VolumeGeneration;
TDuration RequestTimeout;
bool MuteIOErrors;
};

TActorId CreateAcquireDevicesActor(
const NActors::TActorContext& ctx,
const TActorId& owner,
TAcquireReleaseDevicesInfo acquireDevicesInfo,
NActors::NLog::EComponent component);

TActorId CreateReleaseDevicesActor(
const NActors::TActorContext& ctx,
const TActorId& owner,
TAcquireReleaseDevicesInfo releaseDevicesInfo,
NActors::NLog::EComponent component);

} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,4 +444,15 @@ ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request)
return request.Record.GetVolumeRequestId();
}

TString LogDevices(const TVector<NProto::TDeviceConfig>& devices)
{
TStringBuilder sb;
sb << "( ";
for (const auto& d: devices) {
sb << d.GetDeviceUUID() << "@" << d.GetAgentId() << " ";
}
sb << ")";
return sb;
}

} // namespace NCloud::NBlockStore::NStorage
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,5 @@ TBlockRange64 BuildRequestBlockRange(
ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

TString LogDevices(const TVector<NProto::TDeviceConfig>& devices);
} // namespace NCloud::NBlockStore::NStorage
256 changes: 256 additions & 0 deletions cloud/blockstore/libs/storage/core/release_devices_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
#include "acquire_release_devices_actors.h"

#include <cloud/blockstore/libs/storage/core/proto_helpers.h>
#include <cloud/storage/core/libs/actors/helpers.h>

#include <contrib/ydb/library/actors/core/actor.h>
#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
#include <contrib/ydb/library/actors/core/log.h>

#include <util/string/join.h>

namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices {

////////////////////////////////////////////////////////////////////////////////

using namespace NActors;

namespace {

////////////////////////////////////////////////////////////////////////////////

class TReleaseDevicesActor final
: public TActorBootstrapped<TReleaseDevicesActor>
{
private:
const TActorId Owner;
TVector<NProto::TDeviceConfig> Devices;
const TString DiskId;
const TString ClientId;
const ui32 VolumeGeneration;
const TDuration RequestTimeout;
bool MuteIOErrors;
NLog::EComponent Component;

int PendingRequests = 0;

TVector<TAgentReleaseDevicesCachedRequest> SentReleaseRequests;

public:
TReleaseDevicesActor(
const TActorId& owner,
TAcquireReleaseDevicesInfo releaseDevicesInfo,
NLog::EComponent component);

void Bootstrap(const TActorContext& ctx);

private:
void PrepareRequest(NProto::TReleaseDevicesRequest& request);
void ReplyAndDie(const TActorContext& ctx, NProto::TError error);

void OnReleaseResponse(
const TActorContext& ctx,
ui64 cookie,
NProto::TError error);

private:
STFUNC(StateWork);

void HandlePoisonPill(
const TEvents::TEvPoisonPill::TPtr& ev,
const TActorContext& ctx);

void HandleReleaseDevicesResponse(
const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev,
const TActorContext& ctx);

void HandleReleaseDevicesUndelivery(
const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev,
const TActorContext& ctx);

void HandleTimeout(
const TEvents::TEvWakeup::TPtr& ev,
const TActorContext& ctx);

TString LogTargets() const;
};

////////////////////////////////////////////////////////////////////////////////

TReleaseDevicesActor::TReleaseDevicesActor(
const TActorId& owner,
TAcquireReleaseDevicesInfo releaseDevicesInfo,
NLog::EComponent component)
: Owner(owner)
, Devices(std::move(releaseDevicesInfo.Devices))
, DiskId(std::move(releaseDevicesInfo.DiskId))
, ClientId(std::move(releaseDevicesInfo.ClientId))
, VolumeGeneration(releaseDevicesInfo.VolumeGeneration)
, RequestTimeout(releaseDevicesInfo.RequestTimeout)
, MuteIOErrors(releaseDevicesInfo.MuteIOErrors)
, Component(component)
{}

void TReleaseDevicesActor::PrepareRequest(
NProto::TReleaseDevicesRequest& request)
{
request.MutableHeaders()->SetClientId(ClientId);
request.SetDiskId(DiskId);
request.SetVolumeGeneration(VolumeGeneration);
}

void TReleaseDevicesActor::Bootstrap(const TActorContext& ctx)
{
Become(&TThis::StateWork);

SortBy(Devices, [](auto& d) { return d.GetNodeId(); });

auto it = Devices.begin();
while (it != Devices.end()) {
auto request =
std::make_unique<TEvDiskAgent::TEvReleaseDevicesRequest>();
NProto::TReleaseDevicesRequest requestCopy;
PrepareRequest(request->Record);
PrepareRequest(requestCopy);

const ui32 nodeId = it->GetNodeId();
const TString& agentId = it->GetAgentId();

for (; it != Devices.end() && it->GetNodeId() == nodeId; ++it) {
*request->Record.AddDeviceUUIDs() = it->GetDeviceUUID();
*requestCopy.AddDeviceUUIDs() = it->GetDeviceUUID();
}

++PendingRequests;
SentReleaseRequests.emplace_back(agentId, std::move(requestCopy));
NCloud::Send(
ctx,
MakeDiskAgentServiceId(nodeId),
std::move(request),
nodeId);
}

ctx.Schedule(RequestTimeout, new TEvents::TEvWakeup());
}

void TReleaseDevicesActor::ReplyAndDie(
const TActorContext& ctx,
NProto::TError error)
{
NCloud::Send(
ctx,
Owner,
std::make_unique<TEvDevicesReleaseFinished>(
DiskId,
ClientId,
std::move(SentReleaseRequests),
std::move(error)));

Die(ctx);
}

void TReleaseDevicesActor::OnReleaseResponse(
const TActorContext& ctx,
ui64 cookie,
NProto::TError error)
{
Y_ABORT_UNLESS(PendingRequests > 0);

if (HasError(error)) {
LOG_LOG(
ctx,
MuteIOErrors ? NLog::PRI_WARN : NLog::PRI_ERROR,
Component,
"ReleaseDevices %s error: %s, %llu",
LogTargets().c_str(),
FormatError(error).c_str(),
cookie);
}

if (--PendingRequests == 0) {
ReplyAndDie(ctx, {});
}
}

void TReleaseDevicesActor::HandleReleaseDevicesResponse(
const TEvDiskAgent::TEvReleaseDevicesResponse::TPtr& ev,
const TActorContext& ctx)
{
OnReleaseResponse(ctx, ev->Cookie, ev->Get()->GetError());
}

void TReleaseDevicesActor::HandleReleaseDevicesUndelivery(
const TEvDiskAgent::TEvReleaseDevicesRequest::TPtr& ev,
const TActorContext& ctx)
{
OnReleaseResponse(ctx, ev->Cookie, MakeError(E_REJECTED, "not delivered"));
}

void TReleaseDevicesActor::HandlePoisonPill(
const TEvents::TEvPoisonPill::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);

ReplyAndDie(ctx, MakeTabletIsDeadError(E_REJECTED, __LOCATION__));
}

void TReleaseDevicesActor::HandleTimeout(
const TEvents::TEvWakeup::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);

const auto err = TStringBuilder()
<< "TReleaseDevicesActor timeout." << " DiskId: " << DiskId
<< " ClientId: " << ClientId
<< " Targets: " << LogTargets()
<< " VolumeGeneration: " << VolumeGeneration
<< " PendingRequests: " << PendingRequests;

LOG_WARN(ctx, Component, err);

ReplyAndDie(ctx, MakeError(E_TIMEOUT, std::move(err)));
}

STFUNC(TReleaseDevicesActor::StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvWakeup, HandleTimeout);

HFunc(
TEvDiskAgent::TEvReleaseDevicesResponse,
HandleReleaseDevicesResponse);
HFunc(
TEvDiskAgent::TEvReleaseDevicesRequest,
HandleReleaseDevicesUndelivery);

default:
HandleUnexpectedEvent(ev, Component);
break;
}
}

////////////////////////////////////////////////////////////////////////////////

TString TReleaseDevicesActor::LogTargets() const
{
return LogDevices(Devices);
}

} // namespace

TActorId CreateReleaseDevicesActor(
const NActors::TActorContext& ctx,
const TActorId& owner,
TAcquireReleaseDevicesInfo releaseDevicesInfo,
NActors::NLog::EComponent component)
{
return NCloud::Register<TReleaseDevicesActor>(
ctx,
owner,
releaseDevicesInfo,
component);
}
} // namespace NCloud::NBlockStore::NStorage::NAcquireReleaseDevices
Loading
Loading