Skip to content

Commit

Permalink
Add a config property for excluding invalid worker session properties
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Jan 31, 2025
1 parent 14063ed commit f3033e0
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 13 deletions.
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ be executed within a single node.

The corresponding session property is :ref:`admin/properties-session:\`\`single_node_execution_enabled\`\``.

``exclude-invalid-worker-session-properties``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

When ``exclude-invalid-worker-session-properties`` is ``true``, worker session properties that are
incompatible with the cluster type are excluded. For example, when ``native-execution-enabled``
is ``true``, java-worker only session properties are excluded and the native-worker only
session properties are included.

.. _tuning-memory:

Memory Management Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,19 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
// Worker session property providers
MapBinder<String, WorkerSessionPropertyProvider> mapBinder =
newMapBinder(binder, String.class, WorkerSessionPropertyProvider.class);
mapBinder.addBinding("java-worker").to(JavaWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
if (!serverConfig.isCoordinatorSidecarEnabled()) {
mapBinder.addBinding("native-worker").to(NativeWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
if (featuresConfig.isNativeExecutionEnabled()) {
if (!serverConfig.isCoordinatorSidecarEnabled()) {
mapBinder.addBinding("native-worker").to(NativeWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
}
if (!featuresConfig.isExcludeInvalidWorkerSessionProperties()) {
mapBinder.addBinding("java-worker").to(JavaWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
}
}
else {
mapBinder.addBinding("java-worker").to(JavaWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
if (!featuresConfig.isExcludeInvalidWorkerSessionProperties()) {
mapBinder.addBinding("native-worker").to(NativeWorkerSessionPropertyProvider.class).in(Scopes.SINGLETON);
}
}

// Node manager binding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public class FeaturesConfig
private boolean includeValuesNodeInConnectorOptimizer = true;

private boolean eagerPlanValidationEnabled;

private boolean setExcludeInvalidWorkerSessionProperties;
private int eagerPlanValidationThreadPoolSize = 20;

private boolean prestoSparkExecutionEnvironment;
Expand Down Expand Up @@ -2930,4 +2932,17 @@ public FeaturesConfig setExpressionOptimizerName(String expressionOptimizerName)
this.expressionOptimizerName = expressionOptimizerName;
return this;
}

@Config("exclude-invalid-worker-session-properties")
@ConfigDescription("Exclude worker session properties from invalid clusters")
public FeaturesConfig setExcludeInvalidWorkerSessionProperties(boolean setExcludeInvalidWorkerSessionProperties)
{
this.setExcludeInvalidWorkerSessionProperties = setExcludeInvalidWorkerSessionProperties;
return this;
}

public boolean isExcludeInvalidWorkerSessionProperties()
{
return this.setExcludeInvalidWorkerSessionProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ public void testDefaults()
.setSingleNodeExecutionEnabled(false)
.setNativeExecutionScaleWritersThreadsEnabled(false)
.setEnhancedCTESchedulingEnabled(true)
.setExpressionOptimizerName("default"));
.setExpressionOptimizerName("default")
.setExcludeInvalidWorkerSessionProperties(false));
}

@Test
Expand Down Expand Up @@ -454,6 +455,7 @@ public void testExplicitPropertyMappings()
.put("native-execution-scale-writer-threads-enabled", "true")
.put("enhanced-cte-scheduling-enabled", "false")
.put("expression-optimizer-name", "custom")
.put("exclude-invalid-worker-session-properties", "true")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -652,7 +654,8 @@ public void testExplicitPropertyMappings()
.setSingleNodeExecutionEnabled(true)
.setNativeExecutionScaleWritersThreadsEnabled(true)
.setEnhancedCTESchedulingEnabled(false)
.setExpressionOptimizerName("custom");
.setExpressionOptimizerName("custom")
.setExcludeInvalidWorkerSessionProperties(true);
assertFullMapping(properties, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static Map<String, String> getNativeSidecarProperties()
{
return ImmutableMap.<String, String>builder()
.put("coordinator-sidecar-enabled", "true")
.put("exclude-invalid-worker-session-properties", "true")
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,7 @@ public void testShowSession()
@Test
public void testSetJavaWorkerSessionProperty()
{
@Language("SQL") String setSession = "SET SESSION aggregation_spill_enabled=false";
MaterializedResult setSessionResult = computeActual(setSession);
assertEquals(
setSessionResult.toString(),
"MaterializedResult{rows=[[true]], " +
"types=[boolean], " +
"setSessionProperties={aggregation_spill_enabled=false}, " +
"resetSessionProperties=[], updateType=SET SESSION}");
assertQueryFails("SET SESSION aggregation_spill_enabled=false", "line 1:1: Session property aggregation_spill_enabled does not exist");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static org.testng.Assert.assertEquals;

public class TestSetWorkerSessionPropertiesExcludingInvalidProperties
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return DistributedQueryRunner.builder(testSessionBuilder().build())
.setSingleCoordinatorProperty("exclude-invalid-worker-session-properties", "true")
.build();
}

@Test
public void testSetSessionInvalidNativeWorkerSessionProperty()
{
// SET SESSION on a native-worker session property
assertQueryFails("SET SESSION native_expression_max_array_size_in_reduce=50000", "line 1:1: Session property native_expression_max_array_size_in_reduce does not exist");
}

@Test
public void testSetSessionValidJavaWorkerSessionProperty()
{
// SET SESSION on a java-worker session property
@Language("SQL") String setSession = "SET SESSION distinct_aggregation_spill_enabled=false";
MaterializedResult setSessionResult = computeActual(setSession);
assertEquals(
setSessionResult.toString(),
"MaterializedResult{rows=[[true]], " +
"types=[boolean], " +
"setSessionProperties={distinct_aggregation_spill_enabled=false}, " +
"resetSessionProperties=[], updateType=SET SESSION}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static org.testng.Assert.assertEquals;

public class TestSetWorkerSessionPropertiesIncludingInvalidProperties
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return DistributedQueryRunner.builder(testSessionBuilder().build()).build();
}

@Test
public void testSetSessionValidNativeWorkerSessionProperty()
{
// SET SESSION on a native-worker session property
@Language("SQL") String setSession = "SET SESSION native_expression_max_array_size_in_reduce=50000";
MaterializedResult setSessionResult = computeActual(setSession);
assertEquals(
setSessionResult.toString(),
"MaterializedResult{rows=[[true]], " +
"types=[boolean], " +
"setSessionProperties={native_expression_max_array_size_in_reduce=50000}, " +
"resetSessionProperties=[], updateType=SET SESSION}");
}

@Test
public void testSetSessionValidJavaWorkerSessionProperty()
{
// SET SESSION on a java-worker session property
@Language("SQL") String setSession = "SET SESSION distinct_aggregation_spill_enabled=false";
MaterializedResult setSessionResult = computeActual(setSession);
assertEquals(
setSessionResult.toString(),
"MaterializedResult{rows=[[true]], " +
"types=[boolean], " +
"setSessionProperties={distinct_aggregation_spill_enabled=false}, " +
"resetSessionProperties=[], updateType=SET SESSION}");
}
}

0 comments on commit f3033e0

Please sign in to comment.