Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Nov 4, 2023
1 parent 781eed4 commit a864d61
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 25 deletions.
6 changes: 3 additions & 3 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl MyAnalyzerRule {
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
filter.projection.clone(),
filter.projected_schema.clone(),
)?))
}
_ => Transformed::No(plan),
Expand Down Expand Up @@ -143,7 +143,7 @@ impl OptimizerRule for MyOptimizerRule {
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
filter.projection.clone(),
filter.projected_schema.clone(),
)?)))
}
Some(optimized_plan) => Ok(Some(optimized_plan)),
Expand All @@ -153,7 +153,7 @@ impl OptimizerRule for MyOptimizerRule {
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input.clone(),
filter.projection.clone(),
filter.projected_schema.clone(),
)?)))
}
_ => Ok(None),
Expand Down
45 changes: 28 additions & 17 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl LogicalPlan {
}) => projected_schema,
LogicalPlan::Projection(Projection { schema, .. }) => schema,
LogicalPlan::Filter(Filter {
projected_schema, ..
}) => projected_schema,
projected_schema,
input,
..
}) => projected_schema.as_ref().unwrap_or(input.schema()),
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
LogicalPlan::Window(Window { schema, .. }) => schema,
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
Expand Down Expand Up @@ -1549,14 +1551,14 @@ impl LogicalPlan {
}
LogicalPlan::Filter(Filter {
predicate: ref expr,
projection,
projected_schema,
..
}) => {
if let Some(indices) = projection {
let names: Vec<&str> = indices
if let Some(projected_schema) = projected_schema {
let names: Vec<&str> = projected_schema
.fields()
.iter()
.map(|i| projected_schema.field(*i).name().as_str())
.map(|i| i.name().as_str())
.collect();

write!(f, "Filter: {expr}, projection=[{}]", names.join(", "))
Expand Down Expand Up @@ -1853,18 +1855,16 @@ pub struct Filter {
pub predicate: Expr,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// Optional projection
pub projection: Option<Vec<usize>>,
/// Schema representing the data after the optional projection is applied
pub projected_schema: DFSchemaRef,
pub projected_schema: Option<DFSchemaRef>,
}

impl Filter {
/// Create a new filter operator.
pub fn try_new(
predicate: Expr,
input: Arc<LogicalPlan>,
projection: Option<Vec<usize>>,
projected_schema: Option<DFSchemaRef>,
) -> Result<Self> {
// Filter predicates must return a boolean value so we try and validate that here.
// Note that it is not always possible to resolve the predicate expression during plan
Expand All @@ -1889,20 +1889,31 @@ impl Filter {

let func_dependencies = input.schema().functional_dependencies();

let projected_schema = projection
let projected_schema = projected_schema
.as_ref()
.map(|p| {
let projected_func_dependencies =
func_dependencies.project_functional_dependencies(p, p.len());
.map(|p| -> Result<_> {
let indices: Vec<usize> = p
.fields()
.iter()
.filter_map(|x| {
input
.schema()
.index_of_column_by_name(x.qualifier(), x.name())
.transpose()
})
.collect::<Result<_>>()?;
let projected_func_dependencies = func_dependencies
.project_functional_dependencies(&indices, indices.len());
let schema = input.schema().as_ref().clone();
Arc::new(schema.with_functional_dependencies(projected_func_dependencies))
Result::Ok(Arc::new(
schema.with_functional_dependencies(projected_func_dependencies),
))
})
.unwrap_or_else(|| input.schema().clone());
.transpose()?;

Ok(Self {
predicate,
input,
projection,
projected_schema,
})
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
LogicalPlan::Filter(Filter {
predicate,
input,
projection,
projected_schema,
..
}) => {
let predicate = rewrite_preserving_name(predicate, &mut rewriter)?;
Ok(Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
predicate, input, projection,
predicate,
input,
projected_schema.clone(),
)?)))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/analyzer/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
new_expr,
filter.input,
filter.projection,
filter.projected_schema.clone(),
)?))
}
_ => Transformed::No(plan),
Expand Down
10 changes: 9 additions & 1 deletion datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,18 @@ fn push_down_filter(
})
.collect::<Result<Vec<usize>>>()?;

let new_schema = DFSchema::new_with_metadata(
projection
.into_iter()
.map(|i| schema.fields()[i].clone())
.collect(),
schema.metadata().clone(),
)?;

Filter::try_new(
filter.predicate.clone(),
filter.input.clone(),
Some(projection),
Some(Arc::new(new_schema)),
)
.map(LogicalPlan::Filter)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
Ok(Some(LogicalPlan::Filter(Filter::try_new(
rewritten_expr,
filter.input.clone(),
filter.projection.clone(),
filter.projected_schema.clone(),
)?)))
}
_ => Ok(None),
Expand Down

0 comments on commit a864d61

Please sign in to comment.