diff --git a/README.md b/README.md index 8d91806fb..b2709e348 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Version compatibility: | Flint version | JDK version | Spark version | Scala version | OpenSearch | |---------------|-------------|---------------|---------------|------------| | 0.1.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ | +| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ | ## Flint Extension Usage @@ -49,7 +50,7 @@ sbt clean standaloneCosmetic/publishM2 ``` then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, ``` -bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT" +bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.2.0-SNAPSHOT" ``` ### PPL Build & Run @@ -61,7 +62,7 @@ sbt clean sparkPPLCosmetic/publishM2 ``` then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, ``` -bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT" +bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT" ``` ## Code of Conduct diff --git a/build.sbt b/build.sbt index 058f622c2..469d57223 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ lazy val opensearchVersion = "2.6.0" ThisBuild / organization := "org.opensearch" -ThisBuild / version := "0.1.0-SNAPSHOT" +ThisBuild / version := "0.2.0-SNAPSHOT" ThisBuild / scalaVersion := scala212 diff --git a/docs/PPL-on-Spark.md b/docs/PPL-on-Spark.md index d94ee8037..f421d5679 100644 --- a/docs/PPL-on-Spark.md +++ b/docs/PPL-on-Spark.md @@ -34,7 +34,7 @@ sbt clean sparkPPLCosmetic/publishM2 ``` then add org.opensearch:opensearch-spark_2.12 when run spark application, for example, ``` -bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT" +bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT" ``` ### PPL Extension Usage @@ -46,7 +46,7 @@ spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensi ``` ### Running With both Flint & PPL Extensions -In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's +In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.2.0-SNAPSHOT`) to the cluster's classpath. Next need to configure both extensions : diff --git a/docs/img/flint-core-index-state-transition.png b/docs/img/flint-core-index-state-transition.png new file mode 100644 index 000000000..57ef31e54 Binary files /dev/null and b/docs/img/flint-core-index-state-transition.png differ diff --git a/docs/img/flint-spark-index-state-transition.png b/docs/img/flint-spark-index-state-transition.png new file mode 100644 index 000000000..19c0b0c80 Binary files /dev/null and b/docs/img/flint-spark-index-state-transition.png differ diff --git a/docs/index.md b/docs/index.md index cc9110fab..88c2bc5e6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -33,7 +33,7 @@ Currently, Flint metadata is only static configuration without version control a ```json { - "version": "0.1.0", + "version": "0.2.0", "name": "...", "kind": "skipping", "source": "...", @@ -110,6 +110,12 @@ writer.close() ``` +### Index State Transition + +Flint index state transition: + +![FlintCoreIndexState](./img/flint-core-index-state-transition.png) + ### API High level API is dependent on query engine implementation. Please see Query Engine Integration section for details. @@ -437,8 +443,17 @@ flint.materializedView() .create() flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics") + +flint.deleteIndex("flint_spark_catalog_default_alb_logs_skipping_index") +flint.vacuumIndex("flint_spark_catalog_default_alb_logs_skipping_index") ``` +#### Index State Transition + +Flint Spark index state transition: + +![FlintSparkIndexState](./img/flint-spark-index-state-transition.png) + #### Skipping Index Provider SPI ```scala @@ -509,7 +524,7 @@ Manual refreshing a table which already has skipping index being auto-refreshed, ### AWS EMR Spark Integration - Using execution role Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). When running in EMR Spark, Flint use executionRole credentials ``` ---conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT \ +--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.2.0-SNAPSHOT \ --conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \ --conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \ --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \ @@ -551,7 +566,7 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa ``` 3. Set the spark.datasource.flint.customAWSCredentialsProvider property with value as com.amazonaws.emr.AssumeRoleAWSCredentialsProvider. Set the environment variable ASSUME_ROLE_CREDENTIALS_ROLE_ARN with the ARN value of CrossAccountRoleB. ``` ---conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT \ +--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.2.0-SNAPSHOT \ --conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \ --conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \ --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala b/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala index 0af93b179..2ff83d4b4 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintVersion.scala @@ -15,6 +15,7 @@ case class FlintVersion(version: String) { object FlintVersion { val V_0_1_0: FlintVersion = FlintVersion("0.1.0") + val V_0_2_0: FlintVersion = FlintVersion("0.2.0") - def current(): FlintVersion = V_0_1_0 + def current(): FlintVersion = V_0_2_0 } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 48782a303..f3ef364b3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -104,8 +104,13 @@ public T commit(Function operation) { try { T result = operation.apply(latest); - // Append final log - metadataLog.add(finalAction.apply(latest)); + // Append final log or purge log entries + FlintMetadataLogEntry finalLog = finalAction.apply(latest); + if (finalLog == NO_LOG_ENTRY) { + metadataLog.purge(); + } else { + metadataLog.add(finalLog); + } return result; } catch (Exception e) { LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 278d078df..bbbfd86b2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -26,4 +26,9 @@ public interface FlintMetadataLog { * @return latest log entry */ Optional getLatest(); + + /** + * Remove all log entries. + */ + void purge(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index eb93c7fde..e6ae565d2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -85,6 +85,7 @@ object FlintMetadataLogEntry { val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") val RECOVERING: IndexState.Value = Value("recovering") + val VACUUMING: IndexState.Value = Value("vacuuming") val UNKNOWN: IndexState.Value = Value("unknown") def from(s: String): IndexState.Value = { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 3a490a87b..d1992959c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -20,6 +20,11 @@ */ public interface OptimisticTransaction { + /** + * Constant that indicate log entry should be purged. + */ + FlintMetadataLogEntry NO_LOG_ENTRY = null; + /** * @param initialCondition initial precondition that the subsequent transition and action can proceed * @return this transaction @@ -33,7 +38,7 @@ public interface OptimisticTransaction { OptimisticTransaction transientLog(Function action); /** - * @param action action to generate final log entry + * @param action action to generate final log entry (will delete entire metadata log if NO_LOG_ENTRY) * @return this transaction */ OptimisticTransaction finalLog(Function action); @@ -45,29 +50,4 @@ public interface OptimisticTransaction { * @return result */ T commit(Function operation); - - /** - * No optimistic transaction. - */ - class NoOptimisticTransaction implements OptimisticTransaction { - @Override - public OptimisticTransaction initialLog(Predicate initialCondition) { - return this; - } - - @Override - public OptimisticTransaction transientLog(Function action) { - return this; - } - - @Override - public OptimisticTransaction finalLog(Function action) { - return this; - } - - @Override - public T commit(Function operation) { - return operation.apply(null); - } - }; } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index f51e8a628..ab38a5f60 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -5,8 +5,17 @@ package org.opensearch.flint.core.storage; +import static java.util.logging.Level.SEVERE; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -19,14 +28,6 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import java.io.IOException; -import java.util.Base64; -import java.util.Optional; -import java.util.logging.Logger; - -import static java.util.logging.Level.SEVERE; -import static org.opensearch.action.support.WriteRequest.RefreshPolicy; - /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. @@ -98,6 +99,20 @@ public Optional getLatest() { } } + @Override + public void purge() { + LOG.info("Purging log entry with id " + latestId); + try (RestHighLevelClient client = flintClient.createClient()) { + DeleteResponse response = + client.delete( + new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + + LOG.info("Purged log entry with result " + response.getResult()); + } catch (Exception e) { + throw new IllegalStateException("Failed to purge log entry", e); + } + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index cb2e14144..a5f0f993b 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -26,6 +26,7 @@ skippingIndexStatement | refreshSkippingIndexStatement | describeSkippingIndexStatement | dropSkippingIndexStatement + | vacuumSkippingIndexStatement ; createSkippingIndexStatement @@ -48,12 +49,17 @@ dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName ; +vacuumSkippingIndexStatement + : VACUUM SKIPPING INDEX ON tableName + ; + coveringIndexStatement : createCoveringIndexStatement | refreshCoveringIndexStatement | showCoveringIndexStatement | describeCoveringIndexStatement | dropCoveringIndexStatement + | vacuumCoveringIndexStatement ; createCoveringIndexStatement @@ -80,6 +86,10 @@ dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; +vacuumCoveringIndexStatement + : VACUUM INDEX indexName ON tableName + ; + materializedViewStatement : createMaterializedViewStatement | refreshMaterializedViewStatement @@ -110,6 +120,10 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +vacuumMaterializedViewStatement + : VACUUM MATERIALIZED VIEW mvName=multipartIdentifier + ; + indexJobManagementStatement : recoverIndexJobStatement ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index fe6fd3c66..82c890a61 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -174,6 +174,7 @@ RECOVER: 'RECOVER'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; +VACUUM: 'VACUUM'; VIEW: 'VIEW'; VIEWS: 'VIEWS'; WHERE: 'WHERE'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 47ade0f87..122fea601 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -239,6 +240,38 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Delete a Flint index physically. + * + * @param indexName + * index name + * @return + * true if exist and deleted, otherwise false + */ + def vacuumIndex(indexName: String): Boolean = { + logInfo(s"Vacuuming Flint index $indexName") + if (flintClient.exists(indexName)) { + try { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == DELETED) + .transientLog(latest => latest.copy(state = VACUUMING)) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => { + flintClient.deleteIndex(indexName) + true + }) + } catch { + case e: Exception => + logError("Failed to vacuum Flint index", e) + throw new IllegalStateException("Failed to vacuum Flint index") + } + } else { + logInfo("Flint index to vacuum doesn't exist") + false + } + } + /** * Recover index job. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 83a816a58..9b4816e71 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -112,6 +112,15 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + override def visitVacuumCoveringIndexStatement( + ctx: VacuumCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint.vacuumIndex(flintIndexName) + Seq.empty + } + } + private def getFlintIndexName( flint: FlintSpark, indexNameCtx: RuleNode, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 1a990b5b0..3ab164023 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -108,6 +108,14 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitVacuumMaterializedViewStatement( + ctx: VacuumMaterializedViewStatementContext): Command = { + FlintSparkSqlCommand() { flint => + flint.vacuumIndex(getFlintIndexName(flint, ctx.mvName)) + Seq.empty + } + } + private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = { val fullMvName = getFullTableName(flint, mvNameCtx) FlintSparkMaterializedView.getFlintIndexName(fullMvName) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 2b0bb6c48..46cf7eebd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -98,6 +98,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A Seq.empty } + override def visitVacuumSkippingIndexStatement( + ctx: VacuumSkippingIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = getSkippingIndexName(flint, ctx.tableName) + flint.vacuumIndex(indexName) + Seq.empty + } + } + private def getSkippingIndexName(flint: FlintSpark, tableNameCtx: RuleNode): String = FlintSparkSkippingIndex.getSkippingIndexName(getFullTableName(flint, tableNameCtx)) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 56227533a..643a35516 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -119,6 +119,24 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latestLogEntry(testLatestId) should contain("state" -> "deleted") } + test("vacuum index") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + deleteLogically(testLatestId) + flint.vacuumIndex(testFlintIndex) + + // Both index data and metadata log should be vacuumed + openSearchClient + .indices() + .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false + openSearchClient.exists( + new GetRequest(testMetaLogIndex, testLatestId), + RequestOptions.DEFAULT) shouldBe false + } + test("should recreate index if logical deleted") { flint .skippingIndex()