Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Dec 5, 2023
1 parent 06fa45d commit 4420469
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum Key {
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");
"plugins.query.executionengine.spark.auto_index_management.enabled"),
SESSION_INACTIVITY_TIMEOUT_MILLIS(
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Long> SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING =
Setting.longSetting(
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(),
180000L,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -287,6 +294,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
register(
settingBuilder,
clusterSettings,
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
defaultSettings = settingBuilder.build();
}

Expand Down Expand Up @@ -356,6 +369,7 @@ public static List<Setting<?>> pluginSettings() {
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING)
.build();
}

Expand Down
9 changes: 9 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -109,6 +110,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;

recoverIndexJobStatement
: RECOVER INDEX JOB identifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ VALUES: 'VALUES';
VARCHAR: 'VARCHAR';
VAR: 'VAR';
VARIABLE: 'VARIABLE';
VARIANT: 'VARIANT';
VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
Expand Down
16 changes: 14 additions & 2 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;

clusterBySpec
: CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
;

bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
Expand Down Expand Up @@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
Expand Down Expand Up @@ -582,6 +587,10 @@ notMatchedBySourceAction
| UPDATE SET assignmentList
;

exceptClause
: EXCEPT LEFT_PAREN exceptCols=multipartIdentifierList RIGHT_PAREN
;

assignmentList
: assignment (COMMA assignment)*
;
Expand Down Expand Up @@ -964,8 +973,8 @@ primaryExpression
| LAST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #last
| POSITION LEFT_PAREN substr=valueExpression IN str=valueExpression RIGHT_PAREN #position
| constant #constantDefault
| ASTERISK #star
| qualifiedName DOT ASTERISK #star
| ASTERISK exceptClause? #star
| qualifiedName DOT ASTERISK exceptClause? #star
| LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor
| LEFT_PAREN query RIGHT_PAREN #subqueryExpression
| functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
Expand Down Expand Up @@ -1081,6 +1090,7 @@ type
| DECIMAL | DEC | NUMERIC
| VOID
| INTERVAL
| VARIANT
| ARRAY | STRUCT | MAP
| unsupportedType=identifier
;
Expand Down Expand Up @@ -1540,6 +1550,7 @@ ansiNonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down Expand Up @@ -1888,6 +1899,7 @@ nonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ public DispatchQueryResponse submit(
if (dispatchQueryRequest.getSessionId() != null) {
// get session from request
SessionId sessionId = new SessionId(dispatchQueryRequest.getSessionId());
Optional<Session> createdSession =
sessionManager.getSession(sessionId, dispatchQueryRequest.getDatasource());
Optional<Session> createdSession = sessionManager.getSession(sessionId);
if (createdSession.isPresent()) {
session = createdSession.get();
}
}
if (session == null || !session.isReady()) {
if (session == null
|| !session.isOperationalForDataSource(dispatchQueryRequest.getDatasource())) {
// create session if not exist or session dead/fail
tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText());
session =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

/**
* Interactive session.
Expand All @@ -42,6 +43,9 @@ public class InteractiveSession implements Session {
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
private SessionModel sessionModel;
// the threshold of elapsed time in milliseconds before we say a session is stale
private long sessionInactivityTimeoutMilli;
private TimeProvider timeProvider;

@Override
public void open(CreateSessionRequest createSessionRequest) {
Expand Down Expand Up @@ -134,7 +138,14 @@ public Optional<Statement> get(StatementId stID) {
}

@Override
public boolean isReady() {
return sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
public boolean isOperationalForDataSource(String dataSourceName) {
boolean isSessionStateValid =
sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
boolean isDataSourceMatch = sessionId.getDataSourceName().equals(dataSourceName);
boolean isSessionUpdatedRecently =
timeProvider.currentEpochMillis() - sessionModel.getLastUpdateTime()
<= sessionInactivityTimeoutMilli;

return isSessionStateValid && isDataSourceMatch && isSessionUpdatedRecently;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface Session {
SessionId getSessionId();

/** return true if session is ready to use. */
boolean isReady();
boolean isOperationalForDataSource(String dataSourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,31 @@

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS;
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.utils.RealTimeProvider;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;
private final Settings settings;
private Settings settings;

public SessionManager(
StateStore stateStore, EMRServerlessClient emrServerlessClient, Settings settings) {
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
this.settings = settings;
}

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
Expand Down Expand Up @@ -63,6 +70,9 @@ public Optional<Session> getSession(SessionId sid, String dataSourceName) {
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.sessionInactivityTimeoutMilli(
settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS))
.timeProvider(new RealTimeProvider())
.build();
return Optional.ofNullable(session);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

public class RealTimeProvider implements TimeProvider {
@Override
public long currentEpochMillis() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

public interface TimeProvider {
long currentEpochMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Disabled;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
Expand Down Expand Up @@ -421,6 +423,60 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() {
assertNotEquals(second.getSessionId(), third.getSessionId());
}

@Test
public void recreateSessionIfStale() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrsClient);

// enable session
enableSession(true);

// 1. create async query.
CreateAsyncQueryResponse first =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null));
assertNotNull(first.getSessionId());

// set sessionState to RUNNING
setSessionState(first.getSessionId(), SessionState.RUNNING);

// 2. reuse session id
CreateAsyncQueryResponse second =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"select 1", DATASOURCE, LangType.SQL, first.getSessionId()));

assertEquals(first.getSessionId(), second.getSessionId());

try {
// set timeout setting to 0
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
org.opensearch.common.settings.Settings settings =
org.opensearch.common.settings.Settings.builder()
.put(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(), 0)
.build();
request.transientSettings(settings);
client().admin().cluster().updateSettings(request).actionGet(60000);

// 3. not reuse session id
CreateAsyncQueryResponse third =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"select 1", DATASOURCE, LangType.SQL, second.getSessionId()));
assertNotEquals(second.getSessionId(), third.getSessionId());
} finally {
// set timeout setting to 0
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
org.opensearch.common.settings.Settings settings =
org.opensearch.common.settings.Settings.builder()
.putNull(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue())
.build();
request.transientSettings(settings);
client().admin().cluster().updateSettings(request).actionGet(60000);
}
}

