Skip to content

Commit

Permalink
issue-2277: move copying of the request buffer to TIORequestParserAct…
Browse files Browse the repository at this point in the history
…or (#2946)
  • Loading branch information
sharpeye authored Jan 31, 2025
1 parent 8a66b78 commit c43ddfe
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 44 deletions.
4 changes: 4 additions & 0 deletions cloud/blockstore/config/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ message TDiskAgentConfig

// Settings for traffic shaping.
optional TDiskAgentThrottlingConfig ThrottlingConfig = 38;

// If enabled, IOParserActor allocates a storage buffer and copies the
// request data into it.
optional bool IOParserActorAllocateStorageEnabled = 39;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
21 changes: 16 additions & 5 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,22 @@ TBlockRange64 BuildRequestBlockRange(

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return BuildRequestBlockRange(request.Record);
}

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request)
{
ui64 totalSize = 0;
for (const auto& buffer: request.Record.GetBlocks().GetBuffers()) {
for (const auto& buffer: request.GetBlocks().GetBuffers()) {
totalSize += buffer.length();
}
Y_ABORT_UNLESS(totalSize % request.Record.GetBlockSize() == 0);
Y_ABORT_UNLESS(totalSize % request.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
totalSize / request.Record.GetBlockSize());
request.GetStartIndex(),
totalSize / request.GetBlockSize());
}

