Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2956: add checkRange request #2960

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cloud/blockstore/libs/service/auth_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ TPermissionList GetRequestPermissions(EBlockStoreRequest requestType)
case EBlockStoreRequest::ZeroBlocks:
case EBlockStoreRequest::ReadBlocksLocal:
case EBlockStoreRequest::WriteBlocksLocal:
case EBlockStoreRequest::CheckRange:
return CreatePermissionList({});

case EBlockStoreRequest::MountVolume:
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/service/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ using TWriteBlocksLocalResponse = TWriteBlocksResponse;
xxx(CreateVolumeFromDevice, __VA_ARGS__) \
xxx(ResumeDevice, __VA_ARGS__) \
xxx(QueryAgentsInfo, __VA_ARGS__) \
xxx(CheckRange, __VA_ARGS__) \
// BLOCKSTORE_GRPC_STORAGE_SERVICE

#define BLOCKSTORE_ENDPOINT_SERVICE(xxx, ...) \
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/service/request_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ constexpr bool IsControlRequest(EBlockStoreRequest requestType)
case EBlockStoreRequest::ReadBlocksLocal:
case EBlockStoreRequest::WriteBlocksLocal:
case EBlockStoreRequest::QueryAvailableStorage:
case EBlockStoreRequest::CheckRange:
return false;
case EBlockStoreRequest::CreateVolume:
case EBlockStoreRequest::DestroyVolume:
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/api/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(GetChangedBlocks, __VA_ARGS__) \
xxx(ReadBlocksLocal, __VA_ARGS__) \
xxx(WriteBlocksLocal, __VA_ARGS__) \
xxx(CheckRange, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE

// requests forwarded from volume to partion
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/api/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ struct TEvService
EvAddTagsRequest = EvBegin + 91,
EvAddTagsResponse = EvBegin + 92,

EvCheckRangeRequest = EvBegin + 93,
EvCheckRangeResponse = EvBegin + 94,

EvEnd
};

Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/storage/api/volume.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace NCloud::NBlockStore::NStorage {
xxx(GetCheckpointStatus, __VA_ARGS__) \
xxx(ReadBlocksLocal, __VA_ARGS__) \
xxx(WriteBlocksLocal, __VA_ARGS__) \
xxx(CheckRange, __VA_ARGS__) \
// BLOCKSTORE_VOLUME_REQUESTS_FWD_SERVICE

// responses which are forwarded back via volume (volume has handlers for these)
Expand Down Expand Up @@ -79,6 +80,7 @@ namespace NCloud::NBlockStore::NStorage {
xxx(GetCheckpointStatus, __VA_ARGS__) \
xxx(ReadBlocksLocal, __VA_ARGS__) \
xxx(WriteBlocksLocal, __VA_ARGS__) \
xxx(CheckRange, __VA_ARGS__) \
// BLOCKSTORE_VOLUME_HANDLED_RESPONSES_FWD_SERVICE

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -335,6 +337,9 @@ struct TEvVolume
EvGracefulShutdownRequest = EvBegin + 60,
EvGracefulShutdownResponse = EvBegin + 61,

EvCheckRangeRequest = EvBegin + 62,
EvCheckRangeResponse = EvBegin + 63,

EvEnd
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ NCloud::NProto::TError TDeviceClient::AccessDevice(
// migration might be in progress even for an unmounted volume
acquired = !IsReadWriteMode(accessMode)
|| deviceState->WriterSession.Id.empty();
} else if (clientId == CheckHealthClientId) {
} else if (clientId == CheckHealthClientId || clientId == CheckRangeClientId) {
acquired = accessMode == NProto::VOLUME_ACCESS_READ_ONLY;
} else {
acquired = clientId == deviceState->WriterSession.Id;
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/disk_agent/model/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ constexpr TStringBuf BackgroundOpsClientId = "migration";
constexpr TStringBuf CheckHealthClientId = "check-health";
constexpr TStringBuf AnyWriterClientId = "any-writer";
constexpr TStringBuf ShadowDiskClientId = "shadow-disk-client";
constexpr TStringBuf CheckRangeClientId = "check-range";

} // namespace NCloud::NBlockStore::NStorage
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ class TPartitionActor final
TDuration retryTimeout,
TBlockBuffer blockBuffer);

NActors::IActorPtr CreateCheckRangeActor(
NActors::TActorId tablet,
ui64 startIndex,
ui64 blocksCount,
TEvService::TEvCheckRangeRequest::TPtr ev);

private:
STFUNC(StateBoot);
STFUNC(StateInit);
Expand Down
218 changes: 218 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor_checkrange.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/core/probes.h>

#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>

#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
#include <util/generic/guid.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>
#include <util/generic/xrange.h>
#include <util/stream/str.h>

namespace NCloud::NBlockStore::NStorage::NPartition {

using namespace NActors;

using namespace NKikimr;

LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER);

namespace {

////////////////////////////////////////////////////////////////////////////////

class TCheckRangeActor final: public TActorBootstrapped<TCheckRangeActor>
{
private:
const TActorId Tablet;
const ui64 StartIndex;
const ui64 BlocksCount;
const TEvService::TEvCheckRangeRequest::TPtr Ev;

public:
TCheckRangeActor(
const TActorId& tablet,
ui64 startIndex,
ui64 blocksCount,
TEvService::TEvCheckRangeRequest::TPtr&& ev);

void Bootstrap(const TActorContext& ctx);

private:
void ReplyAndDie(
const TActorContext& ctx,
const NProto::TError& error = {});

void HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx);

void SendReadBlocksRequest(const TActorContext& ctx);

private:
STFUNC(StateWork);

void HandleWakeup(
const TEvents::TEvWakeup::TPtr& ev,
const TActorContext& ctx);

void HandlePoisonPill(
const TEvents::TEvPoisonPill::TPtr& ev,
const TActorContext& ctx);
};

////////////////////////////////////////////////////////////////////////////////

TCheckRangeActor::TCheckRangeActor(
const TActorId& tablet,
ui64 startIndex,
ui64 blocksCount,
TEvService::TEvCheckRangeRequest::TPtr&& ev)
: Tablet(tablet)
, StartIndex(startIndex)
, BlocksCount(blocksCount)
, Ev(std::move(ev))
{}

void TCheckRangeActor::Bootstrap(const TActorContext& ctx)
{
SendReadBlocksRequest(ctx);
Become(&TThis::StateWork);
}

void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx)
{
auto request = std::make_unique<TEvService::TEvReadBlocksRequest>();

request->Record.SetStartIndex(StartIndex);
request->Record.SetBlocksCount(BlocksCount);

auto* headers = request->Record.MutableHeaders();

headers->SetIsBackgroundRequest(true);
NCloud::Send(ctx, Tablet, std::move(request));
}

