Skip to content

Commit

Permalink
Add more validations and Integ Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 9, 2024
1 parent 8d75efd commit 07c95a7
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,32 +381,97 @@ public void testMissingUnits() {
}

public void testThreadPoolSettings() {
String key1 = "cluster.thread_pool.snapshot.max";
Settings transientSettings = Settings.builder().put(key1, "-1").build();
// wrong threadpool
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.wrong.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getCause().getMessage().contains("illegal thread_pool name : "));
}

String key2 = "cluster.thread_pool.snapshot.max";
Settings persistentSettings = Settings.builder().put(key2, "5").build();
// Scaling threadpool - negative value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
}

// Scaling threadpool - Other than max and core
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have max and core");
}

// Scaling threadpool - core > max
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.setTransientSettings(
Settings.builder().put("cluster.thread_pool.snapshot.core", "2").put("cluster.thread_pool.snapshot.max", "1").build()
)
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
assertEquals(ex.getCause().getMessage(), "core threadpool size cannot be greater than max");
}

transientSettings = Settings.builder().put(key1, "1").build();
persistentSettings = Settings.builder().put(key2, "5").build();
client().admin()
// Scaling threadpool - happy case
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "1").build())
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "5").build())
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());

// Fixed threadpool - Other than size
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have size");
}

// Fixed threadpool - 0 value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "0").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.get], has to be positive value");
}

// Fixed threadpool - happy case
clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());
}

public void testLoggerLevelUpdate() {
Expand Down
74 changes: 53 additions & 21 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -225,22 +228,10 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting("cluster.thread_pool.", (tpSettings) -> {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
int max = tpGroup.getAsInt("max", 1);
int core = tpGroup.getAsInt("core", 1);
int size = tpGroup.getAsInt("size", 1);
if (max <= 0 || core <= 0 || size <= 0) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
}
}
},
public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting(
"cluster.thread_pool.",
Setting.Property.Dynamic,
Setting.Property.NodeScope

);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
Expand All @@ -258,12 +249,11 @@ public void setThreadPool(Settings tpSettings) {
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", o.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", o.getCorePoolSize());
if (core > max) {
// Can we do better than silently ignoring this as this can't be caught in static validation ?
logger.error("Thread pool {} core {} is higher than maximum value {}. Ignoring it", tpName, core, max);
continue;
}
// Below check makes sure we adhere to the constraint that cores <= max at all the time.
/*
If we are decreasing, core pool size has to be decreased first.
If we are increasing ,max pool size has to be increased first
This ensures that core pool is always smaller than max pool size .
*/
if (core < o.getCorePoolSize()) {
o.setCorePoolSize(core);
o.setMaximumPoolSize(max);
Expand All @@ -286,7 +276,49 @@ public void setThreadPool(Settings tpSettings) {

public void setClusterSettings(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting);
}

private void validateSetting(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
if (THREAD_POOL_TYPES.get(tpName) == null) {
throw new IllegalArgumentException("illegal thread_pool name : " + tpName);
}
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
Set<String> so = new HashSet<>(Arrays.asList("max", "core"));
if (tpGroup.keySet().stream().allMatch(so::contains) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have max and core"
);
}
int max = tpGroup.getAsInt("max", o.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", o.getCorePoolSize());
if (core < 1 || max < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
} else if (core > max) {
throw new IllegalArgumentException("core threadpool size cannot be greater than max");
}
} else {
if (tpGroup.keySet().size() != 1) {
throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet());
} else if (tpGroup.keySet().contains("size") == false) {
throw new IllegalArgumentException("illegal thread_pool config : " + tpGroup.keySet() + " should only have size");
} else {
int size = tpGroup.getAsInt("size", o.getMaximumPoolSize());
if (size < 1) {
throw new IllegalArgumentException(
"illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"
);
}
}
}
}
}

public ThreadPool(
Expand Down

0 comments on commit 07c95a7

Please sign in to comment.