Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): Parameterized cache #7670

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3878,6 +3878,38 @@ ORDER BY \"COUNT(count)\" DESC"
);
}

#[tokio::test]
async fn tableau_mul_null_by_timestamp() {
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT ((CAST('1900-01-01 00:00:00' AS TIMESTAMP) + NULL * INTERVAL '1 DAY') + 1 * INTERVAL '1 DAY') AS \"TEMP(Test)(4169571243)(0)\" FROM \"public\".\"KibanaSampleDataEcommerce\" \"KibanaSampleDataEcommerce\" HAVING (COUNT(1) > 0)".to_string(),
DatabaseProtocol::PostgreSQL,
).await;

let logical_plan = query_plan.as_logical_plan();
assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.count".to_string()]),
segments: Some(vec![]),
dimensions: Some(vec![]),
time_dimensions: None,
order: None,
limit: None,
offset: None,
filters: Some(vec![V1LoadRequestQueryFilterItem {
member: Some("KibanaSampleDataEcommerce.count".to_string()),
operator: Some("gt".to_string()),
values: Some(vec!["0".to_string()]),
or: None,
and: None,
}]),
ungrouped: None,
}
);
}

#[tokio::test]
async fn measure_used_on_dimension() {
init_logger();
Expand Down
106 changes: 93 additions & 13 deletions rust/cubesql/cubesql/src/compile/rewrite/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

#[derive(Clone, Debug)]
pub struct LogicalPlanData {
pub original_expr: Option<Expr>,
pub original_expr: Option<OriginalExpr>,
pub member_name_to_expr: Option<Vec<(Option<String>, Member, Expr)>>,
pub trivial_push_down: Option<usize>,
pub column: Option<Column>,
Expand All @@ -46,6 +46,12 @@
pub is_empty_list: Option<bool>,
}

#[derive(Debug, Clone)]
pub enum OriginalExpr {
Expr(Expr),
List(Vec<Expr>),
}

#[derive(Debug, Clone)]
pub enum ConstantFolding {
Scalar(ScalarValue),
Expand Down Expand Up @@ -230,7 +236,13 @@
// "Single node expected but {:?} found",
// self.egraph.index(index).nodes
// );
&self.egraph.index(index).nodes[0]
&self
.egraph
.index(index)
.nodes
.iter()

Check warning on line 243 in rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs#L240-L243

Added lines #L240 - L243 were not covered by tests
.find(|n| !matches!(n, LogicalPlanLanguage::QueryParam(_)))
.unwrap_or(&self.egraph.index(index).nodes[0])
}
}

