Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add Statement #2318

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;
import static org.opensearch.sql.spark.execution.session.SessionState.END_STATE;
import static org.opensearch.sql.spark.execution.statement.StatementId.newStatementId;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;

import java.util.Optional;
import lombok.Builder;
Expand All @@ -14,7 +18,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;

/**
* Interactive session.
Expand All @@ -27,9 +35,8 @@ public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
Expand All @@ -41,21 +48,75 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStateStore.create(sessionModel);
createSession(stateStore).apply(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

/** todo. StatementSweeper will delete doc. */
@Override
public void close() {
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
if (model.isEmpty()) {
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}

/** Submit statement. If submit successfully, Statement in waiting state. */
public StatementId submit(QueryRequest request) {
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
sessionModel = model.get();
if (!END_STATE.contains(sessionModel.getSessionState())) {
StatementId statementId = newStatementId();
Statement st =
Statement.builder()
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementId(statementId)
.langType(LangType.SQL)
.query(request.getQuery())
.queryId(statementId.getId())
.build();
st.open();
return statementId;
} else {
String errMsg =
String.format(
"can't submit statement, session should not be in end state, "
+ "current session state is: %s",
sessionModel.getSessionState().getSessionState());
LOG.debug(errMsg);
throw new IllegalStateException(errMsg);
}
}
}

@Override
public Optional<Statement> get(StatementId stID) {
return StateStore.getStatement(stateStore)
.apply(stID.getId())
.map(
model ->
Statement.builder()
.sessionId(sessionId)
.applicationId(model.getApplicationId())
.jobId(model.getJobId())
.statementId(model.getStatementId())
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementModel(model)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.sql.spark.execution.session;

import java.util.Optional;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
Expand All @@ -13,6 +18,22 @@ public interface Session {
/** close session. */
void close();

/**
* submit {@link QueryRequest}.
*
* @param request {@link QueryRequest}
* @return {@link StatementId}
*/
StatementId submit(QueryRequest request);

/**
* get {@link Statement}.
*
* @param stID {@link StatementId}
* @return {@link Statement}
*/
Optional<Statement> get(StatementId stID);

SessionModel getSessionModel();

SessionId getSessionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;
import org.opensearch.sql.spark.execution.statestore.StateStore;

/**
* Singleton Class
Expand All @@ -19,27 +19,27 @@
*/
@RequiredArgsConstructor
public class SessionManager {
private final SessionStateStore stateStore;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = stateStore.get(sid);
Optional<SessionModel> model = StateStore.getSession(stateStore).apply(sid.getSessionId());
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.sessionStateStore(stateStore)
.stateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel implements ToXContentObject {
public class SessionModel extends StateModel {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
Expand Down Expand Up @@ -73,6 +73,27 @@ public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

public static SessionModel copyWithState(
SessionModel copy, SessionState state, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(state)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
Expand Down Expand Up @@ -140,4 +161,9 @@ public static SessionModel initInteractiveSession(
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}

@Override
public String getId() {
return sessionId.getSessionId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.sql.spark.execution.session;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
Expand All @@ -17,6 +19,8 @@ public enum SessionState {
DEAD("dead"),
FAIL("fail");

public static List<SessionState> END_STATE = ImmutableList.of(DEAD, FAIL);

private final String sessionState;

SessionState(String sessionState) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statement;

import lombok.Data;
import org.opensearch.sql.spark.rest.model.LangType;

@Data
public class QueryRequest {
private final LangType langType;
private final String query;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.statement.StatementModel.submitStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;

/** Statement represent query to execute in session. One statement map to one session. */
@Getter
@Builder
public class Statement {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final String applicationId;
private final String jobId;
private final StatementId statementId;
private final LangType langType;
private final String query;
private final String queryId;
private final StateStore stateStore;

@Setter private StatementModel statementModel;

/** Open a statement. */
public void open() {
try {
statementModel =
submitStatement(sessionId, applicationId, jobId, statementId, langType, query, queryId);
statementModel = createStatement(stateStore).apply(statementModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

/** Cancel a statement. */
public void cancel() {
if (statementModel.getStatementState().equals(StatementState.RUNNING)) {
String errorMsg =
String.format("can't cancel statement in waiting state. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
try {
this.statementModel =
updateStatementState(stateStore).apply(this.statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
getStatement(stateStore).apply(statementModel.getId()).orElse(this.statementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
this.statementModel.getStatementState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

public StatementState getStatementState() {
return statementModel.getStatementState();
}
}
Loading
Loading