From 86a0da144feda231169b9173fccc5868017ddd39 Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Wed, 29 Jan 2025 18:10:27 +0100 Subject: [PATCH 1/5] issue-2277: move copying of the request buffer to TIORequestParserActor --- .../disk_agent/actors/io_request_parser.cpp | 48 +++++++++++-- .../storage/disk_agent/disk_agent_actor.cpp | 10 +-- .../storage/disk_agent/disk_agent_actor.h | 8 ++- .../disk_agent/disk_agent_actor_io.cpp | 68 ++++++++++++++++++- .../storage/disk_agent/disk_agent_private.h | 15 ++++ .../storage/disk_agent/disk_agent_state.cpp | 46 ++++++++----- .../storage/disk_agent/disk_agent_state.h | 7 ++ .../storage/disk_agent/hash_table_storage.cpp | 10 ++- 8 files changed, 178 insertions(+), 34 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp index dba36fbf162..a58062f90d9 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp @@ -32,12 +32,10 @@ class TIORequestParserActor: public TActor STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); case TEvDiskAgent::EvWriteDeviceBlocksRequest: - HandleRequest( - ev, - TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest); + HandleWriteDeviceBlocks(ev); break; case TEvDiskAgent::EvReadDeviceBlocksRequest: @@ -69,6 +67,48 @@ class TIORequestParserActor: public TActor Die(ctx); } + void HandleWriteDeviceBlocks(TAutoPtr& ev) + { + auto request = std::make_unique< + TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest>(); + + // parse protobuf + auto* msg = ev->Get(); + request->Record.Swap(&msg->Record); + + ui64 bytesCount = 0; + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + bytesCount += buffer.size(); + } + + request->Storage.reset( + static_cast( + std::aligned_alloc(request->Record.GetBlockSize(), bytesCount)), + std::free); + + char* dst = request->Storage.get(); + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + std::memcpy(dst, buffer.data(), buffer.size()); + dst += buffer.size(); + } + + request->ByteCount = bytesCount; + request->Record.ClearBlocks(); + + auto newEv = std::make_unique( + ev->Recipient, + ev->Sender, + request.release(), + ev->Flags, + ev->Cookie, + nullptr, // forwardOnNondelivery + std::move(ev->TraceId)); + + newEv->Rewrite(newEv->Type, Owner); + + ActorContext().Send(std::move(newEv)); + } + template void HandleRequest(TAutoPtr& ev, ui32 typeRewrite) { diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp index c10c5de69c3..9feac6ccbc8 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp @@ -372,13 +372,9 @@ STFUNC(TDiskAgentActor::StateWork) TEvDiskAgent::TEvDisableConcreteAgentRequest, HandleDisableConcreteAgent); - case TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest: - HandleWriteDeviceBlocks( - *reinterpret_cast< - typename TEvDiskAgent::TEvWriteDeviceBlocksRequest::TPtr*>( - &ev), - ActorContext()); - break; + HFunc( + TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest, + HandleParsedWriteDeviceBlocks); case TEvDiskAgentPrivate::EvParsedReadDeviceBlocksRequest: HandleReadDeviceBlocks( diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h index 5854307216c..768c5abce1d 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h @@ -137,10 +137,10 @@ class TDiskAgentActor final void SendRegisterRequest(const NActors::TActorContext& ctx); - template + template void PerformIO( const NActors::TActorContext& ctx, - const typename TMethod::TRequest::TPtr& ev, + const TEv& ev, TOp operation); template @@ -225,6 +225,10 @@ class TDiskAgentActor final const TEvDiskAgentPrivate::TEvCancelSuspensionRequest::TPtr& ev, const NActors::TActorContext& ctx); + void HandleParsedWriteDeviceBlocks( + const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev, + const NActors::TActorContext& ctx); + bool HandleRequests(STFUNC_SIG); bool RejectRequests(STFUNC_SIG); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp index d5f55290426..f9b51e9e9fe 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp @@ -17,6 +17,24 @@ namespace { //////////////////////////////////////////////////////////////////////////////// +ui64 GetVolumeRequestId( + const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) +{ + return request.Record.GetVolumeRequestId(); +} + +TBlockRange64 BuildRequestBlockRange( + const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) +{ + Y_ABORT_UNLESS(request.ByteCount % request.Record.GetBlockSize() == 0); + + return TBlockRange64::WithLength( + request.Record.GetStartIndex(), + request.ByteCount / request.Record.GetBlockSize()); +} + +//////////////////////////////////////////////////////////////////////////////// + template constexpr bool IsWriteDeviceMethod = std::is_same_v || @@ -113,10 +131,10 @@ std::pair HandleException( //////////////////////////////////////////////////////////////////////////////// -template +template void TDiskAgentActor::PerformIO( const TActorContext& ctx, - const typename TMethod::TRequest::TPtr& ev, + const TEv& ev, TOp operation) { auto* msg = ev->Get(); @@ -327,6 +345,52 @@ void TDiskAgentActor::HandleWriteDeviceBlocks( PerformIO(ctx, ev, &TDiskAgentState::Write); } +void TDiskAgentActor::HandleParsedWriteDeviceBlocks( + const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_AGENT_COUNTER(WriteDeviceBlocks); + + using TMethod = TEvDiskAgent::TWriteDeviceBlocksMethod; + + if (CheckIntersection(ctx, ev)) { + return; + } + + // Attach storage to NProto::TWriteBlocksRequest + struct TWriteBlocksRequestWithStorage + : NProto::TWriteBlocksRequest + { + TStorageBuffer Storage; + }; + + auto* msg = ev->Get(); + + PerformIO( + ctx, + ev, + [storage = std::move(msg->Storage), byteCount = msg->ByteCount]( + TDiskAgentState& self, + TInstant now, + NProto::TWriteDeviceBlocksRequest request) mutable + { + auto writeRequest = + std::make_shared(); + writeRequest->Storage = std::move(storage); + writeRequest->MutableHeaders()->Swap(request.MutableHeaders()); + writeRequest->SetStartIndex(request.GetStartIndex()); + + TStringBuf buffer {writeRequest->Storage.get(), byteCount}; + + return self.WriteBlocks( + now, + request.GetDeviceUUID(), + std::move(writeRequest), + request.GetBlockSize(), + buffer); + }); +} + void TDiskAgentActor::HandleZeroDeviceBlocks( const TEvDiskAgent::TEvZeroDeviceBlocksRequest::TPtr& ev, const TActorContext& ctx) diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h index 94cfa9f17d0..7fba588a1c1 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h @@ -157,6 +157,17 @@ struct TEvDiskAgentPrivate struct TCancelSuspensionRequest {}; + // + // ParsedWriteDeviceBlocksRequest + // + + struct TParsedWriteDeviceBlocksRequest + { + NProto::TWriteDeviceBlocksRequest Record; + TStorageBuffer Storage; + ui64 ByteCount = 0; + }; + // // Events declaration // @@ -207,6 +218,10 @@ struct TEvDiskAgentPrivate TCancelSuspensionRequest, EvCancelSuspensionRequest>; + using TEvParsedWriteDeviceBlocksRequest = TRequestEvent< + TParsedWriteDeviceBlocksRequest, + EvParsedWriteDeviceBlocksRequest>; + BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache) }; diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp index 0c955b57742..44d79e8be6d 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp @@ -645,35 +645,49 @@ TFuture TDiskAgentState::Write( TInstant now, NProto::TWriteDeviceBlocksRequest request) { - CheckIfDeviceIsDisabled( - request.GetDeviceUUID(), - request.GetHeaders().GetClientId()); - - const auto& device = GetDeviceState( - request.GetDeviceUUID(), - request.GetHeaders().GetClientId(), - NProto::VOLUME_ACCESS_READ_WRITE); - auto writeRequest = std::make_shared(); writeRequest->MutableHeaders()->CopyFrom(request.GetHeaders()); writeRequest->SetStartIndex(request.GetStartIndex()); writeRequest->MutableBlocks()->Swap(request.MutableBlocks()); - WriteProfileLog( + return WriteBlocks( now, request.GetDeviceUUID(), - *writeRequest, + std::move(writeRequest), request.GetBlockSize(), - ESysRequestType::WriteDeviceBlocks + {} // buffer ); +} + +TFuture TDiskAgentState::WriteBlocks( + TInstant now, + const TString& deviceUUID, + std::shared_ptr request, + ui32 blockSize, + TStringBuf buffer) +{ + CheckIfDeviceIsDisabled( + deviceUUID, + request->GetHeaders().GetClientId()); + + const auto& device = GetDeviceState( + deviceUUID, + request->GetHeaders().GetClientId(), + NProto::VOLUME_ACCESS_READ_WRITE); + + WriteProfileLog( + now, + deviceUUID, + *request, + blockSize, + ESysRequestType::WriteDeviceBlocks); auto result = device.StorageAdapter->WriteBlocks( now, MakeIntrusive(), - std::move(writeRequest), - request.GetBlockSize(), - {} // no data buffer - ); + std::move(request), + blockSize, + buffer); return result.Apply( [] (const auto& future) { diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h index e36ce10810b..8937d5538ed 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h @@ -103,6 +103,13 @@ class TDiskAgentState TInstant now, NProto::TWriteDeviceBlocksRequest request); + NThreading::TFuture WriteBlocks( + TInstant now, + const TString& deviceUUID, + std::shared_ptr request, + ui32 blockSize, + TStringBuf buffer); + NThreading::TFuture WriteZeroes( TInstant now, NProto::TZeroDeviceBlocksRequest request); diff --git a/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp b/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp index 7cf9cc2e135..7500f974a9c 100644 --- a/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp @@ -109,7 +109,7 @@ struct THashTableStorage final return MakeFuture(std::move(response)); } - auto sglist = guard.Get(); + const auto& sglist = guard.Get(); auto b = request->GetStartIndex(); auto e = request->GetStartIndex() + request->BlocksCount; @@ -120,11 +120,15 @@ struct THashTableStorage final return MakeFuture(std::move(response)); } - while (b < e) { - Blocks[b] = sglist[b - request->GetStartIndex()].AsStringBuf(); + TSgList dst(request->BlocksCount); + while (b < e) { + auto& block = Blocks[b]; + block.resize(request->BlockSize); + dst[b - request->GetStartIndex()] = {block.data(), block.size()}; ++b; } + SgListCopy(sglist, dst); return MakeFuture(std::move(response)); } From ffeebf5dd8b6364be549e9e1206378425e2887da Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Wed, 29 Jan 2025 21:46:27 +0100 Subject: [PATCH 2/5] allocator --- .../disk_agent/actors/io_request_parser.cpp | 18 +++++++++++------- .../disk_agent/actors/io_request_parser.h | 7 ++++++- .../disk_agent/disk_agent_actor_init.cpp | 11 ++++++++++- .../storage/disk_agent/disk_agent_actor_io.cpp | 2 +- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp index a58062f90d9..b618ef46cef 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp @@ -21,11 +21,15 @@ class TIORequestParserActor: public TActor { private: const TActorId Owner; + TStorageBufferAllocator Allocator; public: - explicit TIORequestParserActor(const TActorId& owner) + explicit TIORequestParserActor( + const TActorId& owner, + TStorageBufferAllocator allocator) : TActor(&TIORequestParserActor::StateWork) , Owner(owner) + , Allocator(std::move(allocator)) {} private: @@ -81,10 +85,8 @@ class TIORequestParserActor: public TActor bytesCount += buffer.size(); } - request->Storage.reset( - static_cast( - std::aligned_alloc(request->Record.GetBlockSize(), bytesCount)), - std::free); + request->Storage = + Allocator(request->Record.GetBlockSize(), bytesCount); char* dst = request->Storage.get(); for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { @@ -125,9 +127,11 @@ class TIORequestParserActor: public TActor //////////////////////////////////////////////////////////////////////////////// -std::unique_ptr CreateIORequestParserActor(const TActorId& owner) +std::unique_ptr CreateIORequestParserActor( + const TActorId& owner, + TStorageBufferAllocator allocator) { - return std::make_unique(owner); + return std::make_unique(owner, std::move(allocator)); } } // namespace NCloud::NBlockStore::NStorage::NDiskAgent diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h index 1f674e4a4a2..b1941e88bab 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h @@ -2,13 +2,18 @@ #include +#include #include namespace NCloud::NBlockStore::NStorage::NDiskAgent { //////////////////////////////////////////////////////////////////////////////// +using TStorageBufferAllocator = + std::function(ui32 blockSize, ui64 bytesCount)>; + std::unique_ptr CreateIORequestParserActor( - const NActors::TActorId& owner); + const NActors::TActorId& owner, + TStorageBufferAllocator allocator); } // namespace NCloud::NBlockStore::NStorage::NDiskAgent diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp index 1f30059eece..d675d721f4b 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp @@ -144,7 +144,16 @@ void TDiskAgentActor::HandleInitAgentCompleted( "Create " << count << " IORequestParserActor actors"); IOParserActors.reserve(count); for (ui32 i = 0; i != count; ++i) { - auto actor = NDiskAgent::CreateIORequestParserActor(ctx.SelfID); + auto actor = NDiskAgent::CreateIORequestParserActor( + ctx.SelfID, + [](ui32 blockSize, ui64 byteCount) + { + return std::shared_ptr( + static_cast( + std::aligned_alloc(blockSize, byteCount)), + std::free); + }); + IOParserActors.push_back(ctx.Register( actor.release(), TMailboxType::TinyReadAsFilled, diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp index f9b51e9e9fe..6412b019967 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp @@ -380,7 +380,7 @@ void TDiskAgentActor::HandleParsedWriteDeviceBlocks( writeRequest->MutableHeaders()->Swap(request.MutableHeaders()); writeRequest->SetStartIndex(request.GetStartIndex()); - TStringBuf buffer {writeRequest->Storage.get(), byteCount}; + TStringBuf buffer{writeRequest->Storage.get(), byteCount}; return self.WriteBlocks( now, From 4780db420c32c30cfc6e4bcadd68d6f8dbf8b018 Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Thu, 30 Jan 2025 12:00:20 +0100 Subject: [PATCH 3/5] IOParserActorAllocateStorageEnabled --- cloud/blockstore/config/disk.proto | 4 +++ .../libs/storage/core/proto_helpers.cpp | 21 +++++++++---- .../libs/storage/core/proto_helpers.h | 4 +++ .../disk_agent/actors/io_request_parser.cpp | 30 +++++++++---------- .../disk_agent/actors/io_request_parser.h | 2 +- .../disk_agent/disk_agent_actor_init.cpp | 25 ++++++++++------ .../disk_agent/disk_agent_actor_io.cpp | 24 ++++++++++----- .../disk_agent/disk_agent_actor_ut.cpp | 1 + .../storage/disk_agent/disk_agent_private.h | 2 +- .../libs/storage/disk_agent/model/config.cpp | 1 + .../libs/storage/disk_agent/model/config.h | 1 + .../storage/disk_agent/testlib/test_env.cpp | 1 + cloud/blockstore/tests/python/lib/config.py | 1 + .../tests/python/lib/nonreplicated_setup.py | 1 + 14 files changed, 80 insertions(+), 38 deletions(-) diff --git a/cloud/blockstore/config/disk.proto b/cloud/blockstore/config/disk.proto index 8b715760fa9..09d5ffffc19 100644 --- a/cloud/blockstore/config/disk.proto +++ b/cloud/blockstore/config/disk.proto @@ -294,6 +294,10 @@ message TDiskAgentConfig // Settings for traffic shaping. optional TDiskAgentThrottlingConfig ThrottlingConfig = 38; + + // If enabled, IOParserActor allocates a storage buffer and copies the + // request data into it. + optional uint32 IOParserActorAllocateStorageEnabled = 39; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index 51a23fb1e7d..c56b9c4e04a 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -413,16 +413,22 @@ TBlockRange64 BuildRequestBlockRange( TBlockRange64 BuildRequestBlockRange( const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request) +{ + return BuildRequestBlockRange(request.Record); +} + +TBlockRange64 BuildRequestBlockRange( + const NProto::TWriteDeviceBlocksRequest& request) { ui64 totalSize = 0; - for (const auto& buffer: request.Record.GetBlocks().GetBuffers()) { + for (const auto& buffer: request.GetBlocks().GetBuffers()) { totalSize += buffer.length(); } - Y_ABORT_UNLESS(totalSize % request.Record.GetBlockSize() == 0); + Y_ABORT_UNLESS(totalSize % request.GetBlockSize() == 0); return TBlockRange64::WithLength( - request.Record.GetStartIndex(), - totalSize / request.Record.GetBlockSize()); + request.GetStartIndex(), + totalSize / request.GetBlockSize()); } TBlockRange64 BuildRequestBlockRange( @@ -436,7 +442,12 @@ TBlockRange64 BuildRequestBlockRange( ui64 GetVolumeRequestId( const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request) { - return request.Record.GetVolumeRequestId(); + return GetVolumeRequestId(request.Record); +} + +ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request) +{ + return request.GetVolumeRequestId(); } ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request) diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index f8ecfb90327..86c4f5f5e58 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -217,10 +217,14 @@ TBlockRange64 BuildRequestBlockRange( TBlockRange64 BuildRequestBlockRange( const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); +TBlockRange64 BuildRequestBlockRange( + const NProto::TWriteDeviceBlocksRequest& request); + TBlockRange64 BuildRequestBlockRange( const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request); +ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request); ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request); TString LogDevices(const TVector& devices); diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp index b618ef46cef..8ce583c248b 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp @@ -80,23 +80,23 @@ class TIORequestParserActor: public TActor auto* msg = ev->Get(); request->Record.Swap(&msg->Record); - ui64 bytesCount = 0; - for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { - bytesCount += buffer.size(); + if (Allocator) { + ui64 bytesCount = 0; + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + bytesCount += buffer.size(); + } + + request->Storage = Allocator(bytesCount); + request->StorageSize = bytesCount; + + char* dst = request->Storage.get(); + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + std::memcpy(dst, buffer.data(), buffer.size()); + dst += buffer.size(); + } + request->Record.ClearBlocks(); } - request->Storage = - Allocator(request->Record.GetBlockSize(), bytesCount); - - char* dst = request->Storage.get(); - for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { - std::memcpy(dst, buffer.data(), buffer.size()); - dst += buffer.size(); - } - - request->ByteCount = bytesCount; - request->Record.ClearBlocks(); - auto newEv = std::make_unique( ev->Recipient, ev->Sender, diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h index b1941e88bab..9485c0abd44 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.h @@ -10,7 +10,7 @@ namespace NCloud::NBlockStore::NStorage::NDiskAgent { //////////////////////////////////////////////////////////////////////////////// using TStorageBufferAllocator = - std::function(ui32 blockSize, ui64 bytesCount)>; + std::function(ui64 bytesCount)>; std::unique_ptr CreateIORequestParserActor( const NActors::TActorId& owner, diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp index d675d721f4b..50963f521fb 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp @@ -142,17 +142,24 @@ void TDiskAgentActor::HandleInitAgentCompleted( ctx, TBlockStoreComponents::DISK_AGENT, "Create " << count << " IORequestParserActor actors"); + + NDiskAgent::TStorageBufferAllocator allocator; + if (AgentConfig->GetIOParserActorAllocateStorageEnabled() && + AgentConfig->GetBackend() == NProto::DISK_AGENT_BACKEND_AIO) + { + allocator = [](ui64 byteCount) + { + return std::shared_ptr( + static_cast( + std::aligned_alloc(DefaultBlockSize, byteCount)), + std::free); + }; + } + IOParserActors.reserve(count); for (ui32 i = 0; i != count; ++i) { - auto actor = NDiskAgent::CreateIORequestParserActor( - ctx.SelfID, - [](ui32 blockSize, ui64 byteCount) - { - return std::shared_ptr( - static_cast( - std::aligned_alloc(blockSize, byteCount)), - std::free); - }); + auto actor = + NDiskAgent::CreateIORequestParserActor(ctx.SelfID, allocator); IOParserActors.push_back(ctx.Register( actor.release(), diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp index 6412b019967..73ef99f9569 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp @@ -26,11 +26,15 @@ ui64 GetVolumeRequestId( TBlockRange64 BuildRequestBlockRange( const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) { - Y_ABORT_UNLESS(request.ByteCount % request.Record.GetBlockSize() == 0); + if (!request.StorageSize) { + return NStorage::BuildRequestBlockRange(request.Record); + } + + Y_ABORT_UNLESS(request.StorageSize % request.Record.GetBlockSize() == 0); return TBlockRange64::WithLength( request.Record.GetStartIndex(), - request.ByteCount / request.Record.GetBlockSize()); + request.StorageSize / request.Record.GetBlockSize()); } //////////////////////////////////////////////////////////////////////////////// @@ -357,6 +361,13 @@ void TDiskAgentActor::HandleParsedWriteDeviceBlocks( return; } + auto* msg = ev->Get(); + + if (!msg->Storage) { + PerformIO(ctx, ev, &TDiskAgentState::Write); + return; + } + // Attach storage to NProto::TWriteBlocksRequest struct TWriteBlocksRequestWithStorage : NProto::TWriteBlocksRequest @@ -364,23 +375,22 @@ void TDiskAgentActor::HandleParsedWriteDeviceBlocks( TStorageBuffer Storage; }; - auto* msg = ev->Get(); - PerformIO( ctx, ev, - [storage = std::move(msg->Storage), byteCount = msg->ByteCount]( + [storage = std::move(msg->Storage), storageSize = msg->StorageSize]( TDiskAgentState& self, TInstant now, NProto::TWriteDeviceBlocksRequest request) mutable { auto writeRequest = std::make_shared(); - writeRequest->Storage = std::move(storage); writeRequest->MutableHeaders()->Swap(request.MutableHeaders()); + writeRequest->MutableBlocks()->Swap(request.MutableBlocks()); writeRequest->SetStartIndex(request.GetStartIndex()); + writeRequest->Storage = std::move(storage); - TStringBuf buffer{writeRequest->Storage.get(), byteCount}; + TStringBuf buffer{writeRequest->Storage.get(), storageSize}; return self.WriteBlocks( now, diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp index 6afadf5edf9..10a2def21e4 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_ut.cpp @@ -4847,6 +4847,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest) auto config = DiskAgentConfig({deviceId}); config.SetIOParserActorCount(4); config.SetOffloadAllIORequestsParsingEnabled(true); + config.SetIOParserActorAllocateStorageEnabled(true); return config; }(); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h index 7fba588a1c1..034a110abfc 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h @@ -165,7 +165,7 @@ struct TEvDiskAgentPrivate { NProto::TWriteDeviceBlocksRequest Record; TStorageBuffer Storage; - ui64 ByteCount = 0; + ui64 StorageSize = 0; }; // diff --git a/cloud/blockstore/libs/storage/disk_agent/model/config.cpp b/cloud/blockstore/libs/storage/disk_agent/model/config.cpp index 270a9516cca..bf7c665bd1c 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/config.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/config.cpp @@ -49,6 +49,7 @@ namespace { xxx(MaxAIOContextEvents, ui32, 1024 )\ xxx(PathsPerFileIOService, ui32, 0 )\ xxx(DisableBrokenDevices, bool, 0 )\ + xxx(IOParserActorAllocateStorageEnabled, bool, 0 )\ // BLOCKSTORE_AGENT_CONFIG #define BLOCKSTORE_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/blockstore/libs/storage/disk_agent/model/config.h b/cloud/blockstore/libs/storage/disk_agent/model/config.h index 8a7f7c7c18e..01cd5c69b3e 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/config.h +++ b/cloud/blockstore/libs/storage/disk_agent/model/config.h @@ -108,6 +108,7 @@ class TDiskAgentConfig ui32 GetIOParserActorCount() const; bool GetOffloadAllIORequestsParsingEnabled() const; + bool GetIOParserActorAllocateStorageEnabled() const; bool GetDisableNodeBrokerRegistrationOnDevicelessAgent() const; ui32 GetMaxAIOContextEvents() const; ui32 GetPathsPerFileIOService() const; diff --git a/cloud/blockstore/libs/storage/disk_agent/testlib/test_env.cpp b/cloud/blockstore/libs/storage/disk_agent/testlib/test_env.cpp index 97f5fabdc1a..188c320a48e 100644 --- a/cloud/blockstore/libs/storage/disk_agent/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/testlib/test_env.cpp @@ -721,6 +721,7 @@ NProto::TDiskAgentConfig CreateDefaultAgentConfig() config.SetIOParserActorCount(4); config.SetOffloadAllIORequestsParsingEnabled(true); + config.SetIOParserActorAllocateStorageEnabled(true); return config; } diff --git a/cloud/blockstore/tests/python/lib/config.py b/cloud/blockstore/tests/python/lib/config.py index 6ace6561900..25e9f9d89fc 100644 --- a/cloud/blockstore/tests/python/lib/config.py +++ b/cloud/blockstore/tests/python/lib/config.py @@ -316,6 +316,7 @@ def generate_disk_agent_txt( config.ShutdownTimeout = 0 config.IOParserActorCount = 4 config.OffloadAllIORequestsParsingEnabled = True + config.IOParserActorAllocateStorageEnabled = True config.PathsPerFileIOService = 1 if device_erase_method is not None: diff --git a/cloud/blockstore/tests/python/lib/nonreplicated_setup.py b/cloud/blockstore/tests/python/lib/nonreplicated_setup.py index 4c55db1eb6d..681516042fa 100644 --- a/cloud/blockstore/tests/python/lib/nonreplicated_setup.py +++ b/cloud/blockstore/tests/python/lib/nonreplicated_setup.py @@ -222,6 +222,7 @@ def setup_disk_agent_config( config.ShutdownTimeout = get_shutdown_agent_interval() config.IOParserActorCount = 4 config.OffloadAllIORequestsParsingEnabled = True + config.IOParserActorAllocateStorageEnabled = True config.PathsPerFileIOService = 2 if cached_sessions_path is not None: From e20922cec454896ba4bae6ca57c0e6eb1e1db8a5 Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Thu, 30 Jan 2025 12:06:26 +0100 Subject: [PATCH 4/5] tweak --- .../libs/storage/disk_agent/actors/io_request_parser.cpp | 8 +++++--- .../libs/storage/disk_agent/disk_agent_actor_io.cpp | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp index 8ce583c248b..77e467187fa 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp @@ -24,7 +24,7 @@ class TIORequestParserActor: public TActor TStorageBufferAllocator Allocator; public: - explicit TIORequestParserActor( + TIORequestParserActor( const TActorId& owner, TStorageBufferAllocator allocator) : TActor(&TIORequestParserActor::StateWork) @@ -81,8 +81,10 @@ class TIORequestParserActor: public TActor request->Record.Swap(&msg->Record); if (Allocator) { + const auto& buffers = request->Record.GetBlocks().GetBuffers(); + ui64 bytesCount = 0; - for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + for (const auto& buffer: buffers) { bytesCount += buffer.size(); } @@ -90,7 +92,7 @@ class TIORequestParserActor: public TActor request->StorageSize = bytesCount; char* dst = request->Storage.get(); - for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + for (const auto& buffer: buffers) { std::memcpy(dst, buffer.data(), buffer.size()); dst += buffer.size(); } diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp index 73ef99f9569..3b7f782d443 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp @@ -20,7 +20,7 @@ namespace { ui64 GetVolumeRequestId( const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) { - return request.Record.GetVolumeRequestId(); + return NStorage::GetVolumeRequestId(request.Record); } TBlockRange64 BuildRequestBlockRange( From 9dedd4a71a071ea97d59887db0eeb19cffc65ea0 Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Thu, 30 Jan 2025 13:52:23 +0100 Subject: [PATCH 5/5] fix proto --- cloud/blockstore/config/disk.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/blockstore/config/disk.proto b/cloud/blockstore/config/disk.proto index 09d5ffffc19..cc290743eae 100644 --- a/cloud/blockstore/config/disk.proto +++ b/cloud/blockstore/config/disk.proto @@ -297,7 +297,7 @@ message TDiskAgentConfig // If enabled, IOParserActor allocates a storage buffer and copies the // request data into it. - optional uint32 IOParserActorAllocateStorageEnabled = 39; + optional bool IOParserActorAllocateStorageEnabled = 39; } ////////////////////////////////////////////////////////////////////////////////