Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2277: move copying of the request buffer to TIORequestParserActor #2946

Merged
merged 5 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/blockstore/config/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ message TDiskAgentConfig

// Settings for traffic shaping.
optional TDiskAgentThrottlingConfig ThrottlingConfig = 38;

// If enabled, IOParserActor allocates a storage buffer and copies the
// request data into it.
optional bool IOParserActorAllocateStorageEnabled = 39;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
21 changes: 16 additions & 5 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,22 @@ TBlockRange64 BuildRequestBlockRange(

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return BuildRequestBlockRange(request.Record);
}

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request)
{
ui64 totalSize = 0;
for (const auto& buffer: request.Record.GetBlocks().GetBuffers()) {
for (const auto& buffer: request.GetBlocks().GetBuffers()) {
totalSize += buffer.length();
}
Y_ABORT_UNLESS(totalSize % request.Record.GetBlockSize() == 0);
Y_ABORT_UNLESS(totalSize % request.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
totalSize / request.Record.GetBlockSize());
request.GetStartIndex(),
totalSize / request.GetBlockSize());
}

TBlockRange64 BuildRequestBlockRange(
Expand All @@ -436,7 +442,12 @@ TBlockRange64 BuildRequestBlockRange(
ui64 GetVolumeRequestId(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return request.Record.GetVolumeRequestId();
return GetVolumeRequestId(request.Record);
}

ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request)
{
return request.GetVolumeRequestId();
}

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ TBlockRange64 BuildRequestBlockRange(
TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

TString LogDevices(const TVector<NProto::TDeviceConfig>& devices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
{
private:
const TActorId Owner;
TStorageBufferAllocator Allocator;

public:
explicit TIORequestParserActor(const TActorId& owner)
TIORequestParserActor(
const TActorId& owner,
TStorageBufferAllocator allocator)
: TActor(&TIORequestParserActor::StateWork)
, Owner(owner)
, Allocator(std::move(allocator))
{}

private:
STFUNC(StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);

case TEvDiskAgent::EvWriteDeviceBlocksRequest:
HandleRequest<TEvDiskAgent::TEvWriteDeviceBlocksRequest>(
ev,
TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest);
HandleWriteDeviceBlocks(ev);
break;

case TEvDiskAgent::EvReadDeviceBlocksRequest:
Expand Down Expand Up @@ -69,6 +71,48 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
Die(ctx);
}

void HandleWriteDeviceBlocks(TAutoPtr<IEventHandle>& ev)
{
auto request = std::make_unique<
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest>();

// parse protobuf
auto* msg = ev->Get<TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
request->Record.Swap(&msg->Record);

if (Allocator) {
const auto& buffers = request->Record.GetBlocks().GetBuffers();

ui64 bytesCount = 0;
for (const auto& buffer: buffers) {
bytesCount += buffer.size();
}

request->Storage = Allocator(bytesCount);
request->StorageSize = bytesCount;

char* dst = request->Storage.get();
for (const auto& buffer: buffers) {
std::memcpy(dst, buffer.data(), buffer.size());
dst += buffer.size();
}
request->Record.ClearBlocks();
}

auto newEv = std::make_unique<IEventHandle>(
ev->Recipient,
ev->Sender,
request.release(),
ev->Flags,
ev->Cookie,
nullptr, // forwardOnNondelivery
std::move(ev->TraceId));

newEv->Rewrite(newEv->Type, Owner);

ActorContext().Send(std::move(newEv));
}

template <typename TRequest>
void HandleRequest(TAutoPtr<IEventHandle>& ev, ui32 typeRewrite)
{
Expand All @@ -85,9 +129,11 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>

////////////////////////////////////////////////////////////////////////////////

std::unique_ptr<IActor> CreateIORequestParserActor(const TActorId& owner)
std::unique_ptr<IActor> CreateIORequestParserActor(
const TActorId& owner,
TStorageBufferAllocator allocator)
{
return std::make_unique<TIORequestParserActor>(owner);
return std::make_unique<TIORequestParserActor>(owner, std::move(allocator));
}

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

#include <contrib/ydb/library/actors/core/actor.h>

#include <functional>
#include <memory>

namespace NCloud::NBlockStore::NStorage::NDiskAgent {

////////////////////////////////////////////////////////////////////////////////

using TStorageBufferAllocator =
std::function<std::shared_ptr<char>(ui64 bytesCount)>;

std::unique_ptr<NActors::IActor> CreateIORequestParserActor(
const NActors::TActorId& owner);
const NActors::TActorId& owner,
TStorageBufferAllocator allocator);

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
10 changes: 3 additions & 7 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,9 @@ STFUNC(TDiskAgentActor::StateWork)
TEvDiskAgent::TEvDisableConcreteAgentRequest,
HandleDisableConcreteAgent);

case TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest:
HandleWriteDeviceBlocks(
*reinterpret_cast<
typename TEvDiskAgent::TEvWriteDeviceBlocksRequest::TPtr*>(
&ev),
ActorContext());
break;
HFunc(
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest,
HandleParsedWriteDeviceBlocks);

case TEvDiskAgentPrivate::EvParsedReadDeviceBlocksRequest:
HandleReadDeviceBlocks(
Expand Down
8 changes: 6 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ class TDiskAgentActor final

void SendRegisterRequest(const NActors::TActorContext& ctx);

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void PerformIO(
const NActors::TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation);

template <typename TMethod, typename TRequestPtr>
Expand Down Expand Up @@ -225,6 +225,10 @@ class TDiskAgentActor final
const TEvDiskAgentPrivate::TEvCancelSuspensionRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);
bool RejectRequests(STFUNC_SIG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,25 @@ void TDiskAgentActor::HandleInitAgentCompleted(
ctx,
TBlockStoreComponents::DISK_AGENT,
"Create " << count << " IORequestParserActor actors");

NDiskAgent::TStorageBufferAllocator allocator;
if (AgentConfig->GetIOParserActorAllocateStorageEnabled() &&
AgentConfig->GetBackend() == NProto::DISK_AGENT_BACKEND_AIO)
{
allocator = [](ui64 byteCount)
{
return std::shared_ptr<char>(
static_cast<char*>(
std::aligned_alloc(DefaultBlockSize, byteCount)),
std::free);
};
}

IOParserActors.reserve(count);
for (ui32 i = 0; i != count; ++i) {
auto actor = NDiskAgent::CreateIORequestParserActor(ctx.SelfID);
auto actor =
NDiskAgent::CreateIORequestParserActor(ctx.SelfID, allocator);

IOParserActors.push_back(ctx.Register(
actor.release(),
TMailboxType::TinyReadAsFilled,
Expand Down
78 changes: 76 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

ui64 GetVolumeRequestId(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
return NStorage::GetVolumeRequestId(request.Record);
}

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
if (!request.StorageSize) {
return NStorage::BuildRequestBlockRange(request.Record);
}

Y_ABORT_UNLESS(request.StorageSize % request.Record.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
request.StorageSize / request.Record.GetBlockSize());
}

////////////////////////////////////////////////////////////////////////////////

template <typename T>
constexpr bool IsWriteDeviceMethod =
std::is_same_v<T, TEvDiskAgent::TWriteDeviceBlocksMethod> ||
Expand Down Expand Up @@ -113,10 +135,10 @@ std::pair<ui32, TString> HandleException(

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void TDiskAgentActor::PerformIO(
const TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation)
{
auto* msg = ev->Get();
Expand Down Expand Up @@ -327,6 +349,58 @@ void TDiskAgentActor::HandleWriteDeviceBlocks(
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
}

void TDiskAgentActor::HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
{
BLOCKSTORE_DISK_AGENT_COUNTER(WriteDeviceBlocks);

using TMethod = TEvDiskAgent::TWriteDeviceBlocksMethod;

if (CheckIntersection<TMethod>(ctx, ev)) {
return;
}

auto* msg = ev->Get();

if (!msg->Storage) {
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
return;
}

// Attach storage to NProto::TWriteBlocksRequest
struct TWriteBlocksRequestWithStorage
: NProto::TWriteBlocksRequest
{
TStorageBuffer Storage;
};

PerformIO<TMethod>(
ctx,
ev,
[storage = std::move(msg->Storage), storageSize = msg->StorageSize](
TDiskAgentState& self,
TInstant now,
NProto::TWriteDeviceBlocksRequest request) mutable
{
auto writeRequest =
std::make_shared<TWriteBlocksRequestWithStorage>();
writeRequest->MutableHeaders()->Swap(request.MutableHeaders());
writeRequest->MutableBlocks()->Swap(request.MutableBlocks());
writeRequest->SetStartIndex(request.GetStartIndex());
writeRequest->Storage = std::move(storage);

TStringBuf buffer{writeRequest->Storage.get(), storageSize};

return self.WriteBlocks(
now,
request.GetDeviceUUID(),
std::move(writeRequest),
request.GetBlockSize(),
buffer);
});
}

void TDiskAgentActor::HandleZeroDeviceBlocks(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4847,6 +4847,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest)
auto config = DiskAgentConfig({deviceId});
config.SetIOParserActorCount(4);
config.SetOffloadAllIORequestsParsingEnabled(true);
config.SetIOParserActorAllocateStorageEnabled(true);

return config;
}();
Expand Down
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ struct TEvDiskAgentPrivate
struct TCancelSuspensionRequest
{};

//
// ParsedWriteDeviceBlocksRequest
//

struct TParsedWriteDeviceBlocksRequest
{
NProto::TWriteDeviceBlocksRequest Record;
TStorageBuffer Storage;
ui64 StorageSize = 0;
};

//
// Events declaration
//
Expand Down Expand Up @@ -207,6 +218,10 @@ struct TEvDiskAgentPrivate
TCancelSuspensionRequest,
EvCancelSuspensionRequest>;

using TEvParsedWriteDeviceBlocksRequest = TRequestEvent<
TParsedWriteDeviceBlocksRequest,
EvParsedWriteDeviceBlocksRequest>;

BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache)
};

Expand Down
Loading
Loading