void TCheckRangeActor::ReplyAndDie(
const TActorContext& ctx,
const NProto::TError& error)
{
auto response =
std::make_unique<TEvService::TEvCheckRangeResponse>(std::move(error));

NCloud::Reply(ctx, *Ev, std::move(response));

Die(ctx);
}

////////////////////////////////////////////////////////////////////////////////

STFUNC(TCheckRangeActor::StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvWakeup, HandleWakeup);
HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse);
default:
HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER);
break;
}
}

void TCheckRangeActor::HandleWakeup(
const TEvents::TEvWakeup::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);
SendReadBlocksRequest(ctx);
}

void TCheckRangeActor::HandlePoisonPill(
const TEvents::TEvPoisonPill::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);

auto error = MakeError(E_REJECTED, "tablet is shutting down");

ReplyAndDie(ctx, error);
}

void TCheckRangeActor::HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
auto error = MakeError(S_OK);

if (HasError(msg->Record.GetError())) {
auto errorMessage = msg->Record.GetError().GetMessage();
LOG_ERROR(
ctx,
TBlockStoreComponents::VOLUME,
"reading error has occurred: " + errorMessage + " message " +
msg->Record.GetError().message());
auto errorCode =
msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO;
error = MakeError(errorCode, msg->Record.GetError().GetMessage());
}

ReplyAndDie(ctx, error);
}

} // namespace

//////////////////////////////////////////////////////////////////

NActors::IActorPtr TPartitionActor::CreateCheckRangeActor(
NActors::TActorId tablet,
ui64 startIndex,
ui64 blocksCount,
TEvService::TEvCheckRangeRequest::TPtr ev)
{
return std::make_unique<NPartition::TCheckRangeActor>(
std::move(tablet),
startIndex,
blocksCount,
std::move(ev));
}

void NPartition::TPartitionActor::HandleCheckRange(
const TEvService::TEvCheckRangeRequest::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();

if (msg->Record.GetBlocksCount() >
Config->GetBytesPerStripe() / State->GetBlockSize())
{
auto err = MakeError(
E_ARGUMENT,
"Too many blocks requested: " +
std::to_string(msg->Record.GetBlocksCount()) +
" Max blocks per request : " +
std::to_string(
Config->GetBytesPerStripe() / State->GetBlockSize()));
auto response =
std::make_unique<TEvService::TEvCheckRangeResponse>(std::move(err));
NCloud::Reply(ctx, *ev, std::move(response));
return;
}

const auto actorId = NCloud::Register(
ctx,
CreateCheckRangeActor(
SelfId(),
msg->Record.GetStartIndex(),
msg->Record.GetBlocksCount(),
std::move(ev)));

Actors.Insert(actorId);
}

} // namespace NCloud::NBlockStore::NStorage::NPartition
Loading
Loading