From 4ea59f723865fdf17217b30192957235658618be Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 11 Feb 2024 11:57:09 +0200 Subject: [PATCH] Auto Tuning: Set delayed write rate according to L0 clearance. Then decrease write rate with factor adjust. Calculate the delayed write rate based on the total time L0 was in need to compaction (L0 files above trigger) and the amount of L0 data moved to L1. --- db/column_family.cc | 113 +++++++++++++++++++++++++---- db/column_family.h | 21 ++++++ db/compaction/compaction_job.cc | 9 +++ include/rocksdb/advanced_options.h | 4 + options/cf_options.cc | 6 ++ options/cf_options.h | 3 + options/options.cc | 3 + options/options_helper.cc | 1 + tools/db_bench_tool.cc | 10 +++ 9 files changed, 157 insertions(+), 13 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 7be55f5569..199ff3b043 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -901,8 +902,9 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, } // anonymous namespace namespace { -const int kMemtablePenalty = 10; +const int kMemtablePenalty = 5; const int kNumPendingSteps = 100; +const double kGoalMbs = 5242880.0; } // namespace double ColumnFamilyData::TEST_CalculateWriteDelayDivider( @@ -917,6 +919,7 @@ void ColumnFamilyData::DynamicSetupDelay( uint64_t max_write_rate, uint64_t compaction_needed_bytes, const MutableCFOptions& mutable_cf_options, WriteStallCause& write_stall_cause) { + assert(max_write_rate > 0); const double rate_divider = CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( compaction_needed_bytes, mutable_cf_options, write_stall_cause); @@ -1009,6 +1012,7 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( 1 / (1 - (static_cast(step_num) / kNumPendingSteps)); } + pending_divider = 1; double biggest_divider = 1; if (memtable_divider > pending_divider) { biggest_divider = memtable_divider; @@ -1024,16 +1028,59 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( } // L0 files + // + // L0 files increase steadily and then drop in a batch when the L0L1 / intra + // compaction ends. We'd like to avoid steadily decreasing the write speed + // along with the L0 accumulation and then increasing it heavily when the + // compaction ends. + // Another consideration is to never reach level0_stop_writes_trigger. + // The initial L0 delay write rate is set according to the speed L0 can + // clear data which is the amount of data moved from L0 to L1 divided by the + // time it was above compaction trigger. + // This is calculated and set when the L0L1 compaction ends in + // CompactionJob::Install. + // Use this rate for the first extra slowdown files and then heavily delay the + // rest of the files. In any case, delay at least 2 + // * trigger. + + // slowdown 12 ; stop 30; - start at 22 + // slowdown 12 ; stop 50; - start at 24 + // slowdown 12 ; stop 16; start at 12 + auto slowdown = mutable_cf_options.level0_slowdown_writes_trigger; + auto stop = mutable_cf_options.level0_stop_writes_trigger; + auto trigger = mutable_cf_options.level0_file_num_compaction_trigger; + + auto file_to_start_delay = slowdown; + auto gap = stop - slowdown; + if ((gap > (slowdown + 2 * trigger))) { + file_to_start_delay = slowdown * 2; + } else if (stop - (2 * trigger) > slowdown) { + file_to_start_delay = stop - (2 * trigger); + } else { + file_to_start_delay = slowdown; + } + // The goal is to reach kGoalMbs 3 steps before stop condition so that we + // never reach it. The formula to decide the delay percentage is: kGoalMbs = + // start_rate * (delay_percent ^ num_steps) where: kGoalMbs = 5 , start_rate + // is the l0_compaction_speed num_steps = num_L0_steps - 3. double l0_divider = 1; - const auto extra_l0_ssts = vstorage->l0_delay_trigger_count() - - mutable_cf_options.level0_slowdown_writes_trigger; + const auto extra_l0_ssts = + vstorage->l0_delay_trigger_count() - file_to_start_delay; if (extra_l0_ssts > 0) { - const auto num_L0_steps = mutable_cf_options.level0_stop_writes_trigger - - mutable_cf_options.level0_slowdown_writes_trigger; + const auto num_L0_steps = stop - file_to_start_delay; assert(num_L0_steps > 0); + double num_steps = num_L0_steps > 6 ? num_L0_steps - 3 : num_L0_steps; + auto base = kGoalMbs / l0_base_compaction_speed(); + // in cases where l0_base_compaction_speed() is lower than our goal Mbs + if (base > 1) { + base = 0.95; + } + double delay_percent = std::pow(base, 1.0 / num_steps); + assert(delay_percent > 0 && delay_percent < 1); // since extra_l0_ssts == num_L0_steps then we're in a stop condition. assert(extra_l0_ssts < num_L0_steps); - l0_divider = 1 / (1 - (static_cast(extra_l0_ssts) / num_L0_steps)); + + l0_divider = 1 / (std::pow(delay_percent, extra_l0_ssts)); } if (l0_divider > biggest_divider) { @@ -1044,6 +1091,35 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( return biggest_divider; } +void ColumnFamilyData::SetL0BaseCompactionSpeed(uint64_t size) { + if (started_l0_timer_ == true) { + assert(l0_start_clearance_time_ > 0); + float l0_clearance_dur = + ioptions_.clock->NowMicros() - l0_start_clearance_time_; + assert(l0_clearance_dur > 0); + const int micros_in_sec = 1000000; + auto cur_speed = (size / l0_clearance_dur) * micros_in_sec; + if (first_l0_comp_) { + first_l0_comp_ = false; + // Since first compaction is more prone to variability in speed we'd + // rather over-delay than under-delay + lo_base_compaction_speed_ = cur_speed / 2; + } else { + lo_base_compaction_speed_ = + mutable_cf_options_.l0_rate_factor * lo_base_compaction_speed_ + + ((1 - mutable_cf_options_.l0_rate_factor) * cur_speed); + } + started_l0_timer_ = false; + l0_start_clearance_time_ = 0; + ROCKS_LOG_INFO(ioptions_.logger, + "L0L1 compaction ended. duration : %f" + ", amount cleared: %" PRIu64 ", current rate: %" PRIu64 + ", avg rate: %" PRIu64, + l0_clearance_dur, size, cur_speed, + lo_base_compaction_speed_); + } +} + WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { auto write_stall_condition = WriteStallCondition::kNormal; @@ -1064,15 +1140,26 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool needed_delay = write_controller->NeedsDelay(); bool dynamic_delay = write_controller->is_dynamic_delay(); - // GetWriteStallConditionAndCause returns the first condition met, so its - // possible that a later condition will require a harder rate limiting. - // calculate all conditions with DynamicSetupDelay and reevaluate the - // write_stall_cause. this is only relevant in the kDelayed case. if (dynamic_delay) { + // start timer for L0 clearance when trigger passed. + if (!started_l0_timer_ && + vstorage->l0_delay_trigger_count() >= + mutable_cf_options.level0_file_num_compaction_trigger) { + started_l0_timer_ = true; + l0_start_clearance_time_ = ioptions_.clock->NowMicros(); + ROCKS_LOG_INFO( + ioptions_.logger, + "Auto tune: Started timer. time: %" PRIu64 " Num L0 files: %d", + l0_start_clearance_time_, vstorage->l0_delay_trigger_count()); + } + + // GetWriteStallConditionAndCause returns the first condition met, so its + // possible that a later condition will require a harder rate limiting. + // calculate all conditions with DynamicSetupDelay and reevaluate the + // write_stall_cause. this is only relevant in the kDelayed case. if (write_stall_condition == WriteStallCondition::kDelayed) { - DynamicSetupDelay(write_controller->max_delayed_write_rate(), - compaction_needed_bytes, mutable_cf_options, - write_stall_cause); + DynamicSetupDelay(l0_base_compaction_speed(), compaction_needed_bytes, + mutable_cf_options, write_stall_cause); write_controller_token_.reset(); } else { write_controller->HandleRemoveDelayReq(this); diff --git a/db/column_family.h b/db/column_family.h index 91aa2c09e0..e84cbd3010 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -507,7 +507,28 @@ class ColumnFamilyData { VersionStorageInfo* TEST_GetCurrentStorageInfo(); + // set in CompactionJob::Install (DB mutex held) + // also read and set in ColumnFamilyData::AutoTuneMaxRate + void SetL0BaseCompactionSpeed(uint64_t size); + private: + // In bytes per sec. same as delayed_write_rate + uint64_t l0_base_compaction_speed() const { + return lo_base_compaction_speed_; + } + + uint64_t l0_start_clearance_time_ = 0; + + // set to false whenever L0L1 compaction ends or L0 files + // reached below compaction trigger + bool started_l0_timer_ = false; + bool first_l0_comp_ = true; + + // Used for Speedb delay write rate auto tuning; + // Init with rate of 200Mb to help with delay until the first L0L1 compaction + // finishes. + uint64_t lo_base_compaction_speed_ = 209715200; + void UpdateCFRate(void* client_id, uint64_t write_rate); void ResetCFRate(void* client_id); diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index e9a2b969d2..f9587d9026 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -888,6 +888,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_.stats; + // Set the speed of L0L1 compaction for auto tuning delayed write rate. + if (output_level == vstorage->base_level() && + compact_->compaction->start_level() == 0) { + if (compact_->compaction->immutable_options()->compaction_style == + kCompactionStyleLevel) { + cfd->SetL0BaseCompactionSpeed(stats.bytes_read_non_output_levels); + } + } + double read_write_amp = 0.0; double write_amp = 0.0; double bytes_read_per_sec = 0; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 6542ca42a9..ce694e8309 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -392,6 +392,10 @@ struct AdvancedColumnFamilyOptions { // Default: false. bool inplace_update_support = false; + // TODO: add description + // TODO: fix to include other cfs and WBM + double l0_rate_factor = 0.7; + // Number of locks used for inplace update // Default: 10000, if inplace_update_support = true, else 0. // diff --git a/options/cf_options.cc b/options/cf_options.cc index 86a524f515..05518d2183 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -516,6 +516,10 @@ static std::unordered_map offsetof(struct MutableCFOptions, compression_per_level), OptionVerificationType::kNormal, OptionTypeFlags::kMutable, {0, OptionType::kCompressionType})}, + {"l0_rate_factor", + {offsetof(struct MutableCFOptions, l0_rate_factor), + OptionType::kDouble, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"experimental_mempurge_threshold", {offsetof(struct MutableCFOptions, experimental_mempurge_threshold), OptionType::kDouble, OptionVerificationType::kNormal, @@ -1140,6 +1144,8 @@ void MutableCFOptions::Dump(Logger* log) const { report_bg_io_stats); ROCKS_LOG_INFO(log, " compression: %d", static_cast(compression)); + ROCKS_LOG_INFO(log, " l0_rate_factor: %f", + l0_rate_factor); ROCKS_LOG_INFO(log, " experimental_mempurge_threshold: %f", experimental_mempurge_threshold); diff --git a/options/cf_options.h b/options/cf_options.h index ca6511c4a9..deb0a09a92 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -136,6 +136,7 @@ struct MutableCFOptions { max_successive_merges(options.max_successive_merges), inplace_update_num_locks(options.inplace_update_num_locks), prefix_extractor(options.prefix_extractor), + l0_rate_factor(options.l0_rate_factor), experimental_mempurge_threshold( options.experimental_mempurge_threshold), disable_auto_compactions(options.disable_auto_compactions), @@ -208,6 +209,7 @@ struct MutableCFOptions { max_successive_merges(0), inplace_update_num_locks(0), prefix_extractor(nullptr), + l0_rate_factor(0.0), experimental_mempurge_threshold(0.0), disable_auto_compactions(false), soft_pending_compaction_bytes_limit(0), @@ -276,6 +278,7 @@ struct MutableCFOptions { size_t inplace_update_num_locks; std::shared_ptr prefix_extractor; + double l0_rate_factor; // [experimental] // Used to activate or deactive the Mempurge feature (memtable garbage // collection). (deactivated by default). At every flush, the total useful diff --git a/options/options.cc b/options/options.cc index 20cd6d08fc..0c52778ebf 100644 --- a/options/options.cc +++ b/options/options.cc @@ -66,6 +66,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) max_write_buffer_size_to_maintain( options.max_write_buffer_size_to_maintain), inplace_update_support(options.inplace_update_support), + l0_rate_factor(options.l0_rate_factor), inplace_update_num_locks(options.inplace_update_num_locks), experimental_mempurge_threshold(options.experimental_mempurge_threshold), inplace_callback(options.inplace_callback), @@ -478,6 +479,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ? "flush only" : "disabled"); } + ROCKS_LOG_HEADER(log, " Options.l0_rate_factor: %f", + l0_rate_factor); ROCKS_LOG_HEADER(log, " Options.experimental_mempurge_threshold: %f", experimental_mempurge_threshold); ROCKS_LOG_HEADER(log, " Options.memtable_max_range_deletions: %d", diff --git a/options/options_helper.cc b/options/options_helper.cc index 00672728bc..bc2dca76cc 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -261,6 +261,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, cf_opts->max_successive_merges = moptions.max_successive_merges; cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks; cf_opts->prefix_extractor = moptions.prefix_extractor; + cf_opts->l0_rate_factor = moptions.l0_rate_factor; cf_opts->experimental_mempurge_threshold = moptions.experimental_mempurge_threshold; cf_opts->memtable_protection_bytes_per_key = diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 4c965d4e4c..3d0d278d2c 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1645,6 +1645,12 @@ DEFINE_double(experimental_mempurge_threshold, ROCKSDB_NAMESPACE::Options().experimental_mempurge_threshold, "Maximum useful payload ratio estimate that triggers a mempurge " "(memtable garbage collection)."); + +DEFINE_double(l0_rate_factor, ROCKSDB_NAMESPACE::Options().l0_rate_factor, + "A number between 0 and 1 to decide the weight of the previous " + "l0 rate in calculating the current rate " + "e.g. 0.7 is 0.7 * prev_rate + 0.3 * current rate"); + DEFINE_bool(use_spdb_writes, false, "Use optimized Speedb write flow"); DEFINE_bool(inplace_update_support, @@ -4975,6 +4981,10 @@ class Benchmark { options.delayed_write_rate = FLAGS_delayed_write_rate; options.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; + if (FLAGS_l0_rate_factor > 1 || FLAGS_l0_rate_factor < 0) { + ErrorExit("l0_rate_factor should be between 0 and 1"); + } + options.l0_rate_factor = FLAGS_l0_rate_factor; options.experimental_mempurge_threshold = FLAGS_experimental_mempurge_threshold; options.use_spdb_writes = FLAGS_use_spdb_writes;