From f400d0330898bb58ca7b3ba1abfcc43c4c8afc65 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 29 Nov 2023 14:30:59 -0800 Subject: [PATCH 01/12] Add IT to reproduce the bug first Signed-off-by: Chen Dai --- .../AsyncQueryExecutorServiceSpec.java | 6 +- .../InteractiveQueryNullResultFixTest.java | 89 +++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 663e5db852..aa65efa87b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -192,7 +192,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( StateStore stateStore = new StateStore(client, clusterService); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); - JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); + JobExecutionResponseReader jobExecutionResponseReader = createJobExecutionResponseReader(); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( emrServerlessClient, @@ -210,6 +210,10 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( this::sparkExecutionEngineConfig); } + protected JobExecutionResponseReader createJobExecutionResponseReader() { + return new JobExecutionResponseReader(client); + } + public static class LocalEMRSClient implements EMRServerlessClient { private int startJobRunCalled = 0; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java new file mode 100644 index 0000000000..57d845bffd --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; + +import java.util.HashMap; +import java.util.Map; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +/** IT for bug fix in https://github.com/opensearch-project/sql/issues/2436. */ +public class InteractiveQueryNullResultFixTest extends AsyncQueryExecutorServiceSpec { + + @Test + public void queryResult() { + LocalEMRSClient emrClient = new LocalEMRSClient(); + AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(emrClient); + + CreateAsyncQueryResponse response = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); + + AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); + + System.out.println(results); + } + + @Override + protected JobExecutionResponseReader createJobExecutionResponseReader() { + // Reproduce the bug with the following steps executed by intercepting response reader + return new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + // 1) PPL plugin searches query_execution_result and return empty + JSONObject result = super.getResultWithQueryId(queryId, resultIndex); + + // 2) EMR-S bulk writes query_execution_result with refresh = wait_for + Map document = new HashMap<>(); + document.put("result", new String[] {"{'1':1}"}); + document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + + String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + try { + IndexRequest request = + new IndexRequest() + .index(resultIndexName) + .setRefreshPolicy(WAIT_UNTIL) + .source(document); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + + // 3) EMR-S updates query_execution_request with state=success + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); + + // 4) PPL plugin reads query_execution_request and return state=success with null result + return result; + } + }; + } +} From 7d11c1282b296bc5fc84c3908445b8377a391ec6 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 29 Nov 2023 15:29:31 -0800 Subject: [PATCH 02/12] Modify IT as temporary reproduce commit Signed-off-by: Chen Dai --- .../InteractiveQueryNullResultFixTest.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java index 57d845bffd..138fed5f51 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java @@ -28,17 +28,15 @@ public class InteractiveQueryNullResultFixTest extends AsyncQueryExecutorServiceSpec { @Test - public void queryResult() { - LocalEMRSClient emrClient = new LocalEMRSClient(); - AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(emrClient); - + public void reproduceSuccessStatementWithNullResult() { + AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(new LocalEMRSClient()); CreateAsyncQueryResponse response = queryService.createAsyncQuery( new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); - AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); - System.out.println(results); + assertEquals(StatementState.SUCCESS.getState(), results.getStatus()); + assertNull(results.getResults()); } @Override @@ -51,27 +49,13 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { JSONObject result = super.getResultWithQueryId(queryId, resultIndex); // 2) EMR-S bulk writes query_execution_result with refresh = wait_for - Map document = new HashMap<>(); - document.put("result", new String[] {"{'1':1}"}); - document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); - document.put("jobRunId", "XXX"); - document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); - document.put("status", "SUCCESS"); - document.put("error", ""); - document.put("queryId", queryId); - document.put("queryText", "SELECT 1"); - document.put("sessionId", "ZZZ"); - document.put("updateTime", 1699124602715L); - document.put("queryRunTime", 123); - String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; try { IndexRequest request = new IndexRequest() .index(resultIndexName) .setRefreshPolicy(WAIT_UNTIL) - .source(document); + .source(createResultDoc(queryId)); client.index(request).get(); } catch (Exception e) { Assert.fail("Failed to write result doc: " + e.getMessage()); @@ -86,4 +70,21 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { } }; } + + private Map createResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", new String[] {"{'1':1}"}); + document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } } From 7eb0a05b1120273c13d316276672a6b9eb8786f1 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 11:32:01 -0800 Subject: [PATCH 03/12] Fix issue by preferred option and modify IT Signed-off-by: Chen Dai --- .../spark/dispatcher/AsyncQueryHandler.java | 13 +- .../AsyncQueryExecutorServiceSpec.java | 13 +- .../InteractiveQueryGetResultTest.java | 243 ++++++++++++++++++ .../InteractiveQueryNullResultFixTest.java | 90 ------- 4 files changed, 263 insertions(+), 96 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java index b3d2cdd289..d61ac17aa3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/AsyncQueryHandler.java @@ -15,6 +15,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; +import org.opensearch.sql.spark.execution.statement.StatementState; /** Process async query request. */ public abstract class AsyncQueryHandler { @@ -33,10 +34,20 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) result.put(ERROR_FIELD, error); return result; } else { - return getResponseFromExecutor(asyncQueryJobMetadata); + JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata); + + // Consider statement still running if state is success but query result unavailable + if (isSuccessState(statement)) { + statement.put(STATUS_FIELD, StatementState.RUNNING.getState()); + } + return statement; } } + private boolean isSuccessState(JSONObject statement) { + return StatementState.SUCCESS.getState().equalsIgnoreCase(statement.optString(STATUS_FIELD)); + } + protected abstract JSONObject getResponseFromResultIndex( AsyncQueryJobMetadata asyncQueryJobMetadata); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index aa65efa87b..be49a559a2 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -189,10 +189,17 @@ private DataSourceServiceImpl createDataSourceService() { protected AsyncQueryExecutorService createAsyncQueryExecutorService( EMRServerlessClient emrServerlessClient) { + return createAsyncQueryExecutorService( + emrServerlessClient, new JobExecutionResponseReader(client)); + } + + /** Pass a custom response reader which can mock interaction between PPL plugin and EMR-S job. */ + protected AsyncQueryExecutorService createAsyncQueryExecutorService( + EMRServerlessClient emrServerlessClient, + JobExecutionResponseReader jobExecutionResponseReader) { StateStore stateStore = new StateStore(client, clusterService); AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(stateStore); - JobExecutionResponseReader jobExecutionResponseReader = createJobExecutionResponseReader(); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( emrServerlessClient, @@ -210,10 +217,6 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( this::sparkExecutionEngineConfig); } - protected JobExecutionResponseReader createJobExecutionResponseReader() { - return new JobExecutionResponseReader(client); - } - public static class LocalEMRSClient implements EMRServerlessClient { private int startJobRunCalled = 0; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java new file mode 100644 index 0000000000..f8ecc2bc12 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java @@ -0,0 +1,243 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; +import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; +import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class InteractiveQueryGetResultTest extends AsyncQueryExecutorServiceSpec { + + @Test + public void testSuccessfulStatementWithNullResult() { + /* + AsyncQueryExecutorService queryService = + createAsyncQueryExecutorService(new LocalEMRSClient(), createJobExecutionResponseReader()); + CreateAsyncQueryResponse response = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); + AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); + + assertEquals(StatementState.RUNNING.getState(), results.getStatus()); + // assertEquals(List.of(integerValue(1)), results.getResults()); + */ + + createAsyncQuery("SELECT 1") + /* + .withInteraction( + new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + // 1) PPL plugin searches query_execution_result and return empty + JSONObject result = super.getResultWithQueryId(queryId, resultIndex); + + // 2) EMR-S bulk writes query_execution_result with refresh = wait_for + String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + try { + IndexRequest request = + new IndexRequest() + .index(resultIndexName) + .setRefreshPolicy(WAIT_UNTIL) + .source(createResultDoc(queryId)); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + + // 3) EMR-S updates query_execution_request with state=success + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); + + // 4) PPL plugin later reads query_execution_request and return state=success with + // null + // result + return result; + } + }) + */ + .withInteraction( + new PluginAndEmrJobInteraction() { + @Override + JSONObject interact(String queryId, String resultIndex) { + JSONObject result = searchQueryResult(queryId, resultIndex); + writeResultDoc(queryId, resultIndex); + updateStatementState(queryId, StatementState.SUCCESS); + return result; + } + }) + .assertQueryResults("running", null) + .withInteraction( + new PluginAndEmrJobInteraction() { + @Override + JSONObject interact(String queryId, String resultIndex) { + return searchQueryResult(queryId, resultIndex); + } + }) + .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); + } + + @Ignore + public void testSuccessfulStatementWithResult() { + AsyncQueryExecutorService queryService = + createAsyncQueryExecutorService(new LocalEMRSClient(), createJobExecutionResponseReader()); + CreateAsyncQueryResponse response = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); + AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); + + assertEquals(StatementState.RUNNING.getState(), results.getStatus()); + // assertEquals(List.of(integerValue(1)), results.getResults()); + } + + private JobExecutionResponseReader createJobExecutionResponseReader() { + // Reproduce the bug with the following steps executed by intercepting response reader + return new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + // 1) PPL plugin searches query_execution_result and return empty + JSONObject result = super.getResultWithQueryId(queryId, resultIndex); + + // 2) EMR-S bulk writes query_execution_result with refresh = wait_for + String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + try { + IndexRequest request = + new IndexRequest() + .index(resultIndexName) + .setRefreshPolicy(WAIT_UNTIL) + .source(createResultDoc(queryId)); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + + // 3) EMR-S updates query_execution_request with state=success + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); + + // 4) PPL plugin later reads query_execution_request and return state=success with null + // result + return result; + } + }; + } + + private AssertionHelper createAsyncQuery(String query) { + return new AssertionHelper(query); + } + + private class AssertionHelper { + private JobExecutionResponseReader responseReader; + + private final AsyncQueryExecutorService queryService; + private final CreateAsyncQueryResponse createQueryResponse; + + private PluginAndEmrJobInteraction interaction; + + AssertionHelper(String query) { + this.queryService = + createAsyncQueryExecutorService( + new LocalEMRSClient(), + new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + // Get results with current interaction + return interaction.interact(queryId, resultIndex); + } + }); + this.createQueryResponse = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + } + + /* + AssertionHelper withInteraction(JobExecutionResponseReader responseReader) { + this.responseReader = responseReader; + return this; + } + */ + + AssertionHelper withInteraction(PluginAndEmrJobInteraction interaction) { + this.interaction = interaction; + return this; + } + + AssertionHelper assertQueryResults(String status, List data) { + AsyncQueryExecutionResponse results = + queryService.getAsyncQueryResults(createQueryResponse.getQueryId()); + assertEquals(status, results.getStatus()); + assertEquals(data, results.getResults()); + return this; + } + } + + private abstract class PluginAndEmrJobInteraction { + + abstract JSONObject interact(String queryId, String resultIndex); + + JSONObject searchQueryResult(String queryId, String resultIndex) { + return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); + } + + void writeResultDoc(String queryId, String resultIndex) { + // 2) EMR-S bulk writes query_execution_result with refresh = wait_for + String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + try { + IndexRequest request = + new IndexRequest() + .index(resultIndexName) + .setRefreshPolicy(WAIT_UNTIL) + .source(createResultDoc(queryId)); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + } + + void updateStatementState(String queryId, StatementState newState) { + // 3) EMR-S updates query_execution_request with state=success + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); + } + } + + private Map createResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", new String[] {"{'1':1}"}); + document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java deleted file mode 100644 index 138fed5f51..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryNullResultFixTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.asyncquery; - -import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; - -import java.util.HashMap; -import java.util.Map; -import org.json.JSONObject; -import org.junit.Assert; -import org.junit.Test; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.execution.statement.StatementModel; -import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; -import org.opensearch.sql.spark.rest.model.LangType; - -/** IT for bug fix in https://github.com/opensearch-project/sql/issues/2436. */ -public class InteractiveQueryNullResultFixTest extends AsyncQueryExecutorServiceSpec { - - @Test - public void reproduceSuccessStatementWithNullResult() { - AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(new LocalEMRSClient()); - CreateAsyncQueryResponse response = - queryService.createAsyncQuery( - new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); - AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); - - assertEquals(StatementState.SUCCESS.getState(), results.getStatus()); - assertNull(results.getResults()); - } - - @Override - protected JobExecutionResponseReader createJobExecutionResponseReader() { - // Reproduce the bug with the following steps executed by intercepting response reader - return new JobExecutionResponseReader(client) { - @Override - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - // 1) PPL plugin searches query_execution_result and return empty - JSONObject result = super.getResultWithQueryId(queryId, resultIndex); - - // 2) EMR-S bulk writes query_execution_result with refresh = wait_for - String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - try { - IndexRequest request = - new IndexRequest() - .index(resultIndexName) - .setRefreshPolicy(WAIT_UNTIL) - .source(createResultDoc(queryId)); - client.index(request).get(); - } catch (Exception e) { - Assert.fail("Failed to write result doc: " + e.getMessage()); - } - - // 3) EMR-S updates query_execution_request with state=success - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); - - // 4) PPL plugin reads query_execution_request and return state=success with null result - return result; - } - }; - } - - private Map createResultDoc(String queryId) { - Map document = new HashMap<>(); - document.put("result", new String[] {"{'1':1}"}); - document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); - document.put("jobRunId", "XXX"); - document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); - document.put("status", "SUCCESS"); - document.put("error", ""); - document.put("queryId", queryId); - document.put("queryText", "SELECT 1"); - document.put("sessionId", "ZZZ"); - document.put("updateTime", 1699124602715L); - document.put("queryRunTime", 123); - return document; - } -} From 5a966d340c34faf12d6cf9ce1c2f7fc29bbce502 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 11:50:12 -0800 Subject: [PATCH 04/12] Refactor IT with fluent assertion Signed-off-by: Chen Dai --- .../AsyncQueryGetResultSpecTest.java | 147 +++++++++++ .../InteractiveQueryGetResultTest.java | 243 ------------------ 2 files changed, 147 insertions(+), 243 deletions(-) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java new file mode 100644 index 0000000000..697edad159 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -0,0 +1,147 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; +import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { + + @Test + public void testInteractiveQueryGetResult() { + createAsyncQuery("SELECT 1") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(); + interaction.emrJobUpdateStatementState(StatementState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withInteraction(InteractionStep::pluginSearchQueryResult) + .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); + } + + private AssertionHelper createAsyncQuery(String query) { + return new AssertionHelper(query); + } + + private class AssertionHelper { + private final AsyncQueryExecutorService queryService; + private final CreateAsyncQueryResponse createQueryResponse; + private Interaction interaction; + + AssertionHelper(String query) { + this.queryService = + createAsyncQueryExecutorService( + new LocalEMRSClient(), + new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultWithQueryId(String queryId, String resultIndex) { + // Get results with extra steps defined in current interaction + return interaction.interact(new InteractionStep(queryId, resultIndex)); + } + }); + this.createQueryResponse = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + } + + AssertionHelper withInteraction(Interaction interaction) { + this.interaction = interaction; + return this; + } + + AssertionHelper assertQueryResults(String status, List data) { + AsyncQueryExecutionResponse results = + queryService.getAsyncQueryResults(createQueryResponse.getQueryId()); + assertEquals(status, results.getStatus()); + assertEquals(data, results.getResults()); + return this; + } + } + + /** Define an interaction between PPL plugin and EMR-S job. */ + private interface Interaction { + + JSONObject interact(InteractionStep interaction); + } + + /** + * Each method in this class is one step that can happen in an interaction. These methods are + * called in any order to simulate concurrent scenario. + */ + private class InteractionStep { + private final String queryId; + private final String resultIndex; + + private InteractionStep(String queryId, String resultIndex) { + this.queryId = queryId; + this.resultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + } + + /** Simulate PPL plugin search query_execution_result */ + JSONObject pluginSearchQueryResult() { + return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); + } + + /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ + void emrJobWriteResultDoc() { + try { + IndexRequest request = + new IndexRequest() + .index(resultIndex) + .setRefreshPolicy(WAIT_UNTIL) + .source(createResultDoc(queryId)); + client.index(request).get(); + } catch (Exception e) { + Assert.fail("Failed to write result doc: " + e.getMessage()); + } + } + + /** Simulate EMR-S updates query_execution_request with state */ + void emrJobUpdateStatementState(StatementState newState) { + StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); + StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); + } + } + + private Map createResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", new String[] {"{'1':1}"}); + document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java deleted file mode 100644 index f8ecc2bc12..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/InteractiveQueryGetResultTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.asyncquery; - -import static org.opensearch.action.support.WriteRequest.RefreshPolicy.WAIT_UNTIL; -import static org.opensearch.sql.data.model.ExprValueUtils.integerValue; -import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.json.JSONObject; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.sql.data.model.ExprValue; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.execution.statement.StatementModel; -import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.response.JobExecutionResponseReader; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; -import org.opensearch.sql.spark.rest.model.LangType; - -public class InteractiveQueryGetResultTest extends AsyncQueryExecutorServiceSpec { - - @Test - public void testSuccessfulStatementWithNullResult() { - /* - AsyncQueryExecutorService queryService = - createAsyncQueryExecutorService(new LocalEMRSClient(), createJobExecutionResponseReader()); - CreateAsyncQueryResponse response = - queryService.createAsyncQuery( - new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); - AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); - - assertEquals(StatementState.RUNNING.getState(), results.getStatus()); - // assertEquals(List.of(integerValue(1)), results.getResults()); - */ - - createAsyncQuery("SELECT 1") - /* - .withInteraction( - new JobExecutionResponseReader(client) { - @Override - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - // 1) PPL plugin searches query_execution_result and return empty - JSONObject result = super.getResultWithQueryId(queryId, resultIndex); - - // 2) EMR-S bulk writes query_execution_result with refresh = wait_for - String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - try { - IndexRequest request = - new IndexRequest() - .index(resultIndexName) - .setRefreshPolicy(WAIT_UNTIL) - .source(createResultDoc(queryId)); - client.index(request).get(); - } catch (Exception e) { - Assert.fail("Failed to write result doc: " + e.getMessage()); - } - - // 3) EMR-S updates query_execution_request with state=success - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); - - // 4) PPL plugin later reads query_execution_request and return state=success with - // null - // result - return result; - } - }) - */ - .withInteraction( - new PluginAndEmrJobInteraction() { - @Override - JSONObject interact(String queryId, String resultIndex) { - JSONObject result = searchQueryResult(queryId, resultIndex); - writeResultDoc(queryId, resultIndex); - updateStatementState(queryId, StatementState.SUCCESS); - return result; - } - }) - .assertQueryResults("running", null) - .withInteraction( - new PluginAndEmrJobInteraction() { - @Override - JSONObject interact(String queryId, String resultIndex) { - return searchQueryResult(queryId, resultIndex); - } - }) - .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); - } - - @Ignore - public void testSuccessfulStatementWithResult() { - AsyncQueryExecutorService queryService = - createAsyncQueryExecutorService(new LocalEMRSClient(), createJobExecutionResponseReader()); - CreateAsyncQueryResponse response = - queryService.createAsyncQuery( - new CreateAsyncQueryRequest("SELECT 1", DATASOURCE, LangType.SQL, null)); - AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); - - assertEquals(StatementState.RUNNING.getState(), results.getStatus()); - // assertEquals(List.of(integerValue(1)), results.getResults()); - } - - private JobExecutionResponseReader createJobExecutionResponseReader() { - // Reproduce the bug with the following steps executed by intercepting response reader - return new JobExecutionResponseReader(client) { - @Override - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - // 1) PPL plugin searches query_execution_result and return empty - JSONObject result = super.getResultWithQueryId(queryId, resultIndex); - - // 2) EMR-S bulk writes query_execution_result with refresh = wait_for - String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - try { - IndexRequest request = - new IndexRequest() - .index(resultIndexName) - .setRefreshPolicy(WAIT_UNTIL) - .source(createResultDoc(queryId)); - client.index(request).get(); - } catch (Exception e) { - Assert.fail("Failed to write result doc: " + e.getMessage()); - } - - // 3) EMR-S updates query_execution_request with state=success - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - updateStatementState(stateStore, DATASOURCE).apply(stmt, StatementState.SUCCESS); - - // 4) PPL plugin later reads query_execution_request and return state=success with null - // result - return result; - } - }; - } - - private AssertionHelper createAsyncQuery(String query) { - return new AssertionHelper(query); - } - - private class AssertionHelper { - private JobExecutionResponseReader responseReader; - - private final AsyncQueryExecutorService queryService; - private final CreateAsyncQueryResponse createQueryResponse; - - private PluginAndEmrJobInteraction interaction; - - AssertionHelper(String query) { - this.queryService = - createAsyncQueryExecutorService( - new LocalEMRSClient(), - new JobExecutionResponseReader(client) { - @Override - public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - // Get results with current interaction - return interaction.interact(queryId, resultIndex); - } - }); - this.createQueryResponse = - queryService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); - } - - /* - AssertionHelper withInteraction(JobExecutionResponseReader responseReader) { - this.responseReader = responseReader; - return this; - } - */ - - AssertionHelper withInteraction(PluginAndEmrJobInteraction interaction) { - this.interaction = interaction; - return this; - } - - AssertionHelper assertQueryResults(String status, List data) { - AsyncQueryExecutionResponse results = - queryService.getAsyncQueryResults(createQueryResponse.getQueryId()); - assertEquals(status, results.getStatus()); - assertEquals(data, results.getResults()); - return this; - } - } - - private abstract class PluginAndEmrJobInteraction { - - abstract JSONObject interact(String queryId, String resultIndex); - - JSONObject searchQueryResult(String queryId, String resultIndex) { - return new JobExecutionResponseReader(client).getResultWithQueryId(queryId, resultIndex); - } - - void writeResultDoc(String queryId, String resultIndex) { - // 2) EMR-S bulk writes query_execution_result with refresh = wait_for - String resultIndexName = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; - try { - IndexRequest request = - new IndexRequest() - .index(resultIndexName) - .setRefreshPolicy(WAIT_UNTIL) - .source(createResultDoc(queryId)); - client.index(request).get(); - } catch (Exception e) { - Assert.fail("Failed to write result doc: " + e.getMessage()); - } - } - - void updateStatementState(String queryId, StatementState newState) { - // 3) EMR-S updates query_execution_request with state=success - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); - } - } - - private Map createResultDoc(String queryId) { - Map document = new HashMap<>(); - document.put("result", new String[] {"{'1':1}"}); - document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); - document.put("jobRunId", "XXX"); - document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); - document.put("status", "SUCCESS"); - document.put("error", ""); - document.put("queryId", queryId); - document.put("queryText", "SELECT 1"); - document.put("sessionId", "ZZZ"); - document.put("updateTime", 1699124602715L); - document.put("queryRunTime", 123); - return document; - } -} From 51cee706f9f2e9de088847d17de3ecdfa53e7c20 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 13:58:22 -0800 Subject: [PATCH 05/12] Add IT for batch query handler Signed-off-by: Chen Dai --- .../AsyncQueryGetResultSpecTest.java | 65 +++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 697edad159..f7c6fc6d05 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -10,6 +10,8 @@ import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRunState; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +34,8 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Test public void testInteractiveQueryGetResult() { createAsyncQuery("SELECT 1") + .withoutInteraction() + .assertQueryResults("waiting", null) .withInteraction( interaction -> { JSONObject result = interaction.pluginSearchQueryResult(); @@ -40,7 +44,22 @@ public void testInteractiveQueryGetResult() { return result; }) .assertQueryResults("running", null) - .withInteraction(InteractionStep::pluginSearchQueryResult) + .withoutInteraction() + .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); + } + + @Test + public void testBatchQueryGetResult() { + createAsyncQuery("REFRESH SKIPPING INDEX ON test") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withoutInteraction() .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); } @@ -54,14 +73,24 @@ private class AssertionHelper { private Interaction interaction; AssertionHelper(String query) { + CustomEMRSClient emrClient = new CustomEMRSClient(); this.queryService = createAsyncQueryExecutorService( - new LocalEMRSClient(), + emrClient, + /* + * Custom reader that intercepts get results call and inject extra steps defined in + * current interaction. Intercept both get methods for different query handler which + * will only call either of them. + */ new JobExecutionResponseReader(client) { + @Override + public JSONObject getResultFromOpensearchIndex(String jobId, String resultIndex) { + return interaction.interact(new InteractionStep(emrClient, jobId, resultIndex)); + } + @Override public JSONObject getResultWithQueryId(String queryId, String resultIndex) { - // Get results with extra steps defined in current interaction - return interaction.interact(new InteractionStep(queryId, resultIndex)); + return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); } }); this.createQueryResponse = @@ -69,6 +98,11 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); } + AssertionHelper withoutInteraction() { + // No interaction with EMR-S job. Plugin searches query result only. + return withInteraction(InteractionStep::pluginSearchQueryResult); + } + AssertionHelper withInteraction(Interaction interaction) { this.interaction = interaction; return this; @@ -83,6 +117,21 @@ AssertionHelper assertQueryResults(String status, List data) { } } + private class CustomEMRSClient extends LocalEMRSClient { + private String jobState; + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + GetJobRunResult result = super.getJobRunResult(applicationId, jobId); + result.getJobRun().setState(jobState); + return result; + } + + void setJobState(String jobState) { + this.jobState = jobState; + } + } + /** Define an interaction between PPL plugin and EMR-S job. */ private interface Interaction { @@ -94,10 +143,12 @@ private interface Interaction { * called in any order to simulate concurrent scenario. */ private class InteractionStep { + private final CustomEMRSClient emrClient; private final String queryId; private final String resultIndex; - private InteractionStep(String queryId, String resultIndex) { + private InteractionStep(CustomEMRSClient emrClient, String queryId, String resultIndex) { + this.emrClient = emrClient; this.queryId = queryId; this.resultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; } @@ -126,6 +177,10 @@ void emrJobUpdateStatementState(StatementState newState) { StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); } + + void emrJobUpdateJobState(JobRunState jobState) { + emrClient.setJobState(jobState.toString()); + } } private Map createResultDoc(String queryId) { From f2de7b6f8e7e3907db1d788af25014d7add93b62 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 14:54:41 -0800 Subject: [PATCH 06/12] Add IT for streaming query handler Signed-off-by: Chen Dai --- .../AsyncQueryExecutorServiceSpec.java | 118 +++++++++++++++++- .../AsyncQueryGetResultSpecTest.java | 110 +++++++++++----- .../spark/asyncquery/IndexQuerySpecTest.java | 111 ---------------- 3 files changed, 196 insertions(+), 143 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index be49a559a2..ef96a0d6da 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -27,11 +27,13 @@ import java.util.List; import java.util.Optional; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.node.NodeClient; @@ -41,6 +43,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -63,6 +66,9 @@ import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.storage.DataSourceFactory; @@ -222,6 +228,7 @@ public static class LocalEMRSClient implements EMRServerlessClient { private int startJobRunCalled = 0; private int cancelJobRunCalled = 0; private int getJobResult = 0; + private String jobState = "RUNNING"; @Getter private StartJobRequest jobRequest; @@ -236,7 +243,7 @@ public String startJobRun(StartJobRequest startJobRequest) { public GetJobRunResult getJobRunResult(String applicationId, String jobId) { getJobResult++; JobRun jobRun = new JobRun(); - jobRun.setState("RUNNING"); + jobRun.setState(jobState); return new GetJobRunResult().withJobRun(jobRun); } @@ -257,6 +264,10 @@ public void cancelJobRunCalled(int expectedTimes) { public void getJobRunResultCalled(int expectedTimes) { assertEquals(expectedTimes, getJobResult); } + + public void setJobState(String jobState) { + this.jobState = jobState; + } } public SparkExecutionEngineConfig sparkExecutionEngineConfig() { @@ -313,6 +324,111 @@ public String loadResultIndexMappings() { return Resources.toString(url, Charsets.UTF_8); } + public class MockFlintSparkJob { + + private FlintIndexStateModel stateModel; + + public MockFlintSparkJob(String latestId) { + assertNotNull(latestId); + stateModel = + new FlintIndexStateModel( + FlintIndexState.EMPTY, + "mockAppId", + "mockJobId", + latestId, + DATASOURCE, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); + } + + public void refreshing() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.REFRESHING); + } + + public void cancelling() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.CANCELLING); + } + + public void active() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.ACTIVE); + } + + public void deleting() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETING); + } + + public void deleted() { + stateModel = + StateStore.updateFlintIndexState(stateStore, DATASOURCE) + .apply(stateModel, FlintIndexState.DELETED); + } + + void assertState(FlintIndexState expected) { + Optional stateModelOpt = + StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); + assertTrue((stateModelOpt.isPresent())); + assertEquals(expected, stateModelOpt.get().getIndexState()); + } + } + + @RequiredArgsConstructor + public class FlintDatasetMock { + final String query; + final FlintIndexType indexType; + final String indexName; + boolean isLegacy = false; + String latestId; + + FlintDatasetMock isLegacy(boolean isLegacy) { + this.isLegacy = isLegacy; + return this; + } + + FlintDatasetMock latestId(String latestId) { + this.latestId = latestId; + return this; + } + + public void createIndex() { + String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; + switch (indexType) { + case SKIPPING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); + break; + case COVERING: + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); + break; + case MATERIALIZED_VIEW: + createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); + break; + } + } + + @SneakyThrows + public void deleteIndex() { + client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + } + } + + @SneakyThrows + public static String loadMappings(String path) { + URL url = Resources.getResource(path); + return Resources.toString(url, Charsets.UTF_8); + } + public void createIndexWithMappings(String indexName, String metadata) { CreateIndexRequest request = new CreateIndexRequest(indexName); request.mapping(metadata, XContentType.JSON); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index f7c6fc6d05..ea4b81149a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -10,13 +10,14 @@ import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRunState; +import com.google.common.collect.ImmutableList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.json.JSONObject; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.opensearch.action.index.IndexRequest; import org.opensearch.sql.data.model.ExprValue; @@ -24,6 +25,7 @@ import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; @@ -31,6 +33,13 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { + private final FlintDatasetMock mockIndex = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default.http_logs", + FlintIndexType.SKIPPING, + "flint_mys3_default_http_logs_skipping_index") + .latestId("skippingindexid"); + @Test public void testInteractiveQueryGetResult() { createAsyncQuery("SELECT 1") @@ -39,13 +48,13 @@ public void testInteractiveQueryGetResult() { .withInteraction( interaction -> { JSONObject result = interaction.pluginSearchQueryResult(); - interaction.emrJobWriteResultDoc(); + interaction.emrJobWriteResultDoc(createResultDoc(interaction.queryId)); interaction.emrJobUpdateStatementState(StatementState.SUCCESS); return result; }) .assertQueryResults("running", null) .withoutInteraction() - .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); + .assertQueryResults("SUCCESS", ImmutableList.of(tupleValue(Map.of("1", 1)))); } @Test @@ -54,7 +63,47 @@ public void testBatchQueryGetResult() { .withInteraction( interaction -> { JSONObject result = interaction.pluginSearchQueryResult(); - interaction.emrJobWriteResultDoc(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withoutInteraction() + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Test + public void testStreamingQueryGetResult() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + new MockFlintSparkJob(mockIndex.latestId).refreshing(); + + createAsyncQuery( + "CREATE SKIPPING INDEX ON mys3.default.http_logs " + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withoutInteraction() + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Ignore + public void testDropIndexQueryGetResult() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + new MockFlintSparkJob(mockIndex.latestId).refreshing(); + + createAsyncQuery(mockIndex.query) + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); interaction.emrJobUpdateJobState(JobRunState.SUCCESS); return result; }) @@ -73,7 +122,7 @@ private class AssertionHelper { private Interaction interaction; AssertionHelper(String query) { - CustomEMRSClient emrClient = new CustomEMRSClient(); + LocalEMRSClient emrClient = new LocalEMRSClient(); this.queryService = createAsyncQueryExecutorService( emrClient, @@ -117,21 +166,6 @@ AssertionHelper assertQueryResults(String status, List data) { } } - private class CustomEMRSClient extends LocalEMRSClient { - private String jobState; - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - GetJobRunResult result = super.getJobRunResult(applicationId, jobId); - result.getJobRun().setState(jobState); - return result; - } - - void setJobState(String jobState) { - this.jobState = jobState; - } - } - /** Define an interaction between PPL plugin and EMR-S job. */ private interface Interaction { @@ -143,11 +177,11 @@ private interface Interaction { * called in any order to simulate concurrent scenario. */ private class InteractionStep { - private final CustomEMRSClient emrClient; - private final String queryId; - private final String resultIndex; + private final LocalEMRSClient emrClient; + final String queryId; + final String resultIndex; - private InteractionStep(CustomEMRSClient emrClient, String queryId, String resultIndex) { + private InteractionStep(LocalEMRSClient emrClient, String queryId, String resultIndex) { this.emrClient = emrClient; this.queryId = queryId; this.resultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; @@ -159,13 +193,10 @@ JSONObject pluginSearchQueryResult() { } /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ - void emrJobWriteResultDoc() { + void emrJobWriteResultDoc(Map resultDoc) { try { IndexRequest request = - new IndexRequest() - .index(resultIndex) - .setRefreshPolicy(WAIT_UNTIL) - .source(createResultDoc(queryId)); + new IndexRequest().index(resultIndex).setRefreshPolicy(WAIT_UNTIL).source(resultDoc); client.index(request).get(); } catch (Exception e) { Assert.fail("Failed to write result doc: " + e.getMessage()); @@ -183,10 +214,27 @@ void emrJobUpdateJobState(JobRunState jobState) { } } + private Map createEmptyResultDoc(String queryId) { + Map document = new HashMap<>(); + document.put("result", ImmutableList.of()); + document.put("schema", ImmutableList.of()); + document.put("jobRunId", "XXX"); + document.put("applicationId", "YYY"); + document.put("dataSourceName", DATASOURCE); + document.put("status", "SUCCESS"); + document.put("error", ""); + document.put("queryId", queryId); + document.put("queryText", "SELECT 1"); + document.put("sessionId", "ZZZ"); + document.put("updateTime", 1699124602715L); + document.put("queryRunTime", 123); + return document; + } + private Map createResultDoc(String queryId) { Map document = new HashMap<>(); - document.put("result", new String[] {"{'1':1}"}); - document.put("schema", new String[] {"{'column_name':'1','data_type':'integer'}"}); + document.put("result", ImmutableList.of("{'1':1}")); + document.put("schema", ImmutableList.of("{'column_name':'1','data_type':'integer'}")); document.put("jobRunId", "XXX"); document.put("applicationId", "YYY"); document.put("dataSourceName", DATASOURCE); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 45a83b3296..d6085277ff 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -8,16 +8,10 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; -import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; -import com.google.common.io.Resources; -import java.net.URL; import java.util.Optional; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import org.junit.Assert; import org.junit.Test; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -685,109 +679,4 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); assertNotNull(asyncQueryResponse.getSessionId()); } - - public class MockFlintSparkJob { - - private FlintIndexStateModel stateModel; - - public MockFlintSparkJob(String latestId) { - assertNotNull(latestId); - stateModel = - new FlintIndexStateModel( - FlintIndexState.EMPTY, - "mockAppId", - "mockJobId", - latestId, - DATASOURCE, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); - } - - public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.REFRESHING); - } - - public void cancelling() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.CANCELLING); - } - - public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.ACTIVE); - } - - public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETING); - } - - public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETED); - } - - void assertState(FlintIndexState expected) { - Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); - assertEquals(expected, stateModelOpt.get().getIndexState()); - } - } - - @RequiredArgsConstructor - public class FlintDatasetMock { - private final String query; - private final FlintIndexType indexType; - private final String indexName; - private boolean isLegacy = false; - private String latestId; - - FlintDatasetMock isLegacy(boolean isLegacy) { - this.isLegacy = isLegacy; - return this; - } - - FlintDatasetMock latestId(String latestId) { - this.latestId = latestId; - return this; - } - - public void createIndex() { - String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; - switch (indexType) { - case SKIPPING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_skipping_index.json")); - break; - case COVERING: - createIndexWithMappings( - indexName, loadMappings(pathPrefix + "/" + "flint_covering_index.json")); - break; - case MATERIALIZED_VIEW: - createIndexWithMappings(indexName, loadMappings(pathPrefix + "/" + "flint_mv.json")); - break; - } - } - - @SneakyThrows - public void deleteIndex() { - client().admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); - } - } - - @SneakyThrows - public static String loadMappings(String path) { - URL url = Resources.getResource(path); - return Resources.toString(url, Charsets.UTF_8); - } } From faba787fbc27bab0950d02959cfa10b70feb7548 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 15:26:28 -0800 Subject: [PATCH 07/12] Add more IT for normal case Signed-off-by: Chen Dai --- .../AsyncQueryGetResultSpecTest.java | 85 ++++++++++++++++--- 1 file changed, 73 insertions(+), 12 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index ea4b81149a..05f851585c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -17,6 +17,7 @@ import java.util.Map; import org.json.JSONObject; import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.opensearch.action.index.IndexRequest; @@ -33,6 +34,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { + /** Mock Flint index and index state */ private final FlintDatasetMock mockIndex = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", @@ -40,8 +42,29 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { "flint_mys3_default_http_logs_skipping_index") .latestId("skippingindexid"); + private MockFlintSparkJob mockIndexState; + + @Before + public void doSetUp() { + mockIndexState = new MockFlintSparkJob(mockIndex.latestId); + } + @Test public void testInteractiveQueryGetResult() { + createAsyncQuery("SELECT 1") + .withoutInteraction() + .assertQueryResults("waiting", null) + .withInteraction( + interaction -> { + interaction.emrJobWriteResultDoc(createResultDoc(interaction.queryId)); + interaction.emrJobUpdateStatementState(StatementState.SUCCESS); + return interaction.pluginSearchQueryResult(); + }) + .assertQueryResults("SUCCESS", ImmutableList.of(tupleValue(Map.of("1", 1)))); + } + + @Test + public void testInteractiveQueryGetResultWithSearchResultBeforeEmrJobUpdate() { createAsyncQuery("SELECT 1") .withoutInteraction() .assertQueryResults("waiting", null) @@ -62,25 +85,16 @@ public void testBatchQueryGetResult() { createAsyncQuery("REFRESH SKIPPING INDEX ON test") .withInteraction( interaction -> { - JSONObject result = interaction.pluginSearchQueryResult(); interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); interaction.emrJobUpdateJobState(JobRunState.SUCCESS); - return result; + return interaction.pluginSearchQueryResult(); }) - .assertQueryResults("running", null) - .withoutInteraction() .assertQueryResults("SUCCESS", ImmutableList.of()); } @Test - public void testStreamingQueryGetResult() { - // Create mock index with index state refreshing - mockIndex.createIndex(); - new MockFlintSparkJob(mockIndex.latestId).refreshing(); - - createAsyncQuery( - "CREATE SKIPPING INDEX ON mys3.default.http_logs " - + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + public void testBatchQueryGetResultWithSearchResultBeforeEmrJobUpdate() { + createAsyncQuery("REFRESH SKIPPING INDEX ON test") .withInteraction( interaction -> { JSONObject result = interaction.pluginSearchQueryResult(); @@ -93,6 +107,53 @@ public void testStreamingQueryGetResult() { .assertQueryResults("SUCCESS", ImmutableList.of()); } + @Test + public void testStreamingQueryGetResult() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + try { + createAsyncQuery( + "CREATE SKIPPING INDEX ON mys3.default.http_logs " + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + .withInteraction( + interaction -> { + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return interaction.pluginSearchQueryResult(); + }) + .assertQueryResults("SUCCESS", ImmutableList.of()); + } finally { + mockIndex.deleteIndex(); + mockIndexState.deleted(); + } + } + + @Test + public void testStreamingQueryGetResultWithSearchResultBeforeEmrJobUpdate() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + try { + createAsyncQuery( + "CREATE SKIPPING INDEX ON mys3.default.http_logs " + + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)") + .withInteraction( + interaction -> { + JSONObject result = interaction.pluginSearchQueryResult(); + interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); + interaction.emrJobUpdateJobState(JobRunState.SUCCESS); + return result; + }) + .assertQueryResults("running", null) + .withoutInteraction() + .assertQueryResults("SUCCESS", ImmutableList.of()); + } finally { + mockIndex.deleteIndex(); + mockIndexState.deleted(); + } + } + @Ignore public void testDropIndexQueryGetResult() { // Create mock index with index state refreshing From 55c370b3aa550f3eccacdc3aa741e9550921f5b5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 15:56:17 -0800 Subject: [PATCH 08/12] Add IT for drop index Signed-off-by: Chen Dai --- .../AsyncQueryGetResultSpecTest.java | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 05f851585c..3be29d041b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -18,7 +18,6 @@ import org.json.JSONObject; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.opensearch.action.index.IndexRequest; import org.opensearch.sql.data.model.ExprValue; @@ -154,23 +153,22 @@ public void testStreamingQueryGetResultWithSearchResultBeforeEmrJobUpdate() { } } - @Ignore + @Test public void testDropIndexQueryGetResult() { // Create mock index with index state refreshing mockIndex.createIndex(); - new MockFlintSparkJob(mockIndex.latestId).refreshing(); + mockIndexState.refreshing(); - createAsyncQuery(mockIndex.query) - .withInteraction( - interaction -> { - JSONObject result = interaction.pluginSearchQueryResult(); - interaction.emrJobWriteResultDoc(createEmptyResultDoc(interaction.queryId)); - interaction.emrJobUpdateJobState(JobRunState.SUCCESS); - return result; - }) - .assertQueryResults("running", null) - .withoutInteraction() - .assertQueryResults("SUCCESS", List.of(tupleValue(Map.of("1", 1)))); + LocalEMRSClient emrClient = new LocalEMRSClient(); + emrClient.setJobState("Cancelled"); + AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(emrClient); + CreateAsyncQueryResponse response = + queryService.createAsyncQuery( + new CreateAsyncQueryRequest(mockIndex.query, DATASOURCE, LangType.SQL, null)); + + AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", results.getStatus()); + assertNull(results.getError()); } private AssertionHelper createAsyncQuery(String query) { From 54aef87bbc6f09ff1a686705885d2e7674a1ca10 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 30 Nov 2023 17:21:16 -0800 Subject: [PATCH 09/12] Consider drop statement running if result doc unavailable Signed-off-by: Chen Dai --- .../sql/spark/dispatcher/IndexDMLHandler.java | 9 +++- .../AsyncQueryGetResultSpecTest.java | 41 ++++++++++++------- .../spark/asyncquery/IndexQuerySpecTest.java | 4 -- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 3ab5439ad5..a03cd64986 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -5,6 +5,8 @@ package org.opensearch.sql.spark.dispatcher; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; import com.amazonaws.services.emrserverless.model.JobRunState; @@ -24,6 +26,7 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; @@ -106,7 +109,11 @@ protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQuery @Override protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) { - throw new IllegalStateException("[BUG] can't fetch result of index DML query form server"); + // Consider statement still running if result doc created in submit() is not available yet + JSONObject result = new JSONObject(); + result.put(STATUS_FIELD, StatementState.RUNNING.getState()); + result.put(ERROR_FIELD, ""); + return result; } @Override diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 3be29d041b..683e443909 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -63,7 +63,7 @@ public void testInteractiveQueryGetResult() { } @Test - public void testInteractiveQueryGetResultWithSearchResultBeforeEmrJobUpdate() { + public void testInteractiveQueryGetResultWithConcurrentEmrJobUpdate() { createAsyncQuery("SELECT 1") .withoutInteraction() .assertQueryResults("waiting", null) @@ -92,7 +92,7 @@ public void testBatchQueryGetResult() { } @Test - public void testBatchQueryGetResultWithSearchResultBeforeEmrJobUpdate() { + public void testBatchQueryGetResultWithConcurrentEmrJobUpdate() { createAsyncQuery("REFRESH SKIPPING INDEX ON test") .withInteraction( interaction -> { @@ -129,7 +129,7 @@ public void testStreamingQueryGetResult() { } @Test - public void testStreamingQueryGetResultWithSearchResultBeforeEmrJobUpdate() { + public void testStreamingQueryGetResultWithConcurrentEmrJobUpdate() { // Create mock index with index state refreshing mockIndex.createIndex(); mockIndexState.refreshing(); @@ -161,18 +161,32 @@ public void testDropIndexQueryGetResult() { LocalEMRSClient emrClient = new LocalEMRSClient(); emrClient.setJobState("Cancelled"); - AsyncQueryExecutorService queryService = createAsyncQueryExecutorService(emrClient); - CreateAsyncQueryResponse response = - queryService.createAsyncQuery( - new CreateAsyncQueryRequest(mockIndex.query, DATASOURCE, LangType.SQL, null)); - - AsyncQueryExecutionResponse results = queryService.getAsyncQueryResults(response.getQueryId()); - assertEquals("SUCCESS", results.getStatus()); - assertNull(results.getError()); + createAsyncQuery(mockIndex.query, emrClient) + .withoutInteraction() + .assertQueryResults("SUCCESS", ImmutableList.of()); + } + + @Test + public void testDropIndexQueryGetResultWithResultDocRefreshDelay() { + // Create mock index with index state refreshing + mockIndex.createIndex(); + mockIndexState.refreshing(); + + LocalEMRSClient emrClient = new LocalEMRSClient(); + emrClient.setJobState("Cancelled"); + createAsyncQuery(mockIndex.query, emrClient) + .withInteraction(interaction -> new JSONObject()) // simulate result index refresh delay + .assertQueryResults("running", null) + .withoutInteraction() + .assertQueryResults("SUCCESS", ImmutableList.of()); } private AssertionHelper createAsyncQuery(String query) { - return new AssertionHelper(query); + return new AssertionHelper(query, new LocalEMRSClient()); + } + + private AssertionHelper createAsyncQuery(String query, LocalEMRSClient emrClient) { + return new AssertionHelper(query, emrClient); } private class AssertionHelper { @@ -180,8 +194,7 @@ private class AssertionHelper { private final CreateAsyncQueryResponse createQueryResponse; private Interaction interaction; - AssertionHelper(String query) { - LocalEMRSClient emrClient = new LocalEMRSClient(); + AssertionHelper(String query, LocalEMRSClient emrClient) { this.queryService = createAsyncQueryExecutorService( emrClient, diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index d6085277ff..49ac538e65 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -9,14 +9,10 @@ import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; import com.google.common.collect.ImmutableList; -import java.util.Optional; import org.junit.Assert; import org.junit.Test; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; From 3582e4f88fd06adc585e93fad59567cbfb80bbea Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 1 Dec 2023 08:16:08 -0800 Subject: [PATCH 10/12] Fix broken UT Signed-off-by: Chen Dai --- .../sql/spark/dispatcher/IndexDMLHandlerTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index 8419d50ae1..01c46c3c0b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -6,16 +6,19 @@ package org.opensearch.sql.spark.dispatcher; import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import org.json.JSONObject; import org.junit.jupiter.api.Test; class IndexDMLHandlerTest { @Test public void getResponseFromExecutor() { - assertThrows( - IllegalStateException.class, - () -> - new IndexDMLHandler(null, null, null, null, null, null, null) - .getResponseFromExecutor(null)); + JSONObject result = + new IndexDMLHandler(null, null, null, null, null, null, null).getResponseFromExecutor(null); + + assertEquals("running", result.getString(STATUS_FIELD)); + assertEquals("", result.getString(ERROR_FIELD)); } } From 0dc8b4d94ecaef6c5bb05259c85196359584440c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 1 Dec 2023 11:14:03 -0800 Subject: [PATCH 11/12] Address PR comments Signed-off-by: Chen Dai --- .../AsyncQueryGetResultSpecTest.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 683e443909..caafa96e8b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -51,7 +51,7 @@ public void doSetUp() { @Test public void testInteractiveQueryGetResult() { createAsyncQuery("SELECT 1") - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("waiting", null) .withInteraction( interaction -> { @@ -65,7 +65,7 @@ public void testInteractiveQueryGetResult() { @Test public void testInteractiveQueryGetResultWithConcurrentEmrJobUpdate() { createAsyncQuery("SELECT 1") - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("waiting", null) .withInteraction( interaction -> { @@ -75,7 +75,7 @@ public void testInteractiveQueryGetResultWithConcurrentEmrJobUpdate() { return result; }) .assertQueryResults("running", null) - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of(tupleValue(Map.of("1", 1)))); } @@ -102,7 +102,7 @@ public void testBatchQueryGetResultWithConcurrentEmrJobUpdate() { return result; }) .assertQueryResults("running", null) - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of()); } @@ -145,7 +145,7 @@ public void testStreamingQueryGetResultWithConcurrentEmrJobUpdate() { return result; }) .assertQueryResults("running", null) - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of()); } finally { mockIndex.deleteIndex(); @@ -162,7 +162,7 @@ public void testDropIndexQueryGetResult() { LocalEMRSClient emrClient = new LocalEMRSClient(); emrClient.setJobState("Cancelled"); createAsyncQuery(mockIndex.query, emrClient) - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of()); } @@ -177,7 +177,7 @@ public void testDropIndexQueryGetResultWithResultDocRefreshDelay() { createAsyncQuery(mockIndex.query, emrClient) .withInteraction(interaction -> new JSONObject()) // simulate result index refresh delay .assertQueryResults("running", null) - .withoutInteraction() + .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of()); } @@ -219,11 +219,6 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); } - AssertionHelper withoutInteraction() { - // No interaction with EMR-S job. Plugin searches query result only. - return withInteraction(InteractionStep::pluginSearchQueryResult); - } - AssertionHelper withInteraction(Interaction interaction) { this.interaction = interaction; return this; From 52499afeb7de50d3f85e4a589fdab4d823f49381 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 1 Dec 2023 11:26:37 -0800 Subject: [PATCH 12/12] Address PR comments Signed-off-by: Chen Dai --- .../spark/asyncquery/AsyncQueryExecutorServiceSpec.java | 7 ++++--- .../sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index ef96a0d6da..c7054dd200 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -15,6 +15,7 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.JobRunState; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -228,7 +229,7 @@ public static class LocalEMRSClient implements EMRServerlessClient { private int startJobRunCalled = 0; private int cancelJobRunCalled = 0; private int getJobResult = 0; - private String jobState = "RUNNING"; + private JobRunState jobState = JobRunState.RUNNING; @Getter private StartJobRequest jobRequest; @@ -243,7 +244,7 @@ public String startJobRun(StartJobRequest startJobRequest) { public GetJobRunResult getJobRunResult(String applicationId, String jobId) { getJobResult++; JobRun jobRun = new JobRun(); - jobRun.setState(jobState); + jobRun.setState(jobState.toString()); return new GetJobRunResult().withJobRun(jobRun); } @@ -265,7 +266,7 @@ public void getJobRunResultCalled(int expectedTimes) { assertEquals(expectedTimes, getJobResult); } - public void setJobState(String jobState) { + public void setJobState(JobRunState jobState) { this.jobState = jobState; } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index caafa96e8b..bba38693cd 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -160,7 +160,7 @@ public void testDropIndexQueryGetResult() { mockIndexState.refreshing(); LocalEMRSClient emrClient = new LocalEMRSClient(); - emrClient.setJobState("Cancelled"); + emrClient.setJobState(JobRunState.CANCELLED); createAsyncQuery(mockIndex.query, emrClient) .withInteraction(InteractionStep::pluginSearchQueryResult) .assertQueryResults("SUCCESS", ImmutableList.of()); @@ -173,7 +173,7 @@ public void testDropIndexQueryGetResultWithResultDocRefreshDelay() { mockIndexState.refreshing(); LocalEMRSClient emrClient = new LocalEMRSClient(); - emrClient.setJobState("Cancelled"); + emrClient.setJobState(JobRunState.CANCELLED); createAsyncQuery(mockIndex.query, emrClient) .withInteraction(interaction -> new JSONObject()) // simulate result index refresh delay .assertQueryResults("running", null) @@ -277,7 +277,7 @@ void emrJobUpdateStatementState(StatementState newState) { } void emrJobUpdateJobState(JobRunState jobState) { - emrClient.setJobState(jobState.toString()); + emrClient.setJobState(jobState); } }