Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Bug Fix, support cancel query in running state #2352

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ public void open() {

/** Cancel a statement. */
public void cancel() {
if (statementModel.getStatementState().equals(StatementState.RUNNING)) {
StatementState statementState = statementModel.getStatementState();

if (statementState.equals(StatementState.SUCCESS)
|| statementState.equals(StatementState.FAILED)
|| statementState.equals(StatementState.CANCELLED)) {
String errorMsg =
String.format("can't cancel statement in waiting state. statement: %s.", statementId);
String.format(
"can't cancel statement in %s state. statement: %s.",
statementState.getState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.sql.spark.execution.session.InteractiveSessionTest.createSessionRequest;
import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting;
import static org.opensearch.sql.spark.execution.statement.StatementState.CANCELLED;
import static org.opensearch.sql.spark.execution.statement.StatementState.RUNNING;
import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING;
import static org.opensearch.sql.spark.execution.statement.StatementTest.TestStatement.testStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
Expand Down Expand Up @@ -168,38 +169,93 @@ public void cancelFailedBecauseOfConflict() {
}

@Test
public void cancelRunningStatementFailed() {
public void cancelSuccessStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st =
Statement.builder()
.sessionId(new SessionId("sessionId"))
.applicationId("appId")
.jobId("jobId")
.statementId(stId)
.langType(LangType.SQL)
.datasourceName(DS_NAME)
.query("query")
.queryId("statementId")
.stateStore(stateStore)
.build();
st.open();
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(),
StatementState.SUCCESS,
model.getSeqNo(),
model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in success state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelFailedStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(),
StatementState.RUNNING,
StatementState.FAILED,
model.getSeqNo(),
model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in waiting state. statement: %s.", stId),
String.format("can't cancel statement in failed state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelCancelledStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(), CANCELLED, model.getSeqNo(), model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in cancelled state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelRunningStatementSuccess() {
Statement st =
Statement.builder()
.sessionId(new SessionId("sessionId"))
.applicationId("appId")
.jobId("jobId")
.statementId(new StatementId("statementId"))
.langType(LangType.SQL)
.datasourceName(DS_NAME)
.query("query")
.queryId("statementId")
.stateStore(stateStore)
.build();

// submit statement
TestStatement testStatement = testStatement(st, stateStore);
testStatement
.open()
.assertSessionState(WAITING)
.assertStatementId(new StatementId("statementId"));

testStatement.run();

// close statement
testStatement.cancel().assertSessionState(CANCELLED);
}

@Test
public void submitStatementInRunningSession() {
Session session =
Expand Down Expand Up @@ -355,9 +411,33 @@ public TestStatement cancel() {
st.cancel();
return this;
}

public TestStatement run() {
StatementModel model =
updateStatementState(stateStore, DS_NAME).apply(st.getStatementModel(), RUNNING);
st.setStatementModel(model);
return this;
}
}

private QueryRequest queryRequest() {
return new QueryRequest(AsyncQueryId.newAsyncQueryId(DS_NAME), LangType.SQL, "select 1");
}

private Statement createStatement(StatementId stId) {
Statement st =
Statement.builder()
.sessionId(new SessionId("sessionId"))
.applicationId("appId")
.jobId("jobId")
.statementId(stId)
.langType(LangType.SQL)
.datasourceName(DS_NAME)
.query("query")
.queryId("statementId")
.stateStore(stateStore)
.build();
st.open();
return st;
}
}