Expand All @@ -245,15 +257,24 @@
fn make_original_expr(
egraph: &EGraph<LogicalPlanLanguage, Self>,
enode: &LogicalPlanLanguage,
) -> Option<Expr> {
let id_to_expr = |id| {
) -> Option<OriginalExpr> {
let id_to_original_expr = |id| {
egraph[id].data.original_expr.clone().ok_or_else(|| {
CubeError::internal(format!(
"Original expr wasn't prepared for {:?}",
egraph[id]
))
})
};
let id_to_expr = |id| {
id_to_original_expr(id).and_then(|e| match e {
OriginalExpr::Expr(expr) => Ok(expr),
OriginalExpr::List(_) => Err(CubeError::internal(format!(
"Original expr list can't be used in expr eval {:?}",
egraph[id]
))),
})
};
let original_expr = if is_expr_node(enode) {
node_to_expr(
enode,
Expand All @@ -262,8 +283,29 @@
&SingleNodeIndex { egraph },
)
.ok()
.map(|expr| OriginalExpr::Expr(expr))
} else {
None
// While not used directly in expression evaluation OriginalExpr::List is used to trigger parent data invalidation
// Going forward should be used for expression evaluation as well
match enode {
LogicalPlanLanguage::CaseExprWhenThenExpr(params)
| LogicalPlanLanguage::CaseExprElseExpr(params)
| LogicalPlanLanguage::CaseExprExpr(params)
| LogicalPlanLanguage::AggregateFunctionExprArgs(params)
| LogicalPlanLanguage::AggregateUDFExprArgs(params)
| LogicalPlanLanguage::ScalarFunctionExprArgs(params)
| LogicalPlanLanguage::ScalarUDFExprArgs(params) => {
let mut list = Vec::new();
for id in params {
match id_to_original_expr(*id).ok()? {
OriginalExpr::Expr(expr) => list.push(expr),
OriginalExpr::List(exprs) => list.extend(exprs),
}
}
Some(OriginalExpr::List(list))
}
_ => None,
}
};
original_expr
}
Expand All @@ -276,6 +318,7 @@
match enode {
LogicalPlanLanguage::ColumnExpr(_) => Some(0),
LogicalPlanLanguage::LiteralExpr(_) => Some(0),
LogicalPlanLanguage::QueryParam(_) => Some(0),
LogicalPlanLanguage::AliasExpr(params) => trivial_push_down(params[0]),
LogicalPlanLanguage::ProjectionExpr(params)
| LogicalPlanLanguage::AggregateAggrExpr(params)
Expand Down Expand Up @@ -333,7 +376,17 @@
) -> Option<Vec<(Option<String>, Member, Expr)>> {
let column_name = |id| egraph.index(id).data.column.clone();
let id_to_column_name_to_expr = |id| egraph.index(id).data.member_name_to_expr.clone();
let original_expr = |id| egraph.index(id).data.original_expr.clone();
let original_expr = |id| {
egraph
.index(id)
.data
.original_expr
.clone()
.and_then(|e| match e {
OriginalExpr::Expr(expr) => Some(expr),
OriginalExpr::List(_) => None,

Check warning on line 387 in rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs#L387

Added line #L387 was not covered by tests
})
};
let literal_member_relation = |id| {
egraph
.index(id)
Expand Down Expand Up @@ -641,7 +694,17 @@
egraph: &EGraph<LogicalPlanLanguage, Self>,
enode: &LogicalPlanLanguage,
) -> Option<Vec<(Expr, String, Option<bool>)>> {
let original_expr = |id| egraph.index(id).data.original_expr.clone();
let original_expr = |id| {
egraph
.index(id)
.data
.original_expr
.clone()
.and_then(|e| match e {
OriginalExpr::Expr(expr) => Some(expr),
OriginalExpr::List(_) => None,

Check warning on line 705 in rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs#L705

Added line #L705 was not covered by tests
})
};
let id_to_column_name = |id| egraph.index(id).data.column.clone();
let column_name_to_alias = |id| egraph.index(id).data.expr_to_alias.clone();
let mut map = Vec::new();
Expand Down Expand Up @@ -686,7 +749,17 @@
enode: &LogicalPlanLanguage,
) -> Option<Vec<Expr>> {
let referenced_columns = |id| egraph.index(id).data.referenced_expr.clone();
let original_expr = |id| egraph.index(id).data.original_expr.clone();
let original_expr = |id| {
egraph
.index(id)
.data
.original_expr
.clone()
.and_then(|e| match e {
OriginalExpr::Expr(expr) => Some(expr),
OriginalExpr::List(_) => None,

Check warning on line 760 in rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs#L760

Added line #L760 was not covered by tests
})
};
let column_name = |id| egraph.index(id).data.column.clone();
let push_referenced_columns = |id, columns: &mut Vec<Expr>| -> Option<()> {
if let Some(col) = column_name(id) {
Expand Down Expand Up @@ -769,6 +842,7 @@
Some(vec)
}
LogicalPlanLanguage::LiteralExpr(_) => Some(vec),
LogicalPlanLanguage::QueryParam(_) => Some(vec),
LogicalPlanLanguage::SortExpr(params) => {
if column_name(params[0]).is_some() {
let expr = original_expr(params[0])?;
Expand Down Expand Up @@ -821,13 +895,13 @@
};
match enode {
LogicalPlanLanguage::LiteralExpr(_) => {
let expr = node_to_expr(
let result = node_to_expr(
enode,
&egraph.analysis.cube_context,
&constant_expr,
&SingleNodeIndex { egraph },
)
.ok()?;
);
let expr = result.ok()?;
match expr {
Expr::Literal(value) => Some(ConstantFolding::Scalar(value)),
_ => panic!("Expected Literal but got: {:?}", expr),
Expand Down Expand Up @@ -1126,7 +1200,7 @@
}
}

fn merge_option_field<T: Clone>(
fn merge_option_field<T: Clone + Debug>(
&mut self,
a: &mut LogicalPlanData,
mut b: LogicalPlanData,
Expand Down Expand Up @@ -1193,7 +1267,9 @@
if let Some(ConstantFolding::Scalar(c)) = &egraph[id].data.constant {
// As ConstantFolding goes through Alias we can't add LiteralExpr at this level otherwise it gets dropped.
// In case there's wrapping node on top of Alias that can be evaluated to LiteralExpr further it gets replaced instead.
if let Some(Expr::Alias(_, _)) = egraph[id].data.original_expr.as_ref() {
if let Some(OriginalExpr::Expr(Expr::Alias(_, _))) =
egraph[id].data.original_expr.as_ref()
{
return;
}
// TODO: ideally all constants should be aliased, but this requires
Expand All @@ -1210,6 +1286,10 @@
.data
.original_expr
.as_ref()
.and_then(|e| match e {
OriginalExpr::Expr(expr) => Some(expr),
OriginalExpr::List(_) => None,

Check warning on line 1291 in rust/cubesql/cubesql/src/compile/rewrite/analysis.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/analysis.rs#L1291

Added line #L1291 was not covered by tests
})
.map(|expr| expr.name(&DFSchema::empty()).unwrap())
} else {
None
Expand Down
18 changes: 13 additions & 5 deletions rust/cubesql/cubesql/src/compile/rewrite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
pub mod rules;

use crate::{
compile::rewrite::analysis::{LogicalPlanAnalysis, Member},
compile::rewrite::analysis::{LogicalPlanAnalysis, Member, OriginalExpr},
CubeError,
};
use datafusion::{
Expand Down Expand Up @@ -1361,10 +1361,18 @@
egraph: &EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
id: Id,
) -> Option<String> {
egraph[id].data.original_expr.as_ref().map(|e| match e {
Expr::Column(c) => c.name.to_string(),
_ => e.name(&DFSchema::empty()).unwrap(),
})
egraph[id]
.data
.original_expr
.as_ref()
.and_then(|e| match e {
OriginalExpr::Expr(e) => Some(e),
_ => None,

Check warning on line 1370 in rust/cubesql/cubesql/src/compile/rewrite/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/mod.rs#L1370

Added line #L1370 was not covered by tests
})
.map(|e| match e {
Expr::Column(c) => c.name.to_string(),
_ => e.name(&DFSchema::empty()).unwrap(),
})
}

fn search_match_chained<'a>(
Expand Down
20 changes: 11 additions & 9 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@

let (plan, qtrace_egraph_iterations) = tokio::task::spawn_blocking(move || {
let (runner, qtrace_egraph_iterations) =
Self::run_rewrites(&cube_context, egraph, rules)?;
Self::run_rewrites(&cube_context, egraph, rules, "intermediate")?;

Ok::<_, CubeError>((runner.egraph, qtrace_egraph_iterations))
})
Expand Down Expand Up @@ -329,7 +329,7 @@
let (plan, qtrace_egraph_iterations, qtrace_best_graph) =
tokio::task::spawn_blocking(move || {
let (runner, qtrace_egraph_iterations) =
Self::run_rewrites(&cube_context, egraph, rules)?;
Self::run_rewrites(&cube_context, egraph, rules, "final")?;

let extractor = Extractor::new(&runner.egraph, BestCubePlan);
let (best_cost, best) = extractor.find_best(root);
Expand Down Expand Up @@ -374,6 +374,7 @@
cube_context: &Arc<CubeContext>,
egraph: EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
rules: Arc<Vec<Rewrite<LogicalPlanLanguage, LogicalPlanAnalysis>>>,
stage: &str,
) -> Result<(CubeRunner, Vec<QtraceEgraphIteration>), CubeError> {
let runner = Self::rewrite_runner(cube_context.clone(), egraph);
let runner = runner.run(rules.iter());
Expand All @@ -394,20 +395,21 @@
}
};
if IterInfo::egraph_debug_enabled() {
let _ = fs::create_dir_all("egraph-debug");
let _ = fs::create_dir_all("egraph-debug/public");
let _ = fs::create_dir_all("egraph-debug/src");
let dir = format!("egraph-debug-{}", stage);
let _ = fs::create_dir_all(dir.clone());
let _ = fs::create_dir_all(format!("{}/public", dir));
let _ = fs::create_dir_all(format!("{}/src", dir));

Check warning on line 401 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L398-L401

Added lines #L398 - L401 were not covered by tests
fs::copy(
"egraph-debug-template/public/index.html",
"egraph-debug/public/index.html",
format!("{}/public/index.html", dir),

Check warning on line 404 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L404

Added line #L404 was not covered by tests
)?;
fs::copy(
"egraph-debug-template/package.json",
"egraph-debug/package.json",
format!("{}/package.json", dir),

Check warning on line 408 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L408

Added line #L408 was not covered by tests
)?;
fs::copy(
"egraph-debug-template/src/index.js",
"egraph-debug/src/index.js",
format!("{}/src/index.js", dir),

Check warning on line 412 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L412

Added line #L412 was not covered by tests
)?;

let mut iterations = Vec::new();
Expand Down Expand Up @@ -451,7 +453,7 @@
last_debug_data = Some(debug_data_clone);
}
fs::write(
"egraph-debug/src/iterations.js",
format!("{}/src/iterations.js", dir),

Check warning on line 456 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L456

Added line #L456 was not covered by tests
&format!(
"export const iterations = {};",
serde_json::to_string_pretty(&iterations)?
Expand Down
10 changes: 7 additions & 3 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/dates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::utils;
use crate::{
compile::rewrite::{
agg_fun_expr, alias_expr,
analysis::{ConstantFolding, LogicalPlanAnalysis},
analysis::{ConstantFolding, LogicalPlanAnalysis, OriginalExpr},
binary_expr, cast_expr, cast_expr_explicit, column_expr, fun_expr, literal_expr,
literal_int, literal_string, negative_expr, rewrite,
rewriter::RewriteRules,
Expand Down Expand Up @@ -611,7 +611,9 @@ impl DateRules {
let alias_var = var!(alias_var);
move |egraph, root, subst| {
for data_type in var_iter!(egraph[subst[data_type_var]], CastExprDataType) {
if let Some(original_expr) = egraph[root].data.original_expr.as_ref() {
if let Some(OriginalExpr::Expr(original_expr)) =
egraph[root].data.original_expr.as_ref()
{
let alias = original_expr.name(&DFSchema::empty()).unwrap();
match data_type {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
Expand Down Expand Up @@ -663,7 +665,9 @@ impl DateRules {
{
let alias_var = var!(alias_var);
move |egraph, root, subst| {
if let Some(original_expr) = egraph[root].data.original_expr.as_ref() {
if let Some(OriginalExpr::Expr(original_expr)) =
egraph[root].data.original_expr.as_ref()
{
let alias = original_expr.name(&DFSchema::empty()).unwrap();
subst.insert(
alias_var,
Expand Down
Loading
Loading