Skip to content

Commit

Permalink
Validate session with flint datasource passed in async job request
Browse files Browse the repository at this point in the history
Currently, if there's a session running with Datasource1 and the user makes a request to the async API with the same session but a different catalog Datasource2 then SQL plugin doesn't return a new session for Datasource2. This PR creates a new session when there’s a mismatch between datasource and session_id.

Testing done:
1. manual testing
2. added IT.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Dec 1, 2023
1 parent 60058ce commit 06fa45d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public DispatchQueryResponse submit(
if (dispatchQueryRequest.getSessionId() != null) {
// get session from request
SessionId sessionId = new SessionId(dispatchQueryRequest.getSessionId());
Optional<Session> createdSession = sessionManager.getSession(sessionId);
Optional<Session> createdSession =
sessionManager.getSession(sessionId, dispatchQueryRequest.getDatasource());
if (createdSession.isPresent()) {
session = createdSession.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,27 @@ public Session createSession(CreateSessionRequest request) {
return session;
}

public Optional<Session> getSession(SessionId sid) {
/**
* Retrieves the session associated with the given session ID.
*
* <p>This method is particularly used in scenarios where the data source encoded in the session
* ID is deemed untrustworthy. It allows for the safe retrieval of session details based on a
* known and validated session ID, rather than relying on potentially outdated data source
* information.
*
* <p>For more context on the use case and implementation, refer to the documentation here:
* https://tinyurl.com/bdh6s834
*
* @param sid The unique identifier of the session. It is used to fetch the corresponding session
* details.
* @param dataSourceName The name of the data source. This parameter is utilized in the session
* retrieval process.
* @return An Optional containing the session associated with the provided session ID. Returns an
* empty Optional if no matching session is found.
*/
public Optional<Session> getSession(SessionId sid, String dataSourceName) {
Optional<SessionModel> model =
StateStore.getSession(stateStore, sid.getDataSourceName()).apply(sid.getSessionId());
StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId());
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
Expand All @@ -51,6 +69,22 @@ public Optional<Session> getSession(SessionId sid) {
return Optional.empty();
}

/**
* Retrieves the session associated with the provided session ID.
*
* <p>This method is utilized specifically in scenarios where the data source information encoded
* in the session ID is considered trustworthy. It ensures the retrieval of session details based
* on the session ID, relying on the integrity of the data source information contained within it.
*
* @param sid The session ID used to identify and retrieve the corresponding session. It is
* expected to contain valid and trusted data source information.
* @return An Optional containing the session associated with the provided session ID. If no
* session is found that matches the session ID, an empty Optional is returned.
*/
public Optional<Session> getSession(SessionId sid) {
return getSession(sid, sid.getDataSourceName());
}

// todo, keep it only for testing, will remove it later.
public boolean isEnabled() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,44 @@ public void recreateSessionIfNotReady() {
assertNotEquals(second.getSessionId(), third.getSessionId());
}

@Test
public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrsClient);

// enable session
enableSession(true);

// 1. create async query.
CreateAsyncQueryResponse first =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, null));
assertNotNull(first.getSessionId());

// set sessionState to RUNNING
setSessionState(first.getSessionId(), SessionState.RUNNING);

// 2. reuse session id
CreateAsyncQueryResponse second =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, first.getSessionId()));

assertEquals(first.getSessionId(), second.getSessionId());

// set sessionState to RUNNING
setSessionState(second.getSessionId(), SessionState.RUNNING);

// 3. given different source, create a new session id
CreateAsyncQueryResponse third =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"SHOW SCHEMAS IN " + DSOTHER, DSOTHER, LangType.SQL, second.getSessionId()));
assertNotEquals(second.getSessionId(), third.getSessionId());
}

@Test
public void submitQueryInInvalidSessionWillCreateNewSession() {
LocalEMRSClient emrsClient = new LocalEMRSClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ void testDispatchSelectQueryReuseSession() {
doReturn(true).when(sessionManager).isEnabled();
doReturn(Optional.of(session))
.when(sessionManager)
.getSession(eq(new SessionId(MOCK_SESSION_ID)));
.getSession(eq(new SessionId(MOCK_SESSION_ID)), eq("my_glue"));
doReturn(new SessionId(MOCK_SESSION_ID)).when(session).getSessionId();
doReturn(new StatementId(MOCK_STATEMENT_ID)).when(session).submit(any());
when(session.getSessionModel().getJobId()).thenReturn(EMR_JOB_ID);
Expand Down

0 comments on commit 06fa45d

Please sign in to comment.