diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index d16ddb4973205..c807685db0f0c 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -26,7 +26,7 @@ import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecord import org.apache.avro.io.EncoderFactory import org.apache.spark.SparkException -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf @@ -286,4 +286,85 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { assert(msg.contains("Invalid default for field id: null not a \"long\"")) } } + + test("SPARK-48545: from_avro and to_avro SQL functions") { + withTable("t") { + sql( + """ + |create table t as + | select named_struct('u', named_struct('member0', member0, 'member1', member1)) as s + | from values (1, null), (null, 'a') tab(member0, member1) + |""".stripMargin) + val jsonFormatSchema = + """ + |{ + | "type": "record", + | "name": "struct", + | "fields": [{ + | "name": "u", + | "type": ["int","string"] + | }] + |} + |""".stripMargin + val toAvroSql = + s""" + |select to_avro(s, '$jsonFormatSchema') as result from t + |""".stripMargin + val avroResult = spark.sql(toAvroSql).collect() + assert(avroResult != null) + checkAnswer( + spark.sql(s"select from_avro(result, '$jsonFormatSchema', map()).u from ($toAvroSql)"), + Seq(Row(Row(1, null)), + Row(Row(null, "a")))) + + // Negative tests. + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select to_avro(s, 42) as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map("sqlExpr" -> "\"toavro(s, 42)\"", + "msg" -> ("The second argument of the TO_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value to " + + "AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = "to_avro(s, 42)", + start = 8, + stop = 21))) + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select from_avro(s, 42, '') as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map("sqlExpr" -> "\"fromavro(s, 42, )\"", + "msg" -> ("The second argument of the FROM_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "from AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = "from_avro(s, 42, '')", + start = 8, + stop = 27))) + checkError( + exception = intercept[AnalysisException](sql( + s""" + |select from_avro(s, '$jsonFormatSchema', 42) as result from t + |""".stripMargin)), + errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT", + parameters = Map( + "sqlExpr" -> + s"\"fromavro(s, $jsonFormatSchema, 42)\"".stripMargin, + "msg" -> ("The third argument of the FROM_AVRO SQL function must be a constant map of " + + "strings to strings containing the options to use for converting the value " + + "from AVRO format"), + "hint" -> ""), + queryContext = Array(ExpectedContext( + fragment = s"from_avro(s, '$jsonFormatSchema', 42)", + start = 8, + stop = 138))) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 3a418497fa537..20da1c030b538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -860,7 +860,11 @@ object FunctionRegistry { // Xml expression[XmlToStructs]("from_xml"), expression[SchemaOfXml]("schema_of_xml"), - expression[StructsToXml]("to_xml") + expression[StructsToXml]("to_xml"), + + // Avro + expression[FromAvro]("from_avro"), + expression[ToAvro]("to_avro") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala new file mode 100644 index 0000000000000..507511a360071 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/toFromAvroSqlFunctions.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData +import org.apache.spark.sql.types.{MapType, NullType, StringType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Converts a binary column of Avro format into its corresponding Catalyst value. + * This is a thin wrapper over the [[AvroDataToCatalyst]] class to create a SQL function. + * + * @param child the Catalyst binary input column. + * @param jsonFormatSchema the Avro schema in JSON string format. + * @param options the options to use when performing the conversion. + * + * @since 4.0.0 + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(child, jsonFormatSchema, options) - Converts a binary Avro value into a Catalyst value. + """, + examples = """ + Examples: + > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', map()) IS NULL AS result FROM (SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1)); + [false] + """, + note = """ + The specified schema must match actual schema of the read data, otherwise the behavior + is undefined: it may fail or return arbitrary result. + To deserialize the data with a compatible and evolved schema, the expected Avro schema can be + set via the corresponding option. + """, + group = "misc_funcs", + since = "4.0.0" +) +// scalastyle:on line.size.limit +case class FromAvro(child: Expression, jsonFormatSchema: Expression, options: Expression) + extends TernaryExpression with RuntimeReplaceable { + override def first: Expression = child + override def second: Expression = jsonFormatSchema + override def third: Expression = options + + override def withNewChildrenInternal( + newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = { + copy(child = newFirst, jsonFormatSchema = newSecond, options = newThird) + } + + override def checkInputDataTypes(): TypeCheckResult = { + val schemaCheck = jsonFormatSchema.dataType match { + case _: StringType | + _: NullType + if jsonFormatSchema.foldable => + None + case _ => + Some(TypeCheckResult.TypeCheckFailure( + "The second argument of the FROM_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "from AVRO format")) + } + val optionsCheck = options.dataType match { + case MapType(StringType, StringType, _) | + MapType(NullType, NullType, _) | + _: NullType + if options.foldable => + None + case _ => + Some(TypeCheckResult.TypeCheckFailure( + "The third argument of the FROM_AVRO SQL function must be a constant map of strings to " + + "strings containing the options to use for converting the value from AVRO format")) + } + schemaCheck.getOrElse( + optionsCheck.getOrElse( + TypeCheckResult.TypeCheckSuccess)) + } + + override def replacement: Expression = { + val schemaValue: String = jsonFormatSchema.eval() match { + case s: UTF8String => + s.toString + case null => + "" + } + val optionsValue: Map[String, String] = options.eval() match { + case a: ArrayBasedMapData if a.keyArray.array.nonEmpty => + val keys: Array[String] = a.keyArray.array.map(_.toString) + val values: Array[String] = a.valueArray.array.map(_.toString) + keys.zip(values).toMap + case _ => + Map.empty + } + val constructor = + Utils.classForName("org.apache.spark.sql.avro.AvroDataToCatalyst").getConstructors().head + val expr = constructor.newInstance(child, schemaValue, optionsValue) + expr.asInstanceOf[Expression] + } +} + +/** + * Converts a Catalyst binary input value into its corresponding AvroAvro format result. + * This is a thin wrapper over the [[CatalystDataToAvro]] class to create a SQL function. + * + * @param child the Catalyst binary input column. + * @param jsonFormatSchema the Avro schema in JSON string format. + * + * @since 4.0.0 + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(child, jsonFormatSchema) - Converts a Catalyst binary input value into its corresponding + Avro format result. + """, + examples = """ + Examples: + > SELECT _FUNC_(s, '{"type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }]}', MAP()) IS NULL FROM (SELECT NULL AS s); + [true] + """, + group = "misc_funcs", + since = "4.0.0" +) +// scalastyle:on line.size.limit +case class ToAvro(child: Expression, jsonFormatSchema: Expression) + extends BinaryExpression with RuntimeReplaceable { + override def left: Expression = child + + override def right: Expression = jsonFormatSchema + + override def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = { + copy(child = newLeft, jsonFormatSchema = newRight) + } + + override def checkInputDataTypes(): TypeCheckResult = { + jsonFormatSchema.dataType match { + case _: StringType if jsonFormatSchema.foldable => + TypeCheckResult.TypeCheckSuccess + case _ => + TypeCheckResult.TypeCheckFailure( + "The second argument of the TO_AVRO SQL function must be a constant string " + + "containing the JSON representation of the schema to use for converting the value " + + "to AVRO format") + } + } + + override def replacement: Expression = { + val schemaValue: Option[String] = jsonFormatSchema.eval() match { + case null => + None + case s: UTF8String => + Some(s.toString) + } + val constructor = + Utils.classForName("org.apache.spark.sql.avro.CatalystDataToAvro").getConstructors().head + val expr = constructor.newInstance(child, schemaValue) + expr.asInstanceOf[Expression] + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala index 73b2eba7060d0..443597f10056b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala @@ -117,6 +117,10 @@ class ExpressionsSchemaSuite extends QueryTest with SharedSparkSession { // Note: We need to filter out the commands that set the parameters, such as: // SET spark.sql.parser.escapedStringLiterals=true example.split(" > ").tail.filterNot(_.trim.startsWith("SET")).take(1).foreach { + case _ if funcName == "from_avro" || funcName == "to_avro" => + // Skip running the example queries for the from_avro and to_avro functions because + // these functions dynamically load the AvroDataToCatalyst or CatalystDataToAvro classes + // which are not available in this test. case exampleRe(sql, _) => val df = spark.sql(sql) val escapedSql = sql.replaceAll("\\|", "|") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index e80f4af1dc462..bf5d1b24af219 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -225,6 +225,9 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { // Throws an error "org.apache.spark.sql.catalyst.expressions.RaiseErrorExpressionBuilder", "org.apache.spark.sql.catalyst.expressions.AssertTrue", + // Requires dynamic class loading not available in this test suite. + "org.apache.spark.sql.catalyst.expressions.FromAvro", + "org.apache.spark.sql.catalyst.expressions.ToAvro", classOf[CurrentUser].getName, // The encrypt expression includes a random initialization vector to its encrypted result classOf[AesEncrypt].getName) diff --git a/sql/gen-sql-functions-docs.py b/sql/gen-sql-functions-docs.py index 053e11d10295b..dc48a5a6155ed 100644 --- a/sql/gen-sql-functions-docs.py +++ b/sql/gen-sql-functions-docs.py @@ -163,7 +163,8 @@ def _make_pretty_examples(jspark, infos): pretty_output = "" for info in infos: - if info.examples.startswith("\n Examples:"): + if (info.examples.startswith("\n Examples:") + and info.name.lower() not in ("from_avro", "to_avro")): output = [] output.append("-- %s" % info.name) query_examples = filter(lambda x: x.startswith(" > "), info.examples.split("\n"))