From 9de4fa5cc1a8c4f620ec6c9129ebedf9aeb25e01 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 4 Dec 2024 14:14:20 -0800 Subject: [PATCH 01/12] add the following JSON functions - json_delete - json_append - json_extend Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 117 ++++++++++++++++++ .../src/main/antlr4/OpenSearchPPLLexer.g4 | 6 +- .../src/main/antlr4/OpenSearchPPLParser.g4 | 3 + 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 2c0c0ca67..27cc33d30 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -203,6 +203,123 @@ Example: +----------------+ +### `JSON_DELETE` + +**Description** + +`json_delete(json, [keys list])` Deletes json elements from a json object based on json specific keys. Return the updated object after keys deletion . + +**Argument type:** JSON, List + +**Return type:** JSON + +A JSON object format. + +Example: + + os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, ["a"]) + fetched rows / total rows = 1/1 + +----------------------------------+ + | deleted | + +----------------------------------+ + | {"a": "valueA" } | + +----------------------------------+ + + os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["a.b"]) + fetched rows / total rows = 1/1 + +-----------------------------------------------------------+ + | deleted | + +-----------------------------------------------------------+ + | {"a":[{"c":3}] } | + +-----------------------------------------------------------+ + + os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["b.b"]) + fetched rows / total rows = 1/1 + +-----------------------------------+ + | no_action | + +-----------------------------------+ + | {"a":[{"b":1},{"b":2},{"c":3}]} | + +-----------------------------------+ + +### `JSON_APPEND` + +**Description** + +`json_append(json, [path_value list])` appends values to end of an array within the json elements. Return the updated json object after appending . + +**Argument type:** JSON, List<[(STRING, STRING>)]> + +**Return type:** JSON + +A JSON object format. + +**Note** +Append adds the value to the end of the existing array with the following cases: + - path is an object value - append is ignored and the value is returned + - path is an existing array not empty - the value are added to the array's tail + - path is an existing array is empty - create a new array with the given value + +Example: + + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ["a","valueC"]) + fetched rows / total rows = 1/1 + +-------------------------------------------------+ + | append | + +-------------------------------------------------+ + | {"a":["valueA", "valueB", "valueC"]} | + +-------------------------------------------------+ + + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, {"a":["valueC"]}) + fetched rows / total rows = 1/1 + +-----------------------------------------------+ + | append | + +-----------------------------------------------+ + | {"a":["valueA", "valueB", ["valueC"]]} | + +-----------------------------------------------+ + + os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, {"root.a":"valueC"}) + fetched rows / total rows = 1/1 + +-----------------------------------------------+ + | append | + +-----------------------------------------------+ + |{"root": {"a":["valueA", "valueB", "valueC"]}} | + +-----------------------------------------------+ + + + +### `JSON_EXTEND` + +**Description** + +`json_extend(json, [path_value_pairs list])` extend appends multiple (array) values to an existing array json elements. Return the updated object after extending. + +**Argument type:** JSON, List<[(STRING, List)]> + +**Return type:** JSON + +A JSON object format. + +**Note** +Extend arrays as individual values separates the `json_extend` functionality from the `json_append` - which is a similar function that appends the `` as a single element. + +Example: + + os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`, ["valueC","valueD"]) + fetched rows / total rows = 1/1 + +-------------------------------------------------+ + | extend | + +-------------------------------------------------+ + | {"a":["valueA", "valueB", "valueC", "valueD"]} | + +-------------------------------------------------+ + + os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`, {"b":["valueC","valueD"]}) + fetched rows / total rows = 1/1 + +-------------------------------------------------------------+ + | extend | + +-------------------------------------------------------------+ + | {"a":["valueA", "valueB", {"b":"valueC"}, {"b":"valueD"}]} | + +-------------------------------------------------------------+ + ### `JSON_KEYS` **Description** diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index d15f5c8e3..a8efffe71 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -385,11 +385,11 @@ JSON_ARRAY: 'JSON_ARRAY'; JSON_ARRAY_LENGTH: 'JSON_ARRAY_LENGTH'; TO_JSON_STRING: 'TO_JSON_STRING'; JSON_EXTRACT: 'JSON_EXTRACT'; +JSON_DELETE : 'JSON_DELETE'; +JSON_EXTEND : 'JSON_EXTEND'; JSON_KEYS: 'JSON_KEYS'; JSON_VALID: 'JSON_VALID'; -//JSON_APPEND: 'JSON_APPEND'; -//JSON_DELETE: 'JSON_DELETE'; -//JSON_EXTEND: 'JSON_EXTEND'; +JSON_APPEND: 'JSON_APPEND'; //JSON_SET: 'JSON_SET'; //JSON_ARRAY_ALL_MATCH: 'JSON_ARRAY_ALL_MATCH'; //JSON_ARRAY_ANY_MATCH: 'JSON_ARRAY_ANY_MATCH'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index eb6cd1a35..9a0926adf 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -880,6 +880,9 @@ jsonFunctionName | JSON_ARRAY_LENGTH | TO_JSON_STRING | JSON_EXTRACT + | JSON_DELETE + | JSON_EXTEND + | JSON_APPEND | JSON_KEYS | JSON_VALID // | JSON_APPEND From b7b071302ded2f1c642085eafb4fc3afeee2bc9b Mon Sep 17 00:00:00 2001 From: YANGDB Date: Sun, 8 Dec 2024 19:12:26 -0800 Subject: [PATCH 02/12] add the following JSON functions & test for - json_delete - json_append - json_extend Signed-off-by: YANGDB --- .../expression/function/SerializableUdf.java | 203 +++++++++++++++++- .../sql/ppl/CatalystExpressionVisitor.java | 24 +-- .../ppl/utils/BuiltinFunctionTransformer.java | 16 +- ...dfTest.java => SerializableIPUdfTest.java} | 13 +- .../function/SerializableJsonUdfTest.java | 186 ++++++++++++++++ ...PlanJsonFunctionsTranslatorTestSuite.scala | 44 +++- ...PLLogicalPlanParseCidrmatchTestSuite.scala | 44 +--- 7 files changed, 455 insertions(+), 75 deletions(-) rename ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/{SerializableUdfTest.java => SerializableIPUdfTest.java} (87%) create mode 100644 ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 2541b3743..ea7b7d2dc 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -5,17 +5,161 @@ package org.opensearch.sql.expression.function; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.scala.DefaultScalaModule; import inet.ipaddr.AddressStringException; import inet.ipaddr.IPAddressString; import inet.ipaddr.IPAddressStringParameters; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.ScalaUDF; +import org.apache.spark.sql.types.DataTypes; import scala.Function2; +import scala.Option; import scala.Serializable; import scala.runtime.AbstractFunction2; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; + public interface SerializableUdf { - Function2 cidrFunction = new SerializableAbstractFunction2<>() { + ObjectMapper objectMapper = new ObjectMapper(); + + abstract class SerializableAbstractFunction2 extends AbstractFunction2 + implements Serializable { + } + + /** + * Remove specified keys from a JSON string. + * + * @param jsonStr The input JSON string. + * @param keysToRemove The list of keys to remove. + * @return A new JSON string without the specified keys. + */ + Function2, String> jsonDeleteFunction = new SerializableAbstractFunction2<>() { + @Override + public String apply(String jsonStr, List keysToRemove) { + if (jsonStr == null) { + return null; + } + try { + Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + removeKeys(jsonMap, keysToRemove); + return objectMapper.writeValueAsString(jsonMap); + } catch (Exception e) { + return null; + } + } + + private void removeKeys(Map map, List keysToRemove) { + for (String key : keysToRemove) { + String[] keyParts = key.split("\\."); + Map currentMap = map; + for (int i = 0; i < keyParts.length - 1; i++) { + String currentKey = keyParts[i]; + if (currentMap.containsKey(currentKey) && currentMap.get(currentKey) instanceof Map) { + currentMap = (Map) currentMap.get(currentKey); + } else { + return; // Path not found, exit + } + } + // Remove the final key if it exists + currentMap.remove(keyParts[keyParts.length - 1]); + } + } + }; + + Function2>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { + + /** + * Append values to JSON arrays based on specified path-value pairs. + * + * @param jsonStr The input JSON string. + * @param pathValuePairs A list of path-value pairs to append. + * @return The updated JSON string. + */ + public String apply(String jsonStr, List> pathValuePairs) { + if (jsonStr == null) { + return null; + } + try { + Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + + for (Map.Entry pathValuePair : pathValuePairs) { + String path = pathValuePair.getKey(); + String value = pathValuePair.getValue(); + + if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { + List existingList = (List) jsonMap.get(path); + // Append value to the end of the existing Scala List + existingList.add(value); + jsonMap.put(path, existingList); + } else if (jsonMap.containsKey(path)) { + // Ignore appending if the path is not an array + } else { + jsonMap.put(path, List.of(value)); + } + } + + return objectMapper.writeValueAsString(jsonMap); + } catch (Exception e) { + return null; // Return null if parsing fails + } + } + }; + + /** + * Extend JSON arrays with new values based on specified path-value pairs. + * + * @param jsonStr The input JSON string. + * @param pathValuePairs A list of path-value pairs to extend. + * @return The updated JSON string. + */ + Function2>>, String> jsonExtendFunction = new SerializableAbstractFunction2<>() { + + @Override + public String apply(String jsonStr, List>> pathValuePairs) { + if (jsonStr == null) { + return null; + } + try { + Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + + for (Map.Entry> pathValuePair : pathValuePairs) { + String path = pathValuePair.getKey(); + List values = pathValuePair.getValue(); + + if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { + List existingList = (List) jsonMap.get(path); + existingList.addAll(values); + } else { + jsonMap.put(path, values); + } + } + + return objectMapper.writeValueAsString(jsonMap); + } catch (Exception e) { + return null; // Return null if parsing fails + } + } + }; + + /** + * Check if a key matches the given path expression. + * + * @param key The key to check. + * @param path The path expression (e.g., "a.b"). + * @return True if the key matches, false otherwise. + */ + private static boolean matchesKey(String key, String path) { + return key.equals(path) || key.startsWith(path + "."); + } + + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() .allowEmpty(false) @@ -32,7 +176,7 @@ public Boolean apply(String ipAddress, String cidrBlock) { try { parsedIpAddress.validate(); } catch (AddressStringException e) { - throw new RuntimeException("The given ipAddress '"+ipAddress+"' is invalid. It must be a valid IPv4 or IPv6 address. Error details: "+e.getMessage()); + throw new RuntimeException("The given ipAddress '" + ipAddress + "' is invalid. It must be a valid IPv4 or IPv6 address. Error details: " + e.getMessage()); } IPAddressString parsedCidrBlock = new IPAddressString(cidrBlock, valOptions); @@ -40,18 +184,63 @@ public Boolean apply(String ipAddress, String cidrBlock) { try { parsedCidrBlock.validate(); } catch (AddressStringException e) { - throw new RuntimeException("The given cidrBlock '"+cidrBlock+"' is invalid. It must be a valid CIDR or netmask. Error details: "+e.getMessage()); + throw new RuntimeException("The given cidrBlock '" + cidrBlock + "' is invalid. It must be a valid CIDR or netmask. Error details: " + e.getMessage()); } - if(parsedIpAddress.isIPv4() && parsedCidrBlock.isIPv6() || parsedIpAddress.isIPv6() && parsedCidrBlock.isIPv4()) { - throw new RuntimeException("The given ipAddress '"+ipAddress+"' and cidrBlock '"+cidrBlock+"' are not compatible. Both must be either IPv4 or IPv6."); + if (parsedIpAddress.isIPv4() && parsedCidrBlock.isIPv6() || parsedIpAddress.isIPv6() && parsedCidrBlock.isIPv4()) { + throw new RuntimeException("The given ipAddress '" + ipAddress + "' and cidrBlock '" + cidrBlock + "' are not compatible. Both must be either IPv4 or IPv6."); } return parsedCidrBlock.contains(parsedIpAddress); } }; - abstract class SerializableAbstractFunction2 extends AbstractFunction2 - implements Serializable { + /** + * get the function reference according to its name + * + * @param funcName + * @return + */ + static ScalaUDF visit(String funcName, List expressions) { + switch (funcName) { + case "cidr": + return new ScalaUDF(cidrFunction, + DataTypes.BooleanType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("cidr"), + false, + true); + case "json_delete": + return new ScalaUDF(jsonDeleteFunction, + DataTypes.StringType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("json_delete"), + false, + true); + case "json_extend": + return new ScalaUDF(jsonExtendFunction, + DataTypes.StringType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("json_extend"), + false, + true); + case "json_append": + return new ScalaUDF(jsonAppendFunction, + DataTypes.StringType, + seq(expressions), + seq(), + Option.empty(), + Option.apply("json_append"), + false, + true); + default: + return null; + } } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java index 35ac7ed47..bababe79e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java @@ -11,29 +11,21 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.CaseWhen; import org.apache.spark.sql.catalyst.expressions.Cast$; -import org.apache.spark.sql.catalyst.expressions.CurrentRow$; import org.apache.spark.sql.catalyst.expressions.Exists$; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual; import org.apache.spark.sql.catalyst.expressions.In$; import org.apache.spark.sql.catalyst.expressions.InSubquery$; import org.apache.spark.sql.catalyst.expressions.LambdaFunction$; -import org.apache.spark.sql.catalyst.expressions.LessThan; import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual; import org.apache.spark.sql.catalyst.expressions.ListQuery$; import org.apache.spark.sql.catalyst.expressions.MakeInterval$; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.Predicate; -import org.apache.spark.sql.catalyst.expressions.RowFrame$; -import org.apache.spark.sql.catalyst.expressions.ScalaUDF; import org.apache.spark.sql.catalyst.expressions.ScalarSubquery$; import org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable; import org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable$; -import org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame; -import org.apache.spark.sql.catalyst.expressions.WindowExpression; -import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.types.DataTypes; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; @@ -44,7 +36,6 @@ import org.opensearch.sql.ast.expression.Case; import org.opensearch.sql.ast.expression.Cast; import org.opensearch.sql.ast.expression.Compare; -import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.FieldsMapping; import org.opensearch.sql.ast.expression.Function; import org.opensearch.sql.ast.expression.In; @@ -68,9 +59,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.RareTopN; -import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; -import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.SerializableUdf; import org.opensearch.sql.ppl.utils.AggregatorTransformer; import org.opensearch.sql.ppl.utils.BuiltinFunctionTransformer; @@ -89,6 +78,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static java.util.List.of; import static org.opensearch.sql.expression.function.BuiltinFunctionName.EQUAL; import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.BuiltinFunctionTransformer.createIntervalArgs; @@ -438,17 +428,7 @@ public Expression visitCidr(org.opensearch.sql.ast.expression.Cidr node, Catalys Expression ipAddressExpression = context.getNamedParseExpressions().pop(); analyze(node.getCidrBlock(), context); Expression cidrBlockExpression = context.getNamedParseExpressions().pop(); - - ScalaUDF udf = new ScalaUDF(SerializableUdf.cidrFunction, - DataTypes.BooleanType, - seq(ipAddressExpression,cidrBlockExpression), - seq(), - Option.empty(), - Option.apply("cidr"), - false, - true); - - return context.getNamedParseExpressions().push(udf); + return context.getNamedParseExpressions().push(SerializableUdf.visit("cidr", of(ipAddressExpression,cidrBlockExpression))); } @Override diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java index 0a4f19b53..f73a1c491 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/BuiltinFunctionTransformer.java @@ -13,12 +13,15 @@ import org.apache.spark.sql.catalyst.expressions.DateAddInterval$; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.Literal$; +import org.apache.spark.sql.catalyst.expressions.ScalaUDF; import org.apache.spark.sql.catalyst.expressions.TimestampAdd$; import org.apache.spark.sql.catalyst.expressions.TimestampDiff$; import org.apache.spark.sql.catalyst.expressions.ToUTCTimestamp$; import org.apache.spark.sql.catalyst.expressions.UnaryMinus$; import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.SerializableUdf; +import org.opensearch.sql.ppl.CatalystPlanContext; import scala.Option; import java.util.Arrays; @@ -26,7 +29,6 @@ import java.util.Map; import java.util.function.Function; -import static org.opensearch.flint.spark.ppl.OpenSearchPPLLexer.DISTINCT_COUNT_APPROX; import static org.opensearch.sql.expression.function.BuiltinFunctionName.ADD; import static org.opensearch.sql.expression.function.BuiltinFunctionName.ADDDATE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.APPROX_COUNT_DISTINCT; @@ -76,7 +78,7 @@ public interface BuiltinFunctionTransformer { * This is only used for the built-in functions between PPL and Spark with different names. * If the built-in function names are the same in PPL and Spark, add it to {@link BuiltinFunctionName} only. */ - static final Map SPARK_BUILTIN_FUNCTION_NAME_MAPPING + Map SPARK_BUILTIN_FUNCTION_NAME_MAPPING = ImmutableMap.builder() // arithmetic operators .put(ADD, "+") @@ -117,7 +119,7 @@ public interface BuiltinFunctionTransformer { /** * The name mapping between PPL builtin functions to Spark builtin functions. */ - static final Map, Expression>> PPL_TO_SPARK_FUNC_MAPPING + Map, Expression>> PPL_TO_SPARK_FUNC_MAPPING = ImmutableMap., Expression>>builder() // json functions .put( @@ -176,9 +178,11 @@ public interface BuiltinFunctionTransformer { static Expression builtinFunction(org.opensearch.sql.ast.expression.Function function, List args) { if (BuiltinFunctionName.of(function.getFuncName()).isEmpty()) { - // TODO change it when UDF is supported - // TODO should we support more functions which are not PPL builtin functions. E.g Spark builtin functions - throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL"); + ScalaUDF udf = SerializableUdf.visit(function.getFuncName(), args); + if(udf == null) { + throw new UnsupportedOperationException(function.getFuncName() + " is not a builtin function of PPL"); + } + return udf; } else { BuiltinFunctionName builtin = BuiltinFunctionName.of(function.getFuncName()).get(); String name = SPARK_BUILTIN_FUNCTION_NAME_MAPPING.get(builtin); diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableIPUdfTest.java similarity index 87% rename from ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableUdfTest.java rename to ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableIPUdfTest.java index 3d3940730..c11c832c3 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableIPUdfTest.java @@ -1,9 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ package org.opensearch.sql.expression.function; import org.junit.Assert; import org.junit.Test; -public class SerializableUdfTest { +import java.util.Arrays; +import java.util.Collections; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class SerializableIPUdfTest { @Test(expected = RuntimeException.class) public void cidrNullIpTest() { diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java new file mode 100644 index 000000000..2c7080622 --- /dev/null +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.sql.expression.function; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonList; +import static org.apache.derby.vti.XmlVTI.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.opensearch.sql.expression.function.SerializableUdf.jsonAppendFunction; +import static org.opensearch.sql.expression.function.SerializableUdf.jsonDeleteFunction; +import static org.opensearch.sql.expression.function.SerializableUdf.jsonExtendFunction; + +public class SerializableJsonUdfTest { + + @Test + public void testJsonDeleteFunctionRemoveSingleKey() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\":\"value3\"}"; + String expectedJson = "{\"key1\":\"value1\",\"key3\":\"value3\"}"; + String result = jsonDeleteFunction.apply(jsonStr, singletonList("key2")); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonDeleteFunctionRemoveNestedKey() { + // Correctly escape double quotes within the JSON string + String jsonStr = "{\"key1\":\"value1\",\"key2\":{ \"key3\":\"value3\",\"key4\":\"value4\" }}"; + String expectedJson = "{\"key1\":\"value1\",\"key2\":{\"key4\":\"value4\"}}"; + String result = jsonDeleteFunction.apply(jsonStr, singletonList("key2.key3")); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonDeleteFunctionRemoveSingleArrayedKey() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"keyArray\":[\"value1\",\"value2\"]}"; + String expectedJson = "{\"key1\":\"value1\",\"key2\":\"value2\"}"; + String result = jsonDeleteFunction.apply(jsonStr, singletonList("keyArray")); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonDeleteFunctionRemoveMultipleKeys() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\":\"value3\"}"; + String expectedJson = "{\"key3\":\"value3\"}"; + String result = jsonDeleteFunction.apply(jsonStr, Arrays.asList("key1", "key2")); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonDeleteFunctionRemoveMultipleSomeAreNestedKeys() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":{ \"key3\":\"value3\",\"key4\":\"value4\" }}"; + String expectedJson = "{\"key2\":{\"key3\":\"value3\"}}"; + String result = jsonDeleteFunction.apply(jsonStr, Arrays.asList("key1", "key2.key4")); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonDeleteFunctionNoKeysRemoved() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\"}"; + String result = jsonDeleteFunction.apply(jsonStr, Collections.emptyList()); + assertEquals(jsonStr, result); + } + + @Test + public void testJsonDeleteFunctionNullJson() { + String result = jsonDeleteFunction.apply(null, Collections.singletonList("key1")); + assertNull(result); + } + + @Test + public void testJsonDeleteFunctionInvalidJson() { + String invalidJson = "invalid_json"; + String result = jsonDeleteFunction.apply(invalidJson, Collections.singletonList("key1")); + assertNull(result); + } + + @Test + public void testJsonAppendFunctionAppendToExistingArray() { + String jsonStr = "{\"arrayKey\":[\"value1\",\"value2\"]}"; + String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"]}"; + Map.Entry pair = Map.entry("arrayKey", "value3"); + String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionAddNewArray() { + String jsonStr = "{\"key1\":\"value1\"}"; + String expectedJson = "{\"key1\":\"value1\",\"newArray\":[\"newValue\"]}"; + Map.Entry pair = Map.entry("newArray", "newValue"); + String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionIgnoreNonArrayKey() { + String jsonStr = "{\"key1\":\"value1\"}"; + String expectedJson = jsonStr; + Map.Entry pair = Map.entry("key1", "newValue"); + String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionMultipleAppends() { + String jsonStr = "{\"arrayKey\":[\"value1\"]}"; + String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"],\"newKey\":[\"newValue\"]}"; + List> pairs = Arrays.asList( + Map.entry("arrayKey", "value2"), + Map.entry("arrayKey", "value3"), + Map.entry("newKey", "newValue") + ); + String result = jsonAppendFunction.apply(jsonStr, pairs); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionNullJson() { + String result = jsonAppendFunction.apply(null, Collections.singletonList(Map.entry("key", "value"))); + assertNull(result); + } + + @Test + public void testJsonAppendFunctionInvalidJson() { + String invalidJson = "invalid_json"; + String result = jsonAppendFunction.apply(invalidJson, Collections.singletonList(Map.entry("key", "value"))); + assertNull(result); + } + + @Test + public void testJsonExtendFunctionWithExistingPath() { + String jsonStr = "{\"path1\": [\"value1\", \"value2\"]}"; + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add(Map.entry("path1", asList("value3", "value4"))); + + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + String expectedJson = "{\"path1\":[\"value1\",\"value2\",\"value3\",\"value4\"]}"; + + assertEquals(expectedJson, result); + } + + @Test + public void testJsonExtendFunctionWithNewPath() { + String jsonStr = "{\"path1\": [\"value1\"]}"; + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add(Map.entry("path2", asList("newValue1", "newValue2"))); + + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + String expectedJson = "{\"path1\":[\"value1\"],\"path2\":[\"newValue1\",\"newValue2\"]}"; + + assertEquals(expectedJson, result); + } + + @Test + public void testJsonExtendFunctionWithNullInput() { + String result = jsonExtendFunction.apply(null, Collections.emptyList()); + assertNull(result); + } + + @Test + public void testJsonExtendFunctionWithInvalidJson() { + String result = jsonExtendFunction.apply("invalid json", Collections.emptyList()); + assertNull(result); + } + + @Test + public void testJsonExtendFunctionWithNonArrayPath() { + String jsonStr = "{\"path1\":\"value1\"}"; + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add(Map.entry("path1", asList("value2"))); + + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + String expectedJson = "{\"path1\":[\"value2\"]}"; + + assertEquals(expectedJson, result); + } +} diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 6193bc43f..34d0133e0 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -8,12 +8,17 @@ package org.opensearch.flint.spark.ppl import org.opensearch.flint.spark.ppl.PlaneUtils.plan import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.scalatest.matchers.should.Matchers - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project} +import org.apache.spark.sql.types.DataTypes +import org.opensearch.sql.expression.function.SerializableUdf +import org.opensearch.sql.expression.function.SerializableUdf.visit +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq + +import java.util class PPLLogicalPlanJsonFunctionsTranslatorTestSuite extends SparkFunSuite @@ -185,6 +190,43 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite comparePlans(expectedPlan, logPlan, false) } + test("test json_delete()") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, """source=t a = json_delete('{"a":[{"b":1},{"c":2}]}', ["a.b"])"""), + context) + + val table = UnresolvedRelation(Seq("t")) + val keysExpression = Literal("[a.b]") + val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") + val jsonFunc = visit("json_delete", util.List.of(jsonObjExp, keysExpression)) + val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) + val filterPlan = Filter(filterExpr, table) + val projectList = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, filterPlan) + comparePlans(expectedPlan, logPlan, false) + } + + + test("test json_append()") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, """source=t a = json_append('{"a":[{"b":1},{"c":2}]}', 'a.b')"""), + context) + + val table = UnresolvedRelation(Seq("t")) + val keysExpression = Literal("a.b") + val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") + val jsonFunc = visit("json_delete", util.List.of(jsonObjExp, keysExpression)) + val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) + val filterPlan = Filter(filterExpr, table) + val projectList = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, filterPlan) + comparePlans(expectedPlan, logPlan, false) + } + test("test json_keys()") { val context = new CatalystPlanContext val logPlan = diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala index 213f201cc..d14b0fee1 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala @@ -10,13 +10,13 @@ import org.opensearch.sql.expression.function.SerializableUdf import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.scalatest.matchers.should.Matchers - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.DataTypes +import org.opensearch.sql.expression.function.SerializableUdf.visit class PPLLogicalPlanParseCidrmatchTestSuite extends SparkFunSuite @@ -41,15 +41,7 @@ class PPLLogicalPlanParseCidrmatchTestSuite val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(false)) val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(true)) - val cidr = ScalaUDF( - SerializableUdf.cidrFunction, - DataTypes.BooleanType, - seq(ipAddress, cidrExpression), - seq(), - Option.empty, - Option.apply("cidr"), - false, - true) + val cidr = visit("cidr", java.util.List.of(ipAddress, cidrExpression)) val expectedPlan = Project( Seq(UnresolvedStar(None)), @@ -71,15 +63,7 @@ class PPLLogicalPlanParseCidrmatchTestSuite val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(false)) - val cidr = ScalaUDF( - SerializableUdf.cidrFunction, - DataTypes.BooleanType, - seq(ipAddress, cidrExpression), - seq(), - Option.empty, - Option.apply("cidr"), - false, - true) + val cidr = visit("cidr", java.util.List.of(ipAddress, cidrExpression)) val expectedPlan = Project( Seq(UnresolvedStar(None)), @@ -100,15 +84,7 @@ class PPLLogicalPlanParseCidrmatchTestSuite val cidrExpression = Literal("2003:db8::/32") val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) - val cidr = ScalaUDF( - SerializableUdf.cidrFunction, - DataTypes.BooleanType, - seq(ipAddress, cidrExpression), - seq(), - Option.empty, - Option.apply("cidr"), - false, - true) + val cidr = visit("cidr", java.util.List.of(ipAddress, cidrExpression)) val expectedPlan = Project( Seq(UnresolvedAttribute("ip")), @@ -130,15 +106,7 @@ class PPLLogicalPlanParseCidrmatchTestSuite val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) val filterClause = Filter(filterIpv6, UnresolvedRelation(Seq("t"))) - val cidr = ScalaUDF( - SerializableUdf.cidrFunction, - DataTypes.BooleanType, - seq(ipAddress, cidrExpression), - seq(), - Option.empty, - Option.apply("cidr"), - false, - true) + val cidr = visit("cidr", java.util.List.of(ipAddress, cidrExpression)) val equalTo = EqualTo(Literal(true), cidr) val caseFunction = CaseWhen(Seq((equalTo, Literal("in"))), Literal("out")) From af8144a3005d879cb9991c739099f91b66c04187 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Sun, 8 Dec 2024 20:08:06 -0800 Subject: [PATCH 03/12] update tests & scala-fmt Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 6 +- .../function/SerializableJsonUdfTest.java | 66 +++++++++++++++++++ ...PlanJsonFunctionsTranslatorTestSuite.scala | 42 ++++++++---- ...PLLogicalPlanParseCidrmatchTestSuite.scala | 3 +- 4 files changed, 102 insertions(+), 15 deletions(-) diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 27cc33d30..627eb110c 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -269,7 +269,7 @@ Example: | {"a":["valueA", "valueB", "valueC"]} | +-------------------------------------------------+ - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, {"a":["valueC"]}) + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ['a', {"a":["valueC"]}]) fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | @@ -304,7 +304,7 @@ Extend arrays as individual values separates the `json_extend` functionality fro Example: - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`, ["valueC","valueD"]) + os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a', ["valueC","valueD"]]) fetched rows / total rows = 1/1 +-------------------------------------------------+ | extend | @@ -312,7 +312,7 @@ Example: | {"a":["valueA", "valueB", "valueC", "valueD"]} | +-------------------------------------------------+ - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`, {"b":["valueC","valueD"]}) + os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a',[{"b":["valueC","valueD"]}]]) fetched rows / total rows = 1/1 +-------------------------------------------------------------+ | extend | diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 2c7080622..996569611 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -183,4 +183,70 @@ public void testJsonExtendFunctionWithNonArrayPath() { assertEquals(expectedJson, result); } + + @Test + public void testJsonExtendFunctionAddValuesToExistingArray() { + // Initial JSON string + String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; + + // Path-value pairs to extend + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add( Map.entry("key2", Arrays.asList("value3", "value4"))); + + // Expected JSON after extension + String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\",\"value4\"]}"; + + // Apply the function + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + + // Assert that the result matches the expected JSON + assertEquals(expectedJson, result); + } + + @Test + public void testJsonExtendFunctionAddNewArray() { + // Initial JSON string + String jsonStr = "{\"key1\":\"value1\"}"; + + // Path-value pairs to add + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add( Map.entry("key2", Arrays.asList("value2", "value3"))); + + // Expected JSON after adding new array + String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\"]}"; + + // Apply the function + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + + // Assert that the result matches the expected JSON + assertEquals(expectedJson, result); + } + + @Test + public void testJsonExtendFunctionHandleEmptyValues() { + // Initial JSON string + String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; + + // Path-value pairs with an empty list of values to add + List>> pathValuePairs = new ArrayList<>(); + pathValuePairs.add( Map.entry("key2", Collections.emptyList())); + + // Expected JSON should remain unchanged + String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; + + // Apply the function + String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); + + // Assert that the result matches the expected JSON + assertEquals(expectedJson, result); + } + + @Test + public void testJsonExtendFunctionHandleNullInput() { + // Apply the function with null input + String result = jsonExtendFunction.apply(null, Collections.singletonList( Map.entry("key2", List.of("value2")))); + + // Assert that the result is null + assertEquals(null, result); + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 34d0133e0..8e36395b3 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -5,20 +5,21 @@ package org.opensearch.flint.spark.ppl +import java.util + import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.expression.function.SerializableUdf +import org.opensearch.sql.expression.function.SerializableUdf.visit import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.scalatest.matchers.should.Matchers + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project} import org.apache.spark.sql.types.DataTypes -import org.opensearch.sql.expression.function.SerializableUdf -import org.opensearch.sql.expression.function.SerializableUdf.visit -import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq - -import java.util class PPLLogicalPlanJsonFunctionsTranslatorTestSuite extends SparkFunSuite @@ -194,11 +195,11 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, """source=t a = json_delete('{"a":[{"b":1},{"c":2}]}', ["a.b"])"""), + plan(pplParser, """source=t a = json_delete('{"a":[{"b":1},{"c":2}]}', '["a.b"]')"""), context) val table = UnresolvedRelation(Seq("t")) - val keysExpression = Literal("[a.b]") + val keysExpression = Literal("""["a.b"]""") val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") val jsonFunc = visit("json_delete", util.List.of(jsonObjExp, keysExpression)) val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) @@ -208,18 +209,37 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite comparePlans(expectedPlan, logPlan, false) } - test("test json_append()") { val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, """source=t a = json_append('{"a":[{"b":1},{"c":2}]}', 'a.b')"""), + plan(pplParser, """source=t a = json_append('{"a":[1,2]}', '["a",3]')"""), + context) + + val table = UnresolvedRelation(Seq("t")) + val keysExpression = Literal("""["a",3]""") + val jsonObjExp = Literal("""{"a":[1,2]}""") + val jsonFunc = visit("json_append", util.List.of(jsonObjExp, keysExpression)) + val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) + val filterPlan = Filter(filterExpr, table) + val projectList = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, filterPlan) + comparePlans(expectedPlan, logPlan, false) + } + + test("test json_extend()") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + """source=t a = json_extend('{"a":[{"b":1},{"c":2}]}', '["a",{"c":2}]')"""), context) val table = UnresolvedRelation(Seq("t")) - val keysExpression = Literal("a.b") + val keysExpression = Literal("""["a",{"c":2}]""") val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") - val jsonFunc = visit("json_delete", util.List.of(jsonObjExp, keysExpression)) + val jsonFunc = visit("json_extend", util.List.of(jsonObjExp, keysExpression)) val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedStar(None)) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala index d14b0fee1..c8a8a67ad 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala @@ -7,16 +7,17 @@ package org.opensearch.flint.spark.ppl import org.opensearch.flint.spark.ppl.PlaneUtils.plan import org.opensearch.sql.expression.function.SerializableUdf +import org.opensearch.sql.expression.function.SerializableUdf.visit import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.scalatest.matchers.should.Matchers + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.DataTypes -import org.opensearch.sql.expression.function.SerializableUdf.visit class PPLLogicalPlanParseCidrmatchTestSuite extends SparkFunSuite From bb8bd30a455ceefb8f5a134c2d14555320c4f668 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 9 Dec 2024 13:03:41 -0800 Subject: [PATCH 04/12] update IT tests for `json_delete` Signed-off-by: YANGDB --- .../FlintSparkPPLJsonFunctionITSuite.scala | 47 +++++++++++++++++++ .../expression/function/SerializableUdf.java | 34 +++++--------- .../function/SerializableJsonUdfTest.java | 42 ++++------------- 3 files changed, 68 insertions(+), 55 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index fca758101..ed652275d 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -5,11 +5,16 @@ package org.opensearch.flint.spark.ppl +import java.util + +import org.opensearch.sql.expression.function.SerializableUdf.visit + import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Literal, Not} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.StringType class FlintSparkPPLJsonFunctionITSuite extends QueryTest @@ -385,4 +390,46 @@ class FlintSparkPPLJsonFunctionITSuite null)) assertSameRows(expectedSeq, frame) } + + test("test json_delete() function: one key") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_delete('$validJson1',json_array('age')) | head 1 | fields result + | """.stripMargin) + assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225,\"gender\":\"M\"}")), frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = UnresolvedFunction("array", Seq(Literal("age")), isDistinct = false) + val jsonObjExp = + Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_delete() function: multiple keys") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_delete('$validJson1',json_array('age','gender')) | head 1 | fields result + | """.stripMargin) + assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225}")), frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("age"), Literal("gender")), isDistinct = false) + val jsonObjExp = + Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index ea7b7d2dc..89888a4e8 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -6,7 +6,6 @@ package org.opensearch.sql.expression.function; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.scala.DefaultScalaModule; import inet.ipaddr.AddressStringException; import inet.ipaddr.IPAddressString; import inet.ipaddr.IPAddressStringParameters; @@ -16,11 +15,13 @@ import scala.Function2; import scala.Option; import scala.Serializable; +import scala.collection.JavaConverters; +import scala.collection.mutable.WrappedArray; import scala.runtime.AbstractFunction2; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; @@ -40,9 +41,9 @@ abstract class SerializableAbstractFunction2 extends AbstractFunction * @param keysToRemove The list of keys to remove. * @return A new JSON string without the specified keys. */ - Function2, String> jsonDeleteFunction = new SerializableAbstractFunction2<>() { + Function2, String> jsonDeleteFunction = new SerializableAbstractFunction2<>() { @Override - public String apply(String jsonStr, List keysToRemove) { + public String apply(String jsonStr, WrappedArray keysToRemove) { if (jsonStr == null) { return null; } @@ -55,8 +56,9 @@ public String apply(String jsonStr, List keysToRemove) { } } - private void removeKeys(Map map, List keysToRemove) { - for (String key : keysToRemove) { + private void removeKeys(Map map, WrappedArray keysToRemove) { + Collection keys = JavaConverters.asJavaCollection(keysToRemove); + for (String key : keys) { String[] keyParts = key.split("\\."); Map currentMap = map; for (int i = 0; i < keyParts.length - 1; i++) { @@ -64,10 +66,9 @@ private void removeKeys(Map map, List keysToRemove) { if (currentMap.containsKey(currentKey) && currentMap.get(currentKey) instanceof Map) { currentMap = (Map) currentMap.get(currentKey); } else { - return; // Path not found, exit + return; } } - // Remove the final key if it exists currentMap.remove(keyParts[keyParts.length - 1]); } } @@ -107,7 +108,7 @@ public String apply(String jsonStr, List> pathValuePai return objectMapper.writeValueAsString(jsonMap); } catch (Exception e) { - return null; // Return null if parsing fails + return null; } } }; @@ -143,22 +144,11 @@ public String apply(String jsonStr, List>> pathVa return objectMapper.writeValueAsString(jsonMap); } catch (Exception e) { - return null; // Return null if parsing fails + return null; } } }; - - /** - * Check if a key matches the given path expression. - * - * @param key The key to check. - * @param path The path expression (e.g., "a.b"). - * @return True if the key matches, false otherwise. - */ - private static boolean matchesKey(String key, String path) { - return key.equals(path) || key.startsWith(path + "."); - } - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 996569611..8572e01fb 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.expression.function; import org.junit.Test; +import scala.collection.mutable.WrappedArray; import java.util.ArrayList; import java.util.Arrays; @@ -12,7 +13,6 @@ import java.util.List; import java.util.Map; -import static java.util.Collections.singletonList; import static org.apache.derby.vti.XmlVTI.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -26,7 +26,7 @@ public class SerializableJsonUdfTest { public void testJsonDeleteFunctionRemoveSingleKey() { String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\":\"value3\"}"; String expectedJson = "{\"key1\":\"value1\",\"key3\":\"value3\"}"; - String result = jsonDeleteFunction.apply(jsonStr, singletonList("key2")); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2"})); assertEquals(expectedJson, result); } @@ -35,7 +35,7 @@ public void testJsonDeleteFunctionRemoveNestedKey() { // Correctly escape double quotes within the JSON string String jsonStr = "{\"key1\":\"value1\",\"key2\":{ \"key3\":\"value3\",\"key4\":\"value4\" }}"; String expectedJson = "{\"key1\":\"value1\",\"key2\":{\"key4\":\"value4\"}}"; - String result = jsonDeleteFunction.apply(jsonStr, singletonList("key2.key3")); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2.key3"})); assertEquals(expectedJson, result); } @@ -43,7 +43,7 @@ public void testJsonDeleteFunctionRemoveNestedKey() { public void testJsonDeleteFunctionRemoveSingleArrayedKey() { String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"keyArray\":[\"value1\",\"value2\"]}"; String expectedJson = "{\"key1\":\"value1\",\"key2\":\"value2\"}"; - String result = jsonDeleteFunction.apply(jsonStr, singletonList("keyArray")); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"keyArray"})); assertEquals(expectedJson, result); } @@ -51,7 +51,7 @@ public void testJsonDeleteFunctionRemoveSingleArrayedKey() { public void testJsonDeleteFunctionRemoveMultipleKeys() { String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\":\"value3\"}"; String expectedJson = "{\"key3\":\"value3\"}"; - String result = jsonDeleteFunction.apply(jsonStr, Arrays.asList("key1", "key2")); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"key1", "key2"})); assertEquals(expectedJson, result); } @@ -59,27 +59,27 @@ public void testJsonDeleteFunctionRemoveMultipleKeys() { public void testJsonDeleteFunctionRemoveMultipleSomeAreNestedKeys() { String jsonStr = "{\"key1\":\"value1\",\"key2\":{ \"key3\":\"value3\",\"key4\":\"value4\" }}"; String expectedJson = "{\"key2\":{\"key3\":\"value3\"}}"; - String result = jsonDeleteFunction.apply(jsonStr, Arrays.asList("key1", "key2.key4")); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"key1", "key2.key4"})); assertEquals(expectedJson, result); } @Test public void testJsonDeleteFunctionNoKeysRemoved() { String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\"}"; - String result = jsonDeleteFunction.apply(jsonStr, Collections.emptyList()); + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[0])); assertEquals(jsonStr, result); } @Test public void testJsonDeleteFunctionNullJson() { - String result = jsonDeleteFunction.apply(null, Collections.singletonList("key1")); + String result = jsonDeleteFunction.apply(null, WrappedArray.make(new String[]{"key1"})); assertNull(result); } @Test public void testJsonDeleteFunctionInvalidJson() { String invalidJson = "invalid_json"; - String result = jsonDeleteFunction.apply(invalidJson, Collections.singletonList("key1")); + String result = jsonDeleteFunction.apply(invalidJson, WrappedArray.make(new String[]{"key1"})); assertNull(result); } @@ -186,67 +186,43 @@ public void testJsonExtendFunctionWithNonArrayPath() { @Test public void testJsonExtendFunctionAddValuesToExistingArray() { - // Initial JSON string String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - - // Path-value pairs to extend List>> pathValuePairs = new ArrayList<>(); pathValuePairs.add( Map.entry("key2", Arrays.asList("value3", "value4"))); - // Expected JSON after extension String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\",\"value4\"]}"; - - // Apply the function String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - // Assert that the result matches the expected JSON assertEquals(expectedJson, result); } @Test public void testJsonExtendFunctionAddNewArray() { - // Initial JSON string String jsonStr = "{\"key1\":\"value1\"}"; - - // Path-value pairs to add List>> pathValuePairs = new ArrayList<>(); pathValuePairs.add( Map.entry("key2", Arrays.asList("value2", "value3"))); - // Expected JSON after adding new array String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\"]}"; - - // Apply the function String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - // Assert that the result matches the expected JSON assertEquals(expectedJson, result); } @Test public void testJsonExtendFunctionHandleEmptyValues() { - // Initial JSON string String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - - // Path-value pairs with an empty list of values to add List>> pathValuePairs = new ArrayList<>(); pathValuePairs.add( Map.entry("key2", Collections.emptyList())); - // Expected JSON should remain unchanged String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - - // Apply the function String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - // Assert that the result matches the expected JSON assertEquals(expectedJson, result); } @Test public void testJsonExtendFunctionHandleNullInput() { - // Apply the function with null input String result = jsonExtendFunction.apply(null, Collections.singletonList( Map.entry("key2", List.of("value2")))); - - // Assert that the result is null assertEquals(null, result); } } From 9dbc89e94859771f598063b431318cc38ddfce19 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 9 Dec 2024 13:57:44 -0800 Subject: [PATCH 05/12] update IT tests for `json_delete` Signed-off-by: YANGDB --- .../FlintSparkPPLJsonFunctionITSuite.scala | 72 ++++++++++++++++++- .../expression/function/SerializableUdf.java | 39 +++++++--- .../function/SerializableJsonUdfTest.java | 8 +++ 3 files changed, 110 insertions(+), 9 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index ed652275d..c438f5d56 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -14,7 +14,6 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFu import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Literal, Not} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.types.StringType class FlintSparkPPLJsonFunctionITSuite extends QueryTest @@ -432,4 +431,75 @@ class FlintSparkPPLJsonFunctionITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + test("test json_delete() function: nested key") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_delete('$validJson2',json_array('f2.f3')) | head 1 | fields result + | """.stripMargin) + assertSameRows(Seq(Row("{\"f1\":\"abc\",\"f2\":{\"f4\":\"b\"}}")), frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("f2.f3")), isDistinct = false) + val jsonObjExp = + Literal("{\"f1\":\"abc\",\"f2\":{\"f3\":\"a\",\"f4\":\"b\"}}") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_delete() function: multi depth keys ") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_delete('$validJson5',json_array('teacher', 'student.rank')) | head 1 | fields result + | """.stripMargin) + assertSameRows(Seq(Row("{\"student\":[{\"name\":\"Bob\"},{\"name\":\"Charlie\"}]}")), frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("teacher"), Literal("student.rank")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_delete() function: key not found") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_delete('$validJson5',json_array('none')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("none")), isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 89888a4e8..0b1c3b210 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -60,16 +60,39 @@ private void removeKeys(Map map, WrappedArray keysToRemo Collection keys = JavaConverters.asJavaCollection(keysToRemove); for (String key : keys) { String[] keyParts = key.split("\\."); - Map currentMap = map; - for (int i = 0; i < keyParts.length - 1; i++) { - String currentKey = keyParts[i]; - if (currentMap.containsKey(currentKey) && currentMap.get(currentKey) instanceof Map) { - currentMap = (Map) currentMap.get(currentKey); - } else { - return; + removeNestedKey(map, keyParts, 0); + } + } + + private void removeNestedKey(Object currentObj, String[] keyParts, int depth) { + if (currentObj == null || depth >= keyParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = keyParts[depth]; + + if (depth == keyParts.length - 1) { + // If it's the last key, remove it from the map + currentMap.remove(currentKey); + } else { + // If not the last key, continue traversing + if (currentMap.containsKey(currentKey)) { + Object nextObj = currentMap.get(currentKey); + + if (nextObj instanceof List) { + // If the value is a list, process each item in the list + List list = (List) nextObj; + for (int i = 0; i < list.size(); i++) { + removeNestedKey(list.get(i), keyParts, depth + 1); + } + } else { + // Continue traversing if it's a map + removeNestedKey(nextObj, keyParts, depth + 1); + } } } - currentMap.remove(keyParts[keyParts.length - 1]); } } }; diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 8572e01fb..3f54c4dc9 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -63,6 +63,14 @@ public void testJsonDeleteFunctionRemoveMultipleSomeAreNestedKeys() { assertEquals(expectedJson, result); } + @Test + public void testJsonDeleteFunctionRemoveMultipleKeysNestedArrayKeys() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":[{ \"a\":\"valueA\",\"key3\":\"value3\"}, {\"a\":\"valueA\",\"key4\":\"value4\"}]}"; + String expectedJson = "{\"key2\":[{\"key3\":\"value3\"},{\"key4\":\"value4\"}]}"; + String result = jsonDeleteFunction.apply(jsonStr, WrappedArray.make(new String[]{"key1", "key2.a"})); + assertEquals(expectedJson, result); + } + @Test public void testJsonDeleteFunctionNoKeysRemoved() { String jsonStr = "{\"key1\":\"value1\",\"key2\":\"value2\"}"; From a46494ccb02aa06ed548d55e2b24a50642bbd146 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 9 Dec 2024 16:18:44 -0800 Subject: [PATCH 06/12] update IT tests for `json_append` Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 23 +-- .../FlintSparkPPLJsonFunctionITSuite.scala | 166 ++++++++++++++++++ .../expression/function/SerializableUdf.java | 88 +++++++--- .../function/SerializableJsonUdfTest.java | 51 ++++-- 4 files changed, 278 insertions(+), 50 deletions(-) diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 627eb110c..75f8940f9 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -217,7 +217,7 @@ A JSON object format. Example: - os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, ["a"]) + os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, json_array("a")) fetched rows / total rows = 1/1 +----------------------------------+ | deleted | @@ -225,7 +225,7 @@ Example: | {"a": "valueA" } | +----------------------------------+ - os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["a.b"]) + os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("a.b")) fetched rows / total rows = 1/1 +-----------------------------------------------------------+ | deleted | @@ -233,7 +233,7 @@ Example: | {"a":[{"c":3}] } | +-----------------------------------------------------------+ - os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, ["b.b"]) + os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("b.b")) fetched rows / total rows = 1/1 +-----------------------------------+ | no_action | @@ -245,9 +245,9 @@ Example: **Description** -`json_append(json, [path_value list])` appends values to end of an array within the json elements. Return the updated json object after appending . +`json_append(json, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending . -**Argument type:** JSON, List<[(STRING, STRING>)]> +**Argument type:** JSON, List **Return type:** JSON @@ -257,32 +257,33 @@ A JSON object format. Append adds the value to the end of the existing array with the following cases: - path is an object value - append is ignored and the value is returned - path is an existing array not empty - the value are added to the array's tail + - path not found - the value are added to the root of the json tree - path is an existing array is empty - create a new array with the given value Example: - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ["a","valueC"]) + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, json_array('a', 'valueC', 'valueD')) fetched rows / total rows = 1/1 +-------------------------------------------------+ | append | +-------------------------------------------------+ - | {"a":["valueA", "valueB", "valueC"]} | + | {"a":["valueA", "valueB", "valueC", "valueD"]} | +-------------------------------------------------+ - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, ['a', {"a":["valueC"]}]) + os> source=people | eval append = json_append(`{"a":[]}`, json_array('a', 'valueC')) fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | +-----------------------------------------------+ - | {"a":["valueA", "valueB", ["valueC"]]} | + | {"a":["valueC"]} | +-----------------------------------------------+ - os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, {"root.a":"valueC"}) + os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, json_array('root', 'valueC') fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | +-----------------------------------------------+ - |{"root": {"a":["valueA", "valueB", "valueC"]}} | + |{"root": {"a":["valueA", "valueB"]}} | +-----------------------------------------------+ diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index c438f5d56..9e0e6a1b3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -31,6 +31,10 @@ class FlintSparkPPLJsonFunctionITSuite private val validJson5 = "{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" private val validJson6 = "[1,2,3]" + private val validJson7 = + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" + private val validJson8 = + "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}" private val invalidJson1 = "[1,2" private val invalidJson2 = "[invalid json]" private val invalidJson3 = "{\"invalid\": \"json\"" @@ -502,4 +506,166 @@ class FlintSparkPPLJsonFunctionITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + test("test json_append() function: add single value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('teacher', 'Tom')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\",\"Tom\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("teacher"), Literal("Tom")), isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single value key not found") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('headmaster', 'Tom')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[\"Tom\"]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("headmaster"), Literal("Tom")), isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single Object key not found") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[{\"name\":\"Tomy\",\"rank\":1}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("headmaster"), Literal("""{"name":"Tomy","rank":1}""")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add single Object value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2},{\"name\":\"Tomy\",\"rank\":5}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("student"), Literal("""{"name":"Tomy","rank":5}""")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add multi value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson7',json_array('teacher', 'Tom', 'Walt')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("teacher"), Literal("Tom"), Literal("Walt")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test json_append() function: add nested value") { + val frame = sql(s""" + | source = $testTable + | | eval result = json_append('$validJson8',json_array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result + | """.stripMargin) + assertSameRows( + Seq(Row( + "{\"school\":{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")), + frame) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val keysExpression = + UnresolvedFunction( + "array", + Seq(Literal("school.teacher"), Literal("Tom"), Literal("Walt")), + isDistinct = false) + val jsonObjExp = + Literal( + "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval)) + val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 0b1c3b210..7b5e10ffe 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -19,7 +19,9 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.AbstractFunction2; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -97,43 +99,87 @@ private void removeNestedKey(Object currentObj, String[] keyParts, int depth) { } }; - Function2>, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { - + Function2, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { /** - * Append values to JSON arrays based on specified path-value pairs. + * Append values to JSON arrays based on specified path-values. * - * @param jsonStr The input JSON string. - * @param pathValuePairs A list of path-value pairs to append. + * @param jsonStr The input JSON string. + * @param elements A list of path-values where the first item is the path and subsequent items are values to append. * @return The updated JSON string. */ - public String apply(String jsonStr, List> pathValuePairs) { + public String apply(String jsonStr, WrappedArray elements) { if (jsonStr == null) { return null; } try { - Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + List pathValues = JavaConverters.mutableSeqAsJavaList(elements); + if (pathValues.isEmpty()) { + return jsonStr; + } - for (Map.Entry pathValuePair : pathValuePairs) { - String path = pathValuePair.getKey(); - String value = pathValuePair.getValue(); + String path = pathValues.get(0); + String[] pathParts = path.split("\\."); + List values = pathValues.subList(1, pathValues.size()); - if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { - List existingList = (List) jsonMap.get(path); - // Append value to the end of the existing Scala List - existingList.add(value); - jsonMap.put(path, existingList); - } else if (jsonMap.containsKey(path)) { - // Ignore appending if the path is not an array - } else { - jsonMap.put(path, List.of(value)); - } + // Parse the JSON string into a Map + Map jsonMap = objectMapper.readValue(jsonStr, Map.class); + + // Append each value at the specified path + for (String value : values) { + Object parsedValue = parseValue(value); // Parse the value + appendNestedValue(jsonMap, pathParts, 0, parsedValue); } + // Convert the updated map back to JSON return objectMapper.writeValueAsString(jsonMap); } catch (Exception e) { return null; } } + + private Object parseValue(String value) { + // Try parsing the value as JSON, fallback to primitive if parsing fails + try { + return objectMapper.readValue(value, Object.class); + } catch (Exception e) { + // Primitive value, return as is + return value; + } + } + + private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { + if (currentObj == null || depth >= pathParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = pathParts[depth]; + + if (depth == pathParts.length - 1) { + // If it's the last key, append to the array + currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present + Object existingValue = currentMap.get(currentKey); + + if (existingValue instanceof List) { + List existingList = (List) existingValue; + existingList.add(valueToAppend); + } + } else { + // Continue traversing + currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present + appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); + } + } else if (currentObj instanceof List) { + // If the current object is a list, process each map in the list + List list = (List) currentObj; + for (Object item : list) { + if (item instanceof Map) { + appendNestedValue(item, pathParts, depth, valueToAppend); + } + } + } + } }; /** @@ -171,7 +217,7 @@ public String apply(String jsonStr, List>> pathVa } } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 3f54c4dc9..884d3d5a2 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -95,17 +95,30 @@ public void testJsonDeleteFunctionInvalidJson() { public void testJsonAppendFunctionAppendToExistingArray() { String jsonStr = "{\"arrayKey\":[\"value1\",\"value2\"]}"; String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"]}"; - Map.Entry pair = Map.entry("arrayKey", "value3"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"arrayKey", "value3"})); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionAppendObjectToExistingArray() { + String jsonStr = "{\"key1\":\"value1\",\"key2\":[{\"a\":\"valueA\",\"key3\":\"value3\"}]}"; + String expectedJson = "{\"key1\":\"value1\",\"key2\":[{\"a\":\"valueA\",\"key3\":\"value3\"},{\"a\":\"valueA\",\"key4\":\"value4\"}]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2", "{\"a\":\"valueA\",\"key4\":\"value4\"}"})); assertEquals(expectedJson, result); } @Test public void testJsonAppendFunctionAddNewArray() { - String jsonStr = "{\"key1\":\"value1\"}"; + String jsonStr = "{\"key1\":\"value1\",\"newArray\":[]}"; String expectedJson = "{\"key1\":\"value1\",\"newArray\":[\"newValue\"]}"; - Map.Entry pair = Map.entry("newArray", "newValue"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"newArray", "newValue"})); + assertEquals(expectedJson, result); + } + @Test + public void testJsonAppendFunctionNoSuchKey() { + String jsonStr = "{\"key1\":\"value1\"}"; + String expectedJson = "{\"key1\":\"value1\",\"newKey\":[\"newValue\"]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"newKey", "newValue"})); assertEquals(expectedJson, result); } @@ -113,34 +126,36 @@ public void testJsonAppendFunctionAddNewArray() { public void testJsonAppendFunctionIgnoreNonArrayKey() { String jsonStr = "{\"key1\":\"value1\"}"; String expectedJson = jsonStr; - Map.Entry pair = Map.entry("key1", "newValue"); - String result = jsonAppendFunction.apply(jsonStr, Collections.singletonList(pair)); + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key1", "newValue"})); assertEquals(expectedJson, result); } @Test - public void testJsonAppendFunctionMultipleAppends() { - String jsonStr = "{\"arrayKey\":[\"value1\"]}"; - String expectedJson = "{\"arrayKey\":[\"value1\",\"value2\",\"value3\"],\"newKey\":[\"newValue\"]}"; - List> pairs = Arrays.asList( - Map.entry("arrayKey", "value2"), - Map.entry("arrayKey", "value3"), - Map.entry("newKey", "newValue") - ); - String result = jsonAppendFunction.apply(jsonStr, pairs); + public void testJsonAppendFunctionWithNestedArrayKeys() { + String jsonStr = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"}]}"; + String expectedJson = "{\"key2\":[{\"a\":[\"Value1\",\"Value2\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\",\"Value2\"],\"key4\":\"Value4\"}]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2.a","Value2"})); + assertEquals(expectedJson, result); + } + + @Test + public void testJsonAppendFunctionWithObjectKey() { + String jsonStr = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"}]}"; + String expectedJson = "{\"key2\":[{\"a\":[\"Value1\"],\"key3\":\"Value3\"},{\"a\":[\"Value1\"],\"key4\":\"Value4\"},\"Value2\"]}"; + String result = jsonAppendFunction.apply(jsonStr, WrappedArray.make(new String[]{"key2","Value2"})); assertEquals(expectedJson, result); } @Test public void testJsonAppendFunctionNullJson() { - String result = jsonAppendFunction.apply(null, Collections.singletonList(Map.entry("key", "value"))); + String result = jsonAppendFunction.apply(null, WrappedArray.make(new String[]{"key1", "newValue"})); assertNull(result); } @Test public void testJsonAppendFunctionInvalidJson() { String invalidJson = "invalid_json"; - String result = jsonAppendFunction.apply(invalidJson, Collections.singletonList(Map.entry("key", "value"))); + String result = jsonAppendFunction.apply(invalidJson, WrappedArray.make(new String[]{"key1", "newValue"})); assertNull(result); } From 3fe3adbf8b72f0d96c2f8c0120ffe6cd017c6cf4 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 10 Dec 2024 13:05:58 -0800 Subject: [PATCH 07/12] update IT tests according to PR's comments Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 12 ++-- .../FlintSparkPPLJsonFunctionITSuite.scala | 22 +++---- ...PlanJsonFunctionsTranslatorTestSuite.scala | 57 +++++++++++-------- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 75f8940f9..16af1e31e 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -217,7 +217,7 @@ A JSON object format. Example: - os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, json_array("a")) + os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, array("a")) fetched rows / total rows = 1/1 +----------------------------------+ | deleted | @@ -225,7 +225,7 @@ Example: | {"a": "valueA" } | +----------------------------------+ - os> source=people | eval eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("a.b")) + os> source=people | eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("a.b")) fetched rows / total rows = 1/1 +-----------------------------------------------------------+ | deleted | @@ -233,7 +233,7 @@ Example: | {"a":[{"c":3}] } | +-----------------------------------------------------------+ - os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, json_array("b.b")) + os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("b.b")) fetched rows / total rows = 1/1 +-----------------------------------+ | no_action | @@ -262,7 +262,7 @@ Append adds the value to the end of the existing array with the following cases: Example: - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, json_array('a', 'valueC', 'valueD')) + os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, array('a', 'valueC', 'valueD')) fetched rows / total rows = 1/1 +-------------------------------------------------+ | append | @@ -270,7 +270,7 @@ Example: | {"a":["valueA", "valueB", "valueC", "valueD"]} | +-------------------------------------------------+ - os> source=people | eval append = json_append(`{"a":[]}`, json_array('a', 'valueC')) + os> source=people | eval append = json_append(`{"a":[]}`, array('a', 'valueC')) fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | @@ -278,7 +278,7 @@ Example: | {"a":["valueC"]} | +-----------------------------------------------+ - os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, json_array('root', 'valueC') + os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, array('root', 'valueC') fetched rows / total rows = 1/1 +-----------------------------------------------+ | append | diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index 9e0e6a1b3..a1ac52de0 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -397,7 +397,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_delete() function: one key") { val frame = sql(s""" | source = $testTable - | | eval result = json_delete('$validJson1',json_array('age')) | head 1 | fields result + | | eval result = json_delete('$validJson1',array('age')) | head 1 | fields result | """.stripMargin) assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225,\"gender\":\"M\"}")), frame) @@ -417,7 +417,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_delete() function: multiple keys") { val frame = sql(s""" | source = $testTable - | | eval result = json_delete('$validJson1',json_array('age','gender')) | head 1 | fields result + | | eval result = json_delete('$validJson1',array('age','gender')) | head 1 | fields result | """.stripMargin) assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225}")), frame) @@ -438,7 +438,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_delete() function: nested key") { val frame = sql(s""" | source = $testTable - | | eval result = json_delete('$validJson2',json_array('f2.f3')) | head 1 | fields result + | | eval result = json_delete('$validJson2',array('f2.f3')) | head 1 | fields result | """.stripMargin) assertSameRows(Seq(Row("{\"f1\":\"abc\",\"f2\":{\"f4\":\"b\"}}")), frame) @@ -459,7 +459,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_delete() function: multi depth keys ") { val frame = sql(s""" | source = $testTable - | | eval result = json_delete('$validJson5',json_array('teacher', 'student.rank')) | head 1 | fields result + | | eval result = json_delete('$validJson5',array('teacher', 'student.rank')) | head 1 | fields result | """.stripMargin) assertSameRows(Seq(Row("{\"student\":[{\"name\":\"Bob\"},{\"name\":\"Charlie\"}]}")), frame) @@ -484,7 +484,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_delete() function: key not found") { val frame = sql(s""" | source = $testTable - | | eval result = json_delete('$validJson5',json_array('none')) | head 1 | fields result + | | eval result = json_delete('$validJson5',array('none')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -509,7 +509,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add single value") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson7',json_array('teacher', 'Tom')) | head 1 | fields result + | | eval result = json_append('$validJson7',array('teacher', 'Tom')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -534,7 +534,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add single value key not found") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson7',json_array('headmaster', 'Tom')) | head 1 | fields result + | | eval result = json_append('$validJson7',array('headmaster', 'Tom')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -559,7 +559,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add single Object key not found") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson7',json_array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result + | | eval result = json_append('$validJson7',array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -587,7 +587,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add single Object value") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson7',json_array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result + | | eval result = json_append('$validJson7',array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -615,7 +615,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add multi value") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson7',json_array('teacher', 'Tom', 'Walt')) | head 1 | fields result + | | eval result = json_append('$validJson7',array('teacher', 'Tom', 'Walt')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( @@ -643,7 +643,7 @@ class FlintSparkPPLJsonFunctionITSuite test("test json_append() function: add nested value") { val frame = sql(s""" | source = $testTable - | | eval result = json_append('$validJson8',json_array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result + | | eval result = json_append('$validJson8',array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result | """.stripMargin) assertSameRows( Seq(Row( diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 8e36395b3..70a633024 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -16,7 +16,7 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project} import org.apache.spark.sql.types.DataTypes @@ -195,17 +195,20 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, """source=t a = json_delete('{"a":[{"b":1},{"c":2}]}', '["a.b"]')"""), + plan( + pplParser, + """source=t | eval result = json_delete('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), context) val table = UnresolvedRelation(Seq("t")) - val keysExpression = Literal("""["a.b"]""") - val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") - val jsonFunc = visit("json_delete", util.List.of(jsonObjExp, keysExpression)) - val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) - val filterPlan = Filter(filterExpr, table) - val projectList = Seq(UnresolvedStar(None)) - val expectedPlan = Project(projectList, filterPlan) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + val jsonObjExp = + Literal("""{"a":[{"b":1},{"c":2}]}""") + val jsonFunc = + Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) comparePlans(expectedPlan, logPlan, false) } @@ -213,17 +216,20 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite val context = new CatalystPlanContext val logPlan = planTransformer.visit( - plan(pplParser, """source=t a = json_append('{"a":[1,2]}', '["a",3]')"""), + plan( + pplParser, + """source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), context) val table = UnresolvedRelation(Seq("t")) - val keysExpression = Literal("""["a",3]""") - val jsonObjExp = Literal("""{"a":[1,2]}""") - val jsonFunc = visit("json_append", util.List.of(jsonObjExp, keysExpression)) - val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) - val filterPlan = Filter(filterExpr, table) - val projectList = Seq(UnresolvedStar(None)) - val expectedPlan = Project(projectList, filterPlan) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + val jsonObjExp = + Literal("""{"a":[{"b":1},{"c":2}]}""") + val jsonFunc = + Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) comparePlans(expectedPlan, logPlan, false) } @@ -233,17 +239,18 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite planTransformer.visit( plan( pplParser, - """source=t a = json_extend('{"a":[{"b":1},{"c":2}]}', '["a",{"c":2}]')"""), + """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), context) val table = UnresolvedRelation(Seq("t")) - val keysExpression = Literal("""["a",{"c":2}]""") - val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") - val jsonFunc = visit("json_extend", util.List.of(jsonObjExp, keysExpression)) - val filterExpr = EqualTo(UnresolvedAttribute("a"), jsonFunc) - val filterPlan = Filter(filterExpr, table) - val projectList = Seq(UnresolvedStar(None)) - val expectedPlan = Project(projectList, filterPlan) + val keysExpression = + UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + val jsonObjExp = + Literal("""{"a":[{"b":1},{"c":2}]}""") + val jsonFunc = + Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")() + val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) comparePlans(expectedPlan, logPlan, false) } From 88bcaea9c6f4d93a00ffdaa82b3286f520978b1f Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 11 Dec 2024 14:50:49 -0800 Subject: [PATCH 08/12] update PR to only inclide json_delete json_append Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 35 ----- .../FlintSparkPPLJsonFunctionITSuite.scala | 2 +- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 2 +- .../src/main/antlr4/OpenSearchPPLParser.g4 | 3 - .../sql/expression/function/JsonUtils.java | 106 ++++++++++++++ .../expression/function/SerializableUdf.java | 131 +----------------- ...PlanJsonFunctionsTranslatorTestSuite.scala | 14 +- 7 files changed, 123 insertions(+), 170 deletions(-) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index 16af1e31e..c3955941b 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -286,41 +286,6 @@ Example: |{"root": {"a":["valueA", "valueB"]}} | +-----------------------------------------------+ - - -### `JSON_EXTEND` - -**Description** - -`json_extend(json, [path_value_pairs list])` extend appends multiple (array) values to an existing array json elements. Return the updated object after extending. - -**Argument type:** JSON, List<[(STRING, List)]> - -**Return type:** JSON - -A JSON object format. - -**Note** -Extend arrays as individual values separates the `json_extend` functionality from the `json_append` - which is a similar function that appends the `` as a single element. - -Example: - - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a', ["valueC","valueD"]]) - fetched rows / total rows = 1/1 - +-------------------------------------------------+ - | extend | - +-------------------------------------------------+ - | {"a":["valueA", "valueB", "valueC", "valueD"]} | - +-------------------------------------------------+ - - os> source=people | eval extend = json_extend(`{"a":["valueA", "valueB"]}`,['a',[{"b":["valueC","valueD"]}]]) - fetched rows / total rows = 1/1 - +-------------------------------------------------------------+ - | extend | - +-------------------------------------------------------------+ - | {"a":["valueA", "valueB", {"b":"valueC"}, {"b":"valueD"}]} | - +-------------------------------------------------------------+ - ### `JSON_KEYS` **Description** diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala index a1ac52de0..7a00d9a07 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLJsonFunctionITSuite.scala @@ -35,6 +35,7 @@ class FlintSparkPPLJsonFunctionITSuite "{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}" private val validJson8 = "{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}" + private val validJson9 = "{\"a\":[\"valueA\", \"valueB\"]}" private val invalidJson1 = "[1,2" private val invalidJson2 = "[invalid json]" private val invalidJson3 = "{\"invalid\": \"json\"" @@ -667,5 +668,4 @@ class FlintSparkPPLJsonFunctionITSuite val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index a8efffe71..b7d615980 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -386,10 +386,10 @@ JSON_ARRAY_LENGTH: 'JSON_ARRAY_LENGTH'; TO_JSON_STRING: 'TO_JSON_STRING'; JSON_EXTRACT: 'JSON_EXTRACT'; JSON_DELETE : 'JSON_DELETE'; -JSON_EXTEND : 'JSON_EXTEND'; JSON_KEYS: 'JSON_KEYS'; JSON_VALID: 'JSON_VALID'; JSON_APPEND: 'JSON_APPEND'; +//JSON_EXTEND : 'JSON_EXTEND'; //JSON_SET: 'JSON_SET'; //JSON_ARRAY_ALL_MATCH: 'JSON_ARRAY_ALL_MATCH'; //JSON_ARRAY_ANY_MATCH: 'JSON_ARRAY_ANY_MATCH'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 54926e05e..b990fd549 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -876,12 +876,9 @@ jsonFunctionName | TO_JSON_STRING | JSON_EXTRACT | JSON_DELETE - | JSON_EXTEND | JSON_APPEND | JSON_KEYS | JSON_VALID -// | JSON_APPEND -// | JSON_DELETE // | JSON_EXTEND // | JSON_SET // | JSON_ARRAY_ALL_MATCH diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java new file mode 100644 index 000000000..9ca6732c6 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/JsonUtils.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public interface JsonUtils { + ObjectMapper objectMapper = new ObjectMapper(); + + static Object parseValue(String value) { + // Try parsing the value as JSON, fallback to primitive if parsing fails + try { + return objectMapper.readValue(value, Object.class); + } catch (Exception e) { + // Primitive value, return as is + return value; + } + } + + /** + * append nested value to the json object + * @param currentObj + * @param pathParts + * @param depth + * @param valueToAppend + */ + static void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { + if (currentObj == null || depth >= pathParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = pathParts[depth]; + + if (depth == pathParts.length - 1) { + // If it's the last key, append to the array + currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present + Object existingValue = currentMap.get(currentKey); + + if (existingValue instanceof List) { + List existingList = (List) existingValue; + existingList.add(valueToAppend); + } + } else { + // Continue traversing + currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present + appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); + } + } else if (currentObj instanceof List) { + // If the current object is a list, process each map in the list + List list = (List) currentObj; + for (Object item : list) { + if (item instanceof Map) { + appendNestedValue(item, pathParts, depth, valueToAppend); + } + } + } + } + + /** + * remove nested json object using its keys parts + * @param currentObj + * @param keyParts + * @param depth + */ + static void removeNestedKey(Object currentObj, String[] keyParts, int depth) { + if (currentObj == null || depth >= keyParts.length) { + return; + } + + if (currentObj instanceof Map) { + Map currentMap = (Map) currentObj; + String currentKey = keyParts[depth]; + + if (depth == keyParts.length - 1) { + // If it's the last key, remove it from the map + currentMap.remove(currentKey); + } else { + // If not the last key, continue traversing + if (currentMap.containsKey(currentKey)) { + Object nextObj = currentMap.get(currentKey); + + if (nextObj instanceof List) { + // If the value is a list, process each item in the list + List list = (List) nextObj; + for (int i = 0; i < list.size(); i++) { + removeNestedKey(list.get(i), keyParts, depth + 1); + } + } else { + // Continue traversing if it's a map + removeNestedKey(nextObj, keyParts, depth + 1); + } + } + } + } + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java index 7b5e10ffe..e80a26bc4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/SerializableUdf.java @@ -5,7 +5,6 @@ package org.opensearch.sql.expression.function; -import com.fasterxml.jackson.databind.ObjectMapper; import inet.ipaddr.AddressStringException; import inet.ipaddr.IPAddressString; import inet.ipaddr.IPAddressStringParameters; @@ -19,18 +18,19 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.AbstractFunction2; -import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.opensearch.sql.expression.function.JsonUtils.appendNestedValue; +import static org.opensearch.sql.expression.function.JsonUtils.objectMapper; +import static org.opensearch.sql.expression.function.JsonUtils.parseValue; +import static org.opensearch.sql.expression.function.JsonUtils.removeNestedKey; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; public interface SerializableUdf { - ObjectMapper objectMapper = new ObjectMapper(); abstract class SerializableAbstractFunction2 extends AbstractFunction2 implements Serializable { @@ -65,38 +65,6 @@ private void removeKeys(Map map, WrappedArray keysToRemo removeNestedKey(map, keyParts, 0); } } - - private void removeNestedKey(Object currentObj, String[] keyParts, int depth) { - if (currentObj == null || depth >= keyParts.length) { - return; - } - - if (currentObj instanceof Map) { - Map currentMap = (Map) currentObj; - String currentKey = keyParts[depth]; - - if (depth == keyParts.length - 1) { - // If it's the last key, remove it from the map - currentMap.remove(currentKey); - } else { - // If not the last key, continue traversing - if (currentMap.containsKey(currentKey)) { - Object nextObj = currentMap.get(currentKey); - - if (nextObj instanceof List) { - // If the value is a list, process each item in the list - List list = (List) nextObj; - for (int i = 0; i < list.size(); i++) { - removeNestedKey(list.get(i), keyParts, depth + 1); - } - } else { - // Continue traversing if it's a map - removeNestedKey(nextObj, keyParts, depth + 1); - } - } - } - } - } }; Function2, String> jsonAppendFunction = new SerializableAbstractFunction2<>() { @@ -136,88 +104,8 @@ public String apply(String jsonStr, WrappedArray elements) { return null; } } - - private Object parseValue(String value) { - // Try parsing the value as JSON, fallback to primitive if parsing fails - try { - return objectMapper.readValue(value, Object.class); - } catch (Exception e) { - // Primitive value, return as is - return value; - } - } - - private void appendNestedValue(Object currentObj, String[] pathParts, int depth, Object valueToAppend) { - if (currentObj == null || depth >= pathParts.length) { - return; - } - - if (currentObj instanceof Map) { - Map currentMap = (Map) currentObj; - String currentKey = pathParts[depth]; - - if (depth == pathParts.length - 1) { - // If it's the last key, append to the array - currentMap.computeIfAbsent(currentKey, k -> new ArrayList<>()); // Create list if not present - Object existingValue = currentMap.get(currentKey); - - if (existingValue instanceof List) { - List existingList = (List) existingValue; - existingList.add(valueToAppend); - } - } else { - // Continue traversing - currentMap.computeIfAbsent(currentKey, k -> new LinkedHashMap<>()); // Create map if not present - appendNestedValue(currentMap.get(currentKey), pathParts, depth + 1, valueToAppend); - } - } else if (currentObj instanceof List) { - // If the current object is a list, process each map in the list - List list = (List) currentObj; - for (Object item : list) { - if (item instanceof Map) { - appendNestedValue(item, pathParts, depth, valueToAppend); - } - } - } - } - }; - - /** - * Extend JSON arrays with new values based on specified path-value pairs. - * - * @param jsonStr The input JSON string. - * @param pathValuePairs A list of path-value pairs to extend. - * @return The updated JSON string. - */ - Function2>>, String> jsonExtendFunction = new SerializableAbstractFunction2<>() { - - @Override - public String apply(String jsonStr, List>> pathValuePairs) { - if (jsonStr == null) { - return null; - } - try { - Map jsonMap = objectMapper.readValue(jsonStr, Map.class); - - for (Map.Entry> pathValuePair : pathValuePairs) { - String path = pathValuePair.getKey(); - List values = pathValuePair.getValue(); - - if (jsonMap.containsKey(path) && jsonMap.get(path) instanceof List) { - List existingList = (List) jsonMap.get(path); - existingList.addAll(values); - } else { - jsonMap.put(path, values); - } - } - - return objectMapper.writeValueAsString(jsonMap); - } catch (Exception e) { - return null; - } - } }; - + Function2 cidrFunction = new SerializableAbstractFunction2<>() { IPAddressStringParameters valOptions = new IPAddressStringParameters.Builder() @@ -280,15 +168,6 @@ static ScalaUDF visit(String funcName, List expressions) { Option.apply("json_delete"), false, true); - case "json_extend": - return new ScalaUDF(jsonExtendFunction, - DataTypes.StringType, - seq(expressions), - seq(), - Option.empty(), - Option.apply("json_extend"), - false, - true); case "json_append": return new ScalaUDF(jsonAppendFunction, DataTypes.StringType, diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 70a633024..23f6ba5d2 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -218,12 +218,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite planTransformer.visit( plan( pplParser, - """source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), + """source=t | eval result = json_append('{"a":[{"b":1},{"c":2}]}', array('a.b','c','d'))"""), context) val table = UnresolvedRelation(Seq("t")) val keysExpression = - UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + UnresolvedFunction( + "array", + Seq(Literal("a.b"), Literal("c"), Literal("d")), + isDistinct = false) val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") val jsonFunc = @@ -239,12 +242,15 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite planTransformer.visit( plan( pplParser, - """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b'))"""), + """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b', 'c','d'))"""), context) val table = UnresolvedRelation(Seq("t")) val keysExpression = - UnresolvedFunction("array", Seq(Literal("a.b")), isDistinct = false) + UnresolvedFunction( + "array", + Seq(Literal("a.b"), Literal("c"), Literal("d")), + isDistinct = false) val jsonObjExp = Literal("""{"a":[{"b":1},{"c":2}]}""") val jsonFunc = From 09b7b5cbf84058c92f22e1de814552f64468835c Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 11 Dec 2024 14:51:26 -0800 Subject: [PATCH 09/12] update PR to only inclide json_delete json_append Signed-off-by: YANGDB --- ...PlanJsonFunctionsTranslatorTestSuite.scala | 26 +------------------ 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 23f6ba5d2..5a09af992 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -235,31 +235,7 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) comparePlans(expectedPlan, logPlan, false) } - - test("test json_extend()") { - val context = new CatalystPlanContext - val logPlan = - planTransformer.visit( - plan( - pplParser, - """source=t | eval result = json_extend('{"a":[{"b":1},{"c":2}]}', array('a.b', 'c','d'))"""), - context) - - val table = UnresolvedRelation(Seq("t")) - val keysExpression = - UnresolvedFunction( - "array", - Seq(Literal("a.b"), Literal("c"), Literal("d")), - isDistinct = false) - val jsonObjExp = - Literal("""{"a":[{"b":1},{"c":2}]}""") - val jsonFunc = - Alias(visit("json_extend", util.List.of(jsonObjExp, keysExpression)), "result")() - val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table) - val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) - comparePlans(expectedPlan, logPlan, false) - } - + test("test json_keys()") { val context = new CatalystPlanContext val logPlan = From f76ee3711cb9855f9fc43cfd802275a6dffe9560 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 11 Dec 2024 15:22:17 -0800 Subject: [PATCH 10/12] fix scalaFmt Signed-off-by: YANGDB --- .../ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala index 5a09af992..fae070a75 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanJsonFunctionsTranslatorTestSuite.scala @@ -235,7 +235,7 @@ class PPLLogicalPlanJsonFunctionsTranslatorTestSuite val expectedPlan = Project(Seq(UnresolvedStar(None)), eval) comparePlans(expectedPlan, logPlan, false) } - + test("test json_keys()") { val context = new CatalystPlanContext val logPlan = From fe8d600f60e2bf70f924c352445414c61787f8a1 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 11 Dec 2024 15:43:32 -0800 Subject: [PATCH 11/12] fix tests Signed-off-by: YANGDB --- .../function/SerializableJsonUdfTest.java | 98 ------------------- 1 file changed, 98 deletions(-) diff --git a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java index 884d3d5a2..fb47803cf 100644 --- a/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java +++ b/ppl-spark-integration/src/test/java/org/opensearch/sql/expression/function/SerializableJsonUdfTest.java @@ -7,18 +7,10 @@ import org.junit.Test; import scala.collection.mutable.WrappedArray; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.apache.derby.vti.XmlVTI.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.opensearch.sql.expression.function.SerializableUdf.jsonAppendFunction; import static org.opensearch.sql.expression.function.SerializableUdf.jsonDeleteFunction; -import static org.opensearch.sql.expression.function.SerializableUdf.jsonExtendFunction; public class SerializableJsonUdfTest { @@ -158,94 +150,4 @@ public void testJsonAppendFunctionInvalidJson() { String result = jsonAppendFunction.apply(invalidJson, WrappedArray.make(new String[]{"key1", "newValue"})); assertNull(result); } - - @Test - public void testJsonExtendFunctionWithExistingPath() { - String jsonStr = "{\"path1\": [\"value1\", \"value2\"]}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add(Map.entry("path1", asList("value3", "value4"))); - - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - String expectedJson = "{\"path1\":[\"value1\",\"value2\",\"value3\",\"value4\"]}"; - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionWithNewPath() { - String jsonStr = "{\"path1\": [\"value1\"]}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add(Map.entry("path2", asList("newValue1", "newValue2"))); - - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - String expectedJson = "{\"path1\":[\"value1\"],\"path2\":[\"newValue1\",\"newValue2\"]}"; - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionWithNullInput() { - String result = jsonExtendFunction.apply(null, Collections.emptyList()); - assertNull(result); - } - - @Test - public void testJsonExtendFunctionWithInvalidJson() { - String result = jsonExtendFunction.apply("invalid json", Collections.emptyList()); - assertNull(result); - } - - @Test - public void testJsonExtendFunctionWithNonArrayPath() { - String jsonStr = "{\"path1\":\"value1\"}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add(Map.entry("path1", asList("value2"))); - - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - String expectedJson = "{\"path1\":[\"value2\"]}"; - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionAddValuesToExistingArray() { - String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add( Map.entry("key2", Arrays.asList("value3", "value4"))); - - String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\",\"value4\"]}"; - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionAddNewArray() { - String jsonStr = "{\"key1\":\"value1\"}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add( Map.entry("key2", Arrays.asList("value2", "value3"))); - - String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\",\"value3\"]}"; - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionHandleEmptyValues() { - String jsonStr = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - List>> pathValuePairs = new ArrayList<>(); - pathValuePairs.add( Map.entry("key2", Collections.emptyList())); - - String expectedJson = "{\"key1\":\"value1\",\"key2\":[\"value2\"]}"; - String result = jsonExtendFunction.apply(jsonStr, pathValuePairs); - - assertEquals(expectedJson, result); - } - - @Test - public void testJsonExtendFunctionHandleNullInput() { - String result = jsonExtendFunction.apply(null, Collections.singletonList( Map.entry("key2", List.of("value2")))); - assertEquals(null, result); - } } From 2124fadc1d403dfdcd405c9c4845edf169c88bd1 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 13 Dec 2024 10:17:34 -0800 Subject: [PATCH 12/12] fix documentation Signed-off-by: YANGDB --- docs/ppl-lang/functions/ppl-json.md | 79 +++++++++++++++-------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/docs/ppl-lang/functions/ppl-json.md b/docs/ppl-lang/functions/ppl-json.md index c3955941b..3eb952cb7 100644 --- a/docs/ppl-lang/functions/ppl-json.md +++ b/docs/ppl-lang/functions/ppl-json.md @@ -207,51 +207,51 @@ Example: **Description** -`json_delete(json, [keys list])` Deletes json elements from a json object based on json specific keys. Return the updated object after keys deletion . +`json_delete(json_string, [keys list])` Deletes json elements from a json object based on json specific keys. Return the updated object after keys deletion . -**Argument type:** JSON, List +**Arguments type:** JSON_STRING, List -**Return type:** JSON +**Return type:** JSON_STRING A JSON object format. Example: - os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, array("a")) + os> source=people | eval deleted = json_delete('{"account_number":1,"balance":39225,"age":32,"gender":"M"}', array('age','gender')) | head 1 | fields deleted fetched rows / total rows = 1/1 - +----------------------------------+ - | deleted | - +----------------------------------+ - | {"a": "valueA" } | - +----------------------------------+ + +------------------------------------------+ + | deleted | + +-----------------------------------------+ + |{"account_number":1,"balance":39225} | + +-----------------------------------------+ - os> source=people | eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("a.b")) + os> source=people | eval deleted = json_delete('{"f1":"abc","f2":{"f3":"a","f4":"b"}}', array('f2.f3')) | head 1 | fields deleted fetched rows / total rows = 1/1 +-----------------------------------------------------------+ | deleted | +-----------------------------------------------------------+ - | {"a":[{"c":3}] } | + | {"f1":"abc","f2":{"f4":"b"}} | +-----------------------------------------------------------+ - os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("b.b")) + os> source=people | eval deleted = json_delete('{"teacher":"Alice","student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}',array('teacher', 'student.rank')) | head 1 | fields deleted fetched rows / total rows = 1/1 - +-----------------------------------+ - | no_action | - +-----------------------------------+ - | {"a":[{"b":1},{"b":2},{"c":3}]} | - +-----------------------------------+ + +--------------------------------------------------+ + | deleted | + +--------------------------------------------------+ + |{"student":[{"name":"Bob"},{"name":"Charlie"}]} | + +--------------------------------------------------+ ### `JSON_APPEND` **Description** -`json_append(json, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending . +`json_append(json_string, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending . -**Argument type:** JSON, List +**Argument type:** JSON_STRING, List -**Return type:** JSON +**Return type:** JSON_STRING -A JSON object format. +A string JSON object format. **Note** Append adds the value to the end of the existing array with the following cases: @@ -262,29 +262,30 @@ Append adds the value to the end of the existing array with the following cases: Example: - os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, array('a', 'valueC', 'valueD')) + os> source=people | eval append = json_append(`{"teacher":["Alice"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}`,array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields append fetched rows / total rows = 1/1 - +-------------------------------------------------+ - | append | - +-------------------------------------------------+ - | {"a":["valueA", "valueB", "valueC", "valueD"]} | - +-------------------------------------------------+ + +-----------------------------------------------------------------------------------------------------------------------------------+ + | append | + +-----------------------------------------------------------------------------------------------------------------------------------+ + |{"teacher":["Alice"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2},{"name":"Tomy","rank":5}]} | + +-----------------------------------------------------------------------------------------------------------------------------------+ - os> source=people | eval append = json_append(`{"a":[]}`, array('a', 'valueC')) + os> source=people | eval append = json_append(`{"teacher":["Alice"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}`,array('teacher', 'Tom', 'Walt')) | head 1 | fields append fetched rows / total rows = 1/1 - +-----------------------------------------------+ - | append | - +-----------------------------------------------+ - | {"a":["valueC"]} | - +-----------------------------------------------+ + +-----------------------------------------------------------------------------------------------------------------------------------+ + | append | + +-----------------------------------------------------------------------------------------------------------------------------------+ + |{"teacher":["Alice","Tom","Walt"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]} | + +-----------------------------------------------------------------------------------------------------------------------------------+ + - os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, array('root', 'valueC') + os> source=people | eval append = json_append(`{"school":{"teacher":["Alice"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}}`,array('school.teacher', 'Tom', 'Walt')) | head 1 | fields append fetched rows / total rows = 1/1 - +-----------------------------------------------+ - | append | - +-----------------------------------------------+ - |{"root": {"a":["valueA", "valueB"]}} | - +-----------------------------------------------+ + +-------------------------------------------------------------------------------------------------------------------------+ + | append | + +-------------------------------------------------------------------------------------------------------------------------+ + |{"school":{"teacher":["Alice","Tom","Walt"],"student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}} | + +-------------------------------------------------------------------------------------------------------------------------+ ### `JSON_KEYS`