TBlockRange64 BuildRequestBlockRange(
Expand All @@ -436,7 +442,12 @@ TBlockRange64 BuildRequestBlockRange(
ui64 GetVolumeRequestId(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return request.Record.GetVolumeRequestId();
return GetVolumeRequestId(request.Record);
}

ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request)
{
return request.GetVolumeRequestId();
}

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ TBlockRange64 BuildRequestBlockRange(
TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

TString LogDevices(const TVector<NProto::TDeviceConfig>& devices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
{
private:
const TActorId Owner;
TStorageBufferAllocator Allocator;

public:
explicit TIORequestParserActor(const TActorId& owner)
TIORequestParserActor(
const TActorId& owner,
TStorageBufferAllocator allocator)
: TActor(&TIORequestParserActor::StateWork)
, Owner(owner)
, Allocator(std::move(allocator))
{}

private:
STFUNC(StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);

case TEvDiskAgent::EvWriteDeviceBlocksRequest:
HandleRequest<TEvDiskAgent::TEvWriteDeviceBlocksRequest>(
ev,
TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest);
HandleWriteDeviceBlocks(ev);
break;

case TEvDiskAgent::EvReadDeviceBlocksRequest:
Expand Down Expand Up @@ -69,6 +71,48 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
Die(ctx);
}

void HandleWriteDeviceBlocks(TAutoPtr<IEventHandle>& ev)
{
auto request = std::make_unique<
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest>();

// parse protobuf
auto* msg = ev->Get<TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
request->Record.Swap(&msg->Record);

if (Allocator) {
const auto& buffers = request->Record.GetBlocks().GetBuffers();

ui64 bytesCount = 0;
for (const auto& buffer: buffers) {
bytesCount += buffer.size();
}

request->Storage = Allocator(bytesCount);
request->StorageSize = bytesCount;

char* dst = request->Storage.get();
for (const auto& buffer: buffers) {
std::memcpy(dst, buffer.data(), buffer.size());
dst += buffer.size();
}
request->Record.ClearBlocks();
}

auto newEv = std::make_unique<IEventHandle>(
ev->Recipient,
ev->Sender,
request.release(),
ev->Flags,
ev->Cookie,
nullptr, // forwardOnNondelivery
std::move(ev->TraceId));

newEv->Rewrite(newEv->Type, Owner);

ActorContext().Send(std::move(newEv));
}

template <typename TRequest>
void HandleRequest(TAutoPtr<IEventHandle>& ev, ui32 typeRewrite)
{
Expand All @@ -85,9 +129,11 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>

////////////////////////////////////////////////////////////////////////////////

std::unique_ptr<IActor> CreateIORequestParserActor(const TActorId& owner)
std::unique_ptr<IActor> CreateIORequestParserActor(
const TActorId& owner,
TStorageBufferAllocator allocator)
{
return std::make_unique<TIORequestParserActor>(owner);
return std::make_unique<TIORequestParserActor>(owner, std::move(allocator));
}

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

#include <contrib/ydb/library/actors/core/actor.h>

#include <functional>
#include <memory>

namespace NCloud::NBlockStore::NStorage::NDiskAgent {

////////////////////////////////////////////////////////////////////////////////

using TStorageBufferAllocator =
std::function<std::shared_ptr<char>(ui64 bytesCount)>;

std::unique_ptr<NActors::IActor> CreateIORequestParserActor(
const NActors::TActorId& owner);
const NActors::TActorId& owner,
TStorageBufferAllocator allocator);

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
10 changes: 3 additions & 7 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,9 @@ STFUNC(TDiskAgentActor::StateWork)
TEvDiskAgent::TEvDisableConcreteAgentRequest,
HandleDisableConcreteAgent);

case TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest:
HandleWriteDeviceBlocks(
*reinterpret_cast<
typename TEvDiskAgent::TEvWriteDeviceBlocksRequest::TPtr*>(
&ev),
ActorContext());
break;
HFunc(
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest,
HandleParsedWriteDeviceBlocks);

case TEvDiskAgentPrivate::EvParsedReadDeviceBlocksRequest:
HandleReadDeviceBlocks(
Expand Down
8 changes: 6 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ class TDiskAgentActor final

void SendRegisterRequest(const NActors::TActorContext& ctx);

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void PerformIO(
const NActors::TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation);

template <typename TMethod, typename TRequestPtr>
Expand Down Expand Up @@ -225,6 +225,10 @@ class TDiskAgentActor final
const TEvDiskAgentPrivate::TEvCancelSuspensionRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);
bool RejectRequests(STFUNC_SIG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,25 @@ void TDiskAgentActor::HandleInitAgentCompleted(
ctx,
TBlockStoreComponents::DISK_AGENT,
"Create " << count << " IORequestParserActor actors");

NDiskAgent::TStorageBufferAllocator allocator;
if (AgentConfig->GetIOParserActorAllocateStorageEnabled() &&
AgentConfig->GetBackend() == NProto::DISK_AGENT_BACKEND_AIO)
{
allocator = [](ui64 byteCount)
{
return std::shared_ptr<char>(
static_cast<char*>(
std::aligned_alloc(DefaultBlockSize, byteCount)),
std::free);
};
}

IOParserActors.reserve(count);
for (ui32 i = 0; i != count; ++i) {
auto actor = NDiskAgent::CreateIORequestParserActor(ctx.SelfID);
auto actor =
NDiskAgent::CreateIORequestParserActor(ctx.SelfID, allocator);

IOParserActors.push_back(ctx.Register(
actor.release(),
TMailboxType::TinyReadAsFilled,
Expand Down
78 changes: 76 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

ui64 GetVolumeRequestId(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
return NStorage::GetVolumeRequestId(request.Record);
}

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
if (!request.StorageSize) {
return NStorage::BuildRequestBlockRange(request.Record);
}

Y_ABORT_UNLESS(request.StorageSize % request.Record.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
request.StorageSize / request.Record.GetBlockSize());
}

////////////////////////////////////////////////////////////////////////////////

template <typename T>
constexpr bool IsWriteDeviceMethod =
std::is_same_v<T, TEvDiskAgent::TWriteDeviceBlocksMethod> ||
Expand Down Expand Up @@ -113,10 +135,10 @@ std::pair<ui32, TString> HandleException(

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void TDiskAgentActor::PerformIO(
const TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation)
{
auto* msg = ev->Get();
Expand Down Expand Up @@ -327,6 +349,58 @@ void TDiskAgentActor::HandleWriteDeviceBlocks(
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
}

void TDiskAgentActor::HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
{
BLOCKSTORE_DISK_AGENT_COUNTER(WriteDeviceBlocks);

using TMethod = TEvDiskAgent::TWriteDeviceBlocksMethod;

if (CheckIntersection<TMethod>(ctx, ev)) {
return;
}

auto* msg = ev->Get();

if (!msg->Storage) {
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
return;
}

// Attach storage to NProto::TWriteBlocksRequest
struct TWriteBlocksRequestWithStorage
: NProto::TWriteBlocksRequest
{
TStorageBuffer Storage;
};

PerformIO<TMethod>(
ctx,
ev,
[storage = std::move(msg->Storage), storageSize = msg->StorageSize](
TDiskAgentState& self,
TInstant now,
NProto::TWriteDeviceBlocksRequest request) mutable
{
auto writeRequest =
std::make_shared<TWriteBlocksRequestWithStorage>();
writeRequest->MutableHeaders()->Swap(request.MutableHeaders());
writeRequest->MutableBlocks()->Swap(request.MutableBlocks());
writeRequest->SetStartIndex(request.GetStartIndex());
writeRequest->Storage = std::move(storage);

TStringBuf buffer{writeRequest->Storage.get(), storageSize};

return self.WriteBlocks(
now,
request.GetDeviceUUID(),
std::move(writeRequest),
request.GetBlockSize(),
buffer);
});
}

void TDiskAgentActor::HandleZeroDeviceBlocks(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4847,6 +4847,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest)
auto config = DiskAgentConfig({deviceId});
config.SetIOParserActorCount(4);
config.SetOffloadAllIORequestsParsingEnabled(true);
config.SetIOParserActorAllocateStorageEnabled(true);

return config;
}();
Expand Down
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ struct TEvDiskAgentPrivate
struct TCancelSuspensionRequest
{};

//
// ParsedWriteDeviceBlocksRequest
//

struct TParsedWriteDeviceBlocksRequest
{
NProto::TWriteDeviceBlocksRequest Record;
TStorageBuffer Storage;
ui64 StorageSize = 0;
};

//
// Events declaration
//
Expand Down Expand Up @@ -207,6 +218,10 @@ struct TEvDiskAgentPrivate
TCancelSuspensionRequest,
EvCancelSuspensionRequest>;

using TEvParsedWriteDeviceBlocksRequest = TRequestEvent<
TParsedWriteDeviceBlocksRequest,
EvParsedWriteDeviceBlocksRequest>;

BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache)
};

Expand Down
Loading

0 comments on commit c43ddfe

Please sign in to comment.