Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate PPL dedup Command Part 1: allowedDuplication=1 #521

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,40 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createDuplicationNullableTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| id INT,
| name STRING,
| category STRING
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, "A", "X"),
| (2, "A", "Y"),
| (3, "A", "Y"),
| (4, "B", "Z"),
| (5, "B", "Z"),
| (6, "B", "Z"),
| (7, "C", "X"),
| (8, null, "Y"),
| (9, "D", "Z"),
| (10, "E", null),
| (11, "A", "X"),
| (12, "A", "Y"),
| (13, null, "X"),
| (14, "B", null),
| (15, "B", "Y"),
| (16, null, "Z"),
| (17, "C", "X"),
| (18, null, null)
| """.stripMargin)
}

protected def createTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLDedupITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createDuplicationNullableTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test dedupe 1 name") {
val frame = sql(s"""
| source = $testTable | dedup 1 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("A"), Row("B"), Row("C"), Row("D"), Row("E"))
implicit val oneColRowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"))
val dedupKeys = Seq(UnresolvedAttribute("name"))
val filter = Filter(IsNotNull(UnresolvedAttribute("name")), table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"))
implicit val twoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1)))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val dedupKeys = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val filter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val expectedPlan = Project(fieldsProjectList, Deduplicate(dedupKeys, filter))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val isNotNullFilter =
Filter(IsNotNull(UnresolvedAttribute("name")), table)
val deduplicate = Deduplicate(Seq(UnresolvedAttribute("name")), isNotNullFilter)
val isNullFilter = Filter(IsNull(UnresolvedAttribute("name")), table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test dedupe 1 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable | dedup 1 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category"))
val isNotNullFilter = Filter(
And(IsNotNull(UnresolvedAttribute("name")), IsNotNull(UnresolvedAttribute("category"))),
table)
val deduplicate = Deduplicate(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("category")),
isNotNullFilter)
val isNullFilter = Filter(
Or(IsNull(UnresolvedAttribute("name")), IsNull(UnresolvedAttribute("category"))),
table)
val union = Union(deduplicate, isNullFilter)
val expectedPlan = Project(fieldsProjectList, union)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test 1 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 1 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 1 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

ignore("test dedupe 2 name") {
val frame = sql(s"""
| source = $testTable| dedup 2 name | fields name
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] =
Array(Row("A"), Row("A"), Row("B"), Row("B"), Row("C"), Row("C"), Row("D"), Row("E"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name, category") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => {
val value = row.getAs[String](0)
if (value == null) String.valueOf(Int.MaxValue) else value
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row("E", null),
Row(null, "Y"),
Row(null, "X"),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(
results.sorted
.map(_.getAs[String](0))
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))
}

ignore("test dedupe 2 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)

val results: Array[Row] = frame.collect()
// results.foreach(println(_))
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test 2 name CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test 2 name KEEPEMPTY=true CONSECUTIVE=true") {
val ex = intercept[UnsupportedOperationException](sql(s"""
| source = $testTable | dedup 2 name KEEPEMPTY=true CONSECUTIVE=true | fields name
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}
}
18 changes: 16 additions & 2 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,21 @@ Limitation: Overriding existing field is unsupported, following queries throw ex

> For additional details, review [FlintSparkPPLTimeWindowITSuite.scala](../integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala)

**Dedup**

- `source = table | dedup a | fields a,b,c`
- `source = table | dedup a,b | fields a,b,c`
- `source = table | dedup a keepempty=true | fields a,b,c`
- `source = table | dedup a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a | fields a,b,c`
- `source = table | dedup 1 a,b | fields a,b,c`
- `source = table | dedup 1 a keepempty=true | fields a,b,c`
- `source = table | dedup 1 a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Unsupported)
- `source = table | dedup 2 a | fields a,b,c` (Unsupported)

> For additional details, review [FlintSparkPPLDedupITSuite.scala](../integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLDedupITSuite.scala)

#### Supported Commands:
- `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
Expand All @@ -283,6 +298,7 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) (supports AVG, COUNT, DISTINCT_COUNT, MAX, MIN and SUM aggregation functions)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)
- `correlation` - [See details](../docs/PPL-Correlation-command.md)
- `dedup` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/dedup.rst)

> For additional details, review [Integration Tests](../integ-test/src/test/scala/org/opensearch/flint/spark/)

Expand All @@ -298,5 +314,3 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- add [conditions](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/condition.rst) support
- add [top](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/top.rst) support
- add [cast](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/conversion.rst) support
- add [math](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/math.rst) support
- add [deduplicate](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/dedup.rst) support
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ commands
| correlateCommand
| fieldsCommand
| statsCommand
| dedupCommand
| sortCommand
| headCommand
| evalCommand
Expand Down
Loading
Loading