Skip to content

Commit

Permalink
Code improvements based on SonarQube analysis (#48)
Browse files Browse the repository at this point in the history
* Code improvements based on SonarQube analysis

* Additional fixes
  • Loading branch information
adejanovski authored Feb 6, 2017
1 parent 61f14f0 commit b80bfc8
Show file tree
Hide file tree
Showing 19 changed files with 197 additions and 174 deletions.
4 changes: 2 additions & 2 deletions resource/cassandra-reaper-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ database:
# PostgreSQL JDBC settings
driverClass: org.postgresql.Driver
user: postgres
password: postgres
url: jdbc:postgresql://127.0.0.1/reaper
password:
url: jdbc:postgresql://127.0.0.1/reaper
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/ReaperApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperApplicationConfiguration;
import com.spotify.reaper.ReaperApplicationConfiguration.JmxCredentials;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void run(ReaperApplicationConfiguration config,
// read jmx host/port mapping from config and provide to jmx con.factory
Map<String, Integer> jmxPorts = config.getJmxPorts();
if (jmxPorts != null) {
LOG.debug("using JMX ports mapping: " + jmxPorts);
LOG.debug("using JMX ports mapping: {}", jmxPorts);
context.jmxConnectionFactory.setJmxPorts(jmxPorts);
}

Expand Down Expand Up @@ -201,7 +202,7 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config,
LOG.error("invalid storageType: {}", config.getStorageType());
throw new ReaperException("invalid storage type: " + config.getStorageType());
}
assert storage.isStorageConnected() : "Failed to connect storage";
Preconditions.checkState(storage.isStorageConnected(), "Failed to connect storage");
return storage;
}

Expand All @@ -222,7 +223,7 @@ public static void checkRepairParallelismString(String givenRepairParallelism)
} catch (java.lang.IllegalArgumentException ex) {
throw new ReaperException(
"invalid repair parallelism given \"" + givenRepairParallelism
+ "\", must be one of: " + Arrays.toString(RepairParallelism.values()));
+ "\", must be one of: " + Arrays.toString(RepairParallelism.values()), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Map;

import javax.validation.Valid;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -143,7 +142,7 @@ public String getEnableCrossOrigin() {
}

public boolean isEnableCrossOrigin() {
return this.enableCrossOrigin != null && this.enableCrossOrigin.equalsIgnoreCase("true");
return this.enableCrossOrigin != null && ("true").equalsIgnoreCase(this.enableCrossOrigin);
}

public void setStorageType(String storageType) {
Expand Down
138 changes: 72 additions & 66 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ static JmxProxy connect(Optional<RepairStatusHandler> handler, String host, int
// registering a listener throws bunch of exceptions, so we do it here rather than in the
// constructor
mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null);
LOG.debug(String.format("JMX connection to %s properly connected: %s",
host, jmxUrl.toString()));
LOG.debug("JMX connection to {} properly connected: {}",
host, jmxUrl.toString());
return proxy;
} catch (IOException | InstanceNotFoundException e) {
LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port));
LOG.error("Failed to establish JMX connection to {}:{}", host, port);
throw new ReaperException("Failure when establishing JMX connection", e);
}
}
Expand Down Expand Up @@ -189,9 +189,9 @@ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
checkNotNull(ssProxy, "Looks like the proxy is not connected");
try {
return ((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace);
} catch (AssertionError e) {
} catch (Exception e) {
LOG.error(e.getMessage());
throw new ReaperException(e.getMessage());
throw new ReaperException(e.getMessage(), e);
}
}

Expand Down Expand Up @@ -243,8 +243,7 @@ public Set<String> getTableNamesForKeyspace(String keyspace) throws ReaperExcept
try {
proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer);
} catch (IOException | MalformedObjectNameException e) {
e.printStackTrace();
throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX");
throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX", e);
}
while (proxies.hasNext()) {
Map.Entry<String, ColumnFamilyStoreMBean> proxyEntry = proxies.next();
Expand All @@ -267,12 +266,13 @@ public int getPendingCompactions() {
int pendingCount = (int) mbeanServer.getAttribute(name, "PendingTasks");
return pendingCount;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX");
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name");
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
LOG.debug("No compaction has run yet on the node. Ignoring exception.", e);
return 0;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
Expand All @@ -292,12 +292,13 @@ public boolean isRepairRunning() {
long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks");
return activeCount + pendingCount != 0;
} catch (IOException ignored) {
LOG.warn("Failed to connect to " + host + " using JMX");
LOG.warn("Failed to connect to " + host + " using JMX", ignored);
} catch (MalformedObjectNameException ignored) {
LOG.error("Internal error, malformed name");
LOG.error("Internal error, malformed name", ignored);
} catch (InstanceNotFoundException e) {
// This happens if no repair has yet been run on the node
// The AntiEntropySessions object is created on the first repair
LOG.debug("No repair has run yet on the node. Ignoring exception.", e);
return false;
} catch (Exception e) {
LOG.error("Error getting attribute from JMX", e);
Expand Down Expand Up @@ -337,7 +338,7 @@ public boolean tableExists(String ks, String cf) {
} catch (MalformedObjectNameException | IOException e) {
String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf,
e.getMessage());
LOG.warn(errMsg);
LOG.warn(errMsg, e);
return false;
}
return true;
Expand All @@ -363,7 +364,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
try {
canUseDatacenterAware = versionCompare(cassandraVersion, "2.0.12") >= 0;
} catch (ReaperException e) {
LOG.warn("failed on version comparison, not using dc aware repairs by default");
LOG.warn("failed on version comparison, not using dc aware repairs by default", e);
}
String msg = String.format("Triggering repair of range (%s,%s] for keyspace \"%s\" on "
+ "host %s, with repair parallelism %s, in cluster with Cassandra "
Expand All @@ -373,55 +374,62 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys
repairParallelism, cassandraVersion, canUseDatacenterAware,
columnFamilies);
LOG.info(msg);

if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE) && !canUseDatacenterAware) {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
try {
if (!cassandraVersion.startsWith("2.0") && !cassandraVersion.startsWith("1.")) {
if (fullRepair) {
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
if (canUseDatacenterAware) {
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));
} else {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);

return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));

} else {
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));
}
if (cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")) {
return triggerRepairPre2dot1(repairParallelism, keyspace, columnFamilies, beginToken, endToken);
} else {
// Cassandra 2.0 compatibility
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
if (canUseDatacenterAware) {
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), null, null,
columnFamilies.toArray(new String[columnFamilies.size()]));
} else {
LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {},"
+ " falling back to SEQUENTIAL repair.", cassandraVersion);
repairParallelism = RepairParallelism.SEQUENTIAL;
}
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));
return triggerRepairPost2dot1(fullRepair, repairParallelism, keyspace, columnFamilies, beginToken, endToken, cassandraVersion);
}
} catch (Exception e) {
LOG.error("Segment repair failed", e);
throw new ReaperException(e);
}

}


public int triggerRepairPost2dot1(boolean fullRepair, RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken, String cassandraVersion) {
if (fullRepair) {
// full repair
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));
}

boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);

return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
cassandraVersion.startsWith("2.2")?new HashSet<String>():null, cassandraVersion.startsWith("2.2")?new HashSet<String>():null, fullRepair,
columnFamilies.toArray(new String[columnFamilies.size()]));

}

// incremental repair
return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
fullRepair, columnFamilies.toArray(new String[columnFamilies.size()]));

}


public int triggerRepairPre2dot1(RepairParallelism repairParallelism, String keyspace, Collection<String> columnFamilies, BigInteger beginToken, BigInteger endToken) {
// Cassandra 1.2 and 2.0 compatibility
if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) {
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, repairParallelism.ordinal(), null, null,
columnFamilies.toArray(new String[columnFamilies.size()]));
}
boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL);
return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(),
keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()]));

}


/**
Expand All @@ -436,8 +444,8 @@ public void handleNotification(Notification notification, Object handback) {
Thread.currentThread().setName(clusterName);
// we're interested in "repair"
String type = notification.getType();
LOG.debug("Received notification: {}", notification.toString());
if (repairStatusHandler.isPresent() && type.equals("repair")) {
LOG.debug("Received notification: {}", notification);
if (repairStatusHandler.isPresent() && ("repair").equals(type)) {
int[] data = (int[]) notification.getUserData();
// get the repair sequence number
int repairNo = data[0];
Expand All @@ -459,7 +467,7 @@ public boolean isConnectionAlive() {
String connectionId = getConnectionId();
return null != connectionId && connectionId.length() > 0;
} catch (IOException e) {
e.printStackTrace();
LOG.error("Couldn't get Connection Id", e);
}
return false;
}
Expand All @@ -469,18 +477,16 @@ public boolean isConnectionAlive() {
*/
@Override
public void close() throws ReaperException {
LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl));
LOG.debug("close JMX connection to '{}': {}", host, jmxUrl);
try {
mbeanServer.removeNotificationListener(ssMbeanName, this);
} catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) {
LOG.warn("failed on removing notification listener");
e.printStackTrace();
LOG.warn("failed on removing notification listener", e);
}
try {
jmxConnector.close();
} catch (IOException e) {
LOG.warn("failed closing a JMX connection");
e.printStackTrace();
LOG.warn("failed closing a JMX connection", e);
}
}

Expand All @@ -502,10 +508,10 @@ public void close() throws ReaperException {
*/
public static Integer versionCompare(String str1, String str2) throws ReaperException {
try {
str1 = str1.split(" ")[0].replaceAll("[-_~]", ".");
str2 = str2.split(" ")[0].replaceAll("[-_~]", ".");
String[] parts1 = str1.split("\\.");
String[] parts2 = str2.split("\\.");
String cleanedUpStr1 = str1.split(" ")[0].replaceAll("[-_~]", ".");
String cleanedUpStr2 = str2.split(" ")[0].replaceAll("[-_~]", ".");
String[] parts1 = cleanedUpStr1.split("\\.");
String[] parts2 = cleanedUpStr2.split("\\.");
int i = 0;
// set index to first non-equal ordinal or length of shortest version string
while (i < parts1.length && i < parts2.length) {
Expand Down Expand Up @@ -543,7 +549,7 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce
}

public void clearSnapshot(String repairId, String keyspaceName) throws ReaperException {
if (repairId == null || repairId.equals("")) {
if (repairId == null || ("").equals(repairId)) {
// Passing in null or empty string will clear all snapshots on the host
throw new IllegalArgumentException("repairId cannot be null or empty string");
}
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.spotify.reaper.core;

import java.util.Objects;

import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
Expand Down Expand Up @@ -133,6 +135,23 @@ public int compareTo(RepairRun other) {
return -comparator.compare(startTime, other.startTime);
}
}

@Override
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof RepairRun)) {
return false;
}
RepairRun run = (RepairRun) other;
return this.id == run.id
&& this.repairUnitId == run.repairUnitId;
}

@Override
public int hashCode() {
return Objects.hash(this.id, this.repairUnitId);
}

public enum RunState {
NOT_STARTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ public Response addCluster(
try {
newCluster = createClusterWithSeedHost(seedHost.get());
} catch (java.lang.SecurityException e) {
LOG.error(e.getMessage(), e);
return Response.status(400)
.entity("seed host \"" + seedHost.get() + "\" JMX threw security exception: "
+ e.getMessage()).build();
} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
return Response.status(400)
.entity("failed to create cluster with seed host: " + seedHost.get()).build();
}
Expand All @@ -144,8 +146,7 @@ public Response addCluster(
createdURI = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI();
} catch (Exception e) {
String errMsg = "failed creating target URI for cluster: " + newCluster.getName();
LOG.error(errMsg);
e.printStackTrace();
LOG.error(errMsg, e);
return Response.status(400).entity(errMsg).build();
}

Expand All @@ -160,8 +161,7 @@ public Cluster createClusterWithSeedHost(String seedHost)
clusterName = jmxProxy.getClusterName();
partitioner = jmxProxy.getPartitioner();
} catch (ReaperException e) {
LOG.error("failed to create cluster with seed host: " + seedHost);
e.printStackTrace();
LOG.error("failed to create cluster with seed host: {}", seedHost, e);
throw e;
}
return new Cluster(clusterName, partitioner, Collections.singleton(seedHost));
Expand Down
Loading

0 comments on commit b80bfc8

Please sign in to comment.