Skip to content

Commit

Permalink
0.4 release with threadly 5.8 upgrade
Browse files Browse the repository at this point in the history
As part of this upgrade we are switching to the new `CentralThreadlyPool` rather than having our own pool for monitoring the cluster.  This will hopefully reduce thread churn for applications using both.
The only down side to this is that we lost our priority semantics.  Before checking a potentially unhealthy server was done as a low priority task (as well as adding in read-slaves on startup).  Adding in read slaves is not so important, but the unhealthy server is a more interesting case.
I went back and improved the code where we are checking potentially unhealthy servers, solving the minor race condition that was there, so if multiple tasks get submitted the impact is lower.  This will alleviate the advantage we had by using low priority tasks there.
  • Loading branch information
jentfoo committed Dec 28, 2017
1 parent bf9ee2f commit 6feca0d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.shared
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {
)

compile (
"org.threadly:threadly:5.4",
"org.threadly:threadly:5.8",
"mysql:mysql-connector-java:6.0.6"
)
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
group = org.threadly
version = 0.3
version = 0.4
org.gradle.daemon = false
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.threadly</groupId>
<artifactId>auroraArc</artifactId>
<version>0.3</version>
<version>0.4</version>
<packaging>jar</packaging>

<name>AuroraArc</name>
Expand Down
44 changes: 18 additions & 26 deletions src/main/java/org/threadly/db/aurora/AuroraClusterMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.threadly.concurrent.ConfigurableThreadFactory;
import org.threadly.concurrent.PriorityScheduler;
import org.threadly.concurrent.PrioritySchedulerService;
import org.threadly.concurrent.CentralThreadlyPool;
import org.threadly.concurrent.ReschedulingOperation;
import org.threadly.concurrent.TaskPriority;
import org.threadly.concurrent.SubmitterScheduler;

/**
* Class which monitors a "cluster" of aurora servers. It is expected that for each given cluster
Expand All @@ -37,19 +35,12 @@ public class AuroraClusterMonitor {
private static final Logger LOG = Logger.getLogger(AuroraClusterMonitor.class.getSimpleName());

protected static final int CHECK_FREQUENCY_MILLIS = 500; // TODO - make configurable
protected static final int MINIMUM_THREAD_POOL_SIZE = 4;
protected static final int MAXIMUM_THREAD_POOL_SIZE = 32;
protected static final PrioritySchedulerService MONITOR_SCHEDULER;
protected static final int MAXIMUM_THREAD_POOL_SIZE = 64;
protected static final SubmitterScheduler MONITOR_SCHEDULER;
protected static final ConcurrentMap<Set<AuroraServer>, AuroraClusterMonitor> MONITORS;

static {
PriorityScheduler ps =
new PriorityScheduler(MINIMUM_THREAD_POOL_SIZE, TaskPriority.High, 1000,
new ConfigurableThreadFactory("auroraMonitor-", false, true,
Thread.NORM_PRIORITY, null, null));
ps.prestartAllThreads();
ps.setPoolSize(MAXIMUM_THREAD_POOL_SIZE);
MONITOR_SCHEDULER = ps;
MONITOR_SCHEDULER = CentralThreadlyPool.threadPool(MAXIMUM_THREAD_POOL_SIZE, "auroraMonitor");

MONITORS = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -79,7 +70,7 @@ public static AuroraClusterMonitor getMonitor(Set<AuroraServer> servers) {
protected final ClusterChecker clusterStateChecker;
private final AtomicLong replicaIndex; // used to distribute replica reads

protected AuroraClusterMonitor(PrioritySchedulerService scheduler, long checkIntervalMillis,
protected AuroraClusterMonitor(SubmitterScheduler scheduler, long checkIntervalMillis,
Set<AuroraServer> clusterServers) {
clusterStateChecker = new ClusterChecker(scheduler, checkIntervalMillis, clusterServers);
replicaIndex = new AtomicLong();
Expand Down Expand Up @@ -133,14 +124,14 @@ public void expediteServerCheck(AuroraServer auroraServer) {
* (and thus will witness the final cluster state).
*/
protected static class ClusterChecker extends ReschedulingOperation {
protected final PrioritySchedulerService scheduler;
protected final SubmitterScheduler scheduler;
protected final Map<AuroraServer, ServerMonitor> allServers;
protected final List<AuroraServer> secondaryServers;
protected final AtomicReference<AuroraServer> masterServer;
protected final List<AuroraServer> serversWaitingExpeditiedCheck;
protected final CopyOnWriteArrayList<AuroraServer> serversWaitingExpeditiedCheck;
private volatile boolean initialized = false; // starts false to avoid updates while constructor is running

protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalMillis,
protected ClusterChecker(SubmitterScheduler scheduler, long checkIntervalMillis,
Set<AuroraServer> clusterServers) {
super(scheduler, 0);

Expand All @@ -164,7 +155,7 @@ protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalM
}
}
} else { // all other checks can be async, adding in as secondary servers as they complete
scheduler.execute(monitor, TaskPriority.Low);
scheduler.execute(monitor);
}

scheduler.scheduleAtFixedRate(monitor,
Expand All @@ -182,7 +173,7 @@ protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalM
}

// used in testing
protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalMillis,
protected ClusterChecker(SubmitterScheduler scheduler, long checkIntervalMillis,
Map<AuroraServer, ServerMonitor> clusterServers) {
super(scheduler, 0);

Expand All @@ -196,13 +187,14 @@ protected ClusterChecker(PrioritySchedulerService scheduler, long checkIntervalM
}

protected void expediteServerCheck(ServerMonitor serverMonitor) {
// check is not exactly thread safe, but will stop the worst of it
if (! serversWaitingExpeditiedCheck.contains(serverMonitor.server)) {
serversWaitingExpeditiedCheck.add(serverMonitor.server);
if (serversWaitingExpeditiedCheck.addIfAbsent(serverMonitor.server)) {
scheduler.execute(() -> {
serversWaitingExpeditiedCheck.remove(serverMonitor.server);
serverMonitor.run();
}, TaskPriority.Low);
try {
serverMonitor.run();
} finally {
serversWaitingExpeditiedCheck.remove(serverMonitor.server);
}
});
}
}

Expand Down

0 comments on commit 6feca0d

Please sign in to comment.