Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
chenhao-db committed Dec 19, 2024
1 parent 4e41099 commit c017934
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class SparkOptimizer(
V2ScanRelationPushDown,
V2ScanPartitioningAndOrdering,
V2Writes,
PruneFileSourcePartitions)
PruneFileSourcePartitions,
PushVariantIntoScan)

override def preCBORules: Seq[Rule[LogicalPlan]] =
Seq(OptimizeMetadataOnlyDeleteFromTable)
Expand Down Expand Up @@ -95,8 +96,7 @@ class SparkOptimizer(
EliminateLimits,
ConstantFolding),
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*),
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition),
Batch("Push Variant Into Scan", Once, PushVariantIntoScan)))
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)))

override def nonExcludableRules: Seq[String] = super.nonExcludableRules ++
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ class VariantInRelation {
// Find eligible variants recursively. `attrId` is the root attribute id.
// `path` is the current struct access path. `dataType` is the child data type after extracting
// `path` from the root attribute struct.
def addVariantFields(attrId: ExprId, dataType: DataType, defaultValue: Any,
path: Seq[Int]): Unit = {
def addVariantFields(
attrId: ExprId,
dataType: DataType,
defaultValue: Any,
path: Seq[Int]): Unit = {
dataType match {
// TODO(SHREDDING): non-null default value is not yet supported.
case _: VariantType if defaultValue == null =>
Expand Down Expand Up @@ -195,8 +198,9 @@ class VariantInRelation {
}

// Add a requested field to a variant column.
private def addField(map: HashMap[RequestedVariantField, Int],
field: RequestedVariantField): Unit = {
private def addField(
map: HashMap[RequestedVariantField, Int],
field: RequestedVariantField): Unit = {
val idx = map.size
map.getOrElseUpdate(field, idx)
}
Expand Down Expand Up @@ -227,8 +231,9 @@ class VariantInRelation {
case _ => expr.children.foreach(collectRequestedFields)
}

def rewriteExpr(expr: Expression,
attributeMap: Map[ExprId, AttributeReference]): Expression = {
def rewriteExpr(
expr: Expression,
attributeMap: Map[ExprId, AttributeReference]): Expression = {
def rewriteAttribute(expr: Expression): Expression = expr.transformDown {
case a: Attribute => attributeMap.getOrElse(a.exprId, a)
}
Expand Down Expand Up @@ -275,11 +280,12 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
}
}

private def rewritePlan(originalPlan: LogicalPlan,
projectList: Seq[NamedExpression],
filters: Seq[Expression],
relation: LogicalRelation,
hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
private def rewritePlan(
originalPlan: LogicalPlan,
projectList: Seq[NamedExpression],
filters: Seq[Expression],
relation: LogicalRelation,
hadoopFsRelation: HadoopFsRelation): LogicalPlan = {
val variants = new VariantInRelation
val defaultValues = ResolveDefaultColumns.existenceDefaultValues(hadoopFsRelation.schema)
// I'm not aware of any case that an attribute `relation.output` can have a different data type
Expand Down

0 comments on commit c017934

Please sign in to comment.