forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-48343][SQL] Introduction of SQL Scripting interpreter
### What changes were proposed in this pull request? Previous [PR](apache#46665) introduced parser changes for SQL Scripting. This PR is a follow-up to introduce the interpreter for SQL Scripting language and proposes the following changes: - `SqlScriptingExecutionNode` - introduces execution nodes for SQL scripting, used during interpretation phase: - `SingleStatementExec` - executable node for `SingleStatement` logical node; wraps logical plan of the single statement. - `CompoundNestedStatementIteratorExec` - implements base recursive iterator logic for all nesting statements. - `CompoundBodyExec` - concrete implementation of `CompoundNestedStatementIteratorExec` for `CompoundBody` logical node. - `SqlScriptingInterpreter` - introduces the interpreter for SQL scripts. Product of interpretation is the iterator over the statements that should be executed. Follow-up PRs will introduce further statements, support for exceptions thrown from parser/interpreter, exception handling in SQL, etc. More details can be found in [Jira item](https://issues.apache.org/jira/browse/SPARK-48343) for this task and its parent (where the design doc is uploaded as well). ### Why are the changes needed? The intent is to add support for SQL scripting (and stored procedures down the line). It gives users the ability to develop complex logic and ETL entirely in SQL. Until now, users had to write verbose SQL statements or combine SQL + Python to efficiently write the logic. This is an effort to breach that gap and enable complex logic to be written entirely in SQL. ### Does this PR introduce _any_ user-facing change? No. This PR is second in series of PRs that will introduce changes to sql() API to add support for SQL scripting, but for now, the API remains unchanged. In the future, the API will remain the same as well, but it will have new possibility to execute SQL scripts. ### How was this patch tested? There are tests for newly introduced parser changes: - `SqlScriptingExecutionNodeSuite` - unit tests for execution nodes. - `SqlScriptingInterpreterSuite` - tests for interpreter (with parser integration). ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47026 from davidm-db/sql_scripting_interpreter. Authored-by: David Milicevic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
- Loading branch information
Showing
8 changed files
with
535 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
157 changes: 157 additions & 0 deletions
157
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.scripting | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin} | ||
|
||
/** | ||
* Trait for all SQL scripting execution nodes used during interpretation phase. | ||
*/ | ||
sealed trait CompoundStatementExec extends Logging { | ||
|
||
/** | ||
* Whether the statement originates from the SQL script or is created during the interpretation. | ||
* Example: DropVariable statements are automatically created at the end of each compound. | ||
*/ | ||
val isInternal: Boolean = false | ||
|
||
/** | ||
* Reset execution of the current node. | ||
*/ | ||
def reset(): Unit | ||
} | ||
|
||
/** | ||
* Leaf node in the execution tree. | ||
*/ | ||
trait LeafStatementExec extends CompoundStatementExec | ||
|
||
/** | ||
* Non-leaf node in the execution tree. It is an iterator over executable child nodes. | ||
*/ | ||
trait NonLeafStatementExec extends CompoundStatementExec { | ||
|
||
/** | ||
* Construct the iterator to traverse the tree rooted at this node in an in-order traversal. | ||
* @return | ||
* Tree iterator. | ||
*/ | ||
def getTreeIterator: Iterator[CompoundStatementExec] | ||
} | ||
|
||
/** | ||
* Executable node for SingleStatement. | ||
* @param parsedPlan | ||
* Logical plan of the parsed statement. | ||
* @param origin | ||
* Origin descriptor for the statement. | ||
* @param isInternal | ||
* Whether the statement originates from the SQL script or it is created during the | ||
* interpretation. Example: DropVariable statements are automatically created at the end of each | ||
* compound. | ||
*/ | ||
class SingleStatementExec( | ||
var parsedPlan: LogicalPlan, | ||
override val origin: Origin, | ||
override val isInternal: Boolean) | ||
extends LeafStatementExec with WithOrigin { | ||
|
||
/** | ||
* Whether this statement has been executed during the interpretation phase. | ||
* Example: Statements in conditions of If/Else, While, etc. | ||
*/ | ||
var isExecuted = false | ||
|
||
/** | ||
* Get the SQL query text corresponding to this statement. | ||
* @return | ||
* SQL query text. | ||
*/ | ||
def getText: String = { | ||
assert(origin.sqlText.isDefined && origin.startIndex.isDefined && origin.stopIndex.isDefined) | ||
origin.sqlText.get.substring(origin.startIndex.get, origin.stopIndex.get + 1) | ||
} | ||
|
||
override def reset(): Unit = isExecuted = false | ||
} | ||
|
||
/** | ||
* Abstract class for all statements that contain nested statements. | ||
* Implements recursive iterator logic over all child execution nodes. | ||
* @param collection | ||
* Collection of child execution nodes. | ||
*/ | ||
abstract class CompoundNestedStatementIteratorExec(collection: Seq[CompoundStatementExec]) | ||
extends NonLeafStatementExec { | ||
|
||
private var localIterator = collection.iterator | ||
private var curr = if (localIterator.hasNext) Some(localIterator.next()) else None | ||
|
||
private lazy val treeIterator: Iterator[CompoundStatementExec] = | ||
new Iterator[CompoundStatementExec] { | ||
override def hasNext: Boolean = { | ||
val childHasNext = curr match { | ||
case Some(body: NonLeafStatementExec) => body.getTreeIterator.hasNext | ||
case Some(_: LeafStatementExec) => true | ||
case None => false | ||
case _ => throw SparkException.internalError( | ||
"Unknown statement type encountered during SQL script interpretation.") | ||
} | ||
localIterator.hasNext || childHasNext | ||
} | ||
|
||
@scala.annotation.tailrec | ||
override def next(): CompoundStatementExec = { | ||
curr match { | ||
case None => throw SparkException.internalError( | ||
"No more elements to iterate through in the current SQL compound statement.") | ||
case Some(statement: LeafStatementExec) => | ||
curr = if (localIterator.hasNext) Some(localIterator.next()) else None | ||
statement | ||
case Some(body: NonLeafStatementExec) => | ||
if (body.getTreeIterator.hasNext) { | ||
body.getTreeIterator.next() | ||
} else { | ||
curr = if (localIterator.hasNext) Some(localIterator.next()) else None | ||
next() | ||
} | ||
case _ => throw SparkException.internalError( | ||
"Unknown statement type encountered during SQL script interpretation.") | ||
} | ||
} | ||
} | ||
|
||
override def getTreeIterator: Iterator[CompoundStatementExec] = treeIterator | ||
|
||
override def reset(): Unit = { | ||
collection.foreach(_.reset()) | ||
localIterator = collection.iterator | ||
curr = if (localIterator.hasNext) Some(localIterator.next()) else None | ||
} | ||
} | ||
|
||
/** | ||
* Executable node for CompoundBody. | ||
* @param statements | ||
* Executable nodes for nested statements within the CompoundBody. | ||
*/ | ||
class CompoundBodyExec(statements: Seq[CompoundStatementExec]) | ||
extends CompoundNestedStatementIteratorExec(statements) |
83 changes: 83 additions & 0 deletions
83
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.scripting | ||
|
||
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier | ||
import org.apache.spark.sql.catalyst.parser.{CompoundBody, CompoundPlanStatement, SingleStatement} | ||
import org.apache.spark.sql.catalyst.plans.logical.{CreateVariable, DropVariable, LogicalPlan} | ||
import org.apache.spark.sql.catalyst.trees.Origin | ||
|
||
/** | ||
* SQL scripting interpreter - builds SQL script execution plan. | ||
*/ | ||
case class SqlScriptingInterpreter() { | ||
|
||
/** | ||
* Build execution plan and return statements that need to be executed, | ||
* wrapped in the execution node. | ||
* | ||
* @param compound | ||
* CompoundBody for which to build the plan. | ||
* @return | ||
* Iterator through collection of statements to be executed. | ||
*/ | ||
def buildExecutionPlan(compound: CompoundBody): Iterator[CompoundStatementExec] = { | ||
transformTreeIntoExecutable(compound).asInstanceOf[CompoundBodyExec].getTreeIterator | ||
} | ||
|
||
/** | ||
* Fetch the name of the Create Variable plan. | ||
* @param plan | ||
* Plan to fetch the name from. | ||
* @return | ||
* Name of the variable. | ||
*/ | ||
private def getDeclareVarNameFromPlan(plan: LogicalPlan): Option[UnresolvedIdentifier] = | ||
plan match { | ||
case CreateVariable(name: UnresolvedIdentifier, _, _) => Some(name) | ||
case _ => None | ||
} | ||
|
||
/** | ||
* Transform the parsed tree to the executable node. | ||
* @param node | ||
* Root node of the parsed tree. | ||
* @return | ||
* Executable statement. | ||
*/ | ||
private def transformTreeIntoExecutable(node: CompoundPlanStatement): CompoundStatementExec = | ||
node match { | ||
case body: CompoundBody => | ||
// TODO [SPARK-48530]: Current logic doesn't support scoped variables and shadowing. | ||
val variables = body.collection.flatMap { | ||
case st: SingleStatement => getDeclareVarNameFromPlan(st.parsedPlan) | ||
case _ => None | ||
} | ||
val dropVariables = variables | ||
.map(varName => DropVariable(varName, ifExists = true)) | ||
.map(new SingleStatementExec(_, Origin(), isInternal = true)) | ||
.reverse | ||
new CompoundBodyExec( | ||
body.collection.map(st => transformTreeIntoExecutable(st)) ++ dropVariables) | ||
case sparkStatement: SingleStatement => | ||
new SingleStatementExec( | ||
sparkStatement.parsedPlan, | ||
sparkStatement.origin, | ||
isInternal = false) | ||
} | ||
} |
Oops, something went wrong.