diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index 4a210e14c8489..04406f407ae38 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -5,6 +5,7 @@ mod partition_filter; mod planning; pub use planning::PlanningMeta; mod check_memory; +pub mod physical_plan_flags; pub mod pretty_printers; pub mod query_executor; pub mod serialized_plan; diff --git a/rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs b/rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs new file mode 100644 index 0000000000000..59e251106f493 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs @@ -0,0 +1,44 @@ +use datafusion::physical_plan::merge_sort::MergeSortExec; +use datafusion::physical_plan::ExecutionPlan; + +use serde::Serialize; +use serde_json::{json, Value}; + +#[derive(Serialize)] +pub struct PhysicalPlanFlags { + pub merge_sort_node: bool, +} + +impl PhysicalPlanFlags { + pub fn enough_to_fill(&self) -> bool { + self.merge_sort_node + } + + pub fn is_suboptimal_query(&self) -> bool { + !self.merge_sort_node + } + + pub fn to_json(&self) -> Value { + json!(self) + } + + pub fn with_execution_plan(p: &dyn ExecutionPlan) -> Self { + let mut flags = PhysicalPlanFlags { + merge_sort_node: false, + }; + PhysicalPlanFlags::physical_plan_flags_fill(p, &mut flags); + flags + } + + fn physical_plan_flags_fill(p: &dyn ExecutionPlan, flags: &mut PhysicalPlanFlags) { + if p.as_any().is::() { + flags.merge_sort_node = true; + } + if flags.enough_to_fill() { + return; + } + for child in p.children() { + PhysicalPlanFlags::physical_plan_flags_fill(child.as_ref(), flags); + } + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs index d4ffb0f6fd166..49c21f53f213f 100644 --- a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs +++ b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs @@ -43,8 +43,6 @@ use datafusion::physical_plan::parquet::ParquetExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::skip::SkipExec; use datafusion::physical_plan::union::UnionExec; -use serde::Serialize; -use serde_json::{json, Value}; #[derive(Default, Clone, Copy)] pub struct PPOptions { @@ -495,44 +493,3 @@ fn pp_row_range(r: &RowRange) -> String { }; format!("[{},{})", s, e) } - -#[derive(Serialize)] -pub struct PhysPlanFlags { - pub merge_sort_node: bool, -} - -impl PhysPlanFlags { - pub fn enough_to_fill(&self) -> bool { - self.merge_sort_node - } - - pub fn is_suboptimal_query(&self) -> bool { - !self.merge_sort_node - } - - pub fn to_json(&self) -> Value { - json!(self) - } -} - -pub fn phys_plan_flags(p: &dyn ExecutionPlan) -> PhysPlanFlags { - let mut flags = PhysPlanFlags { - merge_sort_node: false, - }; - phys_plan_flags_fill(p, &mut flags); - flags -} - -fn phys_plan_flags_fill(p: &dyn ExecutionPlan, flags: &mut PhysPlanFlags) { - if p.as_any().is::() { - flags.merge_sort_node = true; - } - - if flags.enough_to_fill() { - return; - } - - for child in p.children() { - phys_plan_flags_fill(child.as_ref(), flags); - } -} diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 2685a5f1ff9d6..6db62961e9842 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -6,8 +6,9 @@ use crate::metastore::table::Table; use crate::metastore::{Column, ColumnType, IdRow, Index, Partition}; use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec; use crate::queryplanner::optimizations::CubeQueryPlanner; +use crate::queryplanner::physical_plan_flags::PhysicalPlanFlags; use crate::queryplanner::planning::{get_worker_plan, Snapshot, Snapshots}; -use crate::queryplanner::pretty_printers::{phys_plan_flags, pp_phys_plan, pp_plan}; +use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan}; use crate::queryplanner::serialized_plan::{IndexSnapshot, RowFilter, RowRange, SerializedPlan}; use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::store::DataFrame; @@ -128,7 +129,7 @@ impl QueryExecutor for QueryExecutorImpl { pp_phys_plan(split_plan.as_ref()) ); - let flags = phys_plan_flags(split_plan.as_ref()); + let flags = PhysicalPlanFlags::with_execution_plan(split_plan.as_ref()); if flags.is_suboptimal_query() { if let Some(trace_obj) = trace_obj.as_ref() { suboptimal_query_plan_event(trace_obj, flags.to_json())?;