Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPL support json_delete, append functions #971

Merged
Merged
83 changes: 83 additions & 0 deletions docs/ppl-lang/functions/ppl-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,89 @@ Example:
+----------------+


### `JSON_DELETE`

**Description**

`json_delete(json, [keys list])` Deletes json elements from a json object based on json specific keys. Return the updated object after keys deletion .

**Argument type:** JSON, List<STRING>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the input json should be jsonString and type here is STRING, List<STRING>?


**Return type:** JSON

A JSON object format.

Example:

os> source=people | eval deleted = json_delete({"a":"valueA", "b":"valueB"}, array("a"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this query work? Is it json_delete('{"a":"valueA", "b":"valueB"}', array("a"))?

Copy link
Member Author

@YANG-DB YANG-DB Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the documentation with the examples to match the actual test

fetched rows / total rows = 1/1
+----------------------------------+
| deleted |
+----------------------------------+
| {"a": "valueA" } |
+----------------------------------+

os> source=people | eval deleted = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("a.b"))
fetched rows / total rows = 1/1
+-----------------------------------------------------------+
| deleted |
+-----------------------------------------------------------+
| {"a":[{"c":3}] } |
+-----------------------------------------------------------+

os> source=people | eval `no_action` = json_delete({"a":[{"b":1},{"b":2},{"c":3}]}, array("b.b"))
fetched rows / total rows = 1/1
+-----------------------------------+
| no_action |
+-----------------------------------+
| {"a":[{"b":1},{"b":2},{"c":3}]} |
+-----------------------------------+

### `JSON_APPEND`

**Description**

`json_append(json, [path_key, list of values to add ])` appends values to end of an array within the json elements. Return the updated json object after appending .

**Argument type:** JSON, List<STRING>

**Return type:** JSON

A JSON object format.

**Note**
Append adds the value to the end of the existing array with the following cases:
- path is an object value - append is ignored and the value is returned
- path is an existing array not empty - the value are added to the array's tail
- path not found - the value are added to the root of the json tree
- path is an existing array is empty - create a new array with the given value

Example:

os> source=people | eval append = json_append(`{"a":["valueA", "valueB"]}`, array('a', 'valueC', 'valueD'))
fetched rows / total rows = 1/1
+-------------------------------------------------+
| append |
+-------------------------------------------------+
| {"a":["valueA", "valueB", "valueC", "valueD"]} |
+-------------------------------------------------+

os> source=people | eval append = json_append(`{"a":[]}`, array('a', 'valueC'))
fetched rows / total rows = 1/1
+-----------------------------------------------+
| append |
+-----------------------------------------------+
| {"a":["valueC"]} |
+-----------------------------------------------+

os> source=people | eval append = json_append(`{"root":{ "a":["valueA", "valueB"]}}`, array('root', 'valueC')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a ) in the end.

fetched rows / total rows = 1/1
+-----------------------------------------------+
| append |
+-----------------------------------------------+
|{"root": {"a":["valueA", "valueB"]}} |
LantaoJin marked this conversation as resolved.
Show resolved Hide resolved
+-----------------------------------------------+

### `JSON_KEYS`

**Description**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.flint.spark.ppl

import java.util

import org.opensearch.sql.expression.function.SerializableUdf.visit

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Literal, Not}
Expand All @@ -27,6 +31,11 @@ class FlintSparkPPLJsonFunctionITSuite
private val validJson5 =
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}"
private val validJson6 = "[1,2,3]"
private val validJson7 =
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}"
private val validJson8 =
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}"
private val validJson9 = "{\"a\":[\"valueA\", \"valueB\"]}"
private val invalidJson1 = "[1,2"
private val invalidJson2 = "[invalid json]"
private val invalidJson3 = "{\"invalid\": \"json\""
Expand Down Expand Up @@ -385,4 +394,278 @@ class FlintSparkPPLJsonFunctionITSuite
null))
assertSameRows(expectedSeq, frame)
}

test("test json_delete() function: one key") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_delete('$validJson1',array('age')) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225,\"gender\":\"M\"}")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression = UnresolvedFunction("array", Seq(Literal("age")), isDistinct = false)
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_delete() function: multiple keys") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_delete('$validJson1',array('age','gender')) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row("{\"account_number\":1,\"balance\":39225}")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("age"), Literal("gender")), isDistinct = false)
val jsonObjExp =
Literal("{\"account_number\":1,\"balance\":39225,\"age\":32,\"gender\":\"M\"}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_delete() function: nested key") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_delete('$validJson2',array('f2.f3')) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row("{\"f1\":\"abc\",\"f2\":{\"f4\":\"b\"}}")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("f2.f3")), isDistinct = false)
val jsonObjExp =
Literal("{\"f1\":\"abc\",\"f2\":{\"f3\":\"a\",\"f4\":\"b\"}}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_delete() function: multi depth keys ") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_delete('$validJson5',array('teacher', 'student.rank')) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row("{\"student\":[{\"name\":\"Bob\"},{\"name\":\"Charlie\"}]}")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("teacher"), Literal("student.rank")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_delete() function: key not found") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_delete('$validJson5',array('none')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("none")), isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":\"Alice\",\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_delete", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',array('teacher', 'Tom')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\",\"Tom\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("teacher"), Literal("Tom")), isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single value key not found") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',array('headmaster', 'Tom')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[\"Tom\"]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction("array", Seq(Literal("headmaster"), Literal("Tom")), isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single Object key not found") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',array('headmaster', '{"name":"Tomy","rank":1}')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}],\"headmaster\":[{\"name\":\"Tomy\",\"rank\":1}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("headmaster"), Literal("""{"name":"Tomy","rank":1}""")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add single Object value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',array('student', '{"name":"Tomy","rank":5}')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2},{\"name\":\"Tomy\",\"rank\":5}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("student"), Literal("""{"name":"Tomy","rank":5}""")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add multi value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson7',array('teacher', 'Tom', 'Walt')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("teacher"), Literal("Tom"), Literal("Walt")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test json_append() function: add nested value") {
val frame = sql(s"""
| source = $testTable
| | eval result = json_append('$validJson8',array('school.teacher', 'Tom', 'Walt')) | head 1 | fields result
| """.stripMargin)
assertSameRows(
Seq(Row(
"{\"school\":{\"teacher\":[\"Alice\",\"Tom\",\"Walt\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")),
frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val keysExpression =
UnresolvedFunction(
"array",
Seq(Literal("school.teacher"), Literal("Tom"), Literal("Walt")),
isDistinct = false)
val jsonObjExp =
Literal(
"{\"school\":{\"teacher\":[\"Alice\"],\"student\":[{\"name\":\"Bob\",\"rank\":1},{\"name\":\"Charlie\",\"rank\":2}]}}")
val jsonFunc =
Alias(visit("json_append", util.List.of(jsonObjExp, keysExpression)), "result")()
val eval = Project(Seq(UnresolvedStar(None), jsonFunc), table)
val limit = GlobalLimit(Literal(1), LocalLimit(Literal(1), eval))
val expectedPlan = Project(Seq(UnresolvedAttribute("result")), limit)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
}
Loading
Loading