Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate session with flint datasource passed in async job request #2448

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public enum Key {
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");
"plugins.query.executionengine.spark.auto_index_management.enabled"),
SESSION_INACTIVITY_TIMEOUT_MILLIS(
"plugins.query.executionengine.spark.session_inactivity_timeout_millis");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Long> SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING =
Setting.longSetting(
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS.getKeyValue(),
180000L,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -287,6 +294,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
new Updater(Key.DATASOURCES_LIMIT));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
register(
settingBuilder,
clusterSettings,
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
defaultSettings = settingBuilder.build();
}

Expand Down Expand Up @@ -356,6 +369,7 @@ public static List<Setting<?>> pluginSettings() {
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING)
.build();
}

Expand Down
9 changes: 9 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -109,6 +110,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;

recoverIndexJobStatement
: RECOVER INDEX JOB identifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ VALUES: 'VALUES';
VARCHAR: 'VARCHAR';
VAR: 'VAR';
VARIABLE: 'VARIABLE';
VARIANT: 'VARIANT';
VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
Expand Down
16 changes: 14 additions & 2 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;

clusterBySpec
: CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
;

bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
Expand Down Expand Up @@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
Expand Down Expand Up @@ -582,6 +587,10 @@ notMatchedBySourceAction
| UPDATE SET assignmentList
;

exceptClause
: EXCEPT LEFT_PAREN exceptCols=multipartIdentifierList RIGHT_PAREN
;

assignmentList
: assignment (COMMA assignment)*
;
Expand Down Expand Up @@ -964,8 +973,8 @@ primaryExpression
| LAST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #last
| POSITION LEFT_PAREN substr=valueExpression IN str=valueExpression RIGHT_PAREN #position
| constant #constantDefault
| ASTERISK #star
| qualifiedName DOT ASTERISK #star
| ASTERISK exceptClause? #star
| qualifiedName DOT ASTERISK exceptClause? #star
| LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor
| LEFT_PAREN query RIGHT_PAREN #subqueryExpression
| functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
Expand Down Expand Up @@ -1081,6 +1090,7 @@ type
| DECIMAL | DEC | NUMERIC
| VOID
| INTERVAL
| VARIANT
| ARRAY | STRUCT | MAP
| unsupportedType=identifier
;
Expand Down Expand Up @@ -1540,6 +1550,7 @@ ansiNonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down Expand Up @@ -1888,6 +1899,7 @@ nonReserved
| VARCHAR
| VAR
| VARIABLE
| VARIANT
| VERSION
| VIEW
| VIEWS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public DispatchQueryResponse submit(
session = createdSession.get();
}
}
if (session == null || !session.isReady()) {
if (session == null
|| !session.isOperationalForDataSource(dispatchQueryRequest.getDatasource())) {
// create session if not exist or session dead/fail
tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText());
session =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

/**
* Interactive session.
Expand All @@ -42,6 +43,9 @@ public class InteractiveSession implements Session {
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
private SessionModel sessionModel;
// the threshold of elapsed time in milliseconds before we say a session is stale
private long sessionInactivityTimeoutMilli;
private TimeProvider timeProvider;

@Override
public void open(CreateSessionRequest createSessionRequest) {
Expand Down Expand Up @@ -134,7 +138,14 @@ public Optional<Statement> get(StatementId stID) {
}

@Override
public boolean isReady() {
return sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
public boolean isOperationalForDataSource(String dataSourceName) {
boolean isSessionStateValid =
sessionModel.getSessionState() != DEAD && sessionModel.getSessionState() != FAIL;
boolean isDataSourceMatch = sessionId.getDataSourceName().equals(dataSourceName);
boolean isSessionUpdatedRecently =
timeProvider.currentEpochMillis() - sessionModel.getLastUpdateTime()
<= sessionInactivityTimeoutMilli;

return isSessionStateValid && isDataSourceMatch && isSessionUpdatedRecently;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface Session {
SessionId getSessionId();

/** return true if session is ready to use. */
boolean isReady();
boolean isOperationalForDataSource(String dataSourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,31 @@

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS;
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.utils.RealTimeProvider;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;
private final Settings settings;
private Settings settings;

public SessionManager(
StateStore stateStore, EMRServerlessClient emrServerlessClient, Settings settings) {
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
this.settings = settings;
}

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
Expand All @@ -35,22 +42,59 @@ public Session createSession(CreateSessionRequest request) {
return session;
}

public Optional<Session> getSession(SessionId sid) {
/**
* Retrieves the session associated with the given session ID.
*
* <p>This method is particularly used in scenarios where the data source encoded in the session
* ID is deemed untrustworthy. It allows for the safe retrieval of session details based on a
* known and validated session ID, rather than relying on potentially outdated data source
* information.
*
* <p>For more context on the use case and implementation, refer to the documentation here:
* https://tinyurl.com/bdh6s834
*
* @param sid The unique identifier of the session. It is used to fetch the corresponding session
* details.
* @param dataSourceName The name of the data source. This parameter is utilized in the session
* retrieval process.
* @return An Optional containing the session associated with the provided session ID. Returns an
* empty Optional if no matching session is found.
*/
public Optional<Session> getSession(SessionId sid, String dataSourceName) {
Optional<SessionModel> model =
StateStore.getSession(stateStore, sid.getDataSourceName()).apply(sid.getSessionId());
StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId());
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.sessionInactivityTimeoutMilli(
settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS))
.timeProvider(new RealTimeProvider())
.build();
return Optional.ofNullable(session);
}
return Optional.empty();
}

/**
* Retrieves the session associated with the provided session ID.
*
* <p>This method is utilized specifically in scenarios where the data source information encoded
* in the session ID is considered trustworthy. It ensures the retrieval of session details based
* on the session ID, relying on the integrity of the data source information contained within it.
*
* @param sid The session ID used to identify and retrieve the corresponding session. It is
* expected to contain valid and trusted data source information.
* @return An Optional containing the session associated with the provided session ID. If no
* session is found that matches the session ID, an empty Optional is returned.
*/
public Optional<Session> getSession(SessionId sid) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is only for testing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if yes, add an annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by other src code. Not only for testing.

return getSession(sid, sid.getDataSourceName());
}

// todo, keep it only for testing, will remove it later.
public boolean isEnabled() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

public class RealTimeProvider implements TimeProvider {
@Override
public long currentEpochMillis() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

public interface TimeProvider {
long currentEpochMillis();
}
Loading
Loading