Skip to content

Commit

Permalink
[WIP] fix(cubesql): Make cube join check stricter
Browse files Browse the repository at this point in the history
Now it should disallow any plans with ungrouped CubeScan inside, like Join(CubeScan, Projection(CubeScan(ungrouped=true)))
  • Loading branch information
mcheshkov committed Feb 25, 2025
1 parent cd90d56 commit 8890e20
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 20 deletions.
38 changes: 38 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,44 @@ macro_rules! generate_sql_for_timestamp {
}

impl CubeScanWrapperNode {
pub fn has_ungrouped_scan(&self) -> bool {
Self::has_ungrouped_wrapped_node(self.wrapped_plan.as_ref())
}

Check warning on line 558 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L556-L558

Added lines #L556 - L558 were not covered by tests

fn has_ungrouped_wrapped_node(node: &LogicalPlan) -> bool {
match node {
LogicalPlan::Extension(Extension { node }) => {
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
cube_scan.request.ungrouped == Some(true)
} else if let Some(wrapped_select) =
node.as_any().downcast_ref::<WrappedSelectNode>()

Check warning on line 566 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L560-L566

Added lines #L560 - L566 were not covered by tests
{
// Don't really care if push-to-Cube or not, any aggregation should be ok here from execution perspective
if wrapped_select.select_type == WrappedSelectType::Aggregate {
false

Check warning on line 570 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L569-L570

Added lines #L569 - L570 were not covered by tests
} else {
Self::has_ungrouped_wrapped_node(wrapped_select.from.as_ref())
|| wrapped_select
.joins
.iter()
.map(|(join, _, _)| join.as_ref())
.any(Self::has_ungrouped_wrapped_node)
|| wrapped_select
.subqueries
.iter()
.map(|subq| subq.as_ref())
.any(Self::has_ungrouped_wrapped_node)

Check warning on line 582 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L572-L582

Added lines #L572 - L582 were not covered by tests
}
} else {
false

Check warning on line 585 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L585

Added line #L585 was not covered by tests
}
}
LogicalPlan::EmptyRelation(_) => false,

Check warning on line 588 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L588

Added line #L588 was not covered by tests
// Everything else is unexpected actually
_ => false,

Check warning on line 590 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L590

Added line #L590 was not covered by tests
}
}

Check warning on line 592 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L592

Added line #L592 was not covered by tests

