Skip to content

Commit

Permalink
Validate session with flint datasource passed in async job request
Browse files Browse the repository at this point in the history
Currently, if there's a session running with Datasource1 and the user makes a request to the async API with the same session but a different catalog Datasource2 then SQL plugin doesn't return a new session for Datasource2. This PR creates a new session when there’s a mismatch between datasource and session_id.

Testing done:
1. manual testing
2. added IT.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Dec 1, 2023
1 parent 60058ce commit dfb523d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public DispatchQueryResponse submit(
if (dispatchQueryRequest.getSessionId() != null) {
// get session from request
SessionId sessionId = new SessionId(dispatchQueryRequest.getSessionId());
Optional<Session> createdSession = sessionManager.getSession(sessionId);
Optional<Session> createdSession = sessionManager.getSession(sessionId, dispatchQueryRequest.getDatasource());
if (createdSession.isPresent()) {
session = createdSession.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,21 @@ public Session createSession(CreateSessionRequest request) {
return session;
}

public Optional<Session> getSession(SessionId sid) {
/**
* Retrieves the session associated with the given session ID.
*
* This method is particularly used in scenarios where the data source encoded in the session ID is deemed untrustworthy.
* It allows for the safe retrieval of session details based on a known and validated session ID, rather than relying on potentially outdated data source information.
*
* For more context on the use case and implementation, refer to the documentation here: https://tinyurl.com/bdh6s834
*
* @param sid The unique identifier of the session. It is used to fetch the corresponding session details.
* @param dataSourceName The name of the data source. This parameter is utilized in the session retrieval process.
* @return An Optional containing the session associated with the provided session ID. Returns an empty Optional if no matching session is found.
*/
public Optional<Session> getSession(SessionId sid, String dataSourceName) {
Optional<SessionModel> model =
StateStore.getSession(stateStore, sid.getDataSourceName()).apply(sid.getSessionId());
StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId());
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
Expand All @@ -51,6 +63,18 @@ public Optional<Session> getSession(SessionId sid) {
return Optional.empty();
}

/**
* Retrieves the session associated with the provided session ID.
*
* This method is utilized specifically in scenarios where the data source information encoded in the session ID is considered trustworthy. It ensures the retrieval of session details based on the session ID, relying on the integrity of the data source information contained within it.
*
* @param sid The session ID used to identify and retrieve the corresponding session. It is expected to contain valid and trusted data source information.
* @return An Optional containing the session associated with the provided session ID. If no session is found that matches the session ID, an empty Optional is returned.
*/
public Optional<Session> getSession(SessionId sid) {
return getSession(sid, sid.getDataSourceName());
}

// todo, keep it only for testing, will remove it later.
public boolean isEnabled() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,43 @@ public void recreateSessionIfNotReady() {
assertNotEquals(second.getSessionId(), third.getSessionId());
}

@Test
public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrsClient);

// enable session
enableSession(true);

// 1. create async query.
CreateAsyncQueryResponse first =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest("SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, null));
assertNotNull(first.getSessionId());

// set sessionState to RUNNING
setSessionState(first.getSessionId(), SessionState.RUNNING);

// 2. reuse session id
CreateAsyncQueryResponse second =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, first.getSessionId()));

assertEquals(first.getSessionId(), second.getSessionId());

// set sessionState to RUNNING
setSessionState(second.getSessionId(), SessionState.RUNNING);

// 3. given different source, create a new session id
CreateAsyncQueryResponse third =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"SHOW SCHEMAS IN " + DSOTHER, DSOTHER, LangType.SQL, second.getSessionId()));
assertNotEquals(second.getSessionId(), third.getSessionId());
}

@Test
public void submitQueryInInvalidSessionWillCreateNewSession() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ void testDispatchSelectQueryReuseSession() {
doReturn(true).when(sessionManager).isEnabled();
doReturn(Optional.of(session))
.when(sessionManager)
.getSession(eq(new SessionId(MOCK_SESSION_ID)));
.getSession(eq(new SessionId(MOCK_SESSION_ID)), eq("my_glue"));
doReturn(new SessionId(MOCK_SESSION_ID)).when(session).getSessionId();
doReturn(new StatementId(MOCK_STATEMENT_ID)).when(session).submit(any());
when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID);
Expand Down

0 comments on commit dfb523d

Please sign in to comment.