Skip to content

Commit

Permalink
issue-1814: passing StorageStatusFlags for written blobs from Storage…
Browse files Browse the repository at this point in the history
…ServiceActor to tablets via AddData requests (#1815)
  • Loading branch information
qkrorlqr authored Aug 19, 2024
1 parent 206ad86 commit 0bcc822
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 26 deletions.
10 changes: 10 additions & 0 deletions cloud/filestore/libs/storage/service/service_actor_writedata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
// in flight
TMaybe<TInFlightRequest> InFlightRequest;
TVector<std::unique_ptr<TInFlightRequest>> InFlightBSRequests;
TVector<ui32> StorageStatusFlags;
const NCloud::NProto::EStorageMediaKind MediaKind;

public:
Expand Down Expand Up @@ -173,6 +174,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>

RequestInfo->CallContext->RequestType = EFileStoreRequest::WriteBlob;
InFlightBSRequests.reserve(RemainingBlobsToWrite);
StorageStatusFlags.resize(GenerateBlobIdsResponse.BlobsSize());
for (const auto& blob: GenerateBlobIdsResponse.GetBlobs()) {
NKikimr::TLogoBlobID blobId =
LogoBlobIDFromLogoBlobID(blob.GetBlobId());
Expand Down Expand Up @@ -249,11 +251,14 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
ui64 blobIdx = msg->Id.Cookie();
// It is implicitly expected that cookies are generated in increasing
// order starting from 0.
// TODO: replace this TABLET_VERIFY with a critical event + WriteData
// fallback
TABLET_VERIFY(
blobIdx < InFlightBSRequests.size() &&
InFlightBSRequests[blobIdx] &&
!InFlightBSRequests[blobIdx]->IsCompleted());
InFlightBSRequests[blobIdx]->Complete(ctx.Now(), {});
StorageStatusFlags[blobIdx] = msg->StatusFlags.Raw;

--RemainingBlobsToWrite;
if (RemainingBlobsToWrite == 0) {
Expand All @@ -277,6 +282,11 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
request->Record.AddBlobIds()->Swap(blob.MutableBlobId());
}
request->Record.SetCommitId(GenerateBlobIdsResponse.GetCommitId());
request->Record.MutableStorageStatusFlags()->Reserve(
StorageStatusFlags.size());
for (const auto flags: StorageStatusFlags) {
request->Record.AddStorageStatusFlags(flags);
}

if (Range.Offset < BlobRange.Offset) {
auto& unalignedHead = *request->Record.AddUnalignedDataRanges();
Expand Down
64 changes: 64 additions & 0 deletions cloud/filestore/libs/storage/service/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2608,6 +2608,70 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
// clang-format on
}

Y_UNIT_TEST(ShouldSendBSGroupFlagsToTabletViaAddDataRequests)
{
TTestEnv env;
env.CreateSubDomain("nfs");

ui32 nodeIdx = env.CreateNode("nfs");

TServiceClient service(env.GetRuntime(), nodeIdx);
const TString fs = "test";
service.CreateFileStore(fs, 1000);

{
NProto::TStorageConfig newConfig;
newConfig.SetThreeStageWriteEnabled(true);
const auto response =
ExecuteChangeStorageConfig(std::move(newConfig), service);
env.GetRuntime().DispatchEvents({}, TDuration::Seconds(1));
}

auto headers = service.InitSession(fs, "client");
ui64 nodeId = service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle = service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();

const auto yellowFlag =
NKikimrBlobStorage::EStatusFlags::StatusDiskSpaceYellowStop;

NProtoPrivate::TAddDataRequest addData;
using TFlags = NKikimr::TStorageStatusFlags;
env.GetRuntime().SetEventFilter(
[&](auto& runtime, auto& event)
{
Y_UNUSED(runtime);

switch (event->GetTypeRewrite()) {
case TEvBlobStorage::EvPutResult: {
auto* msg =
event->template Get<TEvBlobStorage::TEvPutResult>();
const_cast<TFlags&>(msg->StatusFlags).Raw |=
ui32(yellowFlag);
break;
}

case TEvIndexTablet::EvAddDataRequest: {
addData = event->template Get<
TEvIndexTablet::TEvAddDataRequest>()->Record;
break;
}
}
return false;
});

TString data = GenerateValidateData(256_KB);
service.WriteData(headers, fs, nodeId, handle, 0, data);
UNIT_ASSERT_VALUES_EQUAL(1, addData.BlobIdsSize());
UNIT_ASSERT_VALUES_EQUAL(1, addData.StorageStatusFlagsSize());
UNIT_ASSERT(NKikimr::TStorageStatusFlags(
addData.GetStorageStatusFlags(0)).Check(yellowFlag));
}

void ConfigureFollowers(
TServiceClient& service,
const TString& fsId,
Expand Down
5 changes: 5 additions & 0 deletions cloud/filestore/libs/storage/tablet/tablet_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ class TIndexTabletActor final
void DefaultSignalTabletActive(const NActors::TActorContext& ctx) override;
void OnActivateExecutor(const NActors::TActorContext& ctx) override;
bool ReassignChannelsEnabled() const override;
void RegisterEvPutResult(
const NActors::TActorContext& ctx,
ui32 generation,
ui32 channel,
const NKikimr::TStorageStatusFlags flags);
void ReassignDataChannelsIfNeeded(const NActors::TActorContext& ctx);
bool OnRenderAppHtmlPage(
NActors::NMon::TEvRemoteHttpInfo::TPtr ev,
Expand Down
10 changes: 10 additions & 0 deletions cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,16 @@ void TIndexTabletActor::HandleAddData(
blobIds.size(),
unalignedMsg().c_str());

const auto evPutResultCount =
Min<ui32>(blobIds.size(), msg->Record.StorageStatusFlagsSize());
for (ui32 i = 0; i < evPutResultCount; ++i) {
RegisterEvPutResult(
ctx,
blobIds[i].Generation(),
blobIds[i].Channel(),
msg->Record.GetStorageStatusFlags(i));
}

AddTransaction<TEvIndexTablet::TAddDataMethod>(*requestInfo);

ExecuteTx<TAddData>(
Expand Down
64 changes: 38 additions & 26 deletions cloud/filestore/libs/storage/tablet/tablet_actor_writeblob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,39 @@ void TIndexTabletActor::HandleWriteBlob(
WorkerActors.insert(actorId);
}

void TIndexTabletActor::RegisterEvPutResult(
const TActorContext& ctx,
ui32 generation,
ui32 channel,
const NKikimr::TStorageStatusFlags flags)
{
const auto validFlag = NKikimrBlobStorage::EStatusFlags::StatusIsValid;
if (flags.Check(validFlag)) {
ui32 group = Info()->GroupFor(channel, generation);

if (flags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
LOG_WARN(ctx, TFileStoreComponents::TABLET,
"%s Yellow move flag received for channel %u and group %u",
LogTag.c_str(),
channel,
group);

RegisterChannelToMove(channel);
}
if (flags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) {
LOG_WARN(ctx, TFileStoreComponents::TABLET,
"%s Yellow stop flag received for channel %u and group %u",
LogTag.c_str(),
channel,
group);

RegisterUnwritableChannel(channel);
}

ReassignDataChannelsIfNeeded(ctx);
}
}

void TIndexTabletActor::HandleWriteBlobCompleted(
const TEvIndexTabletPrivate::TEvWriteBlobCompleted::TPtr& ev,
const TActorContext& ctx)
Expand All @@ -375,33 +408,12 @@ void TIndexTabletActor::HandleWriteBlobCompleted(

Metrics.WriteBlob.Update(msg->Count, msg->Size, msg->Time);

const auto validFlag = NKikimrBlobStorage::EStatusFlags::StatusIsValid;
for (const auto& result: msg->Results) {
if (result.StorageStatusFlags.Check(validFlag)) {
ui32 channel = result.BlobId.Channel();
ui32 group = Info()->GroupFor(channel, result.BlobId.Generation());

if (result.StorageStatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
LOG_WARN(ctx, TFileStoreComponents::TABLET,
"%s Yellow move flag received for channel %u and group %u",
LogTag.c_str(),
channel,
group);

RegisterChannelToMove(channel);
}
if (result.StorageStatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) {
LOG_WARN(ctx, TFileStoreComponents::TABLET,
"%s Yellow stop flag received for channel %u and group %u",
LogTag.c_str(),
channel,
group);

RegisterUnwritableChannel(channel);
}

ReassignDataChannelsIfNeeded(ctx);
}
RegisterEvPutResult(
ctx,
result.BlobId.Generation(),
result.BlobId.Channel(),
result.StorageStatusFlags);
}

if (FAILED(msg->GetStatus())) {
Expand Down
3 changes: 3 additions & 0 deletions cloud/filestore/private/api/protos/tablet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ message TAddDataRequest

// Unaligned data parts - supposed to contain unaligned head and tail.
repeated TFreshDataRange UnalignedDataRanges = 9;

// StorageStatusFlags for the written blobs.
repeated uint32 StorageStatusFlags = 10;
}

message TAddDataResponse
Expand Down

0 comments on commit 0bcc822

Please sign in to comment.