Skip to content

Commit

Permalink
issue-2277: move copying of the request buffer to TIORequestParserActor
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpeye committed Jan 29, 2025
1 parent 8dd2375 commit 86a0da1
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
STFUNC(StateWork)
{
switch (ev->GetTypeRewrite()) {
HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);

case TEvDiskAgent::EvWriteDeviceBlocksRequest:
HandleRequest<TEvDiskAgent::TEvWriteDeviceBlocksRequest>(
ev,
TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest);
HandleWriteDeviceBlocks(ev);
break;

case TEvDiskAgent::EvReadDeviceBlocksRequest:
Expand Down Expand Up @@ -69,6 +67,48 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
Die(ctx);
}

void HandleWriteDeviceBlocks(TAutoPtr<IEventHandle>& ev)
{
auto request = std::make_unique<
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest>();

// parse protobuf
auto* msg = ev->Get<TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
request->Record.Swap(&msg->Record);

ui64 bytesCount = 0;
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
bytesCount += buffer.size();
}

request->Storage.reset(
static_cast<char*>(
std::aligned_alloc(request->Record.GetBlockSize(), bytesCount)),
std::free);

char* dst = request->Storage.get();
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
std::memcpy(dst, buffer.data(), buffer.size());
dst += buffer.size();
}

request->ByteCount = bytesCount;
request->Record.ClearBlocks();

auto newEv = std::make_unique<IEventHandle>(
ev->Recipient,
ev->Sender,
request.release(),
ev->Flags,
ev->Cookie,
nullptr, // forwardOnNondelivery
std::move(ev->TraceId));

newEv->Rewrite(newEv->Type, Owner);

ActorContext().Send(std::move(newEv));
}

template <typename TRequest>
void HandleRequest(TAutoPtr<IEventHandle>& ev, ui32 typeRewrite)
{
Expand Down
10 changes: 3 additions & 7 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,9 @@ STFUNC(TDiskAgentActor::StateWork)
TEvDiskAgent::TEvDisableConcreteAgentRequest,
HandleDisableConcreteAgent);

case TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest:
HandleWriteDeviceBlocks(
*reinterpret_cast<
typename TEvDiskAgent::TEvWriteDeviceBlocksRequest::TPtr*>(
&ev),
ActorContext());
break;
HFunc(
TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest,
HandleParsedWriteDeviceBlocks);

case TEvDiskAgentPrivate::EvParsedReadDeviceBlocksRequest:
HandleReadDeviceBlocks(
Expand Down
8 changes: 6 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ class TDiskAgentActor final

void SendRegisterRequest(const NActors::TActorContext& ctx);

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void PerformIO(
const NActors::TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation);

template <typename TMethod, typename TRequestPtr>
Expand Down Expand Up @@ -225,6 +225,10 @@ class TDiskAgentActor final
const TEvDiskAgentPrivate::TEvCancelSuspensionRequest::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const NActors::TActorContext& ctx);

bool HandleRequests(STFUNC_SIG);
bool RejectRequests(STFUNC_SIG);

Expand Down
68 changes: 66 additions & 2 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

ui64 GetVolumeRequestId(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
return request.Record.GetVolumeRequestId();
}

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
Y_ABORT_UNLESS(request.ByteCount % request.Record.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
request.ByteCount / request.Record.GetBlockSize());
}

////////////////////////////////////////////////////////////////////////////////

template <typename T>
constexpr bool IsWriteDeviceMethod =
std::is_same_v<T, TEvDiskAgent::TWriteDeviceBlocksMethod> ||
Expand Down Expand Up @@ -113,10 +131,10 @@ std::pair<ui32, TString> HandleException(

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod, typename TOp>
template <typename TMethod, typename TEv, typename TOp>
void TDiskAgentActor::PerformIO(
const TActorContext& ctx,
const typename TMethod::TRequest::TPtr& ev,
const TEv& ev,
TOp operation)
{
auto* msg = ev->Get();
Expand Down Expand Up @@ -327,6 +345,52 @@ void TDiskAgentActor::HandleWriteDeviceBlocks(
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
}

void TDiskAgentActor::HandleParsedWriteDeviceBlocks(
const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
{
BLOCKSTORE_DISK_AGENT_COUNTER(WriteDeviceBlocks);

using TMethod = TEvDiskAgent::TWriteDeviceBlocksMethod;

if (CheckIntersection<TMethod>(ctx, ev)) {
return;
}

// Attach storage to NProto::TWriteBlocksRequest
struct TWriteBlocksRequestWithStorage
: NProto::TWriteBlocksRequest
{
TStorageBuffer Storage;
};

auto* msg = ev->Get();

PerformIO<TMethod>(
ctx,
ev,
[storage = std::move(msg->Storage), byteCount = msg->ByteCount](
TDiskAgentState& self,
TInstant now,
NProto::TWriteDeviceBlocksRequest request) mutable
{
auto writeRequest =
std::make_shared<TWriteBlocksRequestWithStorage>();
writeRequest->Storage = std::move(storage);
writeRequest->MutableHeaders()->Swap(request.MutableHeaders());
writeRequest->SetStartIndex(request.GetStartIndex());

TStringBuf buffer {writeRequest->Storage.get(), byteCount};

return self.WriteBlocks(
now,
request.GetDeviceUUID(),
std::move(writeRequest),
request.GetBlockSize(),
buffer);
});
}

void TDiskAgentActor::HandleZeroDeviceBlocks(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest::TPtr& ev,
const TActorContext& ctx)
Expand Down
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ struct TEvDiskAgentPrivate
struct TCancelSuspensionRequest
{};

//
// ParsedWriteDeviceBlocksRequest
//

struct TParsedWriteDeviceBlocksRequest
{
NProto::TWriteDeviceBlocksRequest Record;
TStorageBuffer Storage;
ui64 ByteCount = 0;
};

//
// Events declaration
//
Expand Down Expand Up @@ -207,6 +218,10 @@ struct TEvDiskAgentPrivate
TCancelSuspensionRequest,
EvCancelSuspensionRequest>;

using TEvParsedWriteDeviceBlocksRequest = TRequestEvent<
TParsedWriteDeviceBlocksRequest,
EvParsedWriteDeviceBlocksRequest>;

BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache)
};

Expand Down
46 changes: 30 additions & 16 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,35 +645,49 @@ TFuture<NProto::TWriteDeviceBlocksResponse> TDiskAgentState::Write(
TInstant now,
NProto::TWriteDeviceBlocksRequest request)
{
CheckIfDeviceIsDisabled(
request.GetDeviceUUID(),
request.GetHeaders().GetClientId());

const auto& device = GetDeviceState(
request.GetDeviceUUID(),
request.GetHeaders().GetClientId(),
NProto::VOLUME_ACCESS_READ_WRITE);

auto writeRequest = std::make_shared<NProto::TWriteBlocksRequest>();
writeRequest->MutableHeaders()->CopyFrom(request.GetHeaders());
writeRequest->SetStartIndex(request.GetStartIndex());
writeRequest->MutableBlocks()->Swap(request.MutableBlocks());

WriteProfileLog(
return WriteBlocks(
now,
request.GetDeviceUUID(),
*writeRequest,
std::move(writeRequest),
request.GetBlockSize(),
ESysRequestType::WriteDeviceBlocks
{} // buffer
);
}

TFuture<NProto::TWriteDeviceBlocksResponse> TDiskAgentState::WriteBlocks(
TInstant now,
const TString& deviceUUID,
std::shared_ptr<NProto::TWriteBlocksRequest> request,
ui32 blockSize,
TStringBuf buffer)
{
CheckIfDeviceIsDisabled(
deviceUUID,
request->GetHeaders().GetClientId());

const auto& device = GetDeviceState(
deviceUUID,
request->GetHeaders().GetClientId(),
NProto::VOLUME_ACCESS_READ_WRITE);

WriteProfileLog(
now,
deviceUUID,
*request,
blockSize,
ESysRequestType::WriteDeviceBlocks);

auto result = device.StorageAdapter->WriteBlocks(
now,
MakeIntrusive<TCallContext>(),
std::move(writeRequest),
request.GetBlockSize(),
{} // no data buffer
);
std::move(request),
blockSize,
buffer);

return result.Apply(
[] (const auto& future) {
Expand Down
7 changes: 7 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ class TDiskAgentState
TInstant now,
NProto::TWriteDeviceBlocksRequest request);

NThreading::TFuture<NProto::TWriteDeviceBlocksResponse> WriteBlocks(
TInstant now,
const TString& deviceUUID,
std::shared_ptr<NProto::TWriteBlocksRequest> request,
ui32 blockSize,
TStringBuf buffer);

NThreading::TFuture<NProto::TZeroDeviceBlocksResponse> WriteZeroes(
TInstant now,
NProto::TZeroDeviceBlocksRequest request);
Expand Down
10 changes: 7 additions & 3 deletions cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct THashTableStorage final
return MakeFuture(std::move(response));
}

auto sglist = guard.Get();
const auto& sglist = guard.Get();
auto b = request->GetStartIndex();
auto e = request->GetStartIndex() + request->BlocksCount;

Expand All @@ -120,11 +120,15 @@ struct THashTableStorage final
return MakeFuture(std::move(response));
}

while (b < e) {
Blocks[b] = sglist[b - request->GetStartIndex()].AsStringBuf();
TSgList dst(request->BlocksCount);

while (b < e) {
auto& block = Blocks[b];
block.resize(request->BlockSize);
dst[b - request->GetStartIndex()] = {block.data(), block.size()};
++b;
}
SgListCopy(sglist, dst);

return MakeFuture(std::move(response));
}
Expand Down

0 comments on commit 86a0da1

Please sign in to comment.