Skip to content

Commit

Permalink
Auto Tuning: Set delayed write rate according to L0 clearance.
Browse files Browse the repository at this point in the history
Then decrease write rate with factor adjust.

Calculate the delayed write rate based on the total time L0 was in need to compaction
(L0 files above trigger) and the amount of L0 data moved to L1.
  • Loading branch information
Yuval-Ariel committed Feb 11, 2024
1 parent c328f0e commit 4ea59f7
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 13 deletions.
113 changes: 100 additions & 13 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <algorithm>
#include <cinttypes>
#include <cmath>
#include <functional>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -901,8 +902,9 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
} // anonymous namespace

namespace {
const int kMemtablePenalty = 10;
const int kMemtablePenalty = 5;
const int kNumPendingSteps = 100;
const double kGoalMbs = 5242880.0;
} // namespace

double ColumnFamilyData::TEST_CalculateWriteDelayDivider(
Expand All @@ -917,6 +919,7 @@ void ColumnFamilyData::DynamicSetupDelay(
uint64_t max_write_rate, uint64_t compaction_needed_bytes,
const MutableCFOptions& mutable_cf_options,
WriteStallCause& write_stall_cause) {
assert(max_write_rate > 0);
const double rate_divider =
CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause(
compaction_needed_bytes, mutable_cf_options, write_stall_cause);
Expand Down Expand Up @@ -1009,6 +1012,7 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause(
1 / (1 - (static_cast<double>(step_num) / kNumPendingSteps));
}

pending_divider = 1;
double biggest_divider = 1;
if (memtable_divider > pending_divider) {
biggest_divider = memtable_divider;
Expand All @@ -1024,16 +1028,59 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause(
}

// L0 files
//
// L0 files increase steadily and then drop in a batch when the L0L1 / intra
// compaction ends. We'd like to avoid steadily decreasing the write speed
// along with the L0 accumulation and then increasing it heavily when the
// compaction ends.
// Another consideration is to never reach level0_stop_writes_trigger.
// The initial L0 delay write rate is set according to the speed L0 can
// clear data which is the amount of data moved from L0 to L1 divided by the
// time it was above compaction trigger.
// This is calculated and set when the L0L1 compaction ends in
// CompactionJob::Install.
// Use this rate for the first extra slowdown files and then heavily delay the
// rest of the files. In any case, delay at least 2
// * trigger.

// slowdown 12 ; stop 30; - start at 22
// slowdown 12 ; stop 50; - start at 24
// slowdown 12 ; stop 16; start at 12
auto slowdown = mutable_cf_options.level0_slowdown_writes_trigger;
auto stop = mutable_cf_options.level0_stop_writes_trigger;
auto trigger = mutable_cf_options.level0_file_num_compaction_trigger;

auto file_to_start_delay = slowdown;
auto gap = stop - slowdown;
if ((gap > (slowdown + 2 * trigger))) {
file_to_start_delay = slowdown * 2;
} else if (stop - (2 * trigger) > slowdown) {
file_to_start_delay = stop - (2 * trigger);
} else {
file_to_start_delay = slowdown;
}
// The goal is to reach kGoalMbs 3 steps before stop condition so that we
// never reach it. The formula to decide the delay percentage is: kGoalMbs =
// start_rate * (delay_percent ^ num_steps) where: kGoalMbs = 5 , start_rate
// is the l0_compaction_speed num_steps = num_L0_steps - 3.
double l0_divider = 1;
const auto extra_l0_ssts = vstorage->l0_delay_trigger_count() -
mutable_cf_options.level0_slowdown_writes_trigger;
const auto extra_l0_ssts =
vstorage->l0_delay_trigger_count() - file_to_start_delay;
if (extra_l0_ssts > 0) {
const auto num_L0_steps = mutable_cf_options.level0_stop_writes_trigger -
mutable_cf_options.level0_slowdown_writes_trigger;
const auto num_L0_steps = stop - file_to_start_delay;
assert(num_L0_steps > 0);
double num_steps = num_L0_steps > 6 ? num_L0_steps - 3 : num_L0_steps;
auto base = kGoalMbs / l0_base_compaction_speed();
// in cases where l0_base_compaction_speed() is lower than our goal Mbs
if (base > 1) {
base = 0.95;
}
double delay_percent = std::pow(base, 1.0 / num_steps);
assert(delay_percent > 0 && delay_percent < 1);
// since extra_l0_ssts == num_L0_steps then we're in a stop condition.
assert(extra_l0_ssts < num_L0_steps);
l0_divider = 1 / (1 - (static_cast<double>(extra_l0_ssts) / num_L0_steps));

l0_divider = 1 / (std::pow(delay_percent, extra_l0_ssts));
}

if (l0_divider > biggest_divider) {
Expand All @@ -1044,6 +1091,35 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause(
return biggest_divider;
}

void ColumnFamilyData::SetL0BaseCompactionSpeed(uint64_t size) {
if (started_l0_timer_ == true) {
assert(l0_start_clearance_time_ > 0);
float l0_clearance_dur =
ioptions_.clock->NowMicros() - l0_start_clearance_time_;
assert(l0_clearance_dur > 0);
const int micros_in_sec = 1000000;
auto cur_speed = (size / l0_clearance_dur) * micros_in_sec;
if (first_l0_comp_) {
first_l0_comp_ = false;
// Since first compaction is more prone to variability in speed we'd
// rather over-delay than under-delay
lo_base_compaction_speed_ = cur_speed / 2;
} else {
lo_base_compaction_speed_ =
mutable_cf_options_.l0_rate_factor * lo_base_compaction_speed_ +
((1 - mutable_cf_options_.l0_rate_factor) * cur_speed);
}
started_l0_timer_ = false;
l0_start_clearance_time_ = 0;
ROCKS_LOG_INFO(ioptions_.logger,
"L0L1 compaction ended. duration : %f"
", amount cleared: %" PRIu64 ", current rate: %" PRIu64
", avg rate: %" PRIu64,
l0_clearance_dur, size, cur_speed,
lo_base_compaction_speed_);
}
}

WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
auto write_stall_condition = WriteStallCondition::kNormal;
Expand All @@ -1064,15 +1140,26 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();
bool dynamic_delay = write_controller->is_dynamic_delay();

// GetWriteStallConditionAndCause returns the first condition met, so its
// possible that a later condition will require a harder rate limiting.
// calculate all conditions with DynamicSetupDelay and reevaluate the
// write_stall_cause. this is only relevant in the kDelayed case.
if (dynamic_delay) {
// start timer for L0 clearance when trigger passed.
if (!started_l0_timer_ &&
vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_file_num_compaction_trigger) {
started_l0_timer_ = true;
l0_start_clearance_time_ = ioptions_.clock->NowMicros();
ROCKS_LOG_INFO(
ioptions_.logger,
"Auto tune: Started timer. time: %" PRIu64 " Num L0 files: %d",
l0_start_clearance_time_, vstorage->l0_delay_trigger_count());
}

// GetWriteStallConditionAndCause returns the first condition met, so its
// possible that a later condition will require a harder rate limiting.
// calculate all conditions with DynamicSetupDelay and reevaluate the
// write_stall_cause. this is only relevant in the kDelayed case.
if (write_stall_condition == WriteStallCondition::kDelayed) {
DynamicSetupDelay(write_controller->max_delayed_write_rate(),
compaction_needed_bytes, mutable_cf_options,
write_stall_cause);
DynamicSetupDelay(l0_base_compaction_speed(), compaction_needed_bytes,
mutable_cf_options, write_stall_cause);
write_controller_token_.reset();
} else {
write_controller->HandleRemoveDelayReq(this);
Expand Down
21 changes: 21 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,28 @@ class ColumnFamilyData {

VersionStorageInfo* TEST_GetCurrentStorageInfo();

// set in CompactionJob::Install (DB mutex held)
// also read and set in ColumnFamilyData::AutoTuneMaxRate
void SetL0BaseCompactionSpeed(uint64_t size);

private:
// In bytes per sec. same as delayed_write_rate
uint64_t l0_base_compaction_speed() const {
return lo_base_compaction_speed_;
}

uint64_t l0_start_clearance_time_ = 0;

// set to false whenever L0L1 compaction ends or L0 files
// reached below compaction trigger
bool started_l0_timer_ = false;
bool first_l0_comp_ = true;

// Used for Speedb delay write rate auto tuning;
// Init with rate of 200Mb to help with delay until the first L0L1 compaction
// finishes.
uint64_t lo_base_compaction_speed_ = 209715200;

void UpdateCFRate(void* client_id, uint64_t write_rate);
void ResetCFRate(void* client_id);

Expand Down
9 changes: 9 additions & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_.stats;

// Set the speed of L0L1 compaction for auto tuning delayed write rate.
if (output_level == vstorage->base_level() &&
compact_->compaction->start_level() == 0) {
if (compact_->compaction->immutable_options()->compaction_style ==
kCompactionStyleLevel) {
cfd->SetL0BaseCompactionSpeed(stats.bytes_read_non_output_levels);
}
}

double read_write_amp = 0.0;
double write_amp = 0.0;
double bytes_read_per_sec = 0;
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/advanced_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ struct AdvancedColumnFamilyOptions {
// Default: false.
bool inplace_update_support = false;

// TODO: add description
// TODO: fix to include other cfs and WBM
double l0_rate_factor = 0.7;

// Number of locks used for inplace update
// Default: 10000, if inplace_update_support = true, else 0.
//
Expand Down
6 changes: 6 additions & 0 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct MutableCFOptions, compression_per_level),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
{0, OptionType::kCompressionType})},
{"l0_rate_factor",
{offsetof(struct MutableCFOptions, l0_rate_factor),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"experimental_mempurge_threshold",
{offsetof(struct MutableCFOptions, experimental_mempurge_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
Expand Down Expand Up @@ -1140,6 +1144,8 @@ void MutableCFOptions::Dump(Logger* log) const {
report_bg_io_stats);
ROCKS_LOG_INFO(log, " compression: %d",
static_cast<int>(compression));
ROCKS_LOG_INFO(log, " l0_rate_factor: %f",
l0_rate_factor);
ROCKS_LOG_INFO(log,
" experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
Expand Down
3 changes: 3 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct MutableCFOptions {
max_successive_merges(options.max_successive_merges),
inplace_update_num_locks(options.inplace_update_num_locks),
prefix_extractor(options.prefix_extractor),
l0_rate_factor(options.l0_rate_factor),
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions),
Expand Down Expand Up @@ -208,6 +209,7 @@ struct MutableCFOptions {
max_successive_merges(0),
inplace_update_num_locks(0),
prefix_extractor(nullptr),
l0_rate_factor(0.0),
experimental_mempurge_threshold(0.0),
disable_auto_compactions(false),
soft_pending_compaction_bytes_limit(0),
Expand Down Expand Up @@ -276,6 +278,7 @@ struct MutableCFOptions {
size_t inplace_update_num_locks;

std::shared_ptr<const SliceTransform> prefix_extractor;
double l0_rate_factor;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
// collection). (deactivated by default). At every flush, the total useful
Expand Down
3 changes: 3 additions & 0 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
max_write_buffer_size_to_maintain(
options.max_write_buffer_size_to_maintain),
inplace_update_support(options.inplace_update_support),
l0_rate_factor(options.l0_rate_factor),
inplace_update_num_locks(options.inplace_update_num_locks),
experimental_mempurge_threshold(options.experimental_mempurge_threshold),
inplace_callback(options.inplace_callback),
Expand Down Expand Up @@ -478,6 +479,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
? "flush only"
: "disabled");
}
ROCKS_LOG_HEADER(log, " Options.l0_rate_factor: %f",
l0_rate_factor);
ROCKS_LOG_HEADER(log, " Options.experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
ROCKS_LOG_HEADER(log, " Options.memtable_max_range_deletions: %d",
Expand Down
1 change: 1 addition & 0 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
cf_opts->max_successive_merges = moptions.max_successive_merges;
cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks;
cf_opts->prefix_extractor = moptions.prefix_extractor;
cf_opts->l0_rate_factor = moptions.l0_rate_factor;
cf_opts->experimental_mempurge_threshold =
moptions.experimental_mempurge_threshold;
cf_opts->memtable_protection_bytes_per_key =
Expand Down
10 changes: 10 additions & 0 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,12 @@ DEFINE_double(experimental_mempurge_threshold,
ROCKSDB_NAMESPACE::Options().experimental_mempurge_threshold,
"Maximum useful payload ratio estimate that triggers a mempurge "
"(memtable garbage collection).");

DEFINE_double(l0_rate_factor, ROCKSDB_NAMESPACE::Options().l0_rate_factor,
"A number between 0 and 1 to decide the weight of the previous "
"l0 rate in calculating the current rate "
"e.g. 0.7 is 0.7 * prev_rate + 0.3 * current rate");

DEFINE_bool(use_spdb_writes, false, "Use optimized Speedb write flow");

DEFINE_bool(inplace_update_support,
Expand Down Expand Up @@ -4975,6 +4981,10 @@ class Benchmark {
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
if (FLAGS_l0_rate_factor > 1 || FLAGS_l0_rate_factor < 0) {
ErrorExit("l0_rate_factor should be between 0 and 1");
}
options.l0_rate_factor = FLAGS_l0_rate_factor;
options.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options.use_spdb_writes = FLAGS_use_spdb_writes;
Expand Down

0 comments on commit 4ea59f7

Please sign in to comment.