Skip to content

Commit

Permalink
STORM-3407 Storm 2.x ConstraintSolverStrategy backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
dandsager1 committed Jul 1, 2019
1 parent 994a484 commit 26c471a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ topology.component.cpu.pcore.percent: 10.0
topology.worker.max.heap.size.mb: 768.0
topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
topology.ras.constraint.max.state.search: 10_000 # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy

blacklist.scheduler.tolerance.time.secs: 300
blacklist.scheduler.tolerance.count: 3
Expand Down
7 changes: 7 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ public class Config extends HashMap<String, Object> {
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
/**
* The maximum number of states that will be searched looking for a solution in the constraint solver strategy.
* Backward compatibility config value for old topologies
*/
@isInteger
@isPositiveNumber
public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal";
/**
* The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit.
*/
Expand Down
7 changes: 7 additions & 0 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,13 @@ public class DaemonConfig implements Validated {
public static final String RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS =
"resource.aware.scheduler.max.topology.scheduling.attempts";

/*
* The maximum number of states that will be searched looking for a solution in the constraint solver strategy
*/
@isInteger
@isPositiveNumber
public static final String RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH = "resource.aware.scheduler.constraint.max.state.search";

/**
* How often nimbus's background thread to sync code for missing topologies should run.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
Expand All @@ -42,8 +43,6 @@

public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
//hard coded max number of states to search
public static final int MAX_STATE_SEARCH = 100_000;
public static final int DEFAULT_STATE_SEARCH = 10_000;
private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);

//constraints and spreads
Expand Down Expand Up @@ -248,10 +247,22 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
nodes = RAS_Nodes.getAllNodesFrom(cluster);
Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
//set max number of states to search
final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH),
DEFAULT_STATE_SEARCH));

//set max number of states to search maintaining backward compatibility for old topologies
String stormVersionString = td.getTopology().get_storm_version();
boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2");

Object confMaxStateSearch = null;
if (is2xTopology == false) {
//backward compatibility
confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL);
}
if (confMaxStateSearch == null) {
//new topology or old topology using new config
confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH);
}
int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch));

final long maxTimeMs =
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
Expand Down Expand Up @@ -290,15 +301,15 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
}

//early detection/early fail
if (!checkSchedulingFeasibility()) {
if (!checkSchedulingFeasibility(maxStateSearch)) {
//Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
}
return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
.asSchedulingResult();
}

private boolean checkSchedulingFeasibility() {
private boolean checkSchedulingFeasibility(int maxStateSearch) {
for (String comp : spreadComps) {
int numExecs = compToExecs.get(comp).size();
if (numExecs > nodes.size()) {
Expand All @@ -307,9 +318,9 @@ private boolean checkSchedulingFeasibility() {
return false;
}
}
if (execToComp.size() >= MAX_STATE_SEARCH) {
if (execToComp.size() >= maxStateSearch) {
LOG.error("Number of executors is greater than the maximum number of states allowed to be searched. "
+ "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
+ "# of executors: {} Max states to search: {}", execToComp.size(), maxStateSearch);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public Map<String, Object> makeTestTopoConf() {
spread.add("spout-0");

Map<String, Object> config = Utils.readDefaultConfig();
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
Expand Down Expand Up @@ -206,6 +207,7 @@ public void testIntegrationWithRAS() {

Map<String, Object> config = Utils.readDefaultConfig();
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
Expand Down

0 comments on commit 26c471a

Please sign in to comment.