From 0699d58ca21758a2c69991b72bf5bbf915a44ebc Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 25 Oct 2023 13:39:46 -0700 Subject: [PATCH 1/4] Create new session if session is invalid Signed-off-by: Peng Huo --- .../dispatcher/SparkQueryDispatcher.java | 5 ++--- ...AsyncQueryExecutorServiceImplSpecTest.java | 19 ++++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 8feeddcafc..5e80259e09 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -219,10 +219,9 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ // get session from request SessionId sessionId = new SessionId(dispatchQueryRequest.getSessionId()); Optional createdSession = sessionManager.getSession(sessionId); - if (createdSession.isEmpty()) { - throw new IllegalArgumentException("no session found. " + sessionId); + if (createdSession.isPresent()) { + session = createdSession.get(); } - session = createdSession.get(); } if (session == null || !session.isReady()) { // create session if not exist or session dead/fail diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 6bc40c009b..452528b319 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -460,7 +460,7 @@ public void recreateSessionIfNotReady() { } @Test - public void submitQueryInInvalidSessionThrowException() { + public void submitQueryInInvalidSessionWillCreateNewSession() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = createAsyncQueryExecutorService(emrsClient); @@ -468,16 +468,13 @@ public void submitQueryInInvalidSessionThrowException() { // enable session enableSession(true); - // 1. create async query. - SessionId sessionId = SessionId.newSessionId(DATASOURCE); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, sessionId.getSessionId()))); - assertEquals("no session found. " + sessionId, exception.getMessage()); + // 1. create async query with invalid sessionId + SessionId invalidSessionId = SessionId.newSessionId(DATASOURCE); + CreateAsyncQueryResponse asyncQuery = asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); + assertNotNull(asyncQuery.getSessionId()); + assertNotEquals(invalidSessionId.getSessionId(), asyncQuery.getSessionId()); } private DataSourceServiceImpl createDataSourceService() { From d2579a040266b230f13f97c97a5855f529cadedc Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 25 Oct 2023 13:41:01 -0700 Subject: [PATCH 2/4] fix code style Signed-off-by: Peng Huo --- .../asyncquery/AsyncQueryExecutorServiceImplSpecTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 452528b319..54bd253661 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -470,9 +470,10 @@ public void submitQueryInInvalidSessionWillCreateNewSession() { // 1. create async query with invalid sessionId SessionId invalidSessionId = SessionId.newSessionId(DATASOURCE); - CreateAsyncQueryResponse asyncQuery = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); + CreateAsyncQueryResponse asyncQuery = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); assertNotNull(asyncQuery.getSessionId()); assertNotEquals(invalidSessionId.getSessionId(), asyncQuery.getSessionId()); } From 50980fa593cd6effb898d88f958c6195f57fbbcb Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 25 Oct 2023 13:54:07 -0700 Subject: [PATCH 3/4] fix UT Signed-off-by: Peng Huo --- .../dispatcher/SparkQueryDispatcherTest.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 743274d46c..95b6033d12 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -327,26 +327,6 @@ void testDispatchSelectQueryReuseSession() { Assertions.assertEquals(MOCK_SESSION_ID, dispatchQueryResponse.getSessionId()); } - @Test - void testDispatchSelectQueryInvalidSession() { - String query = "select * from my_glue.default.http_logs"; - DispatchQueryRequest queryRequest = dispatchQueryRequestWithSessionId(query, "invalid"); - - doReturn(true).when(sessionManager).isEnabled(); - doReturn(Optional.empty()).when(sessionManager).getSession(any()); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); - doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, () -> sparkQueryDispatcher.dispatch(queryRequest)); - - verifyNoInteractions(emrServerlessClient); - verify(sessionManager, never()).createSession(any()); - Assertions.assertEquals( - "no session found. " + new SessionId("invalid"), exception.getMessage()); - } - @Test void testDispatchSelectQueryFailedCreateSession() { String query = "select * from my_glue.default.http_logs"; From f30079a9aa93906895b5389096f4118da34998e7 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 25 Oct 2023 16:54:39 -0700 Subject: [PATCH 4/4] fix error response Signed-off-by: Peng Huo --- .../sql/spark/execution/statement/StatementModel.java | 2 +- .../spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java index 2a1043bf73..adc147c905 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java @@ -36,7 +36,7 @@ public class StatementModel extends StateModel { public static final String QUERY_ID = "queryId"; public static final String SUBMIT_TIME = "submitTime"; public static final String ERROR = "error"; - public static final String UNKNOWN = "unknown"; + public static final String UNKNOWN = ""; public static final String STATEMENT_DOC_TYPE = "statement"; private final String version; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 54bd253661..cf638effc6 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -45,6 +45,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.Strings; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.plugins.Plugin; @@ -227,6 +228,7 @@ public void withSessionCreateAsyncQueryThenGetResultThenCancel() { // 2. fetch async query result. AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertTrue(Strings.isEmpty(asyncQueryResults.getError())); assertEquals(StatementState.WAITING.getState(), asyncQueryResults.getStatus()); // 3. cancel async query.