diff --git a/cloud/blockstore/libs/service/auth_scheme.cpp b/cloud/blockstore/libs/service/auth_scheme.cpp index 196bd8149bf..21b83795b1c 100644 --- a/cloud/blockstore/libs/service/auth_scheme.cpp +++ b/cloud/blockstore/libs/service/auth_scheme.cpp @@ -74,6 +74,7 @@ TPermissionList GetRequestPermissions(EBlockStoreRequest requestType) case EBlockStoreRequest::ZeroBlocks: case EBlockStoreRequest::ReadBlocksLocal: case EBlockStoreRequest::WriteBlocksLocal: + case EBlockStoreRequest::CheckRange: return CreatePermissionList({}); case EBlockStoreRequest::MountVolume: diff --git a/cloud/blockstore/libs/service/request.h b/cloud/blockstore/libs/service/request.h index 872254e3f53..cda967b4cf9 100644 --- a/cloud/blockstore/libs/service/request.h +++ b/cloud/blockstore/libs/service/request.h @@ -89,6 +89,7 @@ using TWriteBlocksLocalResponse = TWriteBlocksResponse; xxx(CreateVolumeFromDevice, __VA_ARGS__) \ xxx(ResumeDevice, __VA_ARGS__) \ xxx(QueryAgentsInfo, __VA_ARGS__) \ + xxx(CheckRange, __VA_ARGS__) \ // BLOCKSTORE_GRPC_STORAGE_SERVICE #define BLOCKSTORE_ENDPOINT_SERVICE(xxx, ...) \ diff --git a/cloud/blockstore/libs/service/request_helpers.h b/cloud/blockstore/libs/service/request_helpers.h index 6694ca2bd16..f2de82795ad 100644 --- a/cloud/blockstore/libs/service/request_helpers.h +++ b/cloud/blockstore/libs/service/request_helpers.h @@ -280,6 +280,7 @@ constexpr bool IsControlRequest(EBlockStoreRequest requestType) case EBlockStoreRequest::ReadBlocksLocal: case EBlockStoreRequest::WriteBlocksLocal: case EBlockStoreRequest::QueryAvailableStorage: + case EBlockStoreRequest::CheckRange: return false; case EBlockStoreRequest::CreateVolume: case EBlockStoreRequest::DestroyVolume: diff --git a/cloud/blockstore/libs/storage/api/partition.h b/cloud/blockstore/libs/storage/api/partition.h index d1a11c40452..4a9446ec07b 100644 --- a/cloud/blockstore/libs/storage/api/partition.h +++ b/cloud/blockstore/libs/storage/api/partition.h @@ -25,6 +25,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition { xxx(GetChangedBlocks, __VA_ARGS__) \ xxx(ReadBlocksLocal, __VA_ARGS__) \ xxx(WriteBlocksLocal, __VA_ARGS__) \ + xxx(CheckRange, __VA_ARGS__) \ // BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE // requests forwarded from volume to partion diff --git a/cloud/blockstore/libs/storage/api/service.h b/cloud/blockstore/libs/storage/api/service.h index f56a9f9a4b0..570d4271098 100644 --- a/cloud/blockstore/libs/storage/api/service.h +++ b/cloud/blockstore/libs/storage/api/service.h @@ -343,6 +343,9 @@ struct TEvService EvAddTagsRequest = EvBegin + 91, EvAddTagsResponse = EvBegin + 92, + EvCheckRangeRequest = EvBegin + 93, + EvCheckRangeResponse = EvBegin + 94, + EvEnd }; diff --git a/cloud/blockstore/libs/storage/api/volume.h b/cloud/blockstore/libs/storage/api/volume.h index 515bf915ae1..dc618679be4 100644 --- a/cloud/blockstore/libs/storage/api/volume.h +++ b/cloud/blockstore/libs/storage/api/volume.h @@ -51,6 +51,7 @@ namespace NCloud::NBlockStore::NStorage { xxx(GetCheckpointStatus, __VA_ARGS__) \ xxx(ReadBlocksLocal, __VA_ARGS__) \ xxx(WriteBlocksLocal, __VA_ARGS__) \ + xxx(CheckRange, __VA_ARGS__) \ // BLOCKSTORE_VOLUME_REQUESTS_FWD_SERVICE // responses which are forwarded back via volume (volume has handlers for these) @@ -79,6 +80,7 @@ namespace NCloud::NBlockStore::NStorage { xxx(GetCheckpointStatus, __VA_ARGS__) \ xxx(ReadBlocksLocal, __VA_ARGS__) \ xxx(WriteBlocksLocal, __VA_ARGS__) \ + xxx(CheckRange, __VA_ARGS__) \ // BLOCKSTORE_VOLUME_HANDLED_RESPONSES_FWD_SERVICE //////////////////////////////////////////////////////////////////////////////// @@ -335,6 +337,9 @@ struct TEvVolume EvGracefulShutdownRequest = EvBegin + 60, EvGracefulShutdownResponse = EvBegin + 61, + EvCheckRangeRequest = EvBegin + 62, + EvCheckRangeResponse = EvBegin + 63, + EvEnd }; diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp index fa94027f2b5..b68cc8a4996 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp @@ -286,7 +286,7 @@ NCloud::NProto::TError TDeviceClient::AccessDevice( // migration might be in progress even for an unmounted volume acquired = !IsReadWriteMode(accessMode) || deviceState->WriterSession.Id.empty(); - } else if (clientId == CheckHealthClientId) { + } else if (clientId == CheckHealthClientId || clientId == CheckRangeClientId) { acquired = accessMode == NProto::VOLUME_ACCESS_READ_ONLY; } else { acquired = clientId == deviceState->WriterSession.Id; diff --git a/cloud/blockstore/libs/storage/disk_agent/model/public.h b/cloud/blockstore/libs/storage/disk_agent/model/public.h index 5cdc3ccd062..ab6239703e0 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/public.h +++ b/cloud/blockstore/libs/storage/disk_agent/model/public.h @@ -18,5 +18,6 @@ constexpr TStringBuf BackgroundOpsClientId = "migration"; constexpr TStringBuf CheckHealthClientId = "check-health"; constexpr TStringBuf AnyWriterClientId = "any-writer"; constexpr TStringBuf ShadowDiskClientId = "shadow-disk-client"; +constexpr TStringBuf CheckRangeClientId = "check-range"; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index e3ded32352e..01cff7ad309 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -431,6 +431,12 @@ class TPartitionActor final TDuration retryTimeout, TBlockBuffer blockBuffer); + NActors::IActorPtr CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev); + private: STFUNC(StateBoot); STFUNC(StateInit); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_checkrange.cpp b/cloud/blockstore/libs/storage/partition/part_actor_checkrange.cpp new file mode 100644 index 00000000000..bd9de5f7c80 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_actor_checkrange.cpp @@ -0,0 +1,222 @@ +#include "part_actor.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TActorId Tablet; + const ui64 StartIndex; + const ui64 BlocksCount; + const TEvService::TEvCheckRangeRequest::TPtr Ev; + +public: + TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error = {}); + + void HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx); + + void SendReadBlocksRequest(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev) + : Tablet(tablet) + , StartIndex(startIndex) + , BlocksCount(blocksCount) + , Ev(std::move(ev)) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + SendReadBlocksRequest(ctx); + Become(&TThis::StateWork); +} + +void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx) +{ + auto request = std::make_unique(); + + request->Record.SetStartIndex(StartIndex); + request->Record.SetBlocksCount(BlocksCount); + + auto* headers = request->Record.MutableHeaders(); + + headers->SetIsBackgroundRequest(true); + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error) +{ + auto response = + std::make_unique(std::move(error)); + response->Record.MutableStatus()->CopyFrom(status); + + NCloud::Reply(ctx, *Ev, std::move(response)); + + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleWakeup); + HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse); + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER); + break; + } +} + +void TCheckRangeActor::HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + SendReadBlocksRequest(ctx); +} + +void TCheckRangeActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + auto error = MakeError(E_REJECTED, "tablet is shutting down"); + + ReplyAndDie(ctx, error); +} + +void TCheckRangeActor::HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + auto status = MakeError(S_OK); + + if (HasError(msg->Record.GetError())) { + const auto& errorMessage = msg->Record.GetError().GetMessage(); + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "reading error has occurred: " + errorMessage); + const auto& errorCode = + msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO; + status = MakeError(errorCode, msg->Record.GetError().GetMessage()); + } + + ReplyAndDie(ctx, status); +} + +} // namespace + +////////////////////////////////////////////////////////////////// + +NActors::IActorPtr TPartitionActor::CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev) +{ + return std::make_unique( + std::move(tablet), + startIndex, + blocksCount, + std::move(ev)); +} + +void NPartition::TPartitionActor::HandleCheckRange( + const TEvService::TEvCheckRangeRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + ui64 BlocksPerStripe = Config->GetBytesPerStripe() / State->GetBlockSize(); + // We process 4 MB of data at a time. + const ui64 maxBlocksPerRequest = + std::min(BlocksPerStripe, 4_MB / State->GetBlockSize()); + + if (msg->Record.GetBlocksCount() > maxBlocksPerRequest) { + auto err = MakeError( + E_ARGUMENT, + "Too many blocks requested: " + + std::to_string(msg->Record.GetBlocksCount()) + + " Max blocks per request : " + + std::to_string(maxBlocksPerRequest)); + auto response = + std::make_unique(std::move(err)); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + const auto actorId = NCloud::Register( + ctx, + CreateCheckRangeActor( + SelfId(), + msg->Record.GetStartIndex(), + msg->Record.GetBlocksCount(), + std::move(ev))); + + Actors.Insert(actorId); +} + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index e2c97ce07b6..b1eae657792 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -939,6 +939,16 @@ class TPartitionClient return std::make_unique(); } + std::unique_ptr + CreateCheckRangeRequest(TString id, ui32 startIndex, ui32 size) + { + auto request = std::make_unique(); + request->Record.SetDiskId(id); + request->Record.SetStartIndex(startIndex); + request->Record.SetBlocksCount(size); + return request; + } + #define BLOCKSTORE_DECLARE_METHOD(name, ns) \ template \ void Send##name##Request(Args&&... args) \ @@ -11541,6 +11551,210 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // checking that drain-related counters are in a consistent state partition.Drain(); } + + Y_UNIT_TEST(ShouldCheckRange) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(0, 1024 * 10), + 1); + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1024 * 5, 1024 * 11), + 1); + + const auto step = 16; + for (ui32 i = 1024 * 10; i < 1024 * 12; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step), 1); + } + + for (ui32 i = 1024 * 20; i < 1024 * 21; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step + 1), 1); + } + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1001111, 1001210), + 1); + + partition.ZeroBlocks(TBlockRange32::MakeClosedInterval(1024, 3023)); + partition.ZeroBlocks(TBlockRange32::MakeClosedInterval(5024, 5033)); + + ui32 status = -1; + ui32 error = -1; + + runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + status = msg->Record.GetStatus().GetCode(); + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + + const auto response = partition.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime->DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldCheckRangeWithBrokenBlocks) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(0, 1024 * 10), + 1); + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1024 * 5, 1024 * 11), + 1); + + const auto step = 16; + for (ui32 i = 1024 * 10; i < 1024 * 12; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step), 1); + } + + for (ui32 i = 1024 * 20; i < 1024 * 21; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step + 1), 1); + } + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1001111, 1001210), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + status = msg->Record.GetStatus().GetCode(); + error = msg->Record.GetError().GetCode(); + + break; + } + case TEvService::EvReadBlocksResponse: { + using TEv = TEvService::TEvReadBlocksResponse; + + auto response = std::make_unique( + MakeError(E_IO, "block is broken")); + + runtime->Send( + new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie), + 0); + + return TTestActorRuntime::EEventAction::DROP; + + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + + partition.SendCheckRangeRequest("id", idx, size); + const auto response = + partition.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + UNIT_ASSERT_VALUES_EQUAL(E_IO, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldSuccessfullyCheckRangeIfDiskIsEmpty) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + const ui32 idx = 0; + const ui32 size = 1; + const auto response = partition.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime->DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->Record.GetStatus().GetCode()); + } + + Y_UNIT_TEST(ShouldntCheckRangeWithBigBlockCount) + { + constexpr ui32 blockCount = 1024 * 1024; + constexpr ui32 bytesPerStripe = 1024; + NProto::TStorageServiceConfig config; + config.SetBytesPerStripe(bytesPerStripe); + auto runtime = PrepareTestActorRuntime(config, blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + const ui32 idx = 0; + + partition.SendCheckRangeRequest( + "id", + idx, + bytesPerStripe / DefaultBlockSize + 1); + const auto response = + partition.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime->DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(E_ARGUMENT, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/ya.make b/cloud/blockstore/libs/storage/partition/ya.make index 99e69d9cc74..64223a2fe58 100644 --- a/cloud/blockstore/libs/storage/partition/ya.make +++ b/cloud/blockstore/libs/storage/partition/ya.make @@ -9,6 +9,7 @@ SRCS( part_actor_addunconfirmedblobs.cpp part_actor_changedblocks.cpp part_actor_checkpoint.cpp + part_actor_checkrange.cpp part_actor_cleanup.cpp part_actor_collectgarbage.cpp part_actor_compaction.cpp diff --git a/cloud/blockstore/libs/storage/partition2/part2_actor.h b/cloud/blockstore/libs/storage/partition2/part2_actor.h index fbcfca59b14..d174775550a 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_actor.h +++ b/cloud/blockstore/libs/storage/partition2/part2_actor.h @@ -554,6 +554,12 @@ class TPartitionActor final const NActors::TActorContext& ctx, TEvPartitionPrivate::TOperationCompleted operation); + NActors::IActorPtr CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev); + bool HandleRequests(STFUNC_SIG); bool RejectRequests(STFUNC_SIG); diff --git a/cloud/blockstore/libs/storage/partition2/part2_actor_checkrange.cpp b/cloud/blockstore/libs/storage/partition2/part2_actor_checkrange.cpp new file mode 100644 index 00000000000..a0e738378ec --- /dev/null +++ b/cloud/blockstore/libs/storage/partition2/part2_actor_checkrange.cpp @@ -0,0 +1,222 @@ +#include "part2_actor.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition2 { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TActorId Tablet; + const ui64 StartIndex; + const ui64 BlocksCount; + const TEvService::TEvCheckRangeRequest::TPtr Ev; + +public: + TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error = {}); + + void HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx); + + void SendReadBlocksRequest(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev) + : Tablet(tablet) + , StartIndex(startIndex) + , BlocksCount(blocksCount) + , Ev(std::move(ev)) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + SendReadBlocksRequest(ctx); + Become(&TThis::StateWork); +} + +void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx) +{ + auto request = std::make_unique(); + + request->Record.SetStartIndex(StartIndex); + request->Record.SetBlocksCount(BlocksCount); + + auto* headers = request->Record.MutableHeaders(); + + headers->SetIsBackgroundRequest(true); + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error) +{ + auto response = + std::make_unique(std::move(error)); + response->Record.MutableStatus()->CopyFrom(status); + + NCloud::Reply(ctx, *Ev, std::move(response)); + + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleWakeup); + HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse); + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER); + break; + } +} + +void TCheckRangeActor::HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + SendReadBlocksRequest(ctx); +} + +void TCheckRangeActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + auto error = MakeError(E_REJECTED, "tablet is shutting down"); + + ReplyAndDie(ctx, error); +} + +void TCheckRangeActor::HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + auto status = MakeError(S_OK); + + if (HasError(msg->Record.GetError())) { + const auto& errorMessage = msg->Record.GetError().GetMessage(); + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "reading error has occurred: " + errorMessage); + const auto& errorCode = + msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO; + status = MakeError(errorCode, msg->Record.GetError().GetMessage()); + } + + ReplyAndDie(ctx, status); +} + +} // namespace + +////////////////////////////////////////////////////////////////// + +NActors::IActorPtr TPartitionActor::CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev) +{ + return std::make_unique( + std::move(tablet), + startIndex, + blocksCount, + std::move(ev)); +} + +void NPartition2::TPartitionActor::HandleCheckRange( + const TEvService::TEvCheckRangeRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + ui64 BlocksPerStripe = Config->GetBytesPerStripe() / State->GetBlockSize(); + // We process 4 MB of data at a time. + const ui64 maxBlocksPerRequest = + std::min(BlocksPerStripe, 4_MB / State->GetBlockSize()); + + if (msg->Record.GetBlocksCount() > maxBlocksPerRequest) { + auto err = MakeError( + E_ARGUMENT, + "Too many blocks requested: " + + std::to_string(msg->Record.GetBlocksCount()) + + " Max blocks per request : " + + std::to_string(maxBlocksPerRequest)); + + auto response = + std::make_unique(std::move(err)); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + const auto actorId = NCloud::Register( + ctx, + CreateCheckRangeActor( + SelfId(), + msg->Record.GetStartIndex(), + msg->Record.GetBlocksCount(), + std::move(ev))); + Actors.insert(actorId); +} + +} // namespace NCloud::NBlockStore::NStorage::NPartition2 diff --git a/cloud/blockstore/libs/storage/partition2/part2_ut.cpp b/cloud/blockstore/libs/storage/partition2/part2_ut.cpp index 3e631ab56d7..1b375a6478f 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_ut.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_ut.cpp @@ -849,6 +849,16 @@ class TPartitionClient return RemoteHttpInfo(params, HTTP_METHOD::HTTP_METHOD_GET); } + std::unique_ptr + CreateCheckRangeRequest(TString id, ui32 startIndex, ui32 size) + { + auto request = std::make_unique(); + request->Record.SetDiskId(id); + request->Record.SetStartIndex(startIndex); + request->Record.SetBlocksCount(size); + return request; + } + #define BLOCKSTORE_PARTITION2_COMMON_REQUESTS_PRIVATE(xxx, ...) \ xxx(TrimFreshLog, __VA_ARGS__) \ // BLOCKSTORE_PARTITION2_COMMON_REQUESTS_PRIVATE @@ -7163,6 +7173,207 @@ Y_UNIT_TEST_SUITE(TPartition2Test) auto response = partition.RecvCompactionResponse(); UNIT_ASSERT_VALUES_EQUAL(E_TRY_AGAIN, response->GetStatus()); } + + Y_UNIT_TEST(ShouldCheckRange) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(0, 1024 * 10), + 1); + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1024 * 5, 1024 * 11), + 1); + + const auto step = 16; + for (ui32 i = 1024 * 10; i < 1024 * 12; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step), 1); + } + + for (ui32 i = 1024 * 20; i < 1024 * 21; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step + 1), 1); + } + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1001111, 1001210), + 1); + + partition.ZeroBlocks(TBlockRange32::MakeClosedInterval(1024, 3023)); + partition.ZeroBlocks(TBlockRange32::MakeClosedInterval(5024, 5033)); + + ui32 status = -1; + ui32 error = -1; + + runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + status = msg->Record.GetStatus().GetCode(); + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + + const auto response = partition.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime->DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldCheckRangeWithBrokenBlocks) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(0, 1024 * 10), + 1); + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1024 * 5, 1024 * 11), + 1); + + const auto step = 16; + for (ui32 i = 1024 * 10; i < 1024 * 12; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step), 1); + } + + for (ui32 i = 1024 * 20; i < 1024 * 21; i += step) { + partition.WriteBlocks(TBlockRange32::WithLength(i, step + 1), 1); + } + + partition.WriteBlocks( + TBlockRange32::MakeClosedInterval(1001111, 1001210), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime->SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + status = msg->Record.GetStatus().GetCode(); + error = msg->Record.GetError().GetCode(); + + break; + } + case TEvService::EvReadBlocksResponse: { + using TEv = TEvService::TEvReadBlocksResponse; + + auto response = std::make_unique( + MakeError(E_IO, "block is broken")); + + runtime->Send( + new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie), + 0); + + return TTestActorRuntime::EEventAction::DROP; + + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + + partition.SendCheckRangeRequest("id", idx, size); + const auto response = + partition.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + UNIT_ASSERT_VALUES_EQUAL(E_IO, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldSuccessfullyCheckRangeIfDiskIsEmpty) + { + constexpr ui32 blockCount = 1024 * 1024; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + const ui32 idx = 0; + const ui32 size = 1; + const auto response = partition.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime->DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->Record.GetStatus().GetCode()); + } + + Y_UNIT_TEST(ShouldntCheckRangeWithBigBlockCount) + { + constexpr ui32 blockCount = 1024 * 1024; + constexpr ui32 bytesPerStripe = 1024; + NProto::TStorageServiceConfig config; + config.SetBytesPerStripe(bytesPerStripe); + auto runtime = PrepareTestActorRuntime(config, blockCount); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + const ui32 idx = 0; + + partition.SendCheckRangeRequest("id", idx, bytesPerStripe/DefaultBlockSize + 1); + const auto response = + partition.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime->DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(E_ARGUMENT, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage::NPartition2 diff --git a/cloud/blockstore/libs/storage/partition2/ya.make b/cloud/blockstore/libs/storage/partition2/ya.make index efb6fe9a418..5fdf76a768a 100644 --- a/cloud/blockstore/libs/storage/partition2/ya.make +++ b/cloud/blockstore/libs/storage/partition2/ya.make @@ -7,6 +7,7 @@ SRCS( part2_actor_addblobs.cpp part2_actor_addgarbage.cpp part2_actor_changedblocks.cpp + part2_actor_checkrange.cpp part2_actor_cleanup.cpp part2_actor_collectgarbage.cpp part2_actor_compaction.cpp diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp index 1931a9da47f..7346a52426d 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.cpp @@ -594,6 +594,7 @@ STFUNC(TMirrorPartitionActor::StateWork) HFunc(TEvVolume::TEvGetRebuildMetadataStatusRequest, HandleGetRebuildMetadataStatus); HFunc(TEvVolume::TEvScanDiskRequest, HandleScanDisk); HFunc(TEvVolume::TEvGetScanDiskStatusRequest, HandleGetScanDiskStatus); + HFunc(TEvService::TEvCheckRangeRequest, HandleCheckRange); HFunc( TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted, diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h index f73c933f045..ee8ffd972a2 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor.h @@ -185,11 +185,18 @@ class TMirrorPartitionActor final TBlockRange64 blockRange, const TStringBuf& methodName); + NActors::IActorPtr CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev); + BLOCKSTORE_IMPLEMENT_REQUEST(ReadBlocks, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(WriteBlocks, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(ReadBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(WriteBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(ZeroBlocks, TEvService); + BLOCKSTORE_IMPLEMENT_REQUEST(CheckRange, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(Drain, NPartition::TEvPartition); BLOCKSTORE_IMPLEMENT_REQUEST(DescribeBlocks, TEvVolume); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_checkrange.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_checkrange.cpp new file mode 100644 index 00000000000..89d35e562dc --- /dev/null +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_actor_checkrange.cpp @@ -0,0 +1,234 @@ +#include "part_mirror_actor.h" + +#include "cloud/blockstore/libs/storage/disk_agent/model/public.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TActorId Tablet; + const ui64 StartIndex; + const ui64 BlocksCount; + const TActorId Sender; + const TEvService::TEvCheckRangeRequest::TPtr Ev; + +public: + TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error = {}); + + void HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx); + + void SendReadBlocksRequest(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev) + : Tablet(tablet) + , StartIndex(startIndex) + , BlocksCount(blocksCount) + , Ev(std::move(ev)) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + SendReadBlocksRequest(ctx); + Become(&TThis::StateWork); +} + +void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx) +{ + const TString clientId = TString(CheckRangeClientId); + auto request = std::make_unique(); + + request->Record.SetStartIndex(StartIndex); + request->Record.SetBlocksCount(BlocksCount); + + auto* headers = request->Record.MutableHeaders(); + + headers->SetClientId(clientId); + headers->SetIsBackgroundRequest(true); + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error) +{ + auto response = + std::make_unique(std::move(error)); + response->Record.MutableStatus()->CopyFrom(status); + + NCloud::Reply(ctx, *Ev, std::move(response)); + + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleWakeup); + HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse); + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER); + break; + } +} + +void TCheckRangeActor::HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + SendReadBlocksRequest(ctx); +} + +void TCheckRangeActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + auto error = MakeError(E_REJECTED, "tablet is shutting down"); + + ReplyAndDie(ctx, error); +} + +void TCheckRangeActor::HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + auto status = MakeError(S_OK); + + if (HasError(msg->Record.GetError())) { + const auto& errorMessage = msg->Record.GetError().GetMessage(); + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "reading error has occurred: " + errorMessage); + const auto& errorCode = + msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO; + status = MakeError(errorCode, msg->Record.GetError().GetMessage()); + } + + ReplyAndDie(ctx, status); +} + +} // namespace + +} // namespace NCloud::NBlockStore::NStorage::NPartition + +namespace NCloud::NBlockStore::NStorage { + +NActors::IActorPtr TMirrorPartitionActor::CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev) +{ + return std::make_unique( + std::move(tablet), + startIndex, + blocksCount, + std::move(ev)); +} + +void TMirrorPartitionActor::HandleCheckRange( + const TEvService::TEvCheckRangeRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + ui64 BlocksPerStripe = Config->GetBytesPerStripe() / State.GetBlockSize(); + // We process 4 MB of data at a time. + const ui64 maxBlocksPerRequest = + std::min(BlocksPerStripe, 4_MB / State.GetBlockSize()); + + if (msg->Record.GetBlocksCount() > maxBlocksPerRequest) { + auto err = MakeError( + E_ARGUMENT, + "Too many blocks requested: " + + std::to_string(msg->Record.GetBlocksCount()) + + " Max blocks per request : " + + std::to_string(maxBlocksPerRequest)); + auto response = + std::make_unique(std::move(err)); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + const auto blockRange = TBlockRange64::WithLength( + msg->Record.GetStartIndex(), + msg->Record.GetBlocksCount()); + + const auto requestIdentityKey = ev->Cookie; + RequestsInProgress.AddReadRequest(requestIdentityKey, blockRange); + + NCloud::Register( + ctx, + CreateCheckRangeActor( + SelfId(), + msg->Record.GetStartIndex(), + msg->Record.GetBlocksCount(), + std::move(ev))); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp index edc6fa5c818..4766ef13823 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp @@ -2005,6 +2005,171 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionTest) } } + Y_UNIT_TEST(ShouldCheckRange) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + client.WriteBlocks( + TBlockRange64::MakeClosedInterval(0, 1024 * 1024), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime.SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + status = msg->Record.GetStatus().GetCode(); + + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + error = -1; + + const auto response = client.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime.DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldCheckRangeWithBrokenBlocks) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + client.WriteBlocks( + TBlockRange64::MakeClosedInterval(0, 1024 * 1024), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime.SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + status = msg->Record.GetStatus().GetCode(); + + break; + } + case TEvService::EvReadBlocksResponse: { + using TEv = TEvService::TEvReadBlocksResponse; + + auto response = std::make_unique( + MakeError(E_IO, "block is broken")); + + runtime.Send( + new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie), + 0); + + return TTestActorRuntime::EEventAction::DROP; + + break; + } + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + error = -1; + + client.SendCheckRangeRequest("id", idx, size); + const auto response = + client.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime.DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(E_IO, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldSuccessfullyCheckRangeIfDiskIsEmpty) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + const ui32 idx = 0; + const ui32 size = 1; + const auto response = client.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->Record.GetStatus().GetCode()); + } + + Y_UNIT_TEST(ShouldntCheckRangeWithBigBlockCount) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + const ui32 idx = 0; + + client.SendCheckRangeRequest("id", idx, 16_MB/DefaultBlockSize + 1); + const auto response = + client.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(E_ARGUMENT, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp index 5adec4da8fe..4174e19fc36 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.cpp @@ -558,6 +558,7 @@ STFUNC(TNonreplicatedPartitionActor::StateWork) HFunc(TEvVolume::TEvGetRebuildMetadataStatusRequest, HandleGetRebuildMetadataStatus); HFunc(TEvVolume::TEvScanDiskRequest, HandleScanDisk); HFunc(TEvVolume::TEvGetScanDiskStatusRequest, HandleGetScanDiskStatus); + HFunc(TEvService::TEvCheckRangeRequest, HandleCheckRange); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.h b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.h index c4ef1576699..2fae2b4ddef 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor.h @@ -181,6 +181,12 @@ class TNonreplicatedPartitionActor final const TEvNonreplPartitionPrivate::TEvChecksumBlocksCompleted::TPtr& ev, const NActors::TActorContext& ctx); + NActors::IActorPtr CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev); + bool HandleRequests(STFUNC_SIG); BLOCKSTORE_IMPLEMENT_REQUEST(ReadBlocks, TEvService); @@ -188,6 +194,7 @@ class TNonreplicatedPartitionActor final BLOCKSTORE_IMPLEMENT_REQUEST(ReadBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(WriteBlocksLocal, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(ZeroBlocks, TEvService); + BLOCKSTORE_IMPLEMENT_REQUEST(CheckRange, TEvService); BLOCKSTORE_IMPLEMENT_REQUEST(DescribeBlocks, TEvVolume); BLOCKSTORE_IMPLEMENT_REQUEST(ChecksumBlocks, TEvNonreplPartitionPrivate); BLOCKSTORE_IMPLEMENT_REQUEST(Drain, NPartition::TEvPartition); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_checkrange.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_checkrange.cpp new file mode 100644 index 00000000000..47d0afd64d7 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_actor_checkrange.cpp @@ -0,0 +1,229 @@ +#include "part_nonrepl_actor.h" + +#include "cloud/blockstore/libs/storage/disk_agent/model/public.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TActorId Tablet; + const ui64 StartIndex; + const ui64 BlocksCount; + const TEvService::TEvCheckRangeRequest::TPtr Ev; + +public: + TCheckRangeActor( + const TActorId& tablet, + ui64 blockId, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error = {}); + + void HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx); + + void SendReadBlocksRequest(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor( + const TActorId& tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr&& ev) + : Tablet(tablet) + , StartIndex(startIndex) + , BlocksCount(blocksCount) + , Ev(std::move(ev)) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + SendReadBlocksRequest(ctx); + Become(&TThis::StateWork); +} + +void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx) +{ + const TString clientId = TString(CheckRangeClientId); + auto request = std::make_unique(); + + request->Record.SetStartIndex(StartIndex); + request->Record.SetBlocksCount(BlocksCount); + + auto* headers = request->Record.MutableHeaders(); + + headers->SetClientId(clientId); + headers->SetIsBackgroundRequest(true); + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& status, + const NProto::TError& error) +{ + auto response = + std::make_unique(std::move(error)); + response->Record.MutableStatus()->CopyFrom(status); + + NCloud::Reply(ctx, *Ev, std::move(response)); + + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvWakeup, HandleWakeup); + HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse); + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER); + break; + } +} + +void TCheckRangeActor::HandleWakeup( + const TEvents::TEvWakeup::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + SendReadBlocksRequest(ctx); +} + +void TCheckRangeActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + auto error = MakeError(E_REJECTED, "tablet is shutting down"); + + ReplyAndDie(ctx, error); +} + +void TCheckRangeActor::HandleReadBlocksResponse( + const TEvService::TEvReadBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + auto status = MakeError(S_OK); + + if (HasError(msg->Record.GetError())) { + const auto& errorMessage = msg->Record.GetError().GetMessage(); + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "reading error has occurred: " + errorMessage); + const auto& errorCode = + msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO; + status = MakeError(errorCode, msg->Record.GetError().GetMessage()); + } + + ReplyAndDie(ctx, status); +} + +} // namespace + +} // namespace NCloud::NBlockStore::NStorage::NPartition + +namespace NCloud::NBlockStore::NStorage { + +NActors::IActorPtr TNonreplicatedPartitionActor::CreateCheckRangeActor( + NActors::TActorId tablet, + ui64 startIndex, + ui64 blocksCount, + TEvService::TEvCheckRangeRequest::TPtr ev) +{ + return std::make_unique( + std::move(tablet), + startIndex, + blocksCount, + std::move(ev)); +} + +void TNonreplicatedPartitionActor::HandleCheckRange( + const TEvService::TEvCheckRangeRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + ui64 BlocksPerStripe = + Config->GetBytesPerStripe() / PartConfig->GetBlockSize(); + // We process 4 MB of data at a time. + const ui64 maxBlocksPerRequest = + std::min(BlocksPerStripe, 4_MB / PartConfig->GetBlockSize()); + + if (msg->Record.GetBlocksCount() > maxBlocksPerRequest) { + auto err = MakeError( + E_ARGUMENT, + "Too many blocks requested: " + + std::to_string(msg->Record.GetBlocksCount()) + + " Max blocks per request : " + + std::to_string(maxBlocksPerRequest)); + auto response = + std::make_unique(std::move(err)); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + const auto actorId = NCloud::Register( + ctx, + CreateCheckRangeActor( + SelfId(), + msg->Record.GetStartIndex(), + msg->Record.GetBlocksCount(), + std::move(ev))); + + RequestsInProgress.AddReadRequest(actorId); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp index 963728f612e..8df07ccb246 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp @@ -1979,6 +1979,167 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionTest) UNIT_ASSERT_VALUES_EQUAL(E_ABORTED, response->Error.GetCode()); } } + + Y_UNIT_TEST(ShouldCheckRange) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + client.WriteBlocks( + TBlockRange64::MakeClosedInterval(0, 1024 * 1024), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime.SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + status = msg->Record.GetStatus().GetCode(); + + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + error = -1; + + const auto response = client.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime.DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, status); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldCheckRangeWithBrokenBlocks) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + client.WriteBlocks( + TBlockRange64::MakeClosedInterval(0, 1024 * 1024), + 1); + + ui32 status = -1; + ui32 error = -1; + + runtime.SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvService::EvCheckRangeResponse: { + using TEv = TEvService::TEvCheckRangeResponse; + const auto* msg = event->Get(); + error = msg->GetStatus(); + break; + } + case TEvService::EvReadBlocksResponse: { + using TEv = TEvService::TEvReadBlocksResponse; + + auto response = std::make_unique( + MakeError(E_IO, "block is broken")); + + runtime.Send( + new IEventHandle( + event->Recipient, + event->Sender, + response.release(), + 0, // flags + event->Cookie), + 0); + + return TTestActorRuntime::EEventAction::DROP; + + break; + } + } + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + const auto checkRange = [&](ui32 idx, ui32 size) + { + status = -1; + + client.SendCheckRangeRequest("id", idx, size); + const auto response = + client.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + runtime.DispatchEvents(options, TDuration::Seconds(3)); + + UNIT_ASSERT_VALUES_EQUAL(E_IO, response->Record.GetStatus().GetCode()); + UNIT_ASSERT_VALUES_EQUAL(S_OK, error); + }; + + checkRange(0, 1024); + checkRange(1024, 512); + checkRange(1, 1); + checkRange(1000, 1000); + } + + Y_UNIT_TEST(ShouldSuccessfullyCheckRangeIfDiskIsEmpty) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + const ui32 idx = 0; + const ui32 size = 1; + const auto response = client.CheckRange("id", idx, size); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->Record.GetStatus().GetCode()); + } + + Y_UNIT_TEST(ShouldntCheckRangeWithBigBlockCount) + { + TTestBasicRuntime runtime; + + TTestEnv env(runtime); + TPartitionClient client(runtime, env.ActorId); + + const ui32 idx = 0; + + client.SendCheckRangeRequest("id", idx, 16_MB/DefaultBlockSize + 1); + const auto response = + client.RecvResponse(); + + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvService::EvCheckRangeResponse); + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL(E_ARGUMENT, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h b/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h index 0adc542ab40..009b5785d03 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/ut_env.h @@ -410,6 +410,16 @@ class TPartitionClient return request; } + std::unique_ptr + CreateCheckRangeRequest(TString id, ui32 startIndex, ui32 size) + { + auto request = std::make_unique(); + request->Record.SetDiskId(id); + request->Record.SetStartIndex(startIndex); + request->Record.SetBlocksCount(size); + return request; + } + #define BLOCKSTORE_DECLARE_METHOD(name, ns) \ template \ @@ -443,6 +453,7 @@ class TPartitionClient BLOCKSTORE_DECLARE_METHOD(ReadBlocksLocal, TEvService); BLOCKSTORE_DECLARE_METHOD(WriteBlocksLocal, TEvService); BLOCKSTORE_DECLARE_METHOD(ZeroBlocks, TEvService); + BLOCKSTORE_DECLARE_METHOD(CheckRange, TEvService); BLOCKSTORE_DECLARE_METHOD(ChecksumBlocks, TEvNonreplPartitionPrivate); #undef BLOCKSTORE_DECLARE_METHOD diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/ya.make b/cloud/blockstore/libs/storage/partition_nonrepl/ya.make index 419ac81e784..faaebc81026 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/ya.make +++ b/cloud/blockstore/libs/storage/partition_nonrepl/ya.make @@ -12,6 +12,7 @@ SRCS( part_mirror.cpp part_mirror_actor.cpp + part_mirror_actor_checkrange.cpp part_mirror_actor_get_device_for_range.cpp part_mirror_actor_mirror.cpp part_mirror_actor_readblocks.cpp @@ -30,6 +31,7 @@ SRCS( part_nonrepl.cpp part_nonrepl_actor.cpp + part_nonrepl_actor_checkrange.cpp part_nonrepl_actor_base_request.cpp part_nonrepl_actor_checksumblocks.cpp part_nonrepl_actor_readblocks.cpp diff --git a/cloud/blockstore/libs/storage/service/service_actor.h b/cloud/blockstore/libs/storage/service/service_actor.h index fafda2279b3..e0e95d31947 100644 --- a/cloud/blockstore/libs/storage/service/service_actor.h +++ b/cloud/blockstore/libs/storage/service/service_actor.h @@ -387,6 +387,10 @@ class TServiceActor final TRequestInfoPtr requestInfo, TString input); + TResultOrError CreateCheckRangeActionActor( + TRequestInfoPtr requestInfo, + TString input); + TResultOrError CreateFlushProfileLogActor( TRequestInfoPtr requestInfo, TString input); diff --git a/cloud/blockstore/libs/storage/service/service_actor_actions.cpp b/cloud/blockstore/libs/storage/service/service_actor_actions.cpp index 8f60820f3e9..9f4880e5f14 100644 --- a/cloud/blockstore/libs/storage/service/service_actor_actions.cpp +++ b/cloud/blockstore/libs/storage/service/service_actor_actions.cpp @@ -41,6 +41,7 @@ void TServiceActor::HandleExecuteAction( {"diskregistrysetwritablestate", &TServiceActor::CreateWritableStateActionActor }, {"backupdiskregistrystate", &TServiceActor::CreateBackupDiskRegistryStateActor }, {"checkblob", &TServiceActor::CreateCheckBlobActionActor }, + {"checkrange", &TServiceActor::CreateCheckRangeActionActor }, {"compactrange", &TServiceActor::CreateCompactRangeActionActor }, {"configurevolumebalancer", &TServiceActor::CreateConfigureVolumeBalancerActionActor }, {"deletecheckpointdata", &TServiceActor::CreateDeleteCheckpointDataActionActor }, diff --git a/cloud/blockstore/libs/storage/service/service_actor_actions_check_range.cpp b/cloud/blockstore/libs/storage/service/service_actor_actions_check_range.cpp new file mode 100644 index 00000000000..f6097e995e7 --- /dev/null +++ b/cloud/blockstore/libs/storage/service/service_actor_actions_check_range.cpp @@ -0,0 +1,180 @@ +#include "service_actor.h" + +#include +#include +#include +#include + +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER) + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TRequestInfoPtr RequestInfo; + const TString Input; + + NPrivateProto::TCheckRangeRequest Request; + +public: + TCheckRangeActor(TRequestInfoPtr requestInfo, TString input); + + void Bootstrap(const TActorContext& ctx); + +private: + void ReplyAndDie( + const TActorContext& ctx, + NProto::TError error, + NPrivateProto::TCheckRangeResponse response); + void ReplyAndDie(const TActorContext& ctx, NProto::TError error); + +private: + STFUNC(StateWork); + + void HandleCheckRangeResponse( + const TEvService::TEvCheckRangeResponse::TPtr& ev, + const TActorContext& ctx); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor(TRequestInfoPtr requestInfo, TString input) + : RequestInfo(std::move(requestInfo)) + , Input(std::move(input)) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + if (!google::protobuf::util::JsonStringToMessage(Input, &Request).ok()) { + ReplyAndDie(ctx, MakeError(E_ARGUMENT, "Failed to parse input")); + return; + } + + if (!Request.GetDiskId()) { + ReplyAndDie(ctx, MakeError(E_ARGUMENT, "DiskId should be supplied")); + return; + } + + if (!Request.GetBlocksCount()) { + ReplyAndDie( + ctx, + MakeError(E_ARGUMENT, "Blocks count should be supplied")); + return; + } + + auto request = std::make_unique(); + request->Record.SetDiskId(Request.GetDiskId()); + request->Record.SetStartIndex(Request.GetStartIndex()); + request->Record.SetBlocksCount(Request.GetBlocksCount()); + + LOG_INFO( + ctx, + TBlockStoreComponents::SERVICE, + "Start check disk range for %s", + Request.GetDiskId().c_str()); + + NCloud::Send( + ctx, + MakeVolumeProxyServiceId(), + std::move(request), + RequestInfo->Cookie); + + Become(&TThis::StateWork); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error) +{ + auto msg = std::make_unique(error); + + google::protobuf::util::MessageToJsonString( + NPrivateProto::TCheckRangeResponse(), + msg->Record.MutableOutput()); + + LWTRACK( + ResponseSent_Service, + RequestInfo->CallContext->LWOrbit, + "ExecuteAction_CheckRange", + RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *RequestInfo, std::move(msg)); + Die(ctx); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + NProto::TError error, + NPrivateProto::TCheckRangeResponse response) +{ + auto msg = std::make_unique(error); + + google::protobuf::util::MessageToJsonString( + response, + msg->Record.MutableOutput()); + + LWTRACK( + ResponseSent_Service, + RequestInfo->CallContext->LWOrbit, + "ExecuteAction_CheckRange", + RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *RequestInfo, std::move(msg)); + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TCheckRangeActor::HandleCheckRangeResponse( + const TEvService::TEvCheckRangeResponse::TPtr& ev, + const TActorContext& ctx) +{ + auto response = NPrivateProto::TCheckRangeResponse(); + response.MutableStatus()->CopyFrom(ev->Get()->Record.GetStatus()); + + return ReplyAndDie( + ctx, + std::move(ev->Get()->Record.GetError()), + std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvService::TEvCheckRangeResponse, HandleCheckRangeResponse); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::SERVICE); + break; + } +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +TResultOrError TServiceActor::CreateCheckRangeActionActor( + TRequestInfoPtr requestInfo, + TString input) +{ + return {std::make_unique( + std::move(requestInfo), + std::move(input))}; +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/service/service_actor_checkrange.cpp b/cloud/blockstore/libs/storage/service/service_actor_checkrange.cpp new file mode 100644 index 00000000000..a1e06bcf437 --- /dev/null +++ b/cloud/blockstore/libs/storage/service/service_actor_checkrange.cpp @@ -0,0 +1,184 @@ +#include "service_actor.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCheckRangeActor final: public TActorBootstrapped +{ +private: + const TRequestInfoPtr RequestInfo; + const TStorageConfigPtr Config; + const TString DiskId; + const ui64 StartIndex; + const ui64 BlocksCount; + +public: + TCheckRangeActor( + TRequestInfoPtr requestInfo, + TStorageConfigPtr config, + TString diskId, + ui64 startIndex, + ui64 blocksCount); + + void Bootstrap(const TActorContext& ctx); + +private: + void CheckRange(const TActorContext& ctx); + void DescribeDiskRegistryVolume(const TActorContext& ctx); + + void HandleCheckRangeResponse( + const TEvService::TEvCheckRangeResponse::TPtr& ev, + const TActorContext& ctx); + + void ReplyAndDie( + const TActorContext& ctx, + std::unique_ptr response); + +private: + STFUNC(StateWork); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCheckRangeActor::TCheckRangeActor( + TRequestInfoPtr requestInfo, + TStorageConfigPtr config, + TString diskId, + ui64 startIndex, + ui64 blocksCount) + : RequestInfo(std::move(requestInfo)) + , Config(std::move(config)) + , DiskId(std::move(diskId)) + , StartIndex(startIndex) + , BlocksCount(blocksCount) +{} + +void TCheckRangeActor::Bootstrap(const TActorContext& ctx) +{ + CheckRange(ctx); +} + +void TCheckRangeActor::CheckRange(const TActorContext& ctx) +{ + Become(&TThis::StateWork); + + auto request = std::make_unique(); + request->Record.SetDiskId(DiskId); + request->Record.SetStartIndex(StartIndex); + request->Record.SetBlocksCount(BlocksCount); + + NCloud::Send( + ctx, + MakeVolumeProxyServiceId(), + std::move(request), + RequestInfo->Cookie); +} + +void TCheckRangeActor::HandleCheckRangeResponse( + const TEvService::TEvCheckRangeResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto& error = ev->Get()->GetError(); + auto response = std::make_unique(error); + response->Record.MutableStatus()->CopyFrom(ev->Get()->Record.GetStatus()); + + ReplyAndDie( + ctx, + std::move(response)); +} + +void TCheckRangeActor::ReplyAndDie( + const TActorContext& ctx, + std::unique_ptr response) +{ + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +STFUNC(TCheckRangeActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvService::TEvCheckRangeResponse, HandleCheckRangeResponse); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::SERVICE); + break; + } +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TServiceActor::HandleCheckRange( + const TEvService::TEvCheckRangeRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + + const auto& request = msg->Record; + + if (request.GetDiskId().empty()) { + LOG_ERROR( + ctx, + TBlockStoreComponents::SERVICE, + "Empty DiskId in CheckRange"); + + auto response = std::make_unique( + MakeError(E_ARGUMENT, "Volume id cannot be empty")); + + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + if (request.GetBlocksCount() == 0) { + LOG_ERROR( + ctx, + TBlockStoreComponents::SERVICE, + "Zero BlockCounts in CheckRange"); + + auto response = std::make_unique( + MakeError(E_ARGUMENT, "BlocksCounts shoud be more than zero")); + + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + LOG_DEBUG( + ctx, + TBlockStoreComponents::SERVICE, + "CheckRange volume: %s ", + request.GetDiskId().Quote().data()); + + NCloud::Register( + ctx, + std::move(requestInfo), + Config, + request.GetDiskId(), + request.GetStartIndex(), + request.GetBlocksCount()); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/service/service_ut_actions.cpp b/cloud/blockstore/libs/storage/service/service_ut_actions.cpp index b07032f06ca..092af3b3b3f 100644 --- a/cloud/blockstore/libs/storage/service/service_ut_actions.cpp +++ b/cloud/blockstore/libs/storage/service/service_ut_actions.cpp @@ -1701,6 +1701,43 @@ Y_UNIT_TEST_SUITE(TServiceActionsTest) 1000); } } + + Y_UNIT_TEST(ShouldCheckRange) + { + TTestEnv env; + NProto::TStorageServiceConfig config; + const ui32 nodeIdx = SetupTestEnv(env, std::move(config)); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateVolume("vol0"); + + const auto sessionId = + service.MountVolume("vol0")->Record.GetSessionId(); + + service.WriteBlocks( + "vol0", + TBlockRange64::WithLength(0, 1024), + sessionId, + char(1)); + + { + NPrivateProto::TCheckRangeRequest request; + request.SetDiskId("vol0"); + request.SetStartIndex(0); + request.SetBlocksCount(1000); + + TString buf; + google::protobuf::util::MessageToJsonString(request, &buf); + + const auto response = service.ExecuteAction("CheckRange", buf); + NPrivateProto::TCheckRangeResponse checkRangeResponse; + + UNIT_ASSERT(google::protobuf::util::JsonStringToMessage( + response->Record.GetOutput(), + &checkRangeResponse) + .ok()); + } + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/service/service_ut_checkrange.cpp b/cloud/blockstore/libs/storage/service/service_ut_checkrange.cpp new file mode 100644 index 00000000000..3965e8e3076 --- /dev/null +++ b/cloud/blockstore/libs/storage/service/service_ut_checkrange.cpp @@ -0,0 +1,75 @@ +#include "service_ut.h" + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TServiceCheckRangeTest) +{ + Y_UNIT_TEST(ShouldCheckRange) + { + TTestEnv env; + NProto::TStorageServiceConfig config; + ui32 nodeIdx = SetupTestEnv(env, std::move(config)); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateVolume( + DefaultDiskId, + 512, + DefaultBlockSize, + "test_folder", + "test_cloud"); + ui64 size = 512; + + auto response = service.CheckRange(DefaultDiskId, 0, size); + UNIT_ASSERT(response->GetStatus() == S_OK); + } + + Y_UNIT_TEST(ShouldFailCheckRangeWithEmptyDiskId) + { + TTestEnv env; + NProto::TStorageServiceConfig config; + ui32 nodeIdx = SetupTestEnv(env, std::move(config)); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateVolume( + DefaultDiskId, + 512, + DefaultBlockSize, + "test_folder", + "test_cloud"); + ui64 size = 512; + + service.SendCheckRangeRequest(TString(), 0, size); + auto response = service.RecvCheckRangeResponse(); + + UNIT_ASSERT(response->GetStatus() == E_ARGUMENT); + } + + Y_UNIT_TEST(ShouldFailCheckRangeWithZeroSize) + { + TTestEnv env; + NProto::TStorageServiceConfig config; + ui32 nodeIdx = SetupTestEnv(env, std::move(config)); + + TServiceClient service(env.GetRuntime(), nodeIdx); + service.CreateVolume( + DefaultDiskId, + 512, + DefaultBlockSize, + "test_folder", + "test_cloud"); + ui64 size = 0; + + service.SendCheckRangeRequest(DefaultDiskId, 0, size); + auto response = service.RecvCheckRangeResponse(); + + UNIT_ASSERT(response->GetStatus() == E_ARGUMENT); + } +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/service/ut/ya.make b/cloud/blockstore/libs/storage/service/ut/ya.make index b054d52530e..29083f82495 100644 --- a/cloud/blockstore/libs/storage/service/ut/ya.make +++ b/cloud/blockstore/libs/storage/service/ut/ya.make @@ -6,6 +6,7 @@ SRCS( service_state_ut.cpp service_ut_actions.cpp service_ut_alter.cpp + service_ut_checkrange.cpp service_ut_create_from_device.cpp service_ut_create.cpp service_ut_describe_model.cpp diff --git a/cloud/blockstore/libs/storage/service/ya.make b/cloud/blockstore/libs/storage/service/ya.make index 21820ee4b38..f526e0d2c09 100644 --- a/cloud/blockstore/libs/storage/service/ya.make +++ b/cloud/blockstore/libs/storage/service/ya.make @@ -9,6 +9,7 @@ SRCS( service_actor_actions_change_disk_device.cpp service_actor_actions_change_storage_config.cpp service_actor_actions_check_blob.cpp + service_actor_actions_check_range.cpp service_actor_actions_cms.cpp service_actor_actions_compact_range.cpp service_actor_actions_configure_volume_balancer.cpp @@ -56,6 +57,7 @@ SRCS( service_actor_assign.cpp service_actor_balancer_stats.cpp service_actor_check_liveness.cpp + service_actor_checkrange.cpp service_actor_client_stats.cpp service_actor_create.cpp service_actor_create_from_device.cpp diff --git a/cloud/blockstore/libs/storage/testlib/service_client.cpp b/cloud/blockstore/libs/storage/testlib/service_client.cpp index 914d9222c5f..8a35e5b42da 100644 --- a/cloud/blockstore/libs/storage/testlib/service_client.cpp +++ b/cloud/blockstore/libs/storage/testlib/service_client.cpp @@ -507,6 +507,19 @@ std::unique_ptr TServiceClient::CreateGetV return request; } +std::unique_ptr TServiceClient::CreateCheckRangeRequest( + const TString& diskId, + const ui64 startIndex, + const ui64 blocksCount) +{ + auto request = std::make_unique(); + request->Record.SetDiskId(diskId); + request->Record.SetStartIndex(startIndex); + request->Record.SetBlocksCount(blocksCount); + + return request; +} + void TServiceClient::WaitForVolume(const TString& diskId) { auto request = std::make_unique(); diff --git a/cloud/blockstore/libs/storage/testlib/service_client.h b/cloud/blockstore/libs/storage/testlib/service_client.h index ff857c21665..19c46710c70 100644 --- a/cloud/blockstore/libs/storage/testlib/service_client.h +++ b/cloud/blockstore/libs/storage/testlib/service_client.h @@ -248,6 +248,11 @@ class TServiceClient std::unique_ptr CreateGetVolumeStatsRequest(); + std::unique_ptr CreateCheckRangeRequest( + const TString& diskId, + const ui64 blockIdx, + const ui64 blockCount); + void WaitForVolume(const TString& diskId = DefaultDiskId); #define BLOCKSTORE_DECLARE_METHOD(name, ns) \ diff --git a/cloud/blockstore/libs/storage/volume/multi_partition_requests.cpp b/cloud/blockstore/libs/storage/volume/multi_partition_requests.cpp index 90ba2e9b6e8..1e187087c0f 100644 --- a/cloud/blockstore/libs/storage/volume/multi_partition_requests.cpp +++ b/cloud/blockstore/libs/storage/volume/multi_partition_requests.cpp @@ -617,4 +617,22 @@ bool ToPartitionRequests( return false; } +template <> +bool ToPartitionRequests( + const TPartitionInfoList& partitions, + const ui32 blockSize, + const ui32 blocksPerStripe, + const TEvService::TCheckRangeMethod::TRequest::TPtr& ev, + TVector>* requests, + TBlockRange64* blockRange) +{ + return ToPartitionRequestsSimple( + partitions, + blockSize, + blocksPerStripe, + ev, + requests, + blockRange); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/multi_partition_requests.h b/cloud/blockstore/libs/storage/volume/multi_partition_requests.h index 828943c6317..a7e65b92da0 100644 --- a/cloud/blockstore/libs/storage/volume/multi_partition_requests.h +++ b/cloud/blockstore/libs/storage/volume/multi_partition_requests.h @@ -263,6 +263,21 @@ class TMultiPartitionRequestActor final PartitionRequests[requestNo].PartitionId); } + void Merge( + NProto::TCheckRangeResponse& src, + ui32 requestNo, + NProto::TCheckRangeResponse& dst) + { + Y_UNUSED(requestNo); + if (src.GetError().GetCode() > dst.GetError().GetCode()) { + dst.MutableError()->CopyFrom(src.GetError()); + } + + if (src.GetStatus().GetCode() > dst.GetStatus().GetCode()) { + dst.MutableStatus()->CopyFrom(src.GetStatus()); + } + } + template void Merge( T& src, diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index 2c461382958..4bf751c6d42 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -1013,6 +1013,10 @@ class TVolumeActor final const TEvVolumePrivate::TEvUpdateSmartMigrationState::TPtr& ev, const NActors::TActorContext& ctx); + void HandleCheckRangeResponse( + const TEvService::TEvCheckRangeResponse::TPtr& ev, + const NActors::TActorContext& ctx); + void CreateCheckpointLightRequest( const NActors::TActorContext& ctx, ui64 requestId, diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp index cec719ed843..3f66c2262c4 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp @@ -867,6 +867,7 @@ BLOCKSTORE_FORWARD_REQUEST(GetChangedBlocks, TEvService) BLOCKSTORE_FORWARD_REQUEST(GetCheckpointStatus, TEvService) BLOCKSTORE_FORWARD_REQUEST(ReadBlocksLocal, TEvService) BLOCKSTORE_FORWARD_REQUEST(WriteBlocksLocal, TEvService) +BLOCKSTORE_FORWARD_REQUEST(CheckRange, TEvService) BLOCKSTORE_FORWARD_REQUEST(DescribeBlocks, TEvVolume) BLOCKSTORE_FORWARD_REQUEST(GetUsedBlocks, TEvVolume) diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_forward_trackused.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_forward_trackused.cpp index 10b46d36116..35014f52485 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_forward_trackused.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_forward_trackused.cpp @@ -208,6 +208,7 @@ GENERATE_IMPL(WriteBlocksLocal, TEvService) GENERATE_NO_IMPL(CreateCheckpoint, TEvService) GENERATE_NO_IMPL(DeleteCheckpoint, TEvService) GENERATE_NO_IMPL(GetCheckpointStatus, TEvService) +GENERATE_NO_IMPL(CheckRange, TEvService) GENERATE_NO_IMPL(DescribeBlocks, TEvVolume) GENERATE_NO_IMPL(GetUsedBlocks, TEvVolume) diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_throttling.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_throttling.cpp index f9b7a76a0fe..b048038e1e8 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_throttling.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_throttling.cpp @@ -234,6 +234,7 @@ GENERATE_IMPL(GetChangedBlocks, TEvService) GENERATE_IMPL(GetCheckpointStatus, TEvService) GENERATE_IMPL(ReadBlocksLocal, TEvService) GENERATE_IMPL(WriteBlocksLocal, TEvService) +GENERATE_IMPL(CheckRange, TEvService) GENERATE_IMPL(DescribeBlocks, TEvVolume) GENERATE_IMPL(GetUsedBlocks, TEvVolume) diff --git a/cloud/blockstore/private/api/protos/volume.proto b/cloud/blockstore/private/api/protos/volume.proto index d65918850b9..9d6d3318c42 100644 --- a/cloud/blockstore/private/api/protos/volume.proto +++ b/cloud/blockstore/private/api/protos/volume.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package NCloud.NBlockStore.NPrivateProto; import "cloud/blockstore/private/api/protos/blob.proto"; +import "cloud/storage/core/protos/error.proto"; option go_package = "github.com/ydb-platform/nbs/cloud/blockstore/private/api/protos"; @@ -294,3 +295,23 @@ message TFinishFillDiskRequest message TFinishFillDiskResponse { } + +//////////////////////////////////////////////////////////////////////////////// +// CheckRange request/response. + +message TCheckRangeRequest +{ + string DiskId = 1; + + // First block index. + uint32 StartIndex = 2; + + // Number of blobs per batch. + uint32 BlocksCount = 3; +} + +message TCheckRangeResponse +{ + // Status of ReadBlocks operation. + NCloud.NProto.TError Status = 1; +} diff --git a/cloud/blockstore/public/api/grpc/service.proto b/cloud/blockstore/public/api/grpc/service.proto index b64dd0033da..f2229e68528 100644 --- a/cloud/blockstore/public/api/grpc/service.proto +++ b/cloud/blockstore/public/api/grpc/service.proto @@ -157,6 +157,17 @@ service TBlockStoreService }; } + // + //CheckRange operation + // + + rpc CheckRange(TCheckRangeRequest) returns (TCheckRangeResponse) { + option (google.api.http) = { + post: "/check_range" + body: "*" + }; + } + // // Checkpoint operations. // diff --git a/cloud/blockstore/public/api/protos/volume.proto b/cloud/blockstore/public/api/protos/volume.proto index f32ae794f5f..a9632c48a67 100644 --- a/cloud/blockstore/public/api/protos/volume.proto +++ b/cloud/blockstore/public/api/protos/volume.proto @@ -743,3 +743,32 @@ message TDescribeVolumeModelResponse // Volume model information. TVolumeModel VolumeModel = 2; } + +//////////////////////////////////////////////////////////////////////////////// +// CheckRange +message TCheckRangeRequest +{ + // Optional request headers. + THeaders Headers = 1; + + // Label of volume. + string DiskId = 2; + + // First block index. + uint64 StartIndex = 3; + + // Number of blobs per batch. + uint64 BlocksCount = 4; +} + +message TCheckRangeResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; + + // Status of ReadBlocks operation. + NCloud.NProto.TError Status = 2; + + // Request traces. + NCloud.NProto.TTraceInfo Trace = 3; +} diff --git a/cloud/blockstore/tools/http_proxy/main_test.go b/cloud/blockstore/tools/http_proxy/main_test.go index c0a21e205de..da3a08278c0 100644 --- a/cloud/blockstore/tools/http_proxy/main_test.go +++ b/cloud/blockstore/tools/http_proxy/main_test.go @@ -521,6 +521,16 @@ func (s *mockBlockstoreServer) QueryAgentsInfo( return res, args.Error(1) } +func (s *mockBlockstoreServer) CheckRange( + ctx context.Context, + req *blockstore_protos.TCheckRangeRequest, +) (*blockstore_protos.TCheckRangeResponse, error) { + + args := s.Called(ctx, req) + res, _ := args.Get(0).(*blockstore_protos.TCheckRangeResponse) + return res, args.Error(1) +} + //////////////////////////////////////////////////////////////////////////////// func runMockBlockstoreServer( diff --git a/cloud/blockstore/tools/testing/chaos-monkey/monkey_test.go b/cloud/blockstore/tools/testing/chaos-monkey/monkey_test.go index 8e1f2e0ed92..1d2609296d5 100644 --- a/cloud/blockstore/tools/testing/chaos-monkey/monkey_test.go +++ b/cloud/blockstore/tools/testing/chaos-monkey/monkey_test.go @@ -388,6 +388,14 @@ func (n nbsService) CmsAction( panic("implement me") } +func (n nbsService) CheckRange( + ctx context.Context, + request *protos.TCheckRangeRequest, +) (*protos.TCheckRangeResponse, error) { + //TODO implement me + panic("implement me") +} + //////////////////////////////////////////////////////////////////////////////// type TestContext struct {