Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/2.x' into backport/backport-27…
Browse files Browse the repository at this point in the history
…59-to-2.x
  • Loading branch information
LantaoJin committed Jul 19, 2024
2 parents af8468a + f7c1f09 commit 882c836
Show file tree
Hide file tree
Showing 54 changed files with 1,817 additions and 888 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integ-tests-with-security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ windows-latest, macos-latest ]
os: [ windows-latest, macos-13 ]
java: [ 11, 17, 21 ]

runs-on: ${{ matrix.os }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ jobs:
matrix:
entry:
- { os: windows-latest, java: 11, os_build_args: -x doctest -PbuildPlatform=windows }
- { os: macos-latest, java: 11}
- { os: macos-13, java: 11}
- { os: windows-latest, java: 17, os_build_args: -x doctest -PbuildPlatform=windows }
- { os: macos-latest, java: 17 }
- { os: macos-13, java: 17 }
- { os: windows-latest, java: 21, os_build_args: -x doctest -PbuildPlatform=windows }
- { os: macos-latest, java: 21 }
- { os: macos-13, java: 21 }
runs-on: ${{ matrix.entry.os }}

steps:
Expand Down
6 changes: 4 additions & 2 deletions async-query-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Following is the list of extension points where the consumer of the library need
- [QueryIdProvider](src/main/java/org/opensearch/sql/spark/dispatcher/QueryIdProvider.java)
- [SessionIdProvider](src/main/java/org/opensearch/sql/spark/execution/session/SessionIdProvider.java)
- [SessionConfigSupplier](src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java)
- [EMRServerlessClientFactory](src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactory.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [DataSourceSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/DataSourceSparkParameterComposer.java)
- [GeneralSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/GeneralSparkParameterComposer.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java) To be deprecated in favor of GeneralSparkParameterComposer

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.opensearch.sql.spark.config;

import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilder;

/**
* Interface for extension point to allow modification of spark submit parameter. modifyParameter
* method is called after the default spark submit parameter is build.
* method is called after the default spark submit parameter is build. To be deprecated in favor of
* {@link org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer}
*/
public interface SparkSubmitParameterModifier {
void modifyParameters(SparkSubmitParameters parameters);
void modifyParameters(SparkSubmitParametersBuilder parametersBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand All @@ -25,6 +24,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -37,6 +37,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
protected final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -80,12 +81,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.query(dispatchQueryRequest.getQuery())
.build()
.dataSource(
context.getDataSourceMetadata(),
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
.toString(),
tags,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -32,6 +31,7 @@
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -46,6 +46,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
private final JobExecutionResponseReader jobExecutionResponseReader;
private final LeaseManager leaseManager;
private final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -112,12 +113,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.className(FLINT_SESSION_CLASS_NAME)
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.build()
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()),
.dataSource(
dataSourceMetadata,
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()),
tags,
dataSourceMetadata.getResultIndex(),
dataSourceMetadata.getName()),
Expand Down
Loading

0 comments on commit 882c836

Please sign in to comment.