Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
RusovDmitriy committed Jan 15, 2024
1 parent 789c87a commit 0af7652
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 45 deletions.
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod partition_filter;
mod planning;
pub use planning::PlanningMeta;
mod check_memory;
pub mod physical_plan_flags;
pub mod pretty_printers;
pub mod query_executor;
pub mod serialized_plan;
Expand Down
44 changes: 44 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/physical_plan_flags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use datafusion::physical_plan::merge_sort::MergeSortExec;
use datafusion::physical_plan::ExecutionPlan;

use serde::Serialize;
use serde_json::{json, Value};

#[derive(Serialize)]
pub struct PhysicalPlanFlags {
pub merge_sort_node: bool,
}

impl PhysicalPlanFlags {
pub fn enough_to_fill(&self) -> bool {
self.merge_sort_node
}

pub fn is_suboptimal_query(&self) -> bool {
!self.merge_sort_node
}

pub fn to_json(&self) -> Value {
json!(self)
}

pub fn with_execution_plan(p: &dyn ExecutionPlan) -> Self {
let mut flags = PhysicalPlanFlags {
merge_sort_node: false,
};
PhysicalPlanFlags::physical_plan_flags_fill(p, &mut flags);
flags
}

fn physical_plan_flags_fill(p: &dyn ExecutionPlan, flags: &mut PhysicalPlanFlags) {
if p.as_any().is::<MergeSortExec>() {
flags.merge_sort_node = true;
}
if flags.enough_to_fill() {
return;
}
for child in p.children() {
PhysicalPlanFlags::physical_plan_flags_fill(child.as_ref(), flags);
}
}
}
43 changes: 0 additions & 43 deletions rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::skip::SkipExec;
use datafusion::physical_plan::union::UnionExec;
use serde::Serialize;
use serde_json::{json, Value};

#[derive(Default, Clone, Copy)]
pub struct PPOptions {
Expand Down Expand Up @@ -495,44 +493,3 @@ fn pp_row_range(r: &RowRange) -> String {
};
format!("[{},{})", s, e)
}

#[derive(Serialize)]
pub struct PhysPlanFlags {
pub merge_sort_node: bool,
}

impl PhysPlanFlags {
pub fn enough_to_fill(&self) -> bool {
self.merge_sort_node
}

pub fn is_suboptimal_query(&self) -> bool {
!self.merge_sort_node
}

pub fn to_json(&self) -> Value {
json!(self)
}
}

pub fn phys_plan_flags(p: &dyn ExecutionPlan) -> PhysPlanFlags {
let mut flags = PhysPlanFlags {
merge_sort_node: false,
};
phys_plan_flags_fill(p, &mut flags);
flags
}

fn phys_plan_flags_fill(p: &dyn ExecutionPlan, flags: &mut PhysPlanFlags) {
if p.as_any().is::<MergeSortExec>() {
flags.merge_sort_node = true;
}

if flags.enough_to_fill() {
return;
}

for child in p.children() {
phys_plan_flags_fill(child.as_ref(), flags);
}
}
5 changes: 3 additions & 2 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::metastore::table::Table;
use crate::metastore::{Column, ColumnType, IdRow, Index, Partition};
use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec;
use crate::queryplanner::optimizations::CubeQueryPlanner;
use crate::queryplanner::physical_plan_flags::PhysicalPlanFlags;
use crate::queryplanner::planning::{get_worker_plan, Snapshot, Snapshots};
use crate::queryplanner::pretty_printers::{phys_plan_flags, pp_phys_plan, pp_plan};
use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan};
use crate::queryplanner::serialized_plan::{IndexSnapshot, RowFilter, RowRange, SerializedPlan};
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::store::DataFrame;
Expand Down Expand Up @@ -128,7 +129,7 @@ impl QueryExecutor for QueryExecutorImpl {
pp_phys_plan(split_plan.as_ref())
);

let flags = phys_plan_flags(split_plan.as_ref());
let flags = PhysicalPlanFlags::with_execution_plan(split_plan.as_ref());
if flags.is_suboptimal_query() {
if let Some(trace_obj) = trace_obj.as_ref() {
suboptimal_query_plan_event(trace_obj, flags.to_json())?;
Expand Down

0 comments on commit 0af7652

Please sign in to comment.