diff --git a/proto/hummock.proto b/proto/hummock.proto index 383fc1e54e72d..1995adceab959 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -669,6 +669,8 @@ message RiseCtlUpdateCompactionConfigRequest { uint32 level0_stop_write_threshold_max_sst_count = 27; // The limitation of the max sst size of the level0 to trigger the write stop uint64 level0_stop_write_threshold_max_size = 28; + // The limitation of the max sst count of the trivial move task + uint32 sst_allowed_trivial_move_max_count = 29; } } repeated uint64 compaction_group_ids = 1; @@ -880,8 +882,12 @@ message CompactionConfig { // The limitation of the max sst count of the level0 to trigger the write stop optional uint32 level0_stop_write_threshold_max_sst_count = 27; + // The limitation of the max sst size of the level0 to trigger the write stop optional uint64 level0_stop_write_threshold_max_size = 28; + + // The limitation of the max sst count of the trivial move task + optional uint32 sst_allowed_trivial_move_max_count = 29; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 0a03f3490aab9..76922f4fbeb06 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2215,6 +2215,7 @@ pub mod default { const DEFAULT_MAX_LEVEL: u32 = 6; const DEFAULT_MAX_L0_COMPACT_LEVEL_COUNT: u32 = 42; const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MIN_SIZE: u64 = 4 * MB; + const DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MAX_COUNT: u32 = 64; const DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT: u32 = 2000; // > 50G / 32M = 1600 const DEFAULT_EMERGENCY_LEVEL0_SUB_LEVEL_PARTITION: u32 = 256; const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_MAX_SST_COUNT: u32 = 10000; // 10000 * 32M = 320G @@ -2302,6 +2303,10 @@ pub mod default { 256 * MB } + pub fn sst_allowed_trivial_move_max_count() -> u32 { + DEFAULT_SST_ALLOWED_TRIVIAL_MOVE_MAX_COUNT + } + pub fn emergency_level0_sst_file_count() -> u32 { DEFAULT_EMERGENCY_LEVEL0_SST_FILE_COUNT } @@ -2696,10 +2701,12 @@ pub struct CompactionConfig { pub enable_emergency_picker: bool, #[serde(default = "default::compaction_config::max_level")] pub max_level: u32, - #[serde(default = "default::compaction_config::max_l0_compact_level_count")] - pub max_l0_compact_level_count: u32, #[serde(default = "default::compaction_config::sst_allowed_trivial_move_min_size")] pub sst_allowed_trivial_move_min_size: u64, + #[serde(default = "default::compaction_config::sst_allowed_trivial_move_max_count")] + pub sst_allowed_trivial_move_max_count: u32, + #[serde(default = "default::compaction_config::max_l0_compact_level_count")] + pub max_l0_compact_level_count: u32, #[serde(default = "default::compaction_config::disable_auto_group_scheduling")] pub disable_auto_group_scheduling: bool, #[serde(default = "default::compaction_config::max_overlapping_level_size")] diff --git a/src/config/docs.md b/src/config/docs.md index 79bc0761866db..8a12710a20e5c 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -95,6 +95,7 @@ This page is automatically generated by `./risedev generate-example-config` | max_overlapping_level_size | | 268435456 | | max_space_reclaim_bytes | | 536870912 | | max_sub_compaction | | 4 | +| sst_allowed_trivial_move_max_count | | 64 | | sst_allowed_trivial_move_min_size | | 4194304 | | sub_level_max_compaction_bytes | | 134217728 | | target_file_size_base | | 33554432 | diff --git a/src/config/example.toml b/src/config/example.toml index 6d2664f3b321d..26240ace61538 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -81,8 +81,9 @@ level0_max_compact_file_number = 100 tombstone_reclaim_ratio = 40 enable_emergency_picker = true max_level = 6 -max_l0_compact_level_count = 42 sst_allowed_trivial_move_min_size = 4194304 +sst_allowed_trivial_move_max_count = 64 +max_l0_compact_level_count = 42 disable_auto_group_scheduling = false max_overlapping_level_size = 268435456 emergency_level0_sst_file_count = 2000 diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 03a2a7e7e10a8..75fa60299ff3e 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -69,6 +69,7 @@ pub fn build_compaction_config_vec( sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, max_overlapping_level_size: Option, + sst_allowed_trivial_move_max_count: Option, emergency_level0_sst_file_count: Option, emergency_level0_sub_level_partition: Option, level0_stop_write_threshold_max_sst_count: Option, @@ -135,6 +136,9 @@ pub fn build_compaction_config_vec( if let Some(c) = max_overlapping_level_size { configs.push(MutableConfig::MaxOverlappingLevelSize(c)) } + if let Some(c) = sst_allowed_trivial_move_max_count { + configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c)) + } if let Some(c) = emergency_level0_sst_file_count { configs.push(MutableConfig::EmergencyLevel0SstFileCount(c)) } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 0bd77d2608614..f02566463d0f1 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -197,6 +197,8 @@ enum HummockCommands { #[clap(long)] max_overlapping_level_size: Option, #[clap(long)] + sst_allowed_trivial_move_max_count: Option, + #[clap(long)] emergency_level0_sst_file_count: Option, #[clap(long)] emergency_level0_sub_level_partition: Option, @@ -613,6 +615,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, max_overlapping_level_size, + sst_allowed_trivial_move_max_count, emergency_level0_sst_file_count, emergency_level0_sub_level_partition, level0_stop_write_threshold_max_sst_count, @@ -650,6 +653,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, max_overlapping_level_size, + sst_allowed_trivial_move_max_count, emergency_level0_sst_file_count, emergency_level0_sub_level_partition, level0_stop_write_threshold_max_sst_count, diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index 07b756f3dfc78..94ec57ec73e15 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -72,6 +72,9 @@ impl CompactionConfigBuilder { compaction_config::disable_auto_group_scheduling(), ), max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()), + sst_allowed_trivial_move_max_count: Some( + compaction_config::sst_allowed_trivial_move_max_count(), + ), emergency_level0_sst_file_count: Some( compaction_config::emergency_level0_sst_file_count(), ), @@ -114,6 +117,7 @@ impl CompactionConfigBuilder { .tombstone_reclaim_ratio(opt.tombstone_reclaim_ratio) .max_level(opt.max_level as u64) .sst_allowed_trivial_move_min_size(Some(opt.sst_allowed_trivial_move_min_size)) + .sst_allowed_trivial_move_max_count(Some(opt.sst_allowed_trivial_move_max_count)) .max_overlapping_level_size(Some(opt.max_overlapping_level_size)) .emergency_level0_sst_file_count(Some(opt.emergency_level0_sst_file_count)) .emergency_level0_sub_level_partition(Some(opt.emergency_level0_sub_level_partition)) @@ -182,6 +186,7 @@ builder_field! { level0_overlapping_sub_level_compact_level_count: u32, tombstone_reclaim_ratio: u32, sst_allowed_trivial_move_min_size: Option, + sst_allowed_trivial_move_max_count: Option, disable_auto_group_scheduling: Option, max_overlapping_level_size: Option, emergency_level0_sst_file_count: Option, diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 0eecdcba7d8c7..e8dd6dd111144 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -19,11 +19,11 @@ mod overlap_strategy; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::level::Levels; -use risingwave_pb::hummock::compact_task::{self, TaskType}; +use risingwave_pb::hummock::compact_task::{self}; mod picker; pub mod selector; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -32,7 +32,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; use risingwave_pb::hummock::compaction_config::CompactionMode; -use risingwave_pb::hummock::{CompactionConfig, LevelType}; +use risingwave_pb::hummock::CompactionConfig; pub use selector::{CompactionSelector, CompactionSelectorContext}; use self::selector::{EmergencySelector, LocalSelectorStatistic}; @@ -146,47 +146,6 @@ impl CompactStatus { None } - pub fn is_trivial_move_task(task: &CompactTask) -> bool { - if task.task_type != TaskType::Dynamic && task.task_type != TaskType::Emergency { - return false; - } - - if task.input_ssts.len() != 2 || task.input_ssts[0].level_type != LevelType::Nonoverlapping - { - return false; - } - - // it may be a manual compaction task - if task.input_ssts[0].level_idx == task.input_ssts[1].level_idx - && task.input_ssts[0].level_idx > 0 - { - return false; - } - - if task.input_ssts[1].level_idx == task.target_level - && task.input_ssts[1].table_infos.is_empty() - { - return true; - } - - false - } - - pub fn is_trivial_reclaim(task: &CompactTask) -> bool { - // Currently all VnodeWatermark tasks are trivial reclaim. - if task.task_type == TaskType::VnodeWatermark { - return true; - } - let exist_table_ids = HashSet::::from_iter(task.existing_table_ids.clone()); - task.input_ssts.iter().all(|level| { - level.table_infos.iter().all(|sst| { - sst.table_ids - .iter() - .all(|table_id| !exist_table_ids.contains(table_id)) - }) - }) - } - pub fn report_compact_task(&mut self, compact_task: &CompactTask) { for level in &compact_task.input_ssts { self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id); diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index cb966bdc81fb6..e24e86d80c508 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -142,6 +142,10 @@ impl LevelCompactionPicker { } else { 0 }, + self.config + .sst_allowed_trivial_move_max_count + .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count()) + as usize, ); trivial_move_picker.pick_trivial_move_task( diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index 4f71f9b2780b1..6f201c1ef6acd 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -267,51 +267,53 @@ impl IntraCompactionPicker { continue; } - let trivial_move_picker = TrivialMovePicker::new(0, 0, overlap_strategy.clone(), 0); + let trivial_move_picker = TrivialMovePicker::new( + 0, + 0, + overlap_strategy.clone(), + 0, + self.config + .sst_allowed_trivial_move_max_count + .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count()) + as usize, + ); - let select_sst = trivial_move_picker.pick_trivial_move_sst( + if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts( &l0.sub_levels[idx + 1].table_infos, &level.table_infos, level_handlers, stats, - ); - - // only pick tables for trivial move - if select_sst.is_none() { - continue; + ) { + let mut overlap = overlap_strategy.create_overlap_info(); + select_ssts.iter().for_each(|ssts| overlap.update(ssts)); + + assert!(overlap + .check_multiple_overlap(&l0.sub_levels[idx].table_infos) + .is_empty()); + + let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum(); + let total_file_count = select_ssts.len() as u64; + let input_levels = vec![ + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping, + table_infos: select_ssts, + }, + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping, + table_infos: vec![], + }, + ]; + return Some(CompactionInput { + input_levels, + target_level: 0, + target_sub_level_id: level.sub_level_id, + select_input_size, + total_file_count, + ..Default::default() + }); } - - let select_sst = select_sst.unwrap(); - - // support trivial move cross multi sub_levels - let mut overlap = overlap_strategy.create_overlap_info(); - overlap.update(&select_sst); - - assert!(overlap - .check_multiple_overlap(&l0.sub_levels[idx].table_infos) - .is_empty()); - - let select_input_size = select_sst.sst_size; - let input_levels = vec![ - InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping, - table_infos: vec![select_sst], - }, - InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping, - table_infos: vec![], - }, - ]; - return Some(CompactionInput { - input_levels, - target_level: 0, - target_sub_level_id: level.sub_level_id, - select_input_size, - total_file_count: 1, - ..Default::default() - }); } None } diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 682628969a65a..0b275ae4af77a 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -29,6 +29,8 @@ pub struct TrivialMovePicker { // The minimum size of sst that can be selected as trivial move. sst_allowed_trivial_move_min_size: u64, + + sst_allowed_trivial_move_max_count: usize, } impl TrivialMovePicker { @@ -37,45 +39,85 @@ impl TrivialMovePicker { target_level: usize, overlap_strategy: Arc, sst_allowed_trivial_move_min_size: u64, + sst_allowed_trivial_move_max_count: usize, ) -> Self { Self { level, target_level, overlap_strategy, sst_allowed_trivial_move_min_size, + sst_allowed_trivial_move_max_count, } } - pub fn pick_trivial_move_sst( + pub fn pick_multi_trivial_move_ssts( &self, select_tables: &[SstableInfo], target_tables: &[SstableInfo], level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, - ) -> Option { + ) -> Option> { let mut skip_by_pending = false; + + // means we have already found some sst but not match `sst_allowed_trivial_move_max_count`. + let mut skip_by_size = false; + let mut result = vec![]; + + let mut overlap_info = self.overlap_strategy.create_overlap_info(); for sst in select_tables { + if result.len() >= self.sst_allowed_trivial_move_max_count { + break; + } + + // find the first sst that can be trivial moved. if sst.sst_size < self.sst_allowed_trivial_move_min_size { + skip_by_size = true; + + // Stop probing if we have already found some sst to move. And should avoid small sst move to target level. + if !result.is_empty() { + break; + } + continue; } if level_handlers[self.level].is_pending_compact(&sst.sst_id) { skip_by_pending = true; + + if !result.is_empty() { + break; + } + continue; } - let mut overlap_info = self.overlap_strategy.create_overlap_info(); overlap_info.update(sst); let overlap_files_range = overlap_info.check_multiple_overlap(target_tables); if overlap_files_range.is_empty() { - return Some(sst.clone()); + result.push(sst.clone()); + } else { + // stop probing if we have already found some sst to move. + if !result.is_empty() { + break; + } + + // reset overlap_info + overlap_info = self.overlap_strategy.create_overlap_info(); } } + if !result.is_empty() { + return Some(result); + } + if skip_by_pending { stats.skip_by_pending_files += 1; } + if skip_by_size { + stats.skip_by_count_limit += 1; + } + None } @@ -86,17 +128,17 @@ impl TrivialMovePicker { level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { - if let Some(trivial_move_sst) = - self.pick_trivial_move_sst(select_tables, target_tables, level_handlers, stats) + if let Some(trivial_move_ssts) = + self.pick_multi_trivial_move_ssts(select_tables, target_tables, level_handlers, stats) { return Some(CompactionInput { - select_input_size: trivial_move_sst.sst_size, + select_input_size: trivial_move_ssts.iter().map(|s| s.sst_size).sum(), total_file_count: 1, input_levels: vec![ InputLevel { level_idx: self.level as u32, level_type: LevelType::Nonoverlapping, - table_infos: vec![trivial_move_sst], + table_infos: trivial_move_ssts, }, InputLevel { level_idx: self.target_level as u32, @@ -143,8 +185,8 @@ pub mod tests { let overlap_strategy = create_overlap_strategy(config.compaction_mode()); { let trivial_move_picker = - super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 200); - let trivial_move_task = trivial_move_picker.pick_trivial_move_task( + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 200, 1); + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( &[sst.clone()], &[], &levels_handler, @@ -156,9 +198,9 @@ pub mod tests { { let trivial_move_picker = - super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 50); + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 50, 1); - let trivial_move_task = trivial_move_picker.pick_trivial_move_task( + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( &[sst.clone()], &[], &levels_handler, @@ -168,4 +210,103 @@ pub mod tests { assert!(trivial_move_task.is_some()); } } + + #[test] + fn test_pick_multi_trivial_move_sst() { + let sst1: SstableInfo = SstableInfoInner { + sst_id: 1, + file_size: 100, + sst_size: 100, + ..Default::default() + } + .into(); + let sst2: SstableInfo = SstableInfoInner { + sst_id: 2, + file_size: 100, + sst_size: 100, + ..Default::default() + } + .into(); + let sst3: SstableInfo = SstableInfoInner { + sst_id: 3, + file_size: 100, + sst_size: 100, + ..Default::default() + } + .into(); + let sst4: SstableInfo = SstableInfoInner { + sst_id: 4, + file_size: 100, + sst_size: 100, + ..Default::default() + } + .into(); + + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .level0_sub_level_compact_level_count(1) + .sst_allowed_trivial_move_min_size(Some(50)) + .build(), + ); + let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + let overlap_strategy = create_overlap_strategy(config.compaction_mode()); + { + let trivial_move_picker = + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 200, 10); + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( + &[sst1.clone(), sst2.clone(), sst3.clone(), sst4.clone()], + &[], + &levels_handler, + &mut Default::default(), + ); + + assert!(trivial_move_task.is_none()); + } + + { + let trivial_move_picker = + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 50, 10); + + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( + &[sst1.clone(), sst2.clone(), sst3.clone(), sst4.clone()], + &[], + &levels_handler, + &mut Default::default(), + ); + + assert!(trivial_move_task.is_some()); + assert_eq!(trivial_move_task.unwrap().len(), 4); + } + + { + let trivial_move_picker = + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 50, 2); + + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( + &[sst1.clone(), sst2.clone(), sst3.clone(), sst4.clone()], + &[], + &levels_handler, + &mut Default::default(), + ); + + assert!(trivial_move_task.is_some()); + assert_eq!(trivial_move_task.unwrap().len(), 2); + } + + { + levels_handler[0].test_add_pending_sst(2, 1); + let trivial_move_picker = + super::TrivialMovePicker::new(0, 1, overlap_strategy.clone(), 50, 4); + let trivial_move_task = trivial_move_picker.pick_multi_trivial_move_ssts( + &[sst1.clone(), sst2.clone(), sst3.clone(), sst4.clone()], + &[], + &levels_handler, + &mut Default::default(), + ); + + assert!(trivial_move_task.is_some()); + assert_eq!(trivial_move_task.unwrap().len(), 1); + } + } } diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs index dcfb4a59daa94..a310ad87d77ed 100644 --- a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -31,7 +31,7 @@ impl VnodeWatermarkCompactionPicker { } /// The current implementation only picks trivial reclaim task for the bottommost level. - /// Must modify [`crate::hummock::compaction::CompactStatus::is_trivial_reclaim`], if nontrivial reclaim is supported in the future. + /// Must modify `is_trivial_reclaim`, if non-trivial reclaim is supported in the future. pub fn pick_compaction( &mut self, levels: &Levels, diff --git a/src/meta/src/hummock/level_handler.rs b/src/meta/src/hummock/level_handler.rs index 03a17e3dd60d8..60b4b6fac184b 100644 --- a/src/meta/src/hummock/level_handler.rs +++ b/src/meta/src/hummock/level_handler.rs @@ -137,6 +137,11 @@ impl LevelHandler { pub fn compacting_files(&self) -> &HashMap { &self.compacting_files } + + #[cfg(test)] + pub(crate) fn test_add_pending_sst(&mut self, sst_id: HummockSstableId, task_id: u64) { + self.compacting_files.insert(sst_id, task_id); + } } impl From<&LevelHandler> for risingwave_pb::hummock::LevelHandler { diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 77d5c66c5042f..087811045df2f 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -605,6 +605,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::MaxOverlappingLevelSize(c) => { target.max_overlapping_level_size = Some(*c); } + MutableConfig::SstAllowedTrivialMoveMaxCount(c) => { + target.sst_allowed_trivial_move_max_count = Some(*c); + } MutableConfig::EmergencyLevel0SstFileCount(c) => { target.emergency_level0_sst_file_count = Some(*c); } diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index 8aadc994336a7..3ac75fca7f48e 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -88,7 +88,8 @@ use crate::hummock::manager::transaction::{ }; use crate::hummock::manager::versioning::Versioning; use crate::hummock::metrics_utils::{ - build_compact_task_level_type_metrics_label, trigger_local_table_stat, trigger_sst_stat, + build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat, + trigger_local_table_stat, }; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::next_compaction_task_id; @@ -148,7 +149,7 @@ fn init_selectors() -> HashMap { fn apply_compact_task(&mut self, compact_task: &CompactTask) { let mut version_delta = self.new_delta(); - let trivial_move = CompactStatus::is_trivial_move_task(compact_task); + let trivial_move = compact_task.is_trivial_move_task(); version_delta.trivial_move = trivial_move; let group_deltas = &mut version_delta @@ -798,8 +799,8 @@ impl HummockManager { ..Default::default() }; - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + let is_trivial_reclaim = compact_task.is_trivial_reclaim(); + let is_trivial_move = compact_task.is_trivial_move_task(); if is_trivial_reclaim || (is_trivial_move && can_trivial_move) { let log_label = if is_trivial_reclaim { "TrivialReclaim" @@ -906,6 +907,14 @@ impl HummockManager { .compact_task_batch_count .with_label_values(&["batch_trivial_move"]) .observe(trivial_tasks.len() as f64); + + for trivial_task in &trivial_tasks { + self.metrics + .compact_task_trivial_move_sst_count + .with_label_values(&[&trivial_task.compaction_group_id.to_string()]) + .observe(trivial_task.input_ssts[0].table_infos.len() as _); + } + drop(versioning_guard); } else { // We are using a single transaction to ensure that each task has progress when it is @@ -1045,10 +1054,7 @@ impl HummockManager { .await?; tasks.retain(|task| { if task.task_status == TaskStatus::Success { - debug_assert!( - CompactStatus::is_trivial_reclaim(task) - || CompactStatus::is_trivial_move_task(task) - ); + debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task()); false } else { true @@ -1073,10 +1079,7 @@ impl HummockManager { if task.task_status != TaskStatus::Success { return Ok(Some(task)); } - debug_assert!( - CompactStatus::is_trivial_reclaim(&task) - || CompactStatus::is_trivial_move_task(&task) - ); + debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task()); } Ok(None) } @@ -1282,40 +1285,17 @@ impl HummockManager { compact_task_assignment )?; } - let mut success_groups = vec![]; - for compact_task in tasks { - let task_status = compact_task.task_status; - let task_status_label = task_status.as_str_name(); - let task_type_label = compact_task.task_type.as_str_name(); + let mut success_groups = vec![]; + for compact_task in &tasks { self.compactor_manager .remove_task_heartbeat(compact_task.task_id); - - self.metrics - .compact_frequency - .with_label_values(&[ - "normal", - &compact_task.compaction_group_id.to_string(), - task_type_label, - task_status_label, - ]) - .inc(); - tracing::trace!( "Reported compaction task. {}. cost time: {:?}", - compact_task_to_string(&compact_task), + compact_task_to_string(compact_task), start_time.elapsed(), ); - trigger_sst_stat( - &self.metrics, - compaction - .compaction_statuses - .get(&compact_task.compaction_group_id), - &versioning_guard.current_version, - compact_task.compaction_group_id, - ); - if !deterministic_mode && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic) || matches!(compact_task.task_type, compact_task::TaskType::Emergency)) @@ -1327,10 +1307,17 @@ impl HummockManager { ); } - if task_status == TaskStatus::Success { + if compact_task.task_status == TaskStatus::Success { success_groups.push(compact_task.compaction_group_id); } } + + trigger_compact_tasks_stat( + &self.metrics, + &tasks, + &compaction.compaction_statuses, + &versioning_guard.current_version, + ); drop(versioning_guard); if !success_groups.is_empty() { self.try_update_write_limits(&success_groups).await; diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index c4355b0d5a42c..bbc6ecc4c3ee5 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -20,6 +20,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use itertools::{enumerate, Itertools}; use prometheus::core::{AtomicU64, GenericCounter}; use prometheus::IntGauge; +use risingwave_hummock_sdk::compact_task::CompactTask; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; use risingwave_hummock_sdk::level::Levels; use risingwave_hummock_sdk::table_stats::PbTableStatsMap; @@ -667,3 +668,42 @@ pub fn remove_compact_task_metrics( } } } + +pub fn trigger_compact_tasks_stat( + metrics: &MetaMetrics, + compact_tasks: &[CompactTask], + compact_status: &BTreeMap, + current_version: &HummockVersion, +) { + let mut task_status_label_map = HashMap::new(); + let mut task_type_label_map = HashMap::new(); + let mut group_label_map = HashMap::new(); + + for task in compact_tasks { + let task_status_label = task_status_label_map + .entry(task.task_status) + .or_insert_with(|| task.task_status.as_str_name().to_owned()); + + let task_type_label = task_type_label_map + .entry(task.task_type) + .or_insert_with(|| task.task_type.as_str_name().to_owned()); + + let group_label = group_label_map + .entry(task.compaction_group_id) + .or_insert_with(|| task.compaction_group_id.to_string()); + + metrics + .compact_frequency + .with_label_values(&["normal", group_label, task_type_label, task_status_label]) + .inc(); + } + + group_label_map.keys().for_each(|group_id| { + trigger_sst_stat( + metrics, + compact_status.get(group_id), + current_version, + *group_id, + ); + }); +} diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 0b7863ccbdad4..13e423148c2b1 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -169,6 +169,7 @@ pub struct MetaMetrics { pub split_compaction_group_count: IntCounterVec, pub state_table_count: IntGaugeVec, pub branched_sst_count: IntGaugeVec, + pub compact_task_trivial_move_sst_count: HistogramVec, pub compaction_event_consumed_latency: Histogram, pub compaction_event_loop_iteration_latency: Histogram, @@ -772,6 +773,14 @@ impl MetaMetrics { ) .unwrap(); + let opts = histogram_opts!( + "storage_compact_task_trivial_move_sst_count", + "sst count of compact trivial-move task", + exponential_buckets(1.0, 2.0, 8).unwrap() + ); + let compact_task_trivial_move_sst_count = + register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap(); + Self { grpc_latency, barrier_latency, @@ -834,6 +843,7 @@ impl MetaMetrics { compact_task_size, compact_task_file_count, compact_task_batch_count, + compact_task_trivial_move_sst_count, table_write_throughput, split_compaction_group_count, state_table_count, diff --git a/src/storage/hummock_sdk/src/compact_task.rs b/src/storage/hummock_sdk/src/compact_task.rs index d173f6d252725..5096c272b5a91 100644 --- a/src/storage/hummock_sdk/src/compact_task.rs +++ b/src/storage/hummock_sdk/src/compact_task.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::mem::size_of; use itertools::Itertools; -use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus}; +use risingwave_pb::hummock::compact_task::{PbTaskStatus, PbTaskType, TaskStatus, TaskType}; use risingwave_pb::hummock::subscribe_compaction_event_request::PbReportTask; use risingwave_pb::hummock::{ - PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats, PbValidationTask, + LevelType, PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats, + PbValidationTask, }; use crate::key_range::KeyRange; @@ -112,6 +113,47 @@ impl CompactTask { .map(|table_watermark| size_of::() + table_watermark.estimated_encode_len()) .sum::() } + + pub fn is_trivial_move_task(&self) -> bool { + if self.task_type != TaskType::Dynamic && self.task_type != TaskType::Emergency { + return false; + } + + if self.input_ssts.len() != 2 || self.input_ssts[0].level_type != LevelType::Nonoverlapping + { + return false; + } + + // it may be a manual compaction task + if self.input_ssts[0].level_idx == self.input_ssts[1].level_idx + && self.input_ssts[0].level_idx > 0 + { + return false; + } + + if self.input_ssts[1].level_idx == self.target_level + && self.input_ssts[1].table_infos.is_empty() + { + return true; + } + + false + } + + pub fn is_trivial_reclaim(&self) -> bool { + // Currently all VnodeWatermark tasks are trivial reclaim. + if self.task_type == TaskType::VnodeWatermark { + return true; + } + let exist_table_ids = HashSet::::from_iter(self.existing_table_ids.clone()); + self.input_ssts.iter().all(|level| { + level.table_infos.iter().all(|sst| { + sst.table_ids + .iter() + .all(|table_id| !exist_table_ids.contains(table_id)) + }) + }) + } } impl From for CompactTask {