diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index 60be1fe613..7f02780c3e 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -1113,4 +1113,11 @@ message TStorageServiceConfig // Timeout for TDestroyVolumeActor (in milliseconds) optional uint32 DestroyVolumeTimeout = 405; + + // Minimum compaction data size (in bytes) that lets us write the data to + // blobstorage (as a merged blob) else we will write it to mixed channel. + optional uint32 CompactionBlobThreshold = 406; + + // Overrides CompactionBlobThreshold for SSD volumes. + optional uint32 CompactionBlobThresholdSSD = 407; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index bf5ebda950..2d723ca9f5 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -531,6 +531,8 @@ TDuration MSeconds(ui32 value) xxx(CalculateSplittedUsedQuotaMetric, bool, false )\ \ xxx(DestroyVolumeTimeout, TDuration, Seconds(30) )\ + xxx(CompactionBlobThreshold, ui32, 1 )\ + xxx(CompactionBlobThresholdSSD, ui32, 1 )\ // BLOCKSTORE_STORAGE_CONFIG_RW #define BLOCKSTORE_STORAGE_CONFIG(xxx) \ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index bf48eab356..e34fc951bb 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -635,6 +635,9 @@ class TStorageConfig bool GetAutomaticallyEnableBufferCopyingAfterChecksumMismatch() const; [[nodiscard]] bool GetNonReplicatedVolumeDirectAcquireEnabled() const; [[nodiscard]] TDuration GetDestroyVolumeTimeout() const; + + ui32 GetCompactionBlobThreshold() const; + ui32 GetCompactionBlobThresholdSSD() const; }; ui64 GetAllocationUnit( diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.cpp b/cloud/blockstore/libs/storage/core/proto_helpers.cpp index c56b9c4e04..09a6a618ee 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.cpp +++ b/cloud/blockstore/libs/storage/core/proto_helpers.cpp @@ -287,6 +287,17 @@ ui32 GetWriteBlobThreshold( return config.GetWriteBlobThreshold(); } +ui32 GetCompactionBlobThreshold( + const TStorageConfig& config, + const NCloud::NProto::EStorageMediaKind mediaKind) +{ + if (mediaKind == NCloud::NProto::STORAGE_MEDIA_SSD) { + return config.GetCompactionBlobThresholdSSD(); + } + + return config.GetCompactionBlobThreshold(); +} + bool CompareVolumeConfigs( const NKikimrBlockStore::TVolumeConfig& prevConfig, const NKikimrBlockStore::TVolumeConfig& newConfig) diff --git a/cloud/blockstore/libs/storage/core/proto_helpers.h b/cloud/blockstore/libs/storage/core/proto_helpers.h index 86c4f5f5e5..45fc4f44fe 100644 --- a/cloud/blockstore/libs/storage/core/proto_helpers.h +++ b/cloud/blockstore/libs/storage/core/proto_helpers.h @@ -152,6 +152,10 @@ ui32 GetWriteBlobThreshold( const TStorageConfig& config, const NCloud::NProto::EStorageMediaKind mediaKind); +ui32 GetCompactionBlobThreshold( + const TStorageConfig& config, + const NCloud::NProto::EStorageMediaKind mediaKind); + inline bool RequiresCheckpointSupport(const NProto::TReadBlocksRequest& request) { return !request.GetCheckpointId().empty(); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 7b07df8745..b66a19a551 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ struct TRangeCompactionInfo const ui32 BlobsSkippedByCompaction; const ui32 BlocksSkippedByCompaction; const TVector BlockChecksums; + const EChannelDataKind ChannelDataKind; TGuardedBuffer BlobContent; TVector ZeroBlocks; @@ -59,6 +61,7 @@ struct TRangeCompactionInfo ui32 blobsSkippedByCompaction, ui32 blocksSkippedByCompaction, TVector blockChecksums, + EChannelDataKind channelDataKind, TBlockBuffer blobContent, TVector zeroBlocks, TAffectedBlobs affectedBlobs, @@ -72,6 +75,7 @@ struct TRangeCompactionInfo , BlobsSkippedByCompaction(blobsSkippedByCompaction) , BlocksSkippedByCompaction(blocksSkippedByCompaction) , BlockChecksums(std::move(blockChecksums)) + , ChannelDataKind(channelDataKind) , BlobContent(std::move(blobContent)) , ZeroBlocks(std::move(zeroBlocks)) , AffectedBlobs(std::move(affectedBlobs)) @@ -549,6 +553,7 @@ void TCompactionActor::WriteBlobs(const TActorContext& ctx) void TCompactionActor::AddBlobs(const TActorContext& ctx) { + TVector mixedBlobs; TVector mergedBlobs; TVector blobCompactionInfos; TAffectedBlobs affectedBlobs; @@ -560,7 +565,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx) TBlockMask skipMask, const TVector& blockChecksums, ui32 blobsSkipped, - ui32 blocksSkipped) + ui32 blocksSkipped, + EChannelDataKind channelDataKind) { while (skipMask.Get(range.End - range.Start)) { Y_ABORT_UNLESS(range.End > range.Start); @@ -571,9 +577,25 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx) --range.End; } - mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums); - - blobCompactionInfos.push_back({blobsSkipped, blocksSkipped}); + if (channelDataKind == EChannelDataKind::Merged) { + mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums); + blobCompactionInfos.push_back({blobsSkipped, blocksSkipped}); + } else if (channelDataKind == EChannelDataKind::Mixed) { + TVector blockIndecies(Reserve(range.Size())); + for (auto blockIndex = range.Start; blockIndex <= range.End; + ++blockIndex) + { + blockIndecies.emplace_back(blockIndex); + } + mixedBlobs.emplace_back(blobId, blockIndecies, blockChecksums); + } else { + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "[%lu] unexpected channel data kind %u", + TabletId, + static_cast(channelDataKind)); + } }; for (auto& rc: RangeCompactionInfos) { @@ -584,7 +606,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx) rc.DataBlobSkipMask, rc.BlockChecksums, rc.BlobsSkippedByCompaction, - rc.BlocksSkippedByCompaction); + rc.BlocksSkippedByCompaction, + rc.ChannelDataKind); } if (rc.ZeroBlobId) { @@ -602,7 +625,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx) rc.ZeroBlobSkipMask, rc.BlockChecksums, blobsSkipped, - blocksSkipped); + blocksSkipped, + rc.ChannelDataKind); } if (rc.DataBlobId && rc.ZeroBlobId) { @@ -664,7 +688,7 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx) auto request = std::make_unique( RequestInfo->CallContext, CommitId, - TVector(), + std::move(mixedBlobs), std::move(mergedBlobs), TVector(), ADD_COMPACTION_RESULT, @@ -1481,9 +1505,7 @@ namespace { void PrepareRangeCompaction( const TStorageConfig& config, - const TString& cloudId, - const TString& folderId, - const TString& diskId, + const bool incrementalCompactionEnabled, const ui64 commitId, const bool fullCompaction, const TActorContext& ctx, @@ -1494,19 +1516,10 @@ void PrepareRangeCompaction( TPartitionState& state, TTxPartition::TRangeCompaction& args) { - const bool incrementalCompactionEnabledForCloud = - config.IsIncrementalCompactionFeatureEnabled(cloudId, folderId, diskId); - const bool incrementalCompactionEnabled = - config.GetIncrementalCompactionEnabled() - || incrementalCompactionEnabledForCloud; - TCompactionBlockVisitor visitor(args, commitId); state.FindFreshBlocks(visitor, args.BlockRange, commitId); visitor.KeepTrackOfAffectedBlocks = true; - ready &= state.FindMixedBlocksForCompaction( - db, - visitor, - args.RangeIdx); + ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx); visitor.KeepTrackOfAffectedBlocks = false; ready &= db.FindMergedBlocks( visitor, @@ -1528,10 +1541,7 @@ void PrepareRangeCompaction( } } - if (ready - && incrementalCompactionEnabled - && !fullCompaction) - { + if (ready && incrementalCompactionEnabled && !fullCompaction) { THashMap liveBlocks; for (const auto& m: args.BlockMarks) { if (m.CommitId && m.BlobId) { @@ -1546,12 +1556,9 @@ void PrepareRangeCompaction( } Sort( - blobIds.begin(), - blobIds.end(), - [&] (const TPartialBlobId& l, const TPartialBlobId& r) { - return liveBlocks[l] < liveBlocks[r]; - } - ); + blobIds, + [&](const TPartialBlobId& l, const TPartialBlobId& r) + { return liveBlocks[l] < liveBlocks[r]; }); auto it = blobIds.begin(); args.BlobsSkipped = blobIds.size(); @@ -1559,8 +1566,9 @@ void PrepareRangeCompaction( while (it != blobIds.end()) { const auto bytes = blocks * state.GetBlockSize(); - const auto blobCountOk = args.BlobsSkipped - <= config.GetMaxSkippedBlobsDuringCompaction(); + const auto blobCountOk = + args.BlobsSkipped <= + config.GetMaxSkippedBlobsDuringCompaction(); const auto byteCountOk = bytes >= config.GetTargetCompactionBytesPerOp(); @@ -1608,6 +1616,7 @@ void PrepareRangeCompaction( } if (liveBlocks.size()) { + // TODO: need make UTs TAffectedBlocks affectedBlocks; for (const auto& b: args.AffectedBlocks) { if (!skippedBlockIndices.contains(b.BlockIndex)) { @@ -1652,6 +1661,7 @@ void PrepareRangeCompaction( void CompleteRangeCompaction( const bool blobPatchingEnabled, + const ui32 compactionThreshold, const ui64 commitId, TTabletStorageInfo& tabletStorageInfo, TPartitionState& state, @@ -1668,7 +1678,7 @@ void CompleteRangeCompaction( size_t dataBlocksCount = 0, zeroBlocksCount = 0; for (const auto& mark: args.BlockMarks) { if (mark.CommitId) { - // there could be fresh block OR merged/mixed block + // there could be fresh block (BlockContent not empty) OR merged/mixed block (deletion mark is not set) Y_ABORT_UNLESS(!(mark.BlockContent && !IsDeletionMarker(mark.BlobId))); if (mark.BlockContent || !IsDeletionMarker(mark.BlobId)) { ++dataBlocksCount; @@ -1682,6 +1692,7 @@ void CompleteRangeCompaction( TPartialBlobId dataBlobId, zeroBlobId; TBlockMask dataBlobSkipMask, zeroBlobSkipMask; + auto channelDataKind = EChannelDataKind::Merged; if (dataBlocksCount) { ui32 skipped = 0; for (const auto& mark: args.BlockMarks) { @@ -1690,11 +1701,15 @@ void CompleteRangeCompaction( } } + const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize(); + if (blobSize < compactionThreshold) { + channelDataKind = EChannelDataKind::Mixed; + } dataBlobId = state.GenerateBlobId( - EChannelDataKind::Merged, + channelDataKind, compactionPermissions, commitId, - (args.BlockRange.Size() - skipped) * state.GetBlockSize(), + blobSize, result.size()); } @@ -1706,7 +1721,7 @@ void CompleteRangeCompaction( // compaction range but for the last actual block that's referenced by // the corresponding blob zeroBlobId = state.GenerateBlobId( - EChannelDataKind::Merged, + channelDataKind, compactionPermissions, commitId, 0, @@ -1789,14 +1804,15 @@ void CompleteRangeCompaction( zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); } - if (!patchingCandidate - && blobPatchingEnabled - && mark.BlobId.BlobSize() == dataBlobId.BlobSize()) - { - patchingCandidate = mark.BlobId; - ++patchingCandidateChangedBlockCount; - } else if (patchingCandidate == mark.BlobId) { - ++patchingCandidateChangedBlockCount; + if (blobPatchingEnabled) { + if (!patchingCandidate && + mark.BlobId.BlobSize() == dataBlobId.BlobSize()) + { + patchingCandidate = mark.BlobId; + ++patchingCandidateChangedBlockCount; + } else if (patchingCandidate == mark.BlobId) { + ++patchingCandidateChangedBlockCount; + } } } else { dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); @@ -1877,6 +1893,7 @@ void CompleteRangeCompaction( args.BlobsSkipped, args.BlocksSkipped, std::move(blockChecksums), + channelDataKind, std::move(blobContent), std::move(zeroBlocks), std::move(args.AffectedBlobs), @@ -1901,6 +1918,15 @@ bool TPartitionActor::PrepareCompaction( TRequestScope timer(*args.RequestInfo); TPartitionDatabase db(tx.DB); + const bool incrementalCompactionEnabled = + Config->GetIncrementalCompactionEnabled() || + Config->IsIncrementalCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool fullCompaction = + args.CompactionOptions.test(ToBit(ECompactionOption::Full)); + bool ready = true; THashSet affectedBlobIds; @@ -1908,11 +1934,9 @@ bool TPartitionActor::PrepareCompaction( for (auto& rangeCompaction: args.RangeCompactions) { PrepareRangeCompaction( *Config, - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId(), + incrementalCompactionEnabled, args.CommitId, - args.CompactionOptions.test(ToBit(ECompactionOption::Full)), + fullCompaction, ctx, TabletID(), affectedBlobIds, @@ -1967,6 +1991,9 @@ void TPartitionActor::CompleteCompaction( CompleteRangeCompaction( blobPatchingEnabled, + GetCompactionBlobThreshold( + *Config, + PartitionConfig.GetStorageMediaKind()), args.CommitId, *Info(), *State, diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index e2c97ce07b..f266ea2909 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -11541,6 +11541,173 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // checking that drain-related counters are in a consistent state partition.Drain(); } + + Y_UNIT_TEST(ShouldAutomaticallyRunCompactionToMixed) + { + static constexpr ui32 compactionThreshold = 4; + + auto config = DefaultConfig(); + config.SetSSDMaxBlobsPerRange(compactionThreshold); + config.SetHDDMaxBlobsPerRange(compactionThreshold); + config.SetCompactionBlobThreshold(17_KB); + config.SetCompactionBlobThresholdSSD(17_KB); + + auto runtime = PrepareTestActorRuntime(config); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (size_t i = 1; i < compactionThreshold; ++i) { + partition.WriteBlocks(i, i); + partition.Flush(); + } + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL( + compactionThreshold - 1, + stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL( + compactionThreshold - 1, + stats.GetMixedBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + + partition.WriteBlocks(0, 0); + partition.Flush(); + + // wait for background operations completion + runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1)); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL( + compactionThreshold + 1, + stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL( + compactionThreshold * 2, + stats.GetMixedBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + + partition.Cleanup(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL( + compactionThreshold, + stats.GetMixedBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + } + + Y_UNIT_TEST(CompactionShouldWriteToMixedInCaseThreshold) + { + auto config = DefaultConfig(); + config.SetCompactionBlobThreshold(17_KB); + config.SetCompactionBlobThresholdSSD(17_KB); + + auto runtime = PrepareTestActorRuntime(config); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + partition.WriteBlocks(1, 1); + partition.Flush(); + partition.WriteBlocks(2, 2); + partition.Flush(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(2, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + + partition.WriteBlocks(0, 0); + + // compaction with data size less threshold + partition.Compaction(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(3, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + + partition.WriteBlocks(TBlockRange32::WithLength(3, 5), 3); + partition.Flush(); + + // compaction with data size greater threshold + partition.Compaction(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(4, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetMergedBlobsCount()); + } + + partition.Cleanup(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetMergedBlobsCount()); + } + } + + Y_UNIT_TEST(CompactionShouldMoveDataFromMergedToMixedInCaseThreshold) + { + auto config = DefaultConfig(); + config.SetWriteBlobThreshold(3_KB); + config.SetWriteBlobThresholdSSD(3_KB); + config.SetCompactionBlobThreshold(17_KB); + config.SetCompactionBlobThresholdSSD(17_KB); + + auto runtime = PrepareTestActorRuntime(config); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (int i = 0; i < 4; ++i) { + partition.WriteBlocks(i, i); + } + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetFreshBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(4, stats.GetMergedBlobsCount()); + } + + partition.Compaction(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetFreshBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(4, stats.GetMergedBlobsCount()); + } + + partition.Cleanup(); + + { + auto response = partition.StatPartition(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetFreshBlocksCount()); + UNIT_ASSERT_VALUES_EQUAL(1, stats.GetMixedBlobsCount()); + UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount()); + } + } } } // namespace NCloud::NBlockStore::NStorage::NPartition