-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
42 changed files
with
2,171 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
218 changes: 218 additions & 0 deletions
218
cloud/blockstore/libs/storage/partition/part_actor_checkrange.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
#include "part_actor.h" | ||
|
||
#include <cloud/blockstore/libs/service/context.h> | ||
#include <cloud/blockstore/libs/storage/core/config.h> | ||
#include <cloud/blockstore/libs/storage/core/probes.h> | ||
|
||
#include <contrib/ydb/library/actors/core/actor_bootstrapped.h> | ||
|
||
#include <util/datetime/base.h> | ||
#include <util/generic/algorithm.h> | ||
#include <util/generic/guid.h> | ||
#include <util/generic/string.h> | ||
#include <util/generic/vector.h> | ||
#include <util/generic/xrange.h> | ||
#include <util/stream/str.h> | ||
|
||
namespace NCloud::NBlockStore::NStorage::NPartition { | ||
|
||
using namespace NActors; | ||
|
||
using namespace NKikimr; | ||
|
||
LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); | ||
|
||
namespace { | ||
|
||
//////////////////////////////////////////////////////////////////////////////// | ||
|
||
class TCheckRangeActor final: public TActorBootstrapped<TCheckRangeActor> | ||
{ | ||
private: | ||
const TActorId Tablet; | ||
const ui64 StartIndex; | ||
const ui64 BlocksCount; | ||
const TEvVolume::TEvCheckRangeRequest::TPtr Ev; | ||
|
||
public: | ||
TCheckRangeActor( | ||
const TActorId& tablet, | ||
ui64 startIndex, | ||
ui64 blocksCount, | ||
TEvVolume::TEvCheckRangeRequest::TPtr&& ev); | ||
|
||
void Bootstrap(const TActorContext& ctx); | ||
|
||
private: | ||
void ReplyAndDie( | ||
const TActorContext& ctx, | ||
const NProto::TError& error = {}); | ||
|
||
void HandleReadBlocksResponse( | ||
const TEvService::TEvReadBlocksResponse::TPtr& ev, | ||
const TActorContext& ctx); | ||
|
||
void SendReadBlocksRequest(const TActorContext& ctx); | ||
|
||
private: | ||
STFUNC(StateWork); | ||
|
||
void HandleWakeup( | ||
const TEvents::TEvWakeup::TPtr& ev, | ||
const TActorContext& ctx); | ||
|
||
void HandlePoisonPill( | ||
const TEvents::TEvPoisonPill::TPtr& ev, | ||
const TActorContext& ctx); | ||
}; | ||
|
||
//////////////////////////////////////////////////////////////////////////////// | ||
|
||
TCheckRangeActor::TCheckRangeActor( | ||
const TActorId& tablet, | ||
ui64 startIndex, | ||
ui64 blocksCount, | ||
TEvVolume::TEvCheckRangeRequest::TPtr&& ev) | ||
: Tablet(tablet) | ||
, StartIndex(startIndex) | ||
, BlocksCount(blocksCount) | ||
, Ev(std::move(ev)) | ||
{} | ||
|
||
void TCheckRangeActor::Bootstrap(const TActorContext& ctx) | ||
{ | ||
SendReadBlocksRequest(ctx); | ||
Become(&TThis::StateWork); | ||
} | ||
|
||
void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx) | ||
{ | ||
auto request = std::make_unique<TEvService::TEvReadBlocksRequest>(); | ||
|
||
request->Record.SetStartIndex(StartIndex); | ||
request->Record.SetBlocksCount(BlocksCount); | ||
|
||
auto* headers = request->Record.MutableHeaders(); | ||
|
||
headers->SetIsBackgroundRequest(true); | ||
NCloud::Send(ctx, Tablet, std::move(request)); | ||
} | ||
|
||
void TCheckRangeActor::ReplyAndDie( | ||
const TActorContext& ctx, | ||
const NProto::TError& error) | ||
{ | ||
auto response = | ||
std::make_unique<TEvVolume::TEvCheckRangeResponse>(std::move(error)); | ||
|
||
NCloud::Reply(ctx, *Ev, std::move(response)); | ||
|
||
Die(ctx); | ||
} | ||
|
||
//////////////////////////////////////////////////////////////////////////////// | ||
|
||
STFUNC(TCheckRangeActor::StateWork) | ||
{ | ||
switch (ev->GetTypeRewrite()) { | ||
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); | ||
HFunc(TEvents::TEvWakeup, HandleWakeup); | ||
HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse); | ||
default: | ||
HandleUnexpectedEvent(ev, TBlockStoreComponents::PARTITION_WORKER); | ||
break; | ||
} | ||
} | ||
|
||
void TCheckRangeActor::HandleWakeup( | ||
const TEvents::TEvWakeup::TPtr& ev, | ||
const TActorContext& ctx) | ||
{ | ||
Y_UNUSED(ev); | ||
SendReadBlocksRequest(ctx); | ||
} | ||
|
||
void TCheckRangeActor::HandlePoisonPill( | ||
const TEvents::TEvPoisonPill::TPtr& ev, | ||
const TActorContext& ctx) | ||
{ | ||
Y_UNUSED(ev); | ||
|
||
auto error = MakeError(E_REJECTED, "tablet is shutting down"); | ||
|
||
ReplyAndDie(ctx, error); | ||
} | ||
|
||
void TCheckRangeActor::HandleReadBlocksResponse( | ||
const TEvService::TEvReadBlocksResponse::TPtr& ev, | ||
const TActorContext& ctx) | ||
{ | ||
const auto* msg = ev->Get(); | ||
auto error = MakeError(S_OK); | ||
|
||
if (HasError(msg->Record.GetError())) { | ||
auto errorMessage = msg->Record.GetError().GetMessage(); | ||
LOG_ERROR( | ||
ctx, | ||
TBlockStoreComponents::VOLUME, | ||
"reading error has occurred: " + errorMessage + " message " + | ||
msg->Record.GetError().message()); | ||
auto errorCode = | ||
msg->Record.GetError().code() == E_ARGUMENT ? E_ARGUMENT : E_IO; | ||
error = MakeError(errorCode, msg->Record.GetError().GetMessage()); | ||
} | ||
|
||
ReplyAndDie(ctx, error); | ||
} | ||
|
||
} // namespace | ||
|
||
////////////////////////////////////////////////////////////////// | ||
|
||
NActors::IActorPtr TPartitionActor::CreateCheckRangeActor( | ||
NActors::TActorId tablet, | ||
ui64 startIndex, | ||
ui64 blocksCount, | ||
TEvVolume::TEvCheckRangeRequest::TPtr ev) | ||
{ | ||
return std::make_unique<NPartition::TCheckRangeActor>( | ||
std::move(tablet), | ||
startIndex, | ||
blocksCount, | ||
std::move(ev)); | ||
} | ||
|
||
void NPartition::TPartitionActor::HandleCheckRange( | ||
const TEvVolume::TEvCheckRangeRequest::TPtr& ev, | ||
const TActorContext& ctx) | ||
{ | ||
const auto* msg = ev->Get(); | ||
|
||
if (msg->Record.GetBlocksCount() > | ||
Config->GetBytesPerStripe() / State->GetBlockSize()) | ||
{ | ||
auto err = MakeError( | ||
E_ARGUMENT, | ||
"Too many blocks requested: " + | ||
std::to_string(msg->Record.GetBlocksCount()) + | ||
" Max blocks per request : " + | ||
std::to_string( | ||
Config->GetBytesPerStripe() / State->GetBlockSize())); | ||
auto response = | ||
std::make_unique<TEvVolume::TEvCheckRangeResponse>(std::move(err)); | ||
NCloud::Reply(ctx, *ev, std::move(response)); | ||
return; | ||
} | ||
|
||
const auto actorId = NCloud::Register( | ||
ctx, | ||
CreateCheckRangeActor( | ||
SelfId(), | ||
msg->Record.GetStartIndex(), | ||
msg->Record.GetBlocksCount(), | ||
std::move(ev))); | ||
|
||
Actors.Insert(actorId); | ||
} | ||
|
||
} // namespace NCloud::NBlockStore::NStorage::NPartition |
Oops, something went wrong.