pub async fn generate_sql(
&self,
transport: Arc<dyn TransportService>,
Expand Down
83 changes: 65 additions & 18 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use datafusion::{
plan::{Aggregate, Extension, Filter, Join, Projection, Sort, TableUDFs, Window},
replace_col_to_expr, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Distinct,
EmptyRelation, Expr, ExprRewritable, ExprRewriter, GroupingSet, Like, Limit, LogicalPlan,
LogicalPlanBuilder, TableScan, Union,
LogicalPlanBuilder, Repartition, Subquery, TableScan, Union,
},
physical_plan::planner::DefaultPhysicalPlanner,
scalar::ScalarValue,
Expand Down Expand Up @@ -1350,10 +1350,18 @@ impl LanguageToLogicalPlanConverter {
LogicalPlanLanguage::Join(params) => {
let left_on = match_data_node!(node_by_id, params[2], JoinLeftOn);
let right_on = match_data_node!(node_by_id, params[3], JoinRightOn);
let left = self.to_logical_plan(params[0]);
let right = self.to_logical_plan(params[1]);

if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
let left = self.to_logical_plan(params[0])?;
let right = self.to_logical_plan(params[1])?;

// It's OK to join two grouped queries: expected row count is not that high, so
// SQL API can, potentially, evaluate it completely
// We don't really want it, so cost function should make WrappedSelect preferable
// but still, we don't want to hard error on that
// But if any one of join sides is ungroued, SQL API does not have much of a choice
// but to process every row from ungrouped query, and that's Not Good
if Self::have_ungrouped_cube_scan_inside(&left)
|| Self::have_ungrouped_cube_scan_inside(&right)
{
if left_on.iter().any(|c| c.name == "__cubeJoinField")
|| right_on.iter().any(|c| c.name == "__cubeJoinField")
{
Expand All @@ -1370,8 +1378,8 @@ impl LanguageToLogicalPlanConverter {
}
}

let left = Arc::new(left?);
let right = Arc::new(right?);
let left = Arc::new(left);
let right = Arc::new(right);

let join_type = match_data_node!(node_by_id, params[4], JoinJoinType);
let join_constraint = match_data_node!(node_by_id, params[5], JoinJoinConstraint);
Expand All @@ -1394,7 +1402,18 @@ impl LanguageToLogicalPlanConverter {
})
}
LogicalPlanLanguage::CrossJoin(params) => {
if self.is_cube_scan_node(params[0]) && self.is_cube_scan_node(params[1]) {
let left = self.to_logical_plan(params[0])?;
let right = self.to_logical_plan(params[1])?;

// See comment in Join conversion
// Note that DF can generate Filter(CrossJoin(...)) for complex join conditions
// But, from memory or dataset perspective it's the same: DF would buffer left side completely
// And then iterate over right side, evaluting predicate
// Regular join would use hash partitioning here, so it would be quicker, and utilize less CPU,
// but transfer and buffering will be the same
if Self::have_ungrouped_cube_scan_inside(&left)
|| Self::have_ungrouped_cube_scan_inside(&right)
{
return Err(CubeError::internal(
"Can not join Cubes. This is most likely due to one of the following reasons:\n\
• one of the cubes contains a group by\n\
Expand All @@ -1403,8 +1422,8 @@ impl LanguageToLogicalPlanConverter {
));
}

let left = Arc::new(self.to_logical_plan(params[0])?);
let right = Arc::new(self.to_logical_plan(params[1])?);
let left = Arc::new(left);
let right = Arc::new(right);
let schema = Arc::new(left.schema().join(right.schema())?);

LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -2304,16 +2323,44 @@ impl LanguageToLogicalPlanConverter {
})
}

fn is_cube_scan_node(&self, node_id: Id) -> bool {
let node_by_id = &self.best_expr;
match node_by_id.index(node_id) {
LogicalPlanLanguage::CubeScan(_) | LogicalPlanLanguage::CubeScanWrapper(_) => {
return true
fn have_ungrouped_cube_scan_inside(node: &LogicalPlan) -> bool {
match node {
LogicalPlan::Projection(Projection { input, .. })
| LogicalPlan::Filter(Filter { input, .. })
| LogicalPlan::Window(Window { input, .. })

Check warning on line 2330 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2330

Added line #L2330 was not covered by tests
| LogicalPlan::Aggregate(Aggregate { input, .. })
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Limit(Limit { input, .. }) => {

Check warning on line 2334 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2332-L2334

Added lines #L2332 - L2334 were not covered by tests
Self::have_ungrouped_cube_scan_inside(input)
}
LogicalPlan::Join(Join { left, right, .. })
| LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
Self::have_ungrouped_cube_scan_inside(left)
|| Self::have_ungrouped_cube_scan_inside(right)
}
LogicalPlan::Union(Union { inputs, .. }) => {
inputs.iter().any(Self::have_ungrouped_cube_scan_inside)
}
LogicalPlan::Subquery(Subquery {
input, subqueries, ..
}) => {
Self::have_ungrouped_cube_scan_inside(input)
|| subqueries.iter().any(Self::have_ungrouped_cube_scan_inside)

Check warning on line 2349 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2346-L2349

Added lines #L2346 - L2349 were not covered by tests
}
LogicalPlan::Extension(Extension { node }) => {
if let Some(cube_scan) = node.as_any().downcast_ref::<CubeScanNode>() {
cube_scan.request.ungrouped == Some(true)
} else if let Some(cube_scan_wrapper) =
node.as_any().downcast_ref::<CubeScanWrapperNode>()

Check warning on line 2355 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2354-L2355

Added lines #L2354 - L2355 were not covered by tests
{
cube_scan_wrapper.has_ungrouped_scan()

Check warning on line 2357 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2357

Added line #L2357 was not covered by tests
} else {
false

Check warning on line 2359 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L2359

Added line #L2359 was not covered by tests
}
}
_ => (),
_ => false,
}

return false;
}
}

Expand Down
8 changes: 6 additions & 2 deletions rust/cubesql/cubesql/src/compile/test/test_cube_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ async fn test_join_cubes_on_wrong_field_error() {
let query = convert_sql_to_cube_query(
&r#"
SELECT *
FROM KibanaSampleDataEcommerce
LEFT JOIN Logs ON (KibanaSampleDataEcommerce.has_subscription = Logs.read)
FROM (SELECT customer_gender, has_subscription FROM KibanaSampleDataEcommerce) kibana
LEFT JOIN (SELECT read, content FROM Logs) logs ON (kibana.has_subscription = logs.read)
"#
.to_string(),
meta.clone(),
Expand Down Expand Up @@ -567,7 +567,9 @@ async fn test_join_cubes_with_aggr_error() {
)
}

// TODO it seems this query should not execute: it has join of grouped CubeScan with ungrouped CubeScan by __cubeJoinField
#[tokio::test]
#[ignore]
async fn test_join_cubes_with_postprocessing() {
if !Rewriter::sql_push_down_enabled() {
return;
Expand Down Expand Up @@ -621,7 +623,9 @@ async fn test_join_cubes_with_postprocessing() {
)
}

// TODO it seems this test is not necessary:: this case is covered by ungrouped-grouped join, and we explicitly forbid executing joins with ungrouped scans in DF
#[tokio::test]
#[ignore]
async fn test_join_cubes_with_postprocessing_and_no_cubejoinfield() {
if !Rewriter::sql_push_down_enabled() {
return;
Expand Down

0 comments on commit 8890e20

Please sign in to comment.