diff --git a/db/column_family.cc b/db/column_family.cc index d21d6bcde..5cbefa800 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -937,7 +937,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause( int num_unflushed_memtables, int num_l0_files, uint64_t num_compaction_needed_bytes, const MutableCFOptions& mutable_cf_options, - const ImmutableCFOptions& immutable_cf_options) { + const ImmutableCFOptions& immutable_cf_options, int running_flushes) { if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) { return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit}; } else if (!mutable_cf_options.disable_auto_compactions && @@ -957,7 +957,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause( return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit}; } else if (!mutable_cf_options.disable_auto_compactions && mutable_cf_options.level0_slowdown_writes_trigger >= 0 && - num_l0_files >= + num_l0_files + running_flushes >= mutable_cf_options.level0_slowdown_writes_trigger) { return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit}; } else if (!mutable_cf_options.disable_auto_compactions && @@ -1064,8 +1064,8 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( // linear step, and num_delay_steps is num_steps_till_stop - 1. double l0_divider = 1; const int kGoalMbStep = 1; - const auto extra_l0_ssts = - vstorage->l0_delay_trigger_count() - file_to_start_delay; + const auto extra_l0_ssts = vstorage->l0_delay_trigger_count() + + running_flushes_ - file_to_start_delay; if (extra_l0_ssts > 0) { const auto num_steps_till_stop = stop - file_to_start_delay; assert(num_steps_till_stop > 0); @@ -1093,7 +1093,7 @@ ColumnFamilyData::CalculateWriteDelayDividerAndMaybeUpdateWriteStallCause( assert(delay_percent > 0 && delay_percent < 1); // since extra_l0_ssts == num_steps_till_stop then we're in a stop // condition. - assert(extra_l0_ssts < num_steps_till_stop); + assert(extra_l0_ssts - running_flushes_ < num_steps_till_stop); auto l0_rate = last_linear_step_mbs * std::pow(delay_percent, (extra_l0_ssts - linear_steps)); l0_divider = initial_linear_rate / l0_rate; @@ -1149,7 +1149,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( auto write_stall_condition_and_cause = GetWriteStallConditionAndCause( imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(), vstorage->estimated_compaction_needed_bytes(), mutable_cf_options, - *ioptions()); + *ioptions(), running_flushes_); write_stall_condition = write_stall_condition_and_cause.first; auto write_stall_cause = write_stall_condition_and_cause.second; @@ -1250,10 +1250,11 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( 1); } ROCKS_LOG_WARN(ioptions_.logger, - "[%s] Stalling writes because we have %d level-0 files " + "[%s] Stalling writes because we have %d level-0 files, " + "%d running flushes " "rate %" PRIu64, name_.c_str(), vstorage->l0_delay_trigger_count(), - write_controller->delayed_write_rate()); + running_flushes_, write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && write_stall_cause == WriteStallCause::kPendingCompactionBytes) { // If the distance to hard limit is less than 1/4 of the gap between soft diff --git a/db/column_family.h b/db/column_family.h index e84cbd301..a67955902 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -484,11 +484,11 @@ class ColumnFamilyData { bool queued_for_compaction() { return queued_for_compaction_; } static std::pair - GetWriteStallConditionAndCause( - int num_unflushed_memtables, int num_l0_files, - uint64_t num_compaction_needed_bytes, - const MutableCFOptions& mutable_cf_options, - const ImmutableCFOptions& immutable_cf_options); + GetWriteStallConditionAndCause(int num_unflushed_memtables, int num_l0_files, + uint64_t num_compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + const ImmutableCFOptions& immutable_cf_options, + int running_flushes = 0); // Recalculate some stall conditions, which are changed only during // compaction, adding new memtable and/or recalculation of compaction score. @@ -497,6 +497,16 @@ class ColumnFamilyData { bool IsLastLevelWithData(int level) const; + void DecrRunningFlushes() { + assert(running_flushes_ > 0); + --running_flushes_; + } + + void IncrRunningFlushes() { + assert(running_flushes_ >= 0); + ++running_flushes_; + } + // REQUIREMENT: db mutex must be held double TEST_CalculateWriteDelayDivider( uint64_t compaction_needed_bytes, @@ -542,6 +552,8 @@ class ColumnFamilyData { const MutableCFOptions& mutable_cf_options, WriteStallCause& write_stall_cause); + std::atomic running_flushes_ = 0; + public: void set_initialized() { initialized_.store(true); } diff --git a/db/flush_job.cc b/db/flush_job.cc index 827523d90..22d94bc5c 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -158,9 +158,13 @@ FlushJob::FlushJob( TEST_SYNC_POINT("FlushJob::FlushJob()"); } -FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); } +FlushJob::~FlushJob() { + cfd_->DecrRunningFlushes(); + ThreadStatusUtil::ResetThreadStatus(); +} void FlushJob::ReportStartedFlush() { + cfd_->IncrRunningFlushes(); ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking); ThreadStatusUtil::SetColumnFamily(cfd_); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);