@Test
public void submitQueryInInvalidSessionWillCreateNewSession() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ void testDispatchSelectQueryReuseSession() {
doReturn(true).when(sessionManager).isEnabled();
doReturn(Optional.of(session))
.when(sessionManager)
.getSession(eq(new SessionId(MOCK_SESSION_ID)), eq("my_glue"));
.getSession(eq(new SessionId(MOCK_SESSION_ID)));
doReturn(new SessionId(MOCK_SESSION_ID)).when(session).getSessionId();
doReturn(new StatementId(MOCK_STATEMENT_ID)).when(session).submit(any());
when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID);
when(session.isReady()).thenReturn(true);
when(session.isOperationalForDataSource(any())).thenReturn(true);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata);
doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void closeNotExistSession() {
@Test
public void sessionManagerCreateSession() {
Session session =
new SessionManager(stateStore, emrsClient, sessionSetting(false))
new SessionManager(stateStore, emrsClient, sessionSetting())
.createSession(createSessionRequest());

TestSession testSession = testSession(session, stateStore);
Expand All @@ -127,8 +127,7 @@ public void sessionManagerCreateSession() {

@Test
public void sessionManagerGetSession() {
SessionManager sessionManager =
new SessionManager(stateStore, emrsClient, sessionSetting(false));
SessionManager sessionManager = new SessionManager(stateStore, emrsClient, sessionSetting());
Session session = sessionManager.createSession(createSessionRequest());

Optional<Session> managerSession = sessionManager.getSession(session.getSessionId());
Expand All @@ -138,8 +137,7 @@ public void sessionManagerGetSession() {

@Test
public void sessionManagerGetSessionNotExist() {
SessionManager sessionManager =
new SessionManager(stateStore, emrsClient, sessionSetting(false));
SessionManager sessionManager = new SessionManager(stateStore, emrsClient, sessionSetting());

Optional<Session> managerSession =
sessionManager.getSession(SessionId.newSessionId("no-exist"));
Expand Down
Loading

0 comments on commit 4420469

Please sign in to comment.