Skip to content

Commit

Permalink
Add new parameters CompactionBlobThreshold and CompactionBlobThreshol…
Browse files Browse the repository at this point in the history
…dSSD.

It is thresholds in bytes, if data size less threshold it will be written
 to mixed channel and to merged channel in other case.
  • Loading branch information
agalibin committed Feb 5, 2025
1 parent 2a57ef4 commit 0a68bf1
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 48 deletions.
7 changes: 7 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1113,4 +1113,11 @@ message TStorageServiceConfig

// Timeout for TDestroyVolumeActor (in milliseconds)
optional uint32 DestroyVolumeTimeout = 405;

// Minimum compaction data size (in bytes) that lets us write the data to
// blobstorage (as a merged blob) else we will write it to mixed channel.
optional uint32 CompactionBlobThreshold = 406;

// Overrides CompactionBlobThreshold for SSD volumes.
optional uint32 CompactionBlobThresholdSSD = 407;
}
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ TDuration MSeconds(ui32 value)
xxx(CalculateSplittedUsedQuotaMetric, bool, false )\
\
xxx(DestroyVolumeTimeout, TDuration, Seconds(30) )\
xxx(CompactionBlobThreshold, ui32, 1 )\
xxx(CompactionBlobThresholdSSD, ui32, 1 )\
// BLOCKSTORE_STORAGE_CONFIG_RW

#define BLOCKSTORE_STORAGE_CONFIG(xxx) \
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@ class TStorageConfig
bool GetAutomaticallyEnableBufferCopyingAfterChecksumMismatch() const;
[[nodiscard]] bool GetNonReplicatedVolumeDirectAcquireEnabled() const;
[[nodiscard]] TDuration GetDestroyVolumeTimeout() const;

ui32 GetCompactionBlobThreshold() const;
ui32 GetCompactionBlobThresholdSSD() const;
};

ui64 GetAllocationUnit(
Expand Down
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ ui32 GetWriteBlobThreshold(
return config.GetWriteBlobThreshold();
}

ui32 GetCompactionBlobThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind)
{
if (mediaKind == NCloud::NProto::STORAGE_MEDIA_SSD) {
return config.GetCompactionBlobThresholdSSD();
}

return config.GetCompactionBlobThreshold();
}

bool CompareVolumeConfigs(
const NKikimrBlockStore::TVolumeConfig& prevConfig,
const NKikimrBlockStore::TVolumeConfig& newConfig)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ ui32 GetWriteBlobThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind);

ui32 GetCompactionBlobThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind);

inline bool RequiresCheckpointSupport(const NProto::TReadBlocksRequest& request)
{
return !request.GetCheckpointId().empty();
Expand Down
123 changes: 75 additions & 48 deletions cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/core/proto_helpers.h>

#include <cloud/storage/core/libs/common/alloc.h>
#include <cloud/storage/core/libs/common/block_buffer.h>
Expand Down Expand Up @@ -40,6 +41,7 @@ struct TRangeCompactionInfo
const ui32 BlobsSkippedByCompaction;
const ui32 BlocksSkippedByCompaction;
const TVector<ui32> BlockChecksums;
const EChannelDataKind ChannelDataKind;

TGuardedBuffer<TBlockBuffer> BlobContent;
TVector<ui32> ZeroBlocks;
Expand All @@ -59,6 +61,7 @@ struct TRangeCompactionInfo
ui32 blobsSkippedByCompaction,
ui32 blocksSkippedByCompaction,
TVector<ui32> blockChecksums,
EChannelDataKind channelDataKind,
TBlockBuffer blobContent,
TVector<ui32> zeroBlocks,
TAffectedBlobs affectedBlobs,
Expand All @@ -72,6 +75,7 @@ struct TRangeCompactionInfo
, BlobsSkippedByCompaction(blobsSkippedByCompaction)
, BlocksSkippedByCompaction(blocksSkippedByCompaction)
, BlockChecksums(std::move(blockChecksums))
, ChannelDataKind(channelDataKind)
, BlobContent(std::move(blobContent))
, ZeroBlocks(std::move(zeroBlocks))
, AffectedBlobs(std::move(affectedBlobs))
Expand Down Expand Up @@ -549,6 +553,7 @@ void TCompactionActor::WriteBlobs(const TActorContext& ctx)

void TCompactionActor::AddBlobs(const TActorContext& ctx)
{
TVector<TAddMixedBlob> mixedBlobs;
TVector<TAddMergedBlob> mergedBlobs;
TVector<TMergedBlobCompactionInfo> blobCompactionInfos;
TAffectedBlobs affectedBlobs;
Expand All @@ -560,7 +565,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
TBlockMask skipMask,
const TVector<ui32>& blockChecksums,
ui32 blobsSkipped,
ui32 blocksSkipped)
ui32 blocksSkipped,
EChannelDataKind channelDataKind)
{
while (skipMask.Get(range.End - range.Start)) {
Y_ABORT_UNLESS(range.End > range.Start);
Expand All @@ -571,9 +577,25 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
--range.End;
}

mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums);

blobCompactionInfos.push_back({blobsSkipped, blocksSkipped});
if (channelDataKind == EChannelDataKind::Merged) {
mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums);
blobCompactionInfos.push_back({blobsSkipped, blocksSkipped});
} else if (channelDataKind == EChannelDataKind::Mixed) {
TVector<ui32> blockIndecies(Reserve(range.Size()));
for (auto blockIndex = range.Start; blockIndex <= range.End;
++blockIndex)
{
blockIndecies.emplace_back(blockIndex);
}
mixedBlobs.emplace_back(blobId, blockIndecies, blockChecksums);
} else {
LOG_ERROR(
ctx,
TBlockStoreComponents::PARTITION,
"[%lu] unexpected channel data kind %u",
TabletId,
static_cast<int>(channelDataKind));
}
};

for (auto& rc: RangeCompactionInfos) {
Expand All @@ -584,7 +606,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
rc.DataBlobSkipMask,
rc.BlockChecksums,
rc.BlobsSkippedByCompaction,
rc.BlocksSkippedByCompaction);
rc.BlocksSkippedByCompaction,
rc.ChannelDataKind);
}

if (rc.ZeroBlobId) {
Expand All @@ -602,7 +625,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
rc.ZeroBlobSkipMask,
rc.BlockChecksums,
blobsSkipped,
blocksSkipped);
blocksSkipped,
rc.ChannelDataKind);
}

if (rc.DataBlobId && rc.ZeroBlobId) {
Expand Down Expand Up @@ -664,7 +688,7 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvAddBlobsRequest>(
RequestInfo->CallContext,
CommitId,
TVector<TAddMixedBlob>(),
std::move(mixedBlobs),
std::move(mergedBlobs),
TVector<TAddFreshBlob>(),
ADD_COMPACTION_RESULT,
Expand Down Expand Up @@ -1481,9 +1505,7 @@ namespace {

void PrepareRangeCompaction(
const TStorageConfig& config,
const TString& cloudId,
const TString& folderId,
const TString& diskId,
const bool incrementalCompactionEnabled,
const ui64 commitId,
const bool fullCompaction,
const TActorContext& ctx,
Expand All @@ -1494,19 +1516,10 @@ void PrepareRangeCompaction(
TPartitionState& state,
TTxPartition::TRangeCompaction& args)
{
const bool incrementalCompactionEnabledForCloud =
config.IsIncrementalCompactionFeatureEnabled(cloudId, folderId, diskId);
const bool incrementalCompactionEnabled =
config.GetIncrementalCompactionEnabled()
|| incrementalCompactionEnabledForCloud;

TCompactionBlockVisitor visitor(args, commitId);
state.FindFreshBlocks(visitor, args.BlockRange, commitId);
visitor.KeepTrackOfAffectedBlocks = true;
ready &= state.FindMixedBlocksForCompaction(
db,
visitor,
args.RangeIdx);
ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx);
visitor.KeepTrackOfAffectedBlocks = false;
ready &= db.FindMergedBlocks(
visitor,
Expand All @@ -1528,10 +1541,7 @@ void PrepareRangeCompaction(
}
}

if (ready
&& incrementalCompactionEnabled
&& !fullCompaction)
{
if (ready && incrementalCompactionEnabled && !fullCompaction) {
THashMap<TPartialBlobId, ui32, TPartialBlobIdHash> liveBlocks;
for (const auto& m: args.BlockMarks) {
if (m.CommitId && m.BlobId) {
Expand All @@ -1546,21 +1556,19 @@ void PrepareRangeCompaction(
}

Sort(
blobIds.begin(),
blobIds.end(),
[&] (const TPartialBlobId& l, const TPartialBlobId& r) {
return liveBlocks[l] < liveBlocks[r];
}
);
blobIds,
[&](const TPartialBlobId& l, const TPartialBlobId& r)
{ return liveBlocks[l] < liveBlocks[r]; });

auto it = blobIds.begin();
args.BlobsSkipped = blobIds.size();
ui32 blocks = 0;

while (it != blobIds.end()) {
const auto bytes = blocks * state.GetBlockSize();
const auto blobCountOk = args.BlobsSkipped
<= config.GetMaxSkippedBlobsDuringCompaction();
const auto blobCountOk =
args.BlobsSkipped <=
config.GetMaxSkippedBlobsDuringCompaction();
const auto byteCountOk =
bytes >= config.GetTargetCompactionBytesPerOp();

Expand Down Expand Up @@ -1608,6 +1616,7 @@ void PrepareRangeCompaction(
}

if (liveBlocks.size()) {
// TODO: need make UTs
TAffectedBlocks affectedBlocks;
for (const auto& b: args.AffectedBlocks) {
if (!skippedBlockIndices.contains(b.BlockIndex)) {
Expand Down Expand Up @@ -1652,6 +1661,7 @@ void PrepareRangeCompaction(

void CompleteRangeCompaction(
const bool blobPatchingEnabled,
const ui32 compactionThreshold,
const ui64 commitId,
TTabletStorageInfo& tabletStorageInfo,
TPartitionState& state,
Expand All @@ -1668,7 +1678,7 @@ void CompleteRangeCompaction(
size_t dataBlocksCount = 0, zeroBlocksCount = 0;
for (const auto& mark: args.BlockMarks) {
if (mark.CommitId) {
// there could be fresh block OR merged/mixed block
// there could be fresh block (BlockContent not empty) OR merged/mixed block (deletion mark is not set)
Y_ABORT_UNLESS(!(mark.BlockContent && !IsDeletionMarker(mark.BlobId)));
if (mark.BlockContent || !IsDeletionMarker(mark.BlobId)) {
++dataBlocksCount;
Expand All @@ -1682,6 +1692,7 @@ void CompleteRangeCompaction(
TPartialBlobId dataBlobId, zeroBlobId;
TBlockMask dataBlobSkipMask, zeroBlobSkipMask;

auto channelDataKind = EChannelDataKind::Merged;
if (dataBlocksCount) {
ui32 skipped = 0;
for (const auto& mark: args.BlockMarks) {
Expand All @@ -1690,11 +1701,15 @@ void CompleteRangeCompaction(
}
}

const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize();
if (blobSize < compactionThreshold) {
channelDataKind = EChannelDataKind::Mixed;
}
dataBlobId = state.GenerateBlobId(
EChannelDataKind::Merged,
channelDataKind,
compactionPermissions,
commitId,
(args.BlockRange.Size() - skipped) * state.GetBlockSize(),
blobSize,
result.size());
}

Expand All @@ -1706,7 +1721,7 @@ void CompleteRangeCompaction(
// compaction range but for the last actual block that's referenced by
// the corresponding blob
zeroBlobId = state.GenerateBlobId(
EChannelDataKind::Merged,
channelDataKind,
compactionPermissions,
commitId,
0,
Expand Down Expand Up @@ -1789,14 +1804,15 @@ void CompleteRangeCompaction(
zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start);
}

if (!patchingCandidate
&& blobPatchingEnabled
&& mark.BlobId.BlobSize() == dataBlobId.BlobSize())
{
patchingCandidate = mark.BlobId;
++patchingCandidateChangedBlockCount;
} else if (patchingCandidate == mark.BlobId) {
++patchingCandidateChangedBlockCount;
if (blobPatchingEnabled) {
if (!patchingCandidate &&
mark.BlobId.BlobSize() == dataBlobId.BlobSize())
{
patchingCandidate = mark.BlobId;
++patchingCandidateChangedBlockCount;
} else if (patchingCandidate == mark.BlobId) {
++patchingCandidateChangedBlockCount;
}
}
} else {
dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start);
Expand Down Expand Up @@ -1877,6 +1893,7 @@ void CompleteRangeCompaction(
args.BlobsSkipped,
args.BlocksSkipped,
std::move(blockChecksums),
channelDataKind,
std::move(blobContent),
std::move(zeroBlocks),
std::move(args.AffectedBlobs),
Expand All @@ -1901,18 +1918,25 @@ bool TPartitionActor::PrepareCompaction(
TRequestScope timer(*args.RequestInfo);
TPartitionDatabase db(tx.DB);

const bool incrementalCompactionEnabled =
Config->GetIncrementalCompactionEnabled() ||
Config->IsIncrementalCompactionFeatureEnabled(
PartitionConfig.GetCloudId(),
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId());
const bool fullCompaction =
args.CompactionOptions.test(ToBit(ECompactionOption::Full));

bool ready = true;

THashSet<TPartialBlobId, TPartialBlobIdHash> affectedBlobIds;

for (auto& rangeCompaction: args.RangeCompactions) {
PrepareRangeCompaction(
*Config,
PartitionConfig.GetCloudId(),
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId(),
incrementalCompactionEnabled,
args.CommitId,
args.CompactionOptions.test(ToBit(ECompactionOption::Full)),
fullCompaction,
ctx,
TabletID(),
affectedBlobIds,
Expand Down Expand Up @@ -1967,6 +1991,9 @@ void TPartitionActor::CompleteCompaction(

CompleteRangeCompaction(
blobPatchingEnabled,
GetCompactionBlobThreshold(
*Config,
PartitionConfig.GetStorageMediaKind()),
args.CommitId,
*Info(),
*State,
Expand Down
Loading

0 comments on commit 0a68bf1

Please sign in to comment.