From b80bfc883953cc57cb247d152c94c4af72684822 Mon Sep 17 00:00:00 2001 From: Alexander Dejanovski Date: Mon, 6 Feb 2017 13:54:10 +0100 Subject: [PATCH] Code improvements based on SonarQube analysis (#48) * Code improvements based on SonarQube analysis * Additional fixes --- resource/cassandra-reaper-postgres.yaml | 4 +- .../com/spotify/reaper/ReaperApplication.java | 7 +- .../ReaperApplicationConfiguration.java | 3 +- .../spotify/reaper/cassandra/JmxProxy.java | 138 +++++++++--------- .../com/spotify/reaper/core/RepairRun.java | 19 +++ .../reaper/resources/ClusterResource.java | 8 +- .../spotify/reaper/resources/CommonTools.java | 19 +-- .../reaper/resources/PingResource.java | 2 +- .../reaper/resources/RepairRunResource.java | 33 +++-- .../resources/RepairScheduleResource.java | 28 ++-- .../resources/view/RepairRunStatus.java | 4 +- .../spotify/reaper/service/RepairManager.java | 33 +++-- .../spotify/reaper/service/RepairRunner.java | 15 +- .../reaper/service/SchedulingManager.java | 15 +- .../spotify/reaper/service/SegmentRunner.java | 13 +- .../reaper/storage/CassandraStorage.java | 7 +- .../cassandra/002_fault_tolerant_reaper.cql | 9 ++ .../unit/resources/RepairRunResourceTest.java | 10 +- .../reaper/unit/service/RepairRunnerTest.java | 4 +- 19 files changed, 197 insertions(+), 174 deletions(-) create mode 100644 src/main/resources/db/cassandra/002_fault_tolerant_reaper.cql diff --git a/resource/cassandra-reaper-postgres.yaml b/resource/cassandra-reaper-postgres.yaml index d5bf24d78..7830407ab 100644 --- a/resource/cassandra-reaper-postgres.yaml +++ b/resource/cassandra-reaper-postgres.yaml @@ -47,5 +47,5 @@ database: # PostgreSQL JDBC settings driverClass: org.postgresql.Driver user: postgres - password: postgres - url: jdbc:postgresql://127.0.0.1/reaper \ No newline at end of file + password: + url: jdbc:postgresql://127.0.0.1/reaper diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index c684e042d..426a0aa76 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperApplicationConfiguration.JmxCredentials; @@ -128,7 +129,7 @@ public void run(ReaperApplicationConfiguration config, // read jmx host/port mapping from config and provide to jmx con.factory Map jmxPorts = config.getJmxPorts(); if (jmxPorts != null) { - LOG.debug("using JMX ports mapping: " + jmxPorts); + LOG.debug("using JMX ports mapping: {}", jmxPorts); context.jmxConnectionFactory.setJmxPorts(jmxPorts); } @@ -201,7 +202,7 @@ private IStorage initializeStorage(ReaperApplicationConfiguration config, LOG.error("invalid storageType: {}", config.getStorageType()); throw new ReaperException("invalid storage type: " + config.getStorageType()); } - assert storage.isStorageConnected() : "Failed to connect storage"; + Preconditions.checkState(storage.isStorageConnected(), "Failed to connect storage"); return storage; } @@ -222,7 +223,7 @@ public static void checkRepairParallelismString(String givenRepairParallelism) } catch (java.lang.IllegalArgumentException ex) { throw new ReaperException( "invalid repair parallelism given \"" + givenRepairParallelism - + "\", must be one of: " + Arrays.toString(RepairParallelism.values())); + + "\", must be one of: " + Arrays.toString(RepairParallelism.values()), ex); } } diff --git a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java index c16cbf01e..2aa61ee0e 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java +++ b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java @@ -15,7 +15,6 @@ import java.util.Map; -import javax.validation.Valid; import javax.validation.constraints.DecimalMin; import javax.validation.constraints.Max; import javax.validation.constraints.NotNull; @@ -143,7 +142,7 @@ public String getEnableCrossOrigin() { } public boolean isEnableCrossOrigin() { - return this.enableCrossOrigin != null && this.enableCrossOrigin.equalsIgnoreCase("true"); + return this.enableCrossOrigin != null && ("true").equalsIgnoreCase(this.enableCrossOrigin); } public void setStorageType(String storageType) { diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index f180a8553..a42be8a0a 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -156,11 +156,11 @@ static JmxProxy connect(Optional handler, String host, int // registering a listener throws bunch of exceptions, so we do it here rather than in the // constructor mbeanServerConn.addNotificationListener(ssMbeanName, proxy, null, null); - LOG.debug(String.format("JMX connection to %s properly connected: %s", - host, jmxUrl.toString())); + LOG.debug("JMX connection to {} properly connected: {}", + host, jmxUrl.toString()); return proxy; } catch (IOException | InstanceNotFoundException e) { - LOG.error(String.format("Failed to establish JMX connection to %s:%s", host, port)); + LOG.error("Failed to establish JMX connection to {}:{}", host, port); throw new ReaperException("Failure when establishing JMX connection", e); } } @@ -189,9 +189,9 @@ public Map, List> getRangeToEndpointMap(String keyspace) checkNotNull(ssProxy, "Looks like the proxy is not connected"); try { return ((StorageServiceMBean) ssProxy).getRangeToEndpointMap(keyspace); - } catch (AssertionError e) { + } catch (Exception e) { LOG.error(e.getMessage()); - throw new ReaperException(e.getMessage()); + throw new ReaperException(e.getMessage(), e); } } @@ -243,8 +243,7 @@ public Set getTableNamesForKeyspace(String keyspace) throws ReaperExcept try { proxies = ColumnFamilyStoreMBeanIterator.getColumnFamilyStoreMBeanProxies(mbeanServer); } catch (IOException | MalformedObjectNameException e) { - e.printStackTrace(); - throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX"); + throw new ReaperException("failed to get ColumnFamilyStoreMBean instances from JMX", e); } while (proxies.hasNext()) { Map.Entry proxyEntry = proxies.next(); @@ -267,12 +266,13 @@ public int getPendingCompactions() { int pendingCount = (int) mbeanServer.getAttribute(name, "PendingTasks"); return pendingCount; } catch (IOException ignored) { - LOG.warn("Failed to connect to " + host + " using JMX"); + LOG.warn("Failed to connect to " + host + " using JMX", ignored); } catch (MalformedObjectNameException ignored) { - LOG.error("Internal error, malformed name"); + LOG.error("Internal error, malformed name", ignored); } catch (InstanceNotFoundException e) { // This happens if no repair has yet been run on the node // The AntiEntropySessions object is created on the first repair + LOG.debug("No compaction has run yet on the node. Ignoring exception.", e); return 0; } catch (Exception e) { LOG.error("Error getting attribute from JMX", e); @@ -292,12 +292,13 @@ public boolean isRepairRunning() { long pendingCount = (Long) mbeanServer.getAttribute(name, "PendingTasks"); return activeCount + pendingCount != 0; } catch (IOException ignored) { - LOG.warn("Failed to connect to " + host + " using JMX"); + LOG.warn("Failed to connect to " + host + " using JMX", ignored); } catch (MalformedObjectNameException ignored) { - LOG.error("Internal error, malformed name"); + LOG.error("Internal error, malformed name", ignored); } catch (InstanceNotFoundException e) { // This happens if no repair has yet been run on the node // The AntiEntropySessions object is created on the first repair + LOG.debug("No repair has run yet on the node. Ignoring exception.", e); return false; } catch (Exception e) { LOG.error("Error getting attribute from JMX", e); @@ -337,7 +338,7 @@ public boolean tableExists(String ks, String cf) { } catch (MalformedObjectNameException | IOException e) { String errMsg = String.format("ColumnFamilyStore for %s/%s not found: %s", ks, cf, e.getMessage()); - LOG.warn(errMsg); + LOG.warn(errMsg, e); return false; } return true; @@ -363,7 +364,7 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys try { canUseDatacenterAware = versionCompare(cassandraVersion, "2.0.12") >= 0; } catch (ReaperException e) { - LOG.warn("failed on version comparison, not using dc aware repairs by default"); + LOG.warn("failed on version comparison, not using dc aware repairs by default", e); } String msg = String.format("Triggering repair of range (%s,%s] for keyspace \"%s\" on " + "host %s, with repair parallelism %s, in cluster with Cassandra " @@ -373,48 +374,16 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys repairParallelism, cassandraVersion, canUseDatacenterAware, columnFamilies); LOG.info(msg); - + if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE) && !canUseDatacenterAware) { + LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {}," + + " falling back to SEQUENTIAL repair.", cassandraVersion); + repairParallelism = RepairParallelism.SEQUENTIAL; + } try { - if (!cassandraVersion.startsWith("2.0") && !cassandraVersion.startsWith("1.")) { - if (fullRepair) { - if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) { - if (canUseDatacenterAware) { - return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), - keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet():null, cassandraVersion.startsWith("2.2")?new HashSet():null, fullRepair, - columnFamilies.toArray(new String[columnFamilies.size()])); - } else { - LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {}," - + " falling back to SEQUENTIAL repair.", cassandraVersion); - repairParallelism = RepairParallelism.SEQUENTIAL; - } - } - boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL); - - return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), - keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), - cassandraVersion.startsWith("2.2")?new HashSet():null, cassandraVersion.startsWith("2.2")?new HashSet():null, fullRepair, - columnFamilies.toArray(new String[columnFamilies.size()])); - - } else { - return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - fullRepair, columnFamilies.toArray(new String[columnFamilies.size()])); - } + if (cassandraVersion.startsWith("2.0") || cassandraVersion.startsWith("1.")) { + return triggerRepairPre2dot1(repairParallelism, keyspace, columnFamilies, beginToken, endToken); } else { - // Cassandra 2.0 compatibility - if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) { - if (canUseDatacenterAware) { - return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), - keyspace, repairParallelism.ordinal(), null, null, - columnFamilies.toArray(new String[columnFamilies.size()])); - } else { - LOG.info("Cannot use DATACENTER_AWARE repair policy for Cassandra cluster with version {}," - + " falling back to SEQUENTIAL repair.", cassandraVersion); - repairParallelism = RepairParallelism.SEQUENTIAL; - } - } - boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL); - return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), - keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()])); + return triggerRepairPost2dot1(fullRepair, repairParallelism, keyspace, columnFamilies, beginToken, endToken, cassandraVersion); } } catch (Exception e) { LOG.error("Segment repair failed", e); @@ -422,6 +391,45 @@ public int triggerRepair(BigInteger beginToken, BigInteger endToken, String keys } } + + + public int triggerRepairPost2dot1(boolean fullRepair, RepairParallelism repairParallelism, String keyspace, Collection columnFamilies, BigInteger beginToken, BigInteger endToken, String cassandraVersion) { + if (fullRepair) { + // full repair + if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) { + return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), + keyspace, repairParallelism.ordinal(), cassandraVersion.startsWith("2.2")?new HashSet():null, cassandraVersion.startsWith("2.2")?new HashSet():null, fullRepair, + columnFamilies.toArray(new String[columnFamilies.size()])); + } + + boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL); + + return ((StorageServiceMBean) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), + keyspace, snapshotRepair ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), + cassandraVersion.startsWith("2.2")?new HashSet():null, cassandraVersion.startsWith("2.2")?new HashSet():null, fullRepair, + columnFamilies.toArray(new String[columnFamilies.size()])); + + } + + // incremental repair + return ((StorageServiceMBean) ssProxy).forceRepairAsync(keyspace, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + fullRepair, columnFamilies.toArray(new String[columnFamilies.size()])); + + } + + + public int triggerRepairPre2dot1(RepairParallelism repairParallelism, String keyspace, Collection columnFamilies, BigInteger beginToken, BigInteger endToken) { + // Cassandra 1.2 and 2.0 compatibility + if (repairParallelism.equals(RepairParallelism.DATACENTER_AWARE)) { + return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), + keyspace, repairParallelism.ordinal(), null, null, + columnFamilies.toArray(new String[columnFamilies.size()])); + } + boolean snapshotRepair = repairParallelism.equals(RepairParallelism.SEQUENTIAL); + return ((StorageServiceMBean20) ssProxy).forceRepairRangeAsync(beginToken.toString(), endToken.toString(), + keyspace, snapshotRepair, false, columnFamilies.toArray(new String[columnFamilies.size()])); + + } /** @@ -436,8 +444,8 @@ public void handleNotification(Notification notification, Object handback) { Thread.currentThread().setName(clusterName); // we're interested in "repair" String type = notification.getType(); - LOG.debug("Received notification: {}", notification.toString()); - if (repairStatusHandler.isPresent() && type.equals("repair")) { + LOG.debug("Received notification: {}", notification); + if (repairStatusHandler.isPresent() && ("repair").equals(type)) { int[] data = (int[]) notification.getUserData(); // get the repair sequence number int repairNo = data[0]; @@ -459,7 +467,7 @@ public boolean isConnectionAlive() { String connectionId = getConnectionId(); return null != connectionId && connectionId.length() > 0; } catch (IOException e) { - e.printStackTrace(); + LOG.error("Couldn't get Connection Id", e); } return false; } @@ -469,18 +477,16 @@ public boolean isConnectionAlive() { */ @Override public void close() throws ReaperException { - LOG.debug(String.format("close JMX connection to '%s': %s", host, jmxUrl)); + LOG.debug("close JMX connection to '{}': {}", host, jmxUrl); try { mbeanServer.removeNotificationListener(ssMbeanName, this); } catch (InstanceNotFoundException | ListenerNotFoundException | IOException e) { - LOG.warn("failed on removing notification listener"); - e.printStackTrace(); + LOG.warn("failed on removing notification listener", e); } try { jmxConnector.close(); } catch (IOException e) { - LOG.warn("failed closing a JMX connection"); - e.printStackTrace(); + LOG.warn("failed closing a JMX connection", e); } } @@ -502,10 +508,10 @@ public void close() throws ReaperException { */ public static Integer versionCompare(String str1, String str2) throws ReaperException { try { - str1 = str1.split(" ")[0].replaceAll("[-_~]", "."); - str2 = str2.split(" ")[0].replaceAll("[-_~]", "."); - String[] parts1 = str1.split("\\."); - String[] parts2 = str2.split("\\."); + String cleanedUpStr1 = str1.split(" ")[0].replaceAll("[-_~]", "."); + String cleanedUpStr2 = str2.split(" ")[0].replaceAll("[-_~]", "."); + String[] parts1 = cleanedUpStr1.split("\\."); + String[] parts2 = cleanedUpStr2.split("\\."); int i = 0; // set index to first non-equal ordinal or length of shortest version string while (i < parts1.length && i < parts2.length) { @@ -543,7 +549,7 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce } public void clearSnapshot(String repairId, String keyspaceName) throws ReaperException { - if (repairId == null || repairId.equals("")) { + if (repairId == null || ("").equals(repairId)) { // Passing in null or empty string will clear all snapshots on the host throw new IllegalArgumentException("repairId cannot be null or empty string"); } diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 0baccd1df..39b6007a8 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -13,6 +13,8 @@ */ package com.spotify.reaper.core; +import java.util.Objects; + import org.apache.cassandra.repair.RepairParallelism; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; @@ -133,6 +135,23 @@ public int compareTo(RepairRun other) { return -comparator.compare(startTime, other.startTime); } } + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + if (!(other instanceof RepairRun)) { + return false; + } + RepairRun run = (RepairRun) other; + return this.id == run.id + && this.repairUnitId == run.repairUnitId; + } + + @Override + public int hashCode() { + return Objects.hash(this.id, this.repairUnitId); + } public enum RunState { NOT_STARTED, diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index b17c4b429..522ab8aaa 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -121,10 +121,12 @@ public Response addCluster( try { newCluster = createClusterWithSeedHost(seedHost.get()); } catch (java.lang.SecurityException e) { + LOG.error(e.getMessage(), e); return Response.status(400) .entity("seed host \"" + seedHost.get() + "\" JMX threw security exception: " + e.getMessage()).build(); } catch (ReaperException e) { + LOG.error(e.getMessage(), e); return Response.status(400) .entity("failed to create cluster with seed host: " + seedHost.get()).build(); } @@ -144,8 +146,7 @@ public Response addCluster( createdURI = new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName()).toURI(); } catch (Exception e) { String errMsg = "failed creating target URI for cluster: " + newCluster.getName(); - LOG.error(errMsg); - e.printStackTrace(); + LOG.error(errMsg, e); return Response.status(400).entity(errMsg).build(); } @@ -160,8 +161,7 @@ public Cluster createClusterWithSeedHost(String seedHost) clusterName = jmxProxy.getClusterName(); partitioner = jmxProxy.getPartitioner(); } catch (ReaperException e) { - LOG.error("failed to create cluster with seed host: " + seedHost); - e.printStackTrace(); + LOG.error("failed to create cluster with seed host: {}", seedHost, e); throw e; } return new Cluster(clusterName, partitioner, Collections.singleton(seedHost)); diff --git a/src/main/java/com/spotify/reaper/resources/CommonTools.java b/src/main/java/com/spotify/reaper/resources/CommonTools.java index 008089bc7..be85ace76 100644 --- a/src/main/java/com/spotify/reaper/resources/CommonTools.java +++ b/src/main/java/com/spotify/reaper/resources/CommonTools.java @@ -21,6 +21,7 @@ import com.google.common.base.CharMatcher; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -97,8 +98,8 @@ private static List generateSegments(AppContext context, Cluster targ int segmentCount, Boolean incrementalRepair) throws ReaperException { List segments = null; - assert targetCluster.getPartitioner() != null : - "no partitioner for cluster: " + targetCluster.getName(); + Preconditions.checkState(targetCluster.getPartitioner() != null, + "no partitioner for cluster: " + targetCluster.getName()); SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner()); Set seedHosts = targetCluster.getSeedHosts(); if (seedHosts.isEmpty()) { @@ -113,7 +114,7 @@ private static List generateSegments(AppContext context, Cluster targ segments = sg.generateSegments(segmentCount, tokens, incrementalRepair); break; } catch (ReaperException e) { - LOG.warn("couldn't connect to host: {}, will try next one", host); + LOG.warn("couldn't connect to host: {}, will try next one", host, e); } } @@ -216,7 +217,7 @@ private static Map getClusterNodes(AppContext context, Clust rangeToEndpoint = jmxProxy.getRangeToEndpointMap(repairUnit.getKeyspaceName()); break; } catch (ReaperException e) { - LOG.warn("couldn't connect to host: {}, will try next one", host); + LOG.warn("couldn't connect to host: {}, will try next one", host, e); } } @@ -282,10 +283,10 @@ public static RepairSchedule storeNewRepairSchedule( } private static final boolean aConflictingScheduleAlreadyExists(RepairUnit newRepairUnit, RepairUnit existingRepairUnit){ - return (newRepairUnit.getColumnFamilies().size()==0 && existingRepairUnit.getColumnFamilies().size()==0) - || newRepairUnit.getColumnFamilies().size()==0 && existingRepairUnit.getColumnFamilies().size()!=0 - || newRepairUnit.getColumnFamilies().size()!=0 && existingRepairUnit.getColumnFamilies().size()==0 - || Sets.intersection(existingRepairUnit.getColumnFamilies(),newRepairUnit.getColumnFamilies()).size() >0; + return (newRepairUnit.getColumnFamilies().isEmpty() && existingRepairUnit.getColumnFamilies().isEmpty()) + || newRepairUnit.getColumnFamilies().isEmpty() && !existingRepairUnit.getColumnFamilies().isEmpty() + || !newRepairUnit.getColumnFamilies().isEmpty() && existingRepairUnit.getColumnFamilies().isEmpty() + || !Sets.intersection(existingRepairUnit.getColumnFamilies(),newRepairUnit.getColumnFamilies()).isEmpty(); } @@ -331,7 +332,7 @@ public static RepairUnit getNewOrExistingRepairUnit(AppContext context, Cluster cassandraVersion = Optional.fromNullable(jmxProxy.getCassandraVersion()); break; } catch (ReaperException e) { - LOG.warn("couldn't connect to host: {}, will try next one", host); + LOG.warn("couldn't connect to host: {}, will try next one", host, e); } } if(cassandraVersion.isPresent() && cassandraVersion.get().startsWith("2.0") && incrementalRepair){ diff --git a/src/main/java/com/spotify/reaper/resources/PingResource.java b/src/main/java/com/spotify/reaper/resources/PingResource.java index 025d3bf9b..37cf452fd 100644 --- a/src/main/java/com/spotify/reaper/resources/PingResource.java +++ b/src/main/java/com/spotify/reaper/resources/PingResource.java @@ -30,7 +30,7 @@ public class PingResource { @GET public String answerPing() { LOG.debug("ping called"); - return String.format("Cassandra Reaper ping resource: PONG"); + return "Cassandra Reaper ping resource: PONG"; } } diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 58495f451..1afa08f62 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -44,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.spotify.reaper.AppContext; @@ -109,7 +110,7 @@ public Response addRepairRun( intensity = Double.parseDouble(intensityStr.get()); } else { intensity = context.config.getRepairIntensity(); - LOG.debug("no intensity given, so using default value: " + intensity); + LOG.debug("no intensity given, so using default value: {}", intensity); } Boolean incrementalRepair; @@ -117,7 +118,7 @@ public Response addRepairRun( incrementalRepair = Boolean.parseBoolean(incrementalRepairStr.get()); } else { incrementalRepair = context.config.getIncrementalRepair(); - LOG.debug("no incremental repair given, so using default value: " + incrementalRepair); + LOG.debug("no incremental repair given, so using default value: {}", incrementalRepair); } @@ -143,6 +144,7 @@ public Response addRepairRun( tableNames = CommonTools.getTableNamesBasedOnParam(context, cluster, keyspace.get(), tableNamesParam); } catch (IllegalArgumentException ex) { + LOG.error(ex.getMessage(), ex); return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build(); } @@ -174,8 +176,7 @@ public Response addRepairRun( .entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0)).build(); } catch (ReaperException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); + LOG.error(e.getMessage(), e); return Response.status(500).entity(e.getMessage()).build(); } } @@ -208,6 +209,7 @@ public static Response checkRequestForAddRepair( try { ReaperApplication.checkRepairParallelismString(repairParallelism.get()); } catch (ReaperException ex) { + LOG.error(ex.getMessage(), ex); return Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage()).build(); } } @@ -220,6 +222,7 @@ public static Response checkRequestForAddRepair( + intensityStr.get()).build(); } } catch (NumberFormatException ex) { + LOG.error(ex.getMessage(), ex); return Response.status(Response.Status.BAD_REQUEST).entity( "invalid value for query parameter \"intensity\": " + intensityStr.get()).build(); } @@ -244,13 +247,14 @@ public static Response checkRequestForAddRepair( * * @return OK if all goes well NOT_MODIFIED if new state is the same as the old one, and 501 * (NOT_IMPLEMENTED) if transition is not supported. + * @throws ReaperException */ @PUT @Path("/{id}") public Response modifyRunState( @Context UriInfo uriInfo, @PathParam("id") Long repairRunId, - @QueryParam("state") Optional state) { + @QueryParam("state") Optional state) throws ReaperException { LOG.info("modify repair run state called with: id = {}, state = {}", repairRunId, state); @@ -306,9 +310,7 @@ public Response modifyRunState( return startRun(repairRun.get(), repairUnit.get(), segmentsRepaired); } else if (isPausing(oldState, newState)) { return pauseRun(repairRun.get(), repairUnit.get(), segmentsRepaired); - } else if (isResuming(oldState, newState)) { - return resumeRun(repairRun.get(), repairUnit.get(), segmentsRepaired); - } else if (isRetrying(oldState, newState)) { + } else if (isResuming(oldState, newState) || isRetrying(oldState, newState)) { return resumeRun(repairRun.get(), repairUnit.get(), segmentsRepaired); } else if (isAborting(oldState, newState)) { return abortRun(repairRun.get(), repairUnit.get(), segmentsRepaired); @@ -340,7 +342,7 @@ private boolean isAborting(RepairRun.RunState oldState, RepairRun.RunState newSt return oldState != RepairRun.RunState.ERROR && newState == RepairRun.RunState.ABORTED; } - private Response startRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) { + private Response startRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) throws ReaperException { LOG.info("Starting run {}", repairRun.getId()); RepairRun newRun = context.repairManager.startRepairRun(context, repairRun); return Response.status(Response.Status.OK).entity( @@ -348,19 +350,19 @@ private Response startRun(RepairRun repairRun, RepairUnit repairUnit, int segmen .build(); } - private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) { + private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) throws ReaperException { LOG.info("Pausing run {}", repairRun.getId()); RepairRun newRun = context.repairManager.pauseRepairRun(context, repairRun); return Response.ok().entity(new RepairRunStatus(newRun, repairUnit, segmentsRepaired)).build(); } - private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) { + private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) throws ReaperException { LOG.info("Resuming run {}", repairRun.getId()); RepairRun newRun = context.repairManager.startRepairRun(context, repairRun); return Response.ok().entity(new RepairRunStatus(newRun, repairUnit, segmentsRepaired)).build(); } - private Response abortRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) { + private Response abortRun(RepairRun repairRun, RepairUnit repairUnit, int segmentsRepaired) throws ReaperException { LOG.info("Aborting run {}", repairRun.getId()); RepairRun newRun = context.repairManager.abortRepairRun(context, repairRun); return Response.ok().entity(new RepairRunStatus(newRun, repairUnit, segmentsRepaired)).build(); @@ -402,7 +404,7 @@ public Response getRepairRunsForCluster(@PathParam("cluster_name") String cluste */ private RepairRunStatus getRepairRunStatus(RepairRun repairRun) { Optional repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()); - assert repairUnit.isPresent() : "no repair unit found with id: " + repairRun.getRepairUnitId(); + Preconditions.checkState(repairUnit.isPresent(), "no repair unit found with id: " + repairRun.getRepairUnitId()); int segmentsRepaired = context.storage.getSegmentAmountForRepairRunWithState(repairRun.getId(), RepairSegment.State.DONE); @@ -420,8 +422,7 @@ private URI buildRepairRunURI(UriInfo uriInfo, RepairRun repairRun) { try { runUri = new URL(uriInfo.getBaseUri().toURL(), newRepairRunPathPart).toURI(); } catch (MalformedURLException | URISyntaxException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); + LOG.error(e.getMessage(), e); } checkNotNull(runUri, "failed to build repair run uri"); return runUri; @@ -474,7 +475,7 @@ public Set splitStateParam(Optional state) { try { RepairRun.RunState.valueOf(chunk.toUpperCase()); } catch (IllegalArgumentException e) { - LOG.warn("Listing repair runs called with erroneous states: {}", state.get()); + LOG.warn("Listing repair runs called with erroneous states: {}", state.get(), e); return null; } } diff --git a/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java b/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java index 746d5034a..21b943642 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairScheduleResource.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; @@ -106,21 +107,18 @@ public Response addRepairSchedule( try { nextActivation = DateTime.parse(scheduleTriggerTime.get()); } catch (IllegalArgumentException ex) { - LOG.info("cannot parse data string: " + scheduleTriggerTime.get()); + LOG.info("cannot parse data string: {}", scheduleTriggerTime.get(), ex); return Response.status(Response.Status.BAD_REQUEST).entity( "invalid schedule_trigger_time").build(); } - LOG.info("first schedule activation will be: " - + CommonTools.dateTimeToISO8601(nextActivation)); + LOG.info("first schedule activation will be: {}", CommonTools.dateTimeToISO8601(nextActivation)); } else { nextActivation = DateTime.now().plusDays(1).withTimeAtStartOfDay(); - LOG.info("no schedule_trigger_time given, so setting first scheduling next night: " - + CommonTools.dateTimeToISO8601(nextActivation)); + LOG.info("no schedule_trigger_time given, so setting first scheduling next night: {}", CommonTools.dateTimeToISO8601(nextActivation)); } if (nextActivation.isBeforeNow()) { return Response.status(Response.Status.BAD_REQUEST).entity( - "given schedule_trigger_time is in the past: " - + CommonTools.dateTimeToISO8601(nextActivation)).build(); + "given schedule_trigger_time is in the past: " + CommonTools.dateTimeToISO8601(nextActivation)).build(); } if(!scheduleDaysBetween.isPresent()) { @@ -133,7 +131,7 @@ public Response addRepairSchedule( intensity = Double.parseDouble(intensityStr.get()); } else { intensity = context.config.getRepairIntensity(); - LOG.debug("no intensity given, so using default value: " + intensity); + LOG.debug("no intensity given, so using default value: {}", intensity); } Boolean incrementalRepair; @@ -141,7 +139,7 @@ public Response addRepairSchedule( incrementalRepair = Boolean.parseBoolean(incrementalRepairStr.get()); } else { incrementalRepair = context.config.getIncrementalRepair(); - LOG.debug("no incremental repair given, so using default value: " + incrementalRepair); + LOG.debug("no incremental repair given, so using default value: {}", incrementalRepair); } int segments = context.config.getSegmentCount(); @@ -164,6 +162,7 @@ public Response addRepairSchedule( tableNames = CommonTools.getTableNamesBasedOnParam(context, cluster, keyspace.get(), tableNamesParam); } catch (IllegalArgumentException ex) { + LOG.error(ex.getMessage(), ex); return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build(); } @@ -197,8 +196,7 @@ public Response addRepairSchedule( .entity(new RepairScheduleStatus(newRepairSchedule, theRepairUnit)).build(); } catch (ReaperException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); + LOG.error(e.getMessage(), e); return Response.status(500).entity(e.getMessage()).build(); } } @@ -245,6 +243,7 @@ public Response modifyState( try { newState = RepairSchedule.State.valueOf(state.get().toUpperCase()); } catch (IllegalArgumentException ex) { + LOG.error(ex.getMessage(), ex); return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) .entity("invalid \"state\" argument: " + state.get()).build(); } @@ -325,8 +324,8 @@ public Response getRepairSchedulesForCluster(@PathParam("cluster_name") String c private RepairScheduleStatus getRepairScheduleStatus(RepairSchedule repairSchedule) { Optional repairUnit = context.storage.getRepairUnit(repairSchedule.getRepairUnitId()); - assert repairUnit.isPresent() : "no repair unit found with id: " - + repairSchedule.getRepairUnitId(); + Preconditions.checkState(repairUnit.isPresent(), "no repair unit found with id: " + + repairSchedule.getRepairUnitId()); return new RepairScheduleStatus(repairSchedule, repairUnit.get()); } @@ -341,8 +340,7 @@ private URI buildRepairScheduleURI(UriInfo uriInfo, RepairSchedule repairSchedul try { scheduleUri = new URL(uriInfo.getBaseUri().toURL(), newRepairSchedulePathPart).toURI(); } catch (MalformedURLException | URISyntaxException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); + LOG.error(e.getMessage(), e); } checkNotNull(scheduleUri, "failed to build repair schedule uri"); return scheduleUri; diff --git a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java index 6fb84268b..d11d32e38 100644 --- a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java +++ b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java @@ -130,9 +130,7 @@ public RepairRunStatus(long runId, String clusterName, String keyspaceName, estimatedTimeOfArrival = null; } else { if (state == RepairRun.RunState.ERROR || state == RepairRun.RunState.DELETED || - state == RepairRun.RunState.ABORTED ) { - estimatedTimeOfArrival = null; - } else if (segmentsRepaired == 0) { + state == RepairRun.RunState.ABORTED || segmentsRepaired == 0) { estimatedTimeOfArrival = null; } else { diff --git a/src/main/java/com/spotify/reaper/service/RepairManager.java b/src/main/java/com/spotify/reaper/service/RepairManager.java index 87ef18242..c32af3f3f 100644 --- a/src/main/java/com/spotify/reaper/service/RepairManager.java +++ b/src/main/java/com/spotify/reaper/service/RepairManager.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -51,8 +52,9 @@ public void initializeThreadPool(int threadAmount, long repairTimeout, * Consult storage to see if any repairs are running, and resume those repair runs. * * @param context Reaper's application context. + * @throws ReaperException */ - public void resumeRunningRepairRuns(AppContext context) { + public void resumeRunningRepairRuns(AppContext context) throws ReaperException { Collection running = context.storage.getRepairRunsWithState(RepairRun.RunState.RUNNING); for (RepairRun repairRun : running) { @@ -64,7 +66,7 @@ public void resumeRunningRepairRuns(AppContext context) { SegmentRunner.abort(context, segment, jmxProxy); } catch (ReaperException e) { LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down" - + " (so abortion won't be needed)", segment.getId()); + + " (so abortion won't be needed)", segment.getId(), e); SegmentRunner.postpone(context, segment, context.storage.getRepairUnit(repairRun.getId())); } } @@ -77,7 +79,7 @@ public void resumeRunningRepairRuns(AppContext context) { } } - public RepairRun startRepairRun(AppContext context, RepairRun runToBeStarted) { + public RepairRun startRepairRun(AppContext context, RepairRun runToBeStarted) throws ReaperException { assert null != executor : "you need to initialize the thread pool first"; long runId = runToBeStarted.getId(); LOG.info("Starting a run with id #{} with current state '{}'", @@ -89,7 +91,7 @@ public RepairRun startRepairRun(AppContext context, RepairRun runToBeStarted) { .startTime(DateTime.now()) .build(runToBeStarted.getId()); if (!context.storage.updateRepairRun(updatedRun)) { - throw new RuntimeException("failed updating repair run " + updatedRun.getId()); + throw new ReaperException("failed updating repair run " + updatedRun.getId()); } startRunner(context, runId); return updatedRun; @@ -100,14 +102,14 @@ public RepairRun startRepairRun(AppContext context, RepairRun runToBeStarted) { .pauseTime(null) .build(runToBeStarted.getId()); if (!context.storage.updateRepairRun(updatedRun)) { - throw new RuntimeException("failed updating repair run " + updatedRun.getId()); + throw new ReaperException("failed updating repair run " + updatedRun.getId()); } return updatedRun; } case RUNNING: - assert !repairRunners.containsKey(runId) : - "trying to re-trigger run that is already running, with id " + runId; - LOG.info("re-trigger a running run after restart, with id " + runId); + Preconditions.checkState(!repairRunners.containsKey(runId), + "trying to re-trigger run that is already running, with id " + runId); + LOG.info("re-trigger a running run after restart, with id {}", runId); startRunner(context, runId); return runToBeStarted; case ERROR: { @@ -116,13 +118,13 @@ public RepairRun startRepairRun(AppContext context, RepairRun runToBeStarted) { .endTime(null) .build(runToBeStarted.getId()); if (!context.storage.updateRepairRun(updatedRun)) { - throw new RuntimeException("failed updating repair run " + updatedRun.getId()); + throw new ReaperException("failed updating repair run " + updatedRun.getId()); } startRunner(context, runId); return updatedRun; } default: - throw new RuntimeException("cannot start run with state: " + runToBeStarted.getRunState()); + throw new ReaperException("cannot start run with state: " + runToBeStarted.getRunState()); } } @@ -134,8 +136,7 @@ private void startRunner(AppContext context, long runId) { repairRunners.put(runId, newRunner); executor.submit(newRunner); } catch (ReaperException e) { - e.printStackTrace(); - LOG.warn("Failed to schedule repair for repair run #{}", runId); + LOG.warn("Failed to schedule repair for repair run #{}", runId, e); } } else { LOG.error( @@ -144,24 +145,24 @@ private void startRunner(AppContext context, long runId) { } } - public RepairRun pauseRepairRun(AppContext context, RepairRun runToBePaused) { + public RepairRun pauseRepairRun(AppContext context, RepairRun runToBePaused) throws ReaperException { RepairRun updatedRun = runToBePaused.with() .runState(RepairRun.RunState.PAUSED) .pauseTime(DateTime.now()) .build(runToBePaused.getId()); if (!context.storage.updateRepairRun(updatedRun)) { - throw new RuntimeException("failed updating repair run " + updatedRun.getId()); + throw new ReaperException("failed updating repair run " + updatedRun.getId()); } return updatedRun; } - public RepairRun abortRepairRun(AppContext context, RepairRun runToBeAborted) { + public RepairRun abortRepairRun(AppContext context, RepairRun runToBeAborted) throws ReaperException { RepairRun updatedRun = runToBeAborted.with() .runState(RepairRun.RunState.ABORTED) .pauseTime(DateTime.now()) .build(runToBeAborted.getId()); if (!context.storage.updateRepairRun(updatedRun)) { - throw new RuntimeException("failed updating repair run " + updatedRun.getId()); + throw new ReaperException("failed updating repair run " + updatedRun.getId()); } return updatedRun; } diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 7c658905a..5c3dd9f96 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -145,8 +145,8 @@ public void run() { if ((!repairRun.isPresent() || repairRun.get().getRunState().isTerminated()) && context.repairManager.repairRunners.containsKey(repairRunId)) { // this might happen if a run is deleted while paused etc. - LOG.warn("RepairRun \"" + repairRunId + "\" does not exist. Killing " - + "RepairRunner for this run instance."); + LOG.warn("RepairRun \"{}\" does not exist. Killing " + + "RepairRunner for this run instance.", repairRunId); killAndCleanupRunner(); return; } @@ -164,9 +164,7 @@ public void run() { break; } } catch (RuntimeException | ReaperException e) { - LOG.error("RepairRun FAILURE, scheduling retry"); - LOG.error(e.toString()); - LOG.error(Arrays.toString(e.getStackTrace())); + LOG.error("RepairRun FAILURE, scheduling retry", e); context.repairManager.scheduleRetry(this); } // Adding this here to catch a deadlock @@ -312,8 +310,7 @@ private boolean repairSegment(final int rangeIndex, final long segmentId, RingRa try { confirmJMXConnectionIsOpen(); } catch (ReaperException e) { - e.printStackTrace(); - LOG.warn("Failed to reestablish JMX connection in runner {}, retrying", repairRunId); + LOG.warn("Failed to reestablish JMX connection in runner {}, retrying", repairRunId, e); currentlyRunningSegments.set(rangeIndex, -1); return true; } @@ -336,7 +333,7 @@ private boolean repairSegment(final int rangeIndex, final long segmentId, RingRa context.storage.updateRepairRun(repairRun .with() .runState(RepairRun.RunState.ERROR) - .lastEvent(String.format("No coordinators for range %s", tokenRange.toString())) + .lastEvent(String.format("No coordinators for range %s", tokenRange)) .endTime(DateTime.now()) .build(repairRunId)); killAndCleanupRunner(); @@ -363,7 +360,7 @@ public void onSuccess(Object ignored) { @Override public void onFailure(Throwable t) { currentlyRunningSegments.set(rangeIndex, -1); - LOG.error("Executing SegmentRunner failed: " + t.getMessage()); + LOG.error("Executing SegmentRunner failed: {}", t.getMessage()); } }); diff --git a/src/main/java/com/spotify/reaper/service/SchedulingManager.java b/src/main/java/com/spotify/reaper/service/SchedulingManager.java index 17de80e87..a2197d58d 100644 --- a/src/main/java/com/spotify/reaper/service/SchedulingManager.java +++ b/src/main/java/com/spotify/reaper/service/SchedulingManager.java @@ -30,7 +30,7 @@ public static void start(AppContext context) { LOG.info("Starting new SchedulingManager instance"); schedulingManager = new SchedulingManager(context); Timer timer = new Timer("SchedulingManagerTimer"); - timer.schedule(schedulingManager, 1000, 1000 * 60); + timer.schedule(schedulingManager, 1000l, 1000l * 60); } else { LOG.warn("there is already one instance of SchedulingManager running, not starting new one"); } @@ -42,7 +42,7 @@ public static RepairSchedule pauseRepairSchedule(AppContext context, RepairSched .pauseTime(DateTime.now()) .build(schedule.getId()); if (!context.storage.updateRepairSchedule(updatedSchedule)) { - LOG.error("failed updating repair schedule " + updatedSchedule.getId()); + LOG.error("failed updating repair schedule {}", updatedSchedule.getId()); } return updatedSchedule; } @@ -53,7 +53,7 @@ public static RepairSchedule resumeRepairSchedule(AppContext context, RepairSche .pauseTime(null) .build(schedule.getId()); if (!context.storage.updateRepairSchedule(updatedSchedule)) { - LOG.error("failed updating repair schedule " + updatedSchedule.getId()); + LOG.error("failed updating repair schedule {}", updatedSchedule.getId()); } return updatedSchedule; } @@ -110,7 +110,7 @@ private boolean manageSchedule(RepairSchedule schedule) { Optional fetchedUnit = context.storage.getRepairUnit(schedule.getRepairUnitId()); if (!fetchedUnit.isPresent()) { - LOG.warn("RepairUnit with id " + schedule.getRepairUnitId() + " not found"); + LOG.warn("RepairUnit with id {} not found", schedule.getRepairUnitId()); return false; } repairUnit = fetchedUnit.get(); @@ -138,8 +138,7 @@ private boolean manageSchedule(RepairSchedule schedule) { .build(schedule.getId())); return true; } catch (ReaperException e) { - LOG.error(e.getMessage()); - e.printStackTrace(); + LOG.error(e.getMessage(), e); skipScheduling(schedule); return false; } @@ -148,9 +147,7 @@ private boolean manageSchedule(RepairSchedule schedule) { return false; } } else { - if (nextActivatedSchedule == null) { - nextActivatedSchedule = schedule; - } else if (nextActivatedSchedule.getNextActivation().isAfter(schedule.getNextActivation())) { + if (nextActivatedSchedule == null || nextActivatedSchedule.getNextActivation().isAfter(schedule.getNextActivation())) { nextActivatedSchedule = schedule; } } diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index c83a95a08..834f5c899 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -147,7 +147,7 @@ private void runRepair() { .connectAny(Optional.of(this), potentialCoordinators)) { if (segmentRunners.containsKey(segmentId)) { - LOG.error("SegmentRunner already exists for segment with ID: " + segmentId); + LOG.error("SegmentRunner already exists for segment with ID: {}", segmentId); throw new ReaperException("SegmentRunner already exists for segment with ID: " + segmentId); } segmentRunners.put(segmentId, this); @@ -208,7 +208,7 @@ protected Set initialize() { try { condition.await(timeoutMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId); + LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId, e); } finally { RepairSegment resultingSegment = context.storage.getRepairSegment(segmentId).get(); LOG.info("Repair command {} on segment {} returned with state {}", commandId, segmentId, @@ -228,8 +228,7 @@ protected Set initialize() { } } } catch (ReaperException e) { - LOG.warn("Failed to connect to a coordinator node for segment {}", segmentId); - LOG.warn(e.getMessage()); + LOG.warn("Failed to connect to a coordinator node for segment {}", segmentId, e); String msg = "Postponed a segment because no coordinator was reachable"; repairRunner.updateLastEvent(msg); postponeCurrentSegment(); @@ -246,7 +245,7 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, if(repairHasSegmentRunning(segment.getRunId())) { LOG.info("SegmentRunner declined to repair segment {} because only one segment is allowed " + "at once for incremental repairs", segmentId); - String msg = String.format("Postponed due to already running segment"); + String msg = "Postponed due to already running segment"; repairRunner.updateLastEvent(msg); return false; } @@ -292,7 +291,7 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, } catch (ReaperException e) { if(!context.config.getAllowUnreachableNodes()) { LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) could " - + "not be connected with", segmentId, hostName); + + "not be connected with", segmentId, hostName, e); String msg = String.format("Postponed due to inability to connect host %s", hostName); repairRunner.updateLastEvent(msg); return false; @@ -417,7 +416,7 @@ protected void tryClearSnapshots(String message) { jmx.clearSnapshot(repairId, keyspace); } catch (ReaperException e) { LOG.warn("Failed to clear snapshot after failed session for host {}, keyspace {}: {}", - involvedNode, keyspace, e.getMessage()); + involvedNode, keyspace, e.getMessage(), e); } } } diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index aa242c194..e5e88865c 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -149,7 +149,7 @@ public boolean addCluster(Cluster cluster) { try { session.execute(insertClusterPrepStmt.bind(cluster.getName(), cluster.getPartitioner(), cluster.getSeedHosts())); } catch (Exception e) { - LOG.warn("failed inserting cluster with name: {}", cluster.getName()); + LOG.warn("failed inserting cluster with name: {}", cluster.getName(), e); return false; } return true; @@ -220,7 +220,6 @@ public Optional getRepairRun(long id) { @Override public Collection getRepairRunsForCluster(String clusterName) { - Collection repairRuns = Lists.newArrayList(); List repairRunFutures = Lists.newArrayList(); // Grab all ids for the given cluster name @@ -230,9 +229,7 @@ public Collection getRepairRunsForCluster(String clusterName) { repairRunFutures.add(session.executeAsync(getRepairRunPrepStmt.bind(repairRunId))); } - repairRuns = getRepairRunsAsync(repairRunFutures); - - return repairRuns; + return getRepairRunsAsync(repairRunFutures); } @Override diff --git a/src/main/resources/db/cassandra/002_fault_tolerant_reaper.cql b/src/main/resources/db/cassandra/002_fault_tolerant_reaper.cql new file mode 100644 index 000000000..fcabb2c8c --- /dev/null +++ b/src/main/resources/db/cassandra/002_fault_tolerant_reaper.cql @@ -0,0 +1,9 @@ +-- +-- Upgrade for fault tolerance addons + +CREATE TABLE IF NOT EXISTS segment_leader ( + segment_id bigint PRIMARY KEY, + reaper_instance_id uuid, + reaper_instance_host text, + last_heartbeat timestamp +) WITH default_time_to_live = 180; diff --git a/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java b/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java index e3fefb2d4..aa02fcf75 100644 --- a/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java +++ b/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java @@ -172,7 +172,7 @@ public void testAddRepairRun() throws Exception { } @Test - public void testTriggerNotExistingRun() { + public void testTriggerNotExistingRun() throws ReaperException { RepairRunResource resource = new RepairRunResource(context); Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); Response response = resource.modifyRunState(uriInfo, 42l, newState); @@ -182,7 +182,7 @@ public void testTriggerNotExistingRun() { } @Test - public void testTriggerAlreadyRunningRun() throws InterruptedException { + public void testTriggerAlreadyRunningRun() throws InterruptedException, ReaperException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); context.repairManager.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); @@ -200,7 +200,7 @@ public void testTriggerAlreadyRunningRun() throws InterruptedException { } @Test - public void testTriggerNewRunAlreadyRunningRun() throws InterruptedException { + public void testTriggerNewRunAlreadyRunningRun() throws InterruptedException, ReaperException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); context.repairManager.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); @@ -261,7 +261,7 @@ public void testTriggerRunMissingArgument() { } @Test - public void testPauseNotRunningRun() throws InterruptedException { + public void testPauseNotRunningRun() throws InterruptedException, ReaperException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); context.repairManager.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); @@ -285,7 +285,7 @@ public void testPauseNotRunningRun() throws InterruptedException { } @Test - public void testPauseNotExistingRun() throws InterruptedException { + public void testPauseNotExistingRun() throws InterruptedException, ReaperException { RepairRunResource resource = new RepairRunResource(context); Response response = resource.modifyRunState(uriInfo, 42l, Optional.of(RepairRun.RunState.PAUSED.toString())); diff --git a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java index 0789aa5ae..8f1e67128 100644 --- a/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java @@ -71,7 +71,7 @@ public void setUp() throws Exception { } @Test - public void testHangingRepair() throws InterruptedException { + public void testHangingRepair() throws InterruptedException, ReaperException { final String CLUSTER_NAME = "reaper"; final String KS_NAME = "reaper"; final Set CF_NAMES = Sets.newHashSet("reaper"); @@ -177,7 +177,7 @@ public void run() { } @Test - public void testResumeRepair() throws InterruptedException { + public void testResumeRepair() throws InterruptedException, ReaperException { final String CLUSTER_NAME = "reaper"; final String KS_NAME = "reaper"; final Set CF_NAMES = Sets.newHashSet("reaper");