repairUnit = context.storage.getRepairUnitDao().getRepairUnit(params);
if (repairUnit.isPresent()) {
return repairUnit;
@@ -211,7 +224,12 @@ boolean identicalUnits(Cluster cluster, RepairUnit unit, RepairUnit.Builder buil
// if incremental repair is not the same, the units are not identical
if (unit.getIncrementalRepair() != builder.incrementalRepair.booleanValue()) {
- // incremental reapir is not the same
+ // incremental repair is not the same
+ return false;
+ }
+
+ if (unit.getSubrangeIncrementalRepair() != builder.subrangeIncrementalRepair.booleanValue()) {
+ // subrange incremental repair is not the same
return false;
}
diff --git a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java
index f22e765e4..68bf551e8 100644
--- a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java
+++ b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java
@@ -22,6 +22,7 @@
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Cluster;
import io.cassandrareaper.core.RepairSegment;
+import io.cassandrareaper.core.RepairType;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.management.ClusterFacade;
import io.cassandrareaper.management.ICassandraManagementProxy;
@@ -121,7 +122,9 @@ private SegmentRunner(
this.repairUnit = repairUnit;
this.repairRunner = repairRunner;
this.segmentFailed = new AtomicBoolean(false);
- this.leaderElectionId = repairUnit.getIncrementalRepair() ? repairRunner.getRepairRunId() : segmentId;
+ this.leaderElectionId = repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()
+ ? repairRunner.getRepairRunId()
+ : segmentId;
this.tablesToRepair = tablesToRepair;
}
@@ -159,7 +162,9 @@ static void postponeSegment(AppContext context, RepairSegment segment) {
= segment
.reset()
// set coordinator host to null only for full repairs
- .withCoordinatorHost(unit.getIncrementalRepair() ? segment.getCoordinatorHost() : null)
+ .withCoordinatorHost(unit.getIncrementalRepair() && !unit.getSubrangeIncrementalRepair()
+ ? segment.getCoordinatorHost()
+ : null)
.withFailCount(segment.getFailCount() + 1)
.withId(segment.getId())
.build();
@@ -174,7 +179,9 @@ private static void postpone(AppContext context, RepairSegment segment, RepairUn
segment
.reset()
// set coordinator host to null only for full repairs
- .withCoordinatorHost(repairUnit.getIncrementalRepair() ? segment.getCoordinatorHost() : null)
+ .withCoordinatorHost(repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()
+ ? segment.getCoordinatorHost()
+ : null)
.withFailCount(segment.getFailCount() + 1)
.withId(segment.getId())
.build());
@@ -298,7 +305,8 @@ private boolean runRepair() {
Cluster cluster = context.storage.getClusterDao().getCluster(clusterName);
ICassandraManagementProxy coordinator = clusterFacade.connect(cluster, potentialCoordinators);
String keyspace = repairUnit.getKeyspaceName();
- boolean fullRepair = !repairUnit.getIncrementalRepair();
+ RepairType repairType = repairUnit.getSubrangeIncrementalRepair() ? RepairType.SUBRANGE_INCREMENTAL
+ : repairUnit.getIncrementalRepair() ? RepairType.INCREMENTAL : RepairType.SUBRANGE_FULL;
try (Timer.Context cxt1 = context.metricRegistry.timer(metricNameForRepairing(segment)).time()) {
try {
@@ -321,7 +329,7 @@ private boolean runRepair() {
keyspace,
validationParallelism,
tablesToRepair,
- fullRepair,
+ repairType,
repairUnit.getDatacenters(),
this,
segment.getTokenRange().getTokenRanges(),
@@ -371,7 +379,7 @@ private void processTriggeredSegment(final RepairSegment segment, final ICassand
// Timeout is extended for each attempt to prevent repairs from blocking if settings aren't accurate
int attempt = segment.getFailCount() + 1;
- long segmentTimeout = repairUnit.getIncrementalRepair()
+ long segmentTimeout = repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()
? timeoutMillis * MAX_TIMEOUT_EXTENSIONS * attempt
: timeoutMillis * attempt;
LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId, segmentTimeout);
@@ -856,7 +864,7 @@ private boolean takeLead(RepairSegment segment) {
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "takeLead")).time()) {
boolean result = false;
- if (repairUnit.getIncrementalRepair()) {
+ if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {
result = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).takeLead(leaderElectionId)
: true;
@@ -877,7 +885,7 @@ private boolean renewLead(RepairSegment segment) {
try (Timer.Context cx
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "renewLead")).time()) {
- if (repairUnit.getIncrementalRepair()) {
+ if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {
boolean result = context.storage instanceof IDistributedStorage
? ((IDistributedStorage) context.storage).renewLead(leaderElectionId)
: true;
@@ -905,7 +913,7 @@ private void releaseLead(RepairSegment segment) {
try (Timer.Context cx
= context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) {
if (context.storage instanceof IDistributedStorage) {
- if (repairUnit.getIncrementalRepair()) {
+ if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) {
((IDistributedStorage) context.storage).releaseLead(leaderElectionId);
} else {
((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(),
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/cassandra/CassandraStorageFacade.java b/src/server/src/main/java/io/cassandrareaper/storage/cassandra/CassandraStorageFacade.java
index 7d2906865..13fb1c89e 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/cassandra/CassandraStorageFacade.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/cassandra/CassandraStorageFacade.java
@@ -454,7 +454,7 @@ public enum CassandraMode {
*
* Writes keep retrying forever.
*/
- private static class RetryPolicyImpl implements RetryPolicy {
+ private static final class RetryPolicyImpl implements RetryPolicy {
@Override
public RetryDecision onReadTimeout(
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/metrics/CassandraMetricsDao.java b/src/server/src/main/java/io/cassandrareaper/storage/metrics/CassandraMetricsDao.java
index 73da3fa50..6f81174a1 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/metrics/CassandraMetricsDao.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/metrics/CassandraMetricsDao.java
@@ -89,7 +89,8 @@ void prepareMetricStatements() {
storePercentRepairedForSchedulePrepStmt = session
.prepare(
"INSERT INTO percent_repaired_by_schedule"
- + " (cluster_name, repair_schedule_id, time_bucket, node, keyspace_name, table_name, percent_repaired, ts)"
+ + " (cluster_name, repair_schedule_id, time_bucket, node, keyspace_name,"
+ + " table_name, percent_repaired, ts)"
+ " values(?, ?, ?, ?, ?, ?, ?, ?)"
);
@@ -207,7 +208,6 @@ public void purgeMetrics() {
@Override
public List getPercentRepairedMetrics(String clusterName, UUID repairScheduleId, Long since) {
- List metrics = Lists.newArrayList();
List futures = Lists.newArrayList();
List timeBuckets = Lists.newArrayList();
long now = DateTime.now().getMillis();
@@ -229,6 +229,7 @@ public List getPercentRepairedMetrics(String clusterName,
timeBucket)));
}
+ List metrics = Lists.newArrayList();
long maxTimeBucket = 0;
for (ResultSetFuture future : futures) {
for (Row row : future.getUninterruptibly()) {
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/repairrun/CassandraRepairRunDao.java b/src/server/src/main/java/io/cassandrareaper/storage/repairrun/CassandraRepairRunDao.java
index f1671d78d..fdd025c25 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/repairrun/CassandraRepairRunDao.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/repairrun/CassandraRepairRunDao.java
@@ -215,7 +215,6 @@ public RepairRun addRepairRun(
nbRanges = 0;
}
}
- assert cassRepairUnitDao.getRepairUnit(newRepairRun.getRepairUnitId()).getIncrementalRepair() == isIncremental;
futures.add(this.session.executeAsync(repairRunBatch));
futures.add(
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/repairrun/MemoryRepairRunDao.java b/src/server/src/main/java/io/cassandrareaper/storage/repairrun/MemoryRepairRunDao.java
index 0f1dbffdb..ee3ccd366 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/repairrun/MemoryRepairRunDao.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/repairrun/MemoryRepairRunDao.java
@@ -84,6 +84,7 @@ public Collection getClusterRunStatuses(String clusterName, int
run.getPauseTime(),
run.getIntensity(),
unit.getIncrementalRepair(),
+ unit.getSubrangeIncrementalRepair(),
run.getRepairParallelism(),
unit.getNodes(),
unit.getDatacenters(),
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/repairunit/CassandraRepairUnitDao.java b/src/server/src/main/java/io/cassandrareaper/storage/repairunit/CassandraRepairUnitDao.java
index f9a1eff2c..8ee41e81a 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/repairunit/CassandraRepairUnitDao.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/repairunit/CassandraRepairUnitDao.java
@@ -64,8 +64,9 @@ private void prepareStatements() {
insertRepairUnitPrepStmt = session
.prepare(
"INSERT INTO repair_unit_v1(id, cluster_name, keyspace_name, column_families, "
- + "incremental_repair, nodes, \"datacenters\", blacklisted_tables, repair_thread_count, timeout) "
- + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ + "incremental_repair, subrange_incremental, nodes, \"datacenters\", blacklisted_tables,"
+ + "repair_thread_count, timeout) "
+ + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.QUORUM);
getRepairUnitPrepStmt = session
.prepare("SELECT * FROM repair_unit_v1 WHERE id = ?")
@@ -91,6 +92,7 @@ public void updateRepairUnit(RepairUnit updatedRepairUnit) {
updatedRepairUnit.getKeyspaceName(),
updatedRepairUnit.getColumnFamilies(),
updatedRepairUnit.getIncrementalRepair(),
+ updatedRepairUnit.getSubrangeIncrementalRepair(),
updatedRepairUnit.getNodes(),
updatedRepairUnit.getDatacenters(),
updatedRepairUnit.getBlacklistedTables(),
@@ -106,6 +108,7 @@ private RepairUnit getRepairUnitImpl(UUID id) {
.keyspaceName(repairUnitRow.getString("keyspace_name"))
.columnFamilies(repairUnitRow.getSet("column_families", String.class))
.incrementalRepair(repairUnitRow.getBool("incremental_repair"))
+ .subrangeIncrementalRepair(repairUnitRow.getBool("subrange_incremental"))
.nodes(repairUnitRow.getSet("nodes", String.class))
.datacenters(repairUnitRow.getSet("datacenters", String.class))
.blacklistedTables(repairUnitRow.getSet("blacklisted_tables", String.class))
@@ -134,6 +137,7 @@ public Optional getRepairUnit(RepairUnit.Builder params) {
.keyspaceName(repairUnitRow.getString("keyspace_name"))
.columnFamilies(repairUnitRow.getSet("column_families", String.class))
.incrementalRepair(repairUnitRow.getBool("incremental_repair"))
+ .subrangeIncrementalRepair(repairUnitRow.getBool("subrange_incremental"))
.nodes(repairUnitRow.getSet("nodes", String.class))
.datacenters(repairUnitRow.getSet("datacenters", String.class))
.blacklistedTables(repairUnitRow.getSet("blacklisted_tables", String.class))
diff --git a/src/server/src/main/java/io/cassandrareaper/storage/repairunit/MemoryRepairUnitDao.java b/src/server/src/main/java/io/cassandrareaper/storage/repairunit/MemoryRepairUnitDao.java
index 16a9964e1..d845b03ad 100644
--- a/src/server/src/main/java/io/cassandrareaper/storage/repairunit/MemoryRepairUnitDao.java
+++ b/src/server/src/main/java/io/cassandrareaper/storage/repairunit/MemoryRepairUnitDao.java
@@ -43,7 +43,8 @@ public MemoryRepairUnitDao(MemoryStorageFacade storage) {
@Override
public RepairUnit addRepairUnit(RepairUnit.Builder repairUnitBuilder) {
Optional existing = getRepairUnit(repairUnitBuilder);
- if (existing.isPresent() && repairUnitBuilder.incrementalRepair == existing.get().getIncrementalRepair()) {
+ if (existing.isPresent() && repairUnitBuilder.incrementalRepair == existing.get().getIncrementalRepair()
+ && repairUnitBuilder.subrangeIncrementalRepair == existing.get().getSubrangeIncrementalRepair()) {
return existing.get();
} else {
RepairUnit newRepairUnit = repairUnitBuilder.build(UUIDs.timeBased());
diff --git a/src/server/src/main/resources/db/astra/009_subrange_incremental.cql b/src/server/src/main/resources/db/astra/009_subrange_incremental.cql
new file mode 100644
index 000000000..49d40eb13
--- /dev/null
+++ b/src/server/src/main/resources/db/astra/009_subrange_incremental.cql
@@ -0,0 +1,18 @@
+--
+-- Copyright 2024-2024 Datastax inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- Add a new column to the repair_unit_v1 table to enable subrange incremental repairs
+
+ALTER TABLE repair_unit_v1 ADD subrange_incremental boolean;
diff --git a/src/server/src/main/resources/db/cassandra/033_subrange_incremental.cql b/src/server/src/main/resources/db/cassandra/033_subrange_incremental.cql
new file mode 100644
index 000000000..49d40eb13
--- /dev/null
+++ b/src/server/src/main/resources/db/cassandra/033_subrange_incremental.cql
@@ -0,0 +1,18 @@
+--
+-- Copyright 2024-2024 Datastax inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- Add a new column to the repair_unit_v1 table to enable subrange incremental repairs
+
+ALTER TABLE repair_unit_v1 ADD subrange_incremental boolean;
diff --git a/src/server/src/test/java/io/cassandrareaper/ReaperApplicationConfigurationTest.java b/src/server/src/test/java/io/cassandrareaper/ReaperApplicationConfigurationTest.java
index 4645e4b0c..2eb079bdf 100644
--- a/src/server/src/test/java/io/cassandrareaper/ReaperApplicationConfigurationTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/ReaperApplicationConfigurationTest.java
@@ -51,6 +51,7 @@ public void setUp() {
config.setScheduleDaysBetween(7);
config.setStorageType("foo");
config.setIncrementalRepair(false);
+ config.setSubrangeIncrementalRepair(false);
config.setBlacklistTwcsTables(true);
}
diff --git a/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java b/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java
index 3a2eea87f..211f77d75 100644
--- a/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java
+++ b/src/server/src/test/java/io/cassandrareaper/SimpleReaperClient.java
@@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
@@ -48,7 +47,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
index cb40d02a5..d1cf856b8 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java
@@ -1336,6 +1336,27 @@ public void a_new_incremental_repair_is_added_for_the_last_added_cluster_and_key
}
}
+ @When("^a new subrange incremental repair is added for the last added cluster and keyspace \"([^\"]*)\"$")
+ public void a_new_subrange_incremental_repair_is_added_for_the_last_added_cluster_and_keyspace(String keyspace)
+ throws Throwable {
+ synchronized (BasicSteps.class) {
+ ReaperTestJettyRunner runner = RUNNERS.get(RAND.nextInt(RUNNERS.size()));
+ Map params = Maps.newHashMap();
+ params.put("clusterName", TestContext.TEST_CLUSTER);
+ params.put("keyspace", keyspace);
+ params.put("owner", TestContext.TEST_USER);
+ params.put("incrementalRepair", Boolean.FALSE.toString());
+ params.put("subrangeIncrementalRepair", Boolean.TRUE.toString());
+ Response response = runner.callReaper("POST", "/repair_run", Optional.of(params));
+ String responseData = response.readEntity(String.class);
+ assertEquals(responseData, Response.Status.CREATED.getStatusCode(), response.getStatus());
+ Assertions.assertThat(responseData).isNotBlank();
+ RepairRunStatus run = SimpleReaperClient.parseRepairRunStatusJSON(responseData);
+ Assertions.assertThat(run.getTotalSegments()).isGreaterThan(3);
+ testContext.addCurrentRepairId(run.getId());
+ }
+ }
+
@Then("^reaper has (\\d+) repairs for cluster called \"([^\"]*)\"$")
public void reaper_has_repairs_for_cluster_called(int expected, String clusterName) throws Throwable {
synchronized (BasicSteps.class) {
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraEachIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraEachIT.java
index dc7519e94..f4598ef7d 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraEachIT.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraEachIT.java
@@ -45,10 +45,10 @@ public class ReaperCassandraEachIT {
private static final Logger LOG = LoggerFactory.getLogger(ReaperCassandraSidecarIT.class);
private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>();
private static final String[] CASS_CONFIG_FILE
- = {
- "reaper-cassandra-each1-at.yaml",
- "reaper-cassandra-each2-at.yaml"
- };
+ = {
+ "reaper-cassandra-each1-at.yaml",
+ "reaper-cassandra-each2-at.yaml"
+ };
protected ReaperCassandraEachIT() {}
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraIT.java
index 0baaea5bd..d6be6a992 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraIT.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraIT.java
@@ -25,7 +25,6 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
-
import cucumber.api.CucumberOptions;
import cucumber.api.junit.Cucumber;
import org.junit.AfterClass;
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraSidecarIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraSidecarIT.java
index 834ec36db..59c6865d8 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraSidecarIT.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperCassandraSidecarIT.java
@@ -46,12 +46,12 @@ public class ReaperCassandraSidecarIT {
private static final Logger LOG = LoggerFactory.getLogger(ReaperCassandraSidecarIT.class);
private static final List RUNNER_INSTANCES = new CopyOnWriteArrayList<>();
private static final String[] CASS_CONFIG_FILE
- = {
- "reaper-cassandra-sidecar1-at.yaml",
- "reaper-cassandra-sidecar2-at.yaml",
- "reaper-cassandra-sidecar3-at.yaml",
- "reaper-cassandra-sidecar4-at.yaml",
- };
+ = {
+ "reaper-cassandra-sidecar1-at.yaml",
+ "reaper-cassandra-sidecar2-at.yaml",
+ "reaper-cassandra-sidecar3-at.yaml",
+ "reaper-cassandra-sidecar4-at.yaml",
+ };
protected ReaperCassandraSidecarIT() {}
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperHttpIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperHttpIT.java
index cf6069766..c7a8e3ef7 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperHttpIT.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperHttpIT.java
@@ -25,7 +25,6 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
-
import cucumber.api.CucumberOptions;
import cucumber.api.junit.Cucumber;
import org.junit.AfterClass;
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperTestJettyRunner.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperTestJettyRunner.java
index 56ebcffd0..78e232f70 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperTestJettyRunner.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperTestJettyRunner.java
@@ -27,11 +27,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-
import javax.ws.rs.core.Response;
import com.google.common.collect.Sets;
-
import io.dropwizard.testing.ConfigOverride;
import io.dropwizard.testing.DropwizardTestSupport;
import io.dropwizard.testing.ResourceHelpers;
diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/TestContext.java b/src/server/src/test/java/io/cassandrareaper/acceptance/TestContext.java
index 8f15db987..6979b1d08 100644
--- a/src/server/src/test/java/io/cassandrareaper/acceptance/TestContext.java
+++ b/src/server/src/test/java/io/cassandrareaper/acceptance/TestContext.java
@@ -27,7 +27,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java
index 8be9b697e..19f1ea7a7 100644
--- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpCassandraManagementProxyTest.java
@@ -22,6 +22,7 @@
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.GenericMetric;
import io.cassandrareaper.core.Node;
+import io.cassandrareaper.core.RepairType;
import io.cassandrareaper.core.Snapshot;
import io.cassandrareaper.core.Table;
import io.cassandrareaper.management.RepairStatusHandler;
@@ -155,7 +156,12 @@ public void testRepairProcessMapHandlers() throws Exception {
int repairNo = httpCassandraManagementProxy.triggerRepair("ks",
RepairParallelism.PARALLEL,
- Collections.singleton("table"), true, Collections.emptyList(), repairStatusHandler, Collections.emptyList(), 1);
+ Collections.singleton("table"),
+ RepairType.SUBRANGE_FULL,
+ Collections.emptyList(),
+ repairStatusHandler,
+ Collections.emptyList(),
+ 1);
assertEquals(123456789, repairNo);
assertEquals(1, httpCassandraManagementProxy.jobTracker.size());
@@ -192,7 +198,7 @@ public void testNotificationsTracker() throws Exception {
int repairNo = httpCassandraManagementProxy.triggerRepair("ks",
RepairParallelism.PARALLEL,
- Collections.singleton("table"), true, Collections.emptyList(), workAroundHandler,
+ Collections.singleton("table"), RepairType.SUBRANGE_FULL, Collections.emptyList(), workAroundHandler,
Collections.emptyList(), 1);
verify(mockClient).putRepairV2(eq(
diff --git a/src/server/src/test/java/io/cassandrareaper/management/http/HttpMetricsProxyTest.java b/src/server/src/test/java/io/cassandrareaper/management/http/HttpMetricsProxyTest.java
index ba224fd21..3a4f52573 100644
--- a/src/server/src/test/java/io/cassandrareaper/management/http/HttpMetricsProxyTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/management/http/HttpMetricsProxyTest.java
@@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-
import javax.management.JMException;
import com.datastax.mgmtapi.client.api.DefaultApi;
@@ -38,7 +37,6 @@
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
-
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
@@ -261,12 +259,29 @@ public void testParseThreadPoolMetric_withoutMatch() {
@SuppressWarnings("checkstyle:LineLength")
@Test
public void testCollectTpStats() throws IOException, JMException {
- String responseBodyStr = "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"TPC\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"ValidationExecutor\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"AntiCompactionExecutor\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"AntiEntropyStage\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"TPC\",} 0.0\n"
- + "org_apache_cassandra_metrics_keyspace_view_read_time_solr_admin_bucket{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",le=\"3379391\",} 0.0\n";
+ String responseBodyStr = "org_apache_cassandra_metrics_thread_pools_pending_tasks{"
+ + "host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\","
+ + "datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\","
+ + "pool_type=\"internal\",pool_name=\"TPC\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"ValidationExecutor\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"AntiCompactionExecutor\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"AntiEntropyStage\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"TPC\",} 0.0\n"
+ + "org_apache_cassandra_metrics_keyspace_view_read_time_solr_admin_bucket{"
+ + "host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\","
+ + "rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",le=\"3379391\",} 0.0\n";
OkHttpClient httpClient = Mockito.mock(OkHttpClient.class);
Call call = Mockito.mock(Call.class);
@@ -304,13 +319,34 @@ public void testCollectTpStats() throws IOException, JMException {
@SuppressWarnings("checkstyle:LineLength")
@Test
public void testCollectTpPendingTasks() throws JMException, IOException, ReaperException {
- String responseBodyStr = "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"TPC\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"ValidationExecutor\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"AntiCompactionExecutor\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"AntiEntropyStage\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"TPC\",} 0.0\n"
- + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\",pool_name=\"CompactionExecutor\",} 0.0\n"
- + "org_apache_cassandra_metrics_keyspace_view_read_time_solr_admin_bucket{host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",le=\"3379391\",} 0.0\n";
+ String responseBodyStr = "org_apache_cassandra_metrics_thread_pools_pending_tasks{"
+ + "host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\","
+ + "rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"TPC\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"ValidationExecutor\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"AntiCompactionExecutor\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"AntiEntropyStage\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"TPC\",} 0.0\n"
+ + "org_apache_cassandra_metrics_thread_pools_pending_tasks{host=\"eae3481e-f803-49b5-8516-7ff271104db5\","
+ + "instance=\"172.18.0.3\",cluster=\"test\",datacenter=\"dc1\",rack=\"default\","
+ + "pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\",pool_type=\"internal\","
+ + "pool_name=\"CompactionExecutor\",} 0.0\n"
+ + "org_apache_cassandra_metrics_keyspace_view_read_time_solr_admin_bucket{"
+ + "host=\"eae3481e-f803-49b5-8516-7ff271104db5\",instance=\"172.18.0.3\",cluster=\"test\","
+ + "datacenter=\"dc1\",rack=\"default\",pod_name=\"test-dc1-default-sts-0\",node_name=\"mc-0-worker3\","
+ + "le=\"3379391\",} 0.0\n";
OkHttpClient httpClient = Mockito.mock(OkHttpClient.class);
Call call = Mockito.mock(Call.class);
diff --git a/src/server/src/test/java/io/cassandrareaper/resources/RepairRunResourceTest.java b/src/server/src/test/java/io/cassandrareaper/resources/RepairRunResourceTest.java
index e6d99bf3f..af9a5ddae 100644
--- a/src/server/src/test/java/io/cassandrareaper/resources/RepairRunResourceTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/resources/RepairRunResourceTest.java
@@ -71,7 +71,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
@@ -159,7 +158,7 @@ public void setUp() throws Exception {
anyString(),
any(RepairParallelism.class),
anyCollection(),
- anyBoolean(),
+ any(),
anyCollection(),
any(),
any(),
@@ -186,6 +185,7 @@ public void setUp() throws Exception {
.keyspaceName(keyspace)
.columnFamilies(TABLES)
.incrementalRepair(INCREMENTAL)
+ .subrangeIncrementalRepair(INCREMENTAL)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -238,6 +238,7 @@ private Response addRepairRun(
Optional.of(REPAIR_PARALLELISM.name()),
Optional.empty(),
Optional.empty(),
+ Optional.empty(),
nodes.isEmpty() ? Optional.empty() : Optional.of(StringUtils.join(nodes, ',')),
Optional.empty(),
blacklistedTables.isEmpty() ? Optional.empty() : Optional.of(StringUtils.join(blacklistedTables, ',')),
@@ -562,6 +563,7 @@ private Response addRepairRunWithForceParam(
Optional.of(REPAIR_PARALLELISM.name()),
Optional.empty(),
Optional.empty(),
+ Optional.empty(),
nodes.isEmpty() ? Optional.empty() : Optional.of(StringUtils.join(nodes, ',')),
Optional.empty(),
blacklistedTables.isEmpty() ? Optional.empty() : Optional.of(StringUtils.join(blacklistedTables, ',')),
@@ -593,4 +595,53 @@ public void testAddRunMalformedForceParam() {
assertTrue(response.getEntity() instanceof String);
assertEquals("invalid query parameter \"force\", expecting [True,False]", response.getEntity());
}
+
+ @Test
+ public void testGetSegments() {
+ RepairRunResource repairRunResource = new RepairRunResource(context, context.storage.getRepairRunDao());
+ // subrange incremental
+ int segments = repairRunResource.getSegments(Optional.of(10), true, true);
+ assertEquals(10, segments);
+ // non-subrange incremental
+ segments = repairRunResource.getSegments(Optional.of(10), false, true);
+ assertEquals(-1, segments);
+ // non-incremental
+ segments = repairRunResource.getSegments(Optional.of(10), false, false);
+ assertEquals(10, segments);
+ }
+
+ @Test
+ public void testGetIncrementalRepair() {
+ AppContext ctx = mock(AppContext.class);
+ ctx.config = mock(ReaperApplicationConfiguration.class);
+ ctx.storage = new MemoryStorageFacade();
+ RepairRunResource repairRunResource = new RepairRunResource(ctx, ctx.storage.getRepairRunDao());
+
+ boolean incrementalRepair = repairRunResource.getIncrementalRepair(Optional.of("false"));
+ assertFalse(incrementalRepair);
+ incrementalRepair = repairRunResource.getIncrementalRepair(Optional.of("true"));
+ assertTrue(incrementalRepair);
+ incrementalRepair = repairRunResource.getIncrementalRepair(Optional.empty());
+ assertFalse(incrementalRepair);
+ }
+
+ @Test
+ public void testGetSubrangeIncrementalRepair() {
+ AppContext ctx = mock(AppContext.class);
+ ctx.config = mock(ReaperApplicationConfiguration.class);
+ ctx.storage = new MemoryStorageFacade();
+ RepairRunResource repairRunResource = new RepairRunResource(ctx, ctx.storage.getRepairRunDao());
+ boolean subrangeIncremental = repairRunResource.getSubrangeIncrementalRepair(Optional.of("false"));
+ assertFalse(subrangeIncremental);
+ }
+
+ @Test
+ public void testGetIntensity() {
+ AppContext ctx = mock(AppContext.class);
+ ctx.config = mock(ReaperApplicationConfiguration.class);
+ ctx.storage = new MemoryStorageFacade();
+ RepairRunResource repairRunResource = new RepairRunResource(ctx, ctx.storage.getRepairRunDao());
+ Double intensity = repairRunResource.getIntensity(Optional.of("0.8"));
+ assertEquals(0.8f, intensity.floatValue(), 0.0f);
+ }
}
\ No newline at end of file
diff --git a/src/server/src/test/java/io/cassandrareaper/resources/RepairScheduleResourceTest.java b/src/server/src/test/java/io/cassandrareaper/resources/RepairScheduleResourceTest.java
index 11e385030..a4e14ff1b 100644
--- a/src/server/src/test/java/io/cassandrareaper/resources/RepairScheduleResourceTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/resources/RepairScheduleResourceTest.java
@@ -92,6 +92,7 @@ private static MockObjects initInMemoryMocks(URI uri) {
RepairUnit.Builder mockRepairUnitBuilder = RepairUnit.builder()
.incrementalRepair(false)
+ .subrangeIncrementalRepair(false)
.repairThreadCount(1)
.clusterName("cluster-test")
.keyspaceName("keyspace-test")
diff --git a/src/server/src/test/java/io/cassandrareaper/resources/view/RepairRunStatusTest.java b/src/server/src/test/java/io/cassandrareaper/resources/view/RepairRunStatusTest.java
index e43a41a7a..8c358b521 100644
--- a/src/server/src/test/java/io/cassandrareaper/resources/view/RepairRunStatusTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/resources/view/RepairRunStatusTest.java
@@ -73,6 +73,7 @@ public void testRunningRepairDuration() {
null, // pauseTime
0.9, // intensity
false, // incremental
+ false, // subrange incremental
RepairParallelism.PARALLEL, // repairParellelism
Collections.EMPTY_LIST, // nodes
Collections.EMPTY_LIST, // datacenters
@@ -104,6 +105,7 @@ public void testFinishedRepairDuration() {
null, // pauseTime
0.9, // intensity
false, // incremental
+ false, // subrange incremental
RepairParallelism.PARALLEL, // repairParellelism
Collections.EMPTY_LIST, // nodes
Collections.EMPTY_LIST, // datacenters
@@ -135,6 +137,7 @@ public void testPausedRepairDuration() {
new DateTime().now().minusMinutes(1), // pauseTime
0.9, // intensity
false, // incremental
+ false, // subrange incremental
RepairParallelism.PARALLEL, // repairParellelism
Collections.EMPTY_LIST, // nodes
Collections.EMPTY_LIST, // datacenters
@@ -166,6 +169,7 @@ public void testAbortedRepairDuration() {
new DateTime().now().minusMinutes(1), // pauseTime
0.9, // intensity
false, // incremental
+ false, // subrange incremental
RepairParallelism.PARALLEL, // repairParellelism
Collections.EMPTY_LIST, // nodes
Collections.EMPTY_LIST, // datacenters
diff --git a/src/server/src/test/java/io/cassandrareaper/resources/view/RepairScheduleStatusTest.java b/src/server/src/test/java/io/cassandrareaper/resources/view/RepairScheduleStatusTest.java
index 43f3adf17..c98e94675 100644
--- a/src/server/src/test/java/io/cassandrareaper/resources/view/RepairScheduleStatusTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/resources/view/RepairScheduleStatusTest.java
@@ -45,6 +45,7 @@ public void testJacksonJSONParsing() throws Exception {
data.setId(UUIDs.timeBased());
data.setIntensity(0.75);
data.setIncrementalRepair(false);
+ data.setSubrangeIncrementalRepair(false);
data.setKeyspaceName("testKeyspace");
data.setOwner("testuser");
data.setRepairParallelism(RepairParallelism.PARALLEL);
@@ -63,6 +64,7 @@ public void testJacksonJSONParsing() throws Exception {
assertEquals(data.getId(), dataAfter.getId());
assertEquals(data.getIntensity(), dataAfter.getIntensity(), 0.0);
assertEquals(data.getIncrementalRepair(), dataAfter.getIncrementalRepair());
+ assertEquals(data.getSubrangeIncrementalRepair(), dataAfter.getSubrangeIncrementalRepair());
assertEquals(data.getKeyspaceName(), dataAfter.getKeyspaceName());
assertEquals(data.getRepairParallelism(), dataAfter.getRepairParallelism());
assertEquals(data.getState(), dataAfter.getState());
diff --git a/src/server/src/test/java/io/cassandrareaper/service/ClusterRepairSchedulerTest.java b/src/server/src/test/java/io/cassandrareaper/service/ClusterRepairSchedulerTest.java
index 778a004c4..e4aede3cc 100644
--- a/src/server/src/test/java/io/cassandrareaper/service/ClusterRepairSchedulerTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/service/ClusterRepairSchedulerTest.java
@@ -271,6 +271,7 @@ private RepairUnit.Builder oneRepair(Cluster cluster, String keyspace) {
.clusterName(cluster.getName())
.keyspaceName(keyspace)
.incrementalRepair(Boolean.FALSE)
+ .subrangeIncrementalRepair(Boolean.FALSE)
.repairThreadCount(1)
.timeout(30);
}
diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java
index 0c06d7ced..412191e9b 100644
--- a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java
@@ -82,6 +82,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final double intensity = 0.5f;
@@ -96,6 +97,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -167,6 +169,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final double intensity = 0.5f;
@@ -187,7 +190,6 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru
AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
-
RepairManager repairManager = RepairManager.create(
context,
Executors.newScheduledThreadPool(1),
@@ -195,15 +197,14 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru
TimeUnit.MILLISECONDS,
1,
context.storage.getRepairRunDao());
-
repairManager = Mockito.spy(repairManager);
context.repairManager = repairManager;
-
final RepairUnit cf = RepairUnit.builder()
.clusterName(clusterName)
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -260,6 +261,7 @@ public void doNotAbortRunningSegmentWithRepairRunnerAndNoDistributedStorage()
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final double intensity = 0.5f;
@@ -279,6 +281,7 @@ public void doNotAbortRunningSegmentWithRepairRunnerAndNoDistributedStorage()
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -348,6 +351,7 @@ public void abortRunningSegmentWithNoRepairRunnerAndNoDistributedStorage()
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final double intensity = 0.5f;
@@ -382,6 +386,7 @@ public void abortRunningSegmentWithNoRepairRunnerAndNoDistributedStorage()
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -451,6 +456,7 @@ public void updateRepairRunIntensityTest() throws ReaperException, InterruptedEx
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final int repairThreadCount = 1;
@@ -461,6 +467,7 @@ public void updateRepairRunIntensityTest() throws ReaperException, InterruptedEx
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -568,6 +575,7 @@ private RepairUnit createRepairUnit(String clusterName) {
final String ksName = "reaper";
final Set cfNames = Sets.newHashSet("reaper");
final boolean incrementalRepair = false;
+ final boolean subrangeIncremental = false;
final Set nodes = Sets.newHashSet("127.0.0.1");
final Set datacenters = Collections.emptySet();
final int repairThreadCount = 1;
@@ -578,6 +586,7 @@ private RepairUnit createRepairUnit(String clusterName) {
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(subrangeIncremental)
.nodes(nodes)
.datacenters(datacenters)
.repairThreadCount(repairThreadCount)
@@ -603,14 +612,14 @@ private RepairRun createRepairRun(
return run;
}
- private static class NotEmptyList implements ArgumentMatcher> {
+ private static final class NotEmptyList implements ArgumentMatcher> {
@Override
public boolean matches(Collection segments) {
return !segments.isEmpty();
}
}
- private static class EmptyList implements ArgumentMatcher> {
+ private static final class EmptyList implements ArgumentMatcher> {
@Override
public boolean matches(Collection segments) {
return segments.isEmpty();
diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java
index 20af6113f..5b8d720e4 100644
--- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunServiceTest.java
@@ -259,6 +259,7 @@ public void generateSegmentsFail1Test() throws ReaperException, UnknownHostExcep
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -305,6 +306,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
.keyspaceName("test")
.blacklistedTables(Sets.newHashSet("table1"))
.incrementalRepair(false)
+ .subrangeIncrementalRepair(false)
.repairThreadCount(4)
.timeout(segmentTimeout)
.build(UUIDs.timeBased());
@@ -345,6 +347,7 @@ public void generateSegmentsTest() throws ReaperException, UnknownHostException
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -392,6 +395,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
.keyspaceName("test")
.blacklistedTables(Sets.newHashSet("table1"))
.incrementalRepair(false)
+ .subrangeIncrementalRepair(false)
.repairThreadCount(4)
.timeout(segmentTimeout)
.build(UUIDs.timeBased());
@@ -406,6 +410,7 @@ public void failRepairRunCreationTest() throws ReaperException, UnknownHostExcep
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Collections.emptySet();
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -450,6 +455,7 @@ public void failRepairRunCreationTest() throws ReaperException, UnknownHostExcep
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -465,6 +471,7 @@ public void failIncrRepairRunCreationTest() throws ReaperException, UnknownHostE
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Collections.emptySet();
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -515,6 +522,7 @@ public void failIncrRepairRunCreationTest() throws ReaperException, UnknownHostE
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -531,6 +539,7 @@ public void getDCsByNodeForRepairSegmentPath1Test() throws ReaperException, Unkn
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Sets.newHashSet("dc1");
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -575,6 +584,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -592,6 +602,7 @@ public void getDCsByNodeForRepairSegmentFailPathTest() throws ReaperException, U
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Sets.newHashSet("dc1");
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -639,6 +650,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -655,6 +667,7 @@ public void createRepairSegmentsForIncrementalRepairTest() throws ReaperExceptio
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Sets.newHashSet("dc1");
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -682,6 +695,7 @@ public void createRepairSegmentsForIncrementalRepairTest() throws ReaperExceptio
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -821,6 +835,7 @@ public void generateSegmentsTestEmpty() throws ReaperException, UnknownHostExcep
final String KS_NAME = "reaper";
final Set CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
+ final boolean SUBRANGE_INCREMENTAL_REPAIR = false;
final Set NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set DATACENTERS = Collections.emptySet();
final Set BLACKLISTED_TABLES = Collections.emptySet();
@@ -841,6 +856,7 @@ public void generateSegmentsTestEmpty() throws ReaperException, UnknownHostExcep
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
+ .subrangeIncrementalRepair(SUBRANGE_INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
@@ -888,6 +904,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
.keyspaceName("test")
.blacklistedTables(Sets.newHashSet("table1"))
.incrementalRepair(false)
+ .subrangeIncrementalRepair(false)
.repairThreadCount(4)
.timeout(segmentTimeout)
.build(UUIDs.timeBased());
diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java
new file mode 100644
index 000000000..afbd073f8
--- /dev/null
+++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java
@@ -0,0 +1,683 @@
+/*
+ * Copyright 2015-2017 Spotify AB
+ * Copyright 2016-2019 The Last Pickle Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cassandrareaper.service;
+
+import io.cassandrareaper.AppContext;
+import io.cassandrareaper.ReaperApplicationConfiguration;
+import io.cassandrareaper.ReaperException;
+import io.cassandrareaper.core.Cluster;
+import io.cassandrareaper.core.CompactionStats;
+import io.cassandrareaper.core.Node;
+import io.cassandrareaper.core.RepairRun;
+import io.cassandrareaper.core.RepairSegment;
+import io.cassandrareaper.core.RepairUnit;
+import io.cassandrareaper.core.Segment;
+import io.cassandrareaper.crypto.NoopCrypotograph;
+import io.cassandrareaper.management.ClusterFacade;
+import io.cassandrareaper.management.RepairStatusHandler;
+import io.cassandrareaper.management.jmx.CassandraManagementProxyTest;
+import io.cassandrareaper.management.jmx.JmxCassandraManagementProxy;
+import io.cassandrareaper.management.jmx.JmxManagementConnectionFactory;
+import io.cassandrareaper.storage.IStorageDao;
+import io.cassandrareaper.storage.MemoryStorageFacade;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ReflectionException;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class RepairRunnerHangingTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerHangingTest.class);
+ private static final Set TABLES = ImmutableSet.of("table1");
+ private static final List THREE_TOKENS = Lists.newArrayList(
+ BigInteger.valueOf(0L),
+ BigInteger.valueOf(100L),
+ BigInteger.valueOf(200L));
+
+ private final Cluster cluster = Cluster.builder()
+ .withName("test_" + RandomStringUtils.randomAlphabetic(12))
+ .withSeedHosts(ImmutableSet.of("127.0.0.1"))
+ .withState(Cluster.State.ACTIVE)
+ .build();
+ private Map replicas = ImmutableMap.of(
+ "127.0.0.1", "dc1"
+ );
+ private Map currentRunners;
+
+ @Before
+ public void setUp() throws Exception {
+ SegmentRunner.SEGMENT_RUNNERS.clear();
+ currentRunners = new HashMap<>();
+ // Create 3 runners for cluster1 and 1 for cluster2
+ for (int i = 0; i < 3; i++) {
+ RepairRunner runner = mock(RepairRunner.class);
+ when(runner.getClusterName()).thenReturn("cluster1");
+ when(runner.isRunning()).thenReturn(true);
+ currentRunners.put(UUID.randomUUID(), runner);
+ }
+
+ RepairRunner runner = mock(RepairRunner.class);
+ when(runner.getClusterName()).thenReturn("cluster2");
+ when(runner.isRunning()).thenReturn(true);
+ currentRunners.put(UUID.randomUUID(), runner);
+ }
+
+ public static Map, List> threeNodeCluster() {
+ Map, List> map = new HashMap, List>();
+ map = addRangeToMap(map, "0", "50", "a1", "a2", "a3");
+ map = addRangeToMap(map, "50", "100", "a2", "a3", "a1");
+ map = addRangeToMap(map, "100", "0", "a3", "a1", "a2");
+ return map;
+ }
+
+ public static Map, List> threeNodeClusterWithIps() {
+ Map, List> map = new HashMap, List>();
+ map = addRangeToMap(map, "0", "100", "127.0.0.1", "127.0.0.2", "127.0.0.3");
+ map = addRangeToMap(map, "100", "200", "127.0.0.2", "127.0.0.3", "127.0.0.1");
+ map = addRangeToMap(map, "200", "0", "127.0.0.3", "127.0.0.1", "127.0.0.2");
+ return map;
+ }
+
+ public static Map, List> scyllaThreeNodeClusterWithIps() {
+ Map, List> map = new HashMap, List>();
+ map = addRangeToMap(map, "", "0", "127.0.0.3", "127.0.0.1", "127.0.0.2");
+ map = addRangeToMap(map, "0", "100", "127.0.0.1", "127.0.0.2", "127.0.0.3");
+ map = addRangeToMap(map, "100", "200", "127.0.0.2", "127.0.0.3", "127.0.0.1");
+ map = addRangeToMap(map, "200", "", "127.0.0.3", "127.0.0.1", "127.0.0.2");
+ return map;
+ }
+
+ public static Map, List> sixNodeCluster() {
+ Map, List> map = new HashMap, List>();
+ map = addRangeToMap(map, "0", "50", "a1", "a2", "a3");
+ map = addRangeToMap(map, "50", "100", "a2", "a3", "a4");
+ map = addRangeToMap(map, "100", "150", "a3", "a4", "a5");
+ map = addRangeToMap(map, "150", "200", "a4", "a5", "a6");
+ map = addRangeToMap(map, "200", "250", "a5", "a6", "a1");
+ map = addRangeToMap(map, "250", "0", "a6", "a1", "a2");
+ return map;
+ }
+
+ public static Map threeNodeClusterEndpoint() {
+ Map map = new HashMap();
+ map.put("host1", "hostId1");
+ map.put("host2", "hostId2");
+ map.put("host3", "hostId3");
+ return map;
+ }
+
+ public static Map sixNodeClusterEndpoint() {
+ Map map = new HashMap();
+ map.put("host1", "hostId1");
+ map.put("host2", "hostId2");
+ map.put("host3", "hostId3");
+ map.put("host4", "hostId4");
+ map.put("host5", "hostId5");
+ map.put("host6", "hostId6");
+ return map;
+ }
+
+ private Map endpointToHostIDMap() {
+ Map endpointToHostIDMap = new HashMap();
+ endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString());
+ endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString());
+ endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString());
+
+ return endpointToHostIDMap;
+ }
+
+ private RepairRun addNewRepairRun(
+ final Map nodeMap,
+ final double intensity,
+ final IStorageDao storage,
+ UUID cf,
+ UUID hostID
+ ) {
+ return storage.getRepairRunDao().addRepairRun(
+ RepairRun.builder(cluster.getName(), cf)
+ .intensity(intensity)
+ .segmentCount(1)
+ .repairParallelism(RepairParallelism.PARALLEL)
+ .tables(TABLES),
+ Lists.newArrayList(
+ RepairSegment.builder(
+ Segment.builder()
+ .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
+ .withReplicas(nodeMap)
+ .build(), cf)
+ .withState(RepairSegment.State.RUNNING)
+ .withStartTime(DateTime.now())
+ .withCoordinatorHost("reaper")
+ .withHostID(hostID),
+ RepairSegment.builder(
+ Segment.builder()
+ .withTokenRange(new RingRange(new BigInteger("100"), new BigInteger("200")))
+ .withReplicas(nodeMap)
+ .build(), cf)
+ .withHostID(hostID)
+ )
+ );
+ }
+
+ private static Map, List> addRangeToMap(
+ Map, List> map,
+ String start,
+ String end,
+ String... hosts) {
+
+ List range = Lists.newArrayList(start, end);
+ List endPoints = Lists.newArrayList(hosts);
+ map.put(range, endPoints);
+ return map;
+ }
+
+ @After
+ public void tearDown() {
+ DateTimeUtils.setCurrentMillisSystem();
+ }
+
+ @Test
+ public void testHangingRepair() throws InterruptedException, ReaperException, JMException, IOException {
+ final Set cfNames = Sets.newHashSet("reaper");
+ final Set nodeSet = Sets.newHashSet("127.0.0.1");
+ final Set datacenters = Collections.emptySet();
+ final Set blacklistedTables = Collections.emptySet();
+ final long timeRun = 41L;
+ final double intensity = 0.5f;
+ final int repairThreadCount = 1;
+ final int segmentTimeout = 1;
+ final IStorageDao storage = new MemoryStorageFacade();
+ storage.getClusterDao().addCluster(cluster);
+ RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
+ RepairUnit.builder()
+ .clusterName(cluster.getName())
+ .keyspaceName("reaper")
+ .columnFamilies(cfNames)
+ .incrementalRepair(false)
+ .subrangeIncrementalRepair(false)
+ .nodes(nodeSet)
+ .datacenters(datacenters)
+ .blacklistedTables(blacklistedTables)
+ .repairThreadCount(repairThreadCount)
+ .timeout(segmentTimeout));
+ DateTimeUtils.setCurrentMillisFixed(timeRun);
+ RepairRun run = storage.getRepairRunDao().addRepairRun(
+ RepairRun.builder(cluster.getName(), cf.getId())
+ .intensity(intensity)
+ .segmentCount(1)
+ .repairParallelism(RepairParallelism.PARALLEL)
+ .tables(TABLES),
+ Collections.singleton(
+ RepairSegment.builder(
+ Segment.builder()
+ .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
+ .withReplicas(replicas)
+ .build(),
+ cf.getId())));
+ final UUID runId = run.getId();
+ final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId();
+ assertEquals(storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState(),
+ RepairSegment.State.NOT_STARTED);
+ AppContext context = new AppContext();
+ context.storage = storage;
+ context.config = new ReaperApplicationConfiguration();
+ final Semaphore mutex = new Semaphore(0);
+ final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl();
+ Map endpointToHostIDMap = endpointToHostIDMap();
+ when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap);
+ when(jmx.getClusterName()).thenReturn(cluster.getName());
+ when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerHangingTest.sixNodeCluster());
+ EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class);
+ when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1");
+ try {
+ when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1");
+ } catch (UnknownHostException ex) {
+ throw new AssertionError(ex);
+ }
+ final AtomicInteger repairAttempts = new AtomicInteger(1);
+ when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt()))
+ .then(
+ (invocation) -> {
+ assertEquals(RepairSegment.State.STARTED,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ final int repairNumber = repairAttempts.getAndIncrement();
+ switch (repairNumber) {
+ case 1:
+ new Thread() {
+ @Override
+ public void run() {
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(repairNumber,
+ Optional.of(ActiveRepairService.Status.STARTED),
+ Optional.empty(), null, jmx);
+ assertEquals(
+ RepairSegment.State.RUNNING,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ }
+ }.start();
+ break;
+ case 2:
+ new Thread() {
+ @Override
+ public void run() {
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber,
+ Optional.of(ActiveRepairService.Status.STARTED),
+ Optional.empty(), null, jmx);
+ assertEquals(
+ RepairSegment.State.RUNNING,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber,
+ Optional.of(ActiveRepairService.Status.SESSION_SUCCESS),
+ Optional.empty(), null, jmx);
+ assertEquals(
+ RepairSegment.State.DONE,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(repairNumber,
+ Optional.of(ActiveRepairService.Status.FINISHED),
+ Optional.empty(), null, jmx);
+
+ mutex.release();
+ LOG.info("MUTEX RELEASED");
+ }
+ }.start();
+ break;
+ default:
+ fail("triggerRepair should only have been called twice");
+ }
+ LOG.info("repair number : " + repairNumber);
+ return repairNumber;
+ });
+ ClusterFacade clusterFacade = mock(ClusterFacade.class);
+ when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
+ when(clusterFacade.nodeIsDirectlyAccessible(any(), any())).thenReturn(true);
+ when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any())).thenReturn(Lists.newArrayList(nodeSet));
+ when(clusterFacade.getEndpointToHostId(any())).thenReturn(endpointToHostIDMap);
+ when(clusterFacade.listActiveCompactions(any())).thenReturn(CompactionStats.builder().withActiveCompactions(
+ Collections.emptyList()).withPendingCompactions(Optional.of(0)).build());
+ when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
+ .thenReturn((Map) ImmutableMap.of(Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet)));
+ context.repairManager = RepairManager.create(
+ context,
+ clusterFacade,
+ Executors.newScheduledThreadPool(1),
+ 1,
+ TimeUnit.MILLISECONDS,
+ 1,
+ context.storage.getRepairRunDao());
+ context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) {
+ @Override
+ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException {
+ return jmx;
+ }
+ };
+ context.repairManager.startRepairRun(run);
+ await().with().atMost(2, TimeUnit.MINUTES).until(() -> {
+ try {
+ mutex.acquire();
+ Thread.sleep(1000);
+ return true;
+ } catch (InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ });
+ assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
+ }
+
+ @Test
+ public void testHangingRepairNewApi() throws InterruptedException, ReaperException, MalformedObjectNameException,
+ ReflectionException, IOException {
+ final String ksName = "reaper";
+ final Set cfNames = Sets.newHashSet("reaper");
+ final boolean incrementalRepair = false;
+ final Set nodeSet = Sets.newHashSet("127.0.0.1");
+ final Set datacenters = Collections.emptySet();
+ final Set blacklistedTables = Collections.emptySet();
+ final long timeRun = 41L;
+ final double intensity = 0.5f;
+ final int repairThreadCount = 1;
+ final int segmentTimeout = 1;
+ final IStorageDao storage = new MemoryStorageFacade();
+ storage.getClusterDao().addCluster(cluster);
+ DateTimeUtils.setCurrentMillisFixed(timeRun);
+ RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
+ RepairUnit.builder()
+ .clusterName(cluster.getName())
+ .keyspaceName(ksName)
+ .columnFamilies(cfNames)
+ .incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(incrementalRepair)
+ .nodes(nodeSet)
+ .datacenters(datacenters)
+ .blacklistedTables(blacklistedTables)
+ .repairThreadCount(repairThreadCount)
+ .timeout(segmentTimeout));
+ RepairRun run = storage.getRepairRunDao().addRepairRun(
+ RepairRun.builder(cluster.getName(), cf.getId())
+ .intensity(intensity).segmentCount(1)
+ .repairParallelism(RepairParallelism.PARALLEL)
+ .tables(TABLES),
+ Collections.singleton(
+ RepairSegment.builder(
+ Segment.builder()
+ .withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
+ .withReplicas(replicas)
+ .build(),
+ cf.getId())));
+ final UUID runId = run.getId();
+ final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId();
+ assertEquals(storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState(),
+ RepairSegment.State.NOT_STARTED);
+ AppContext context = new AppContext();
+ context.storage = storage;
+ context.config = new ReaperApplicationConfiguration();
+ final Semaphore mutex = new Semaphore(0);
+ final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl();
+ when(jmx.getClusterName()).thenReturn(cluster.getName());
+ when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerHangingTest.sixNodeCluster());
+ EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class);
+ when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1");
+ try {
+ when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1");
+ } catch (UnknownHostException ex) {
+ throw new AssertionError(ex);
+ }
+ final AtomicInteger repairAttempts = new AtomicInteger(1);
+ when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt()))
+ .then(
+ (invocation) -> {
+ assertEquals(
+ RepairSegment.State.STARTED,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ final int repairNumber = repairAttempts.getAndIncrement();
+ switch (repairNumber) {
+ case 1:
+ new Thread() {
+ @Override
+ public void run() {
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber, Optional.empty(),
+ Optional.of(ProgressEventType.START), null, jmx);
+ assertEquals(
+ RepairSegment.State.RUNNING,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ }
+ }.start();
+ break;
+ case 2:
+ new Thread() {
+ @Override
+ public void run() {
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber, Optional.empty(),
+ Optional.of(ProgressEventType.START), null, jmx);
+ assertEquals(
+ RepairSegment.State.RUNNING,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber, Optional.empty(),
+ Optional.of(ProgressEventType.SUCCESS), null, jmx);
+ assertEquals(
+ RepairSegment.State.DONE,
+ storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState());
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber, Optional.empty(),
+ Optional.of(ProgressEventType.COMPLETE), null, jmx);
+ mutex.release();
+ LOG.info("MUTEX RELEASED");
+ }
+ }.start();
+ break;
+ default:
+ fail("triggerRepair should only have been called twice");
+ }
+ return repairNumber;
+ });
+ ClusterFacade clusterFacade = mock(ClusterFacade.class);
+ when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
+ when(clusterFacade.nodeIsDirectlyAccessible(any(), any())).thenReturn(true);
+ when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any()))
+ .thenReturn(Lists.newArrayList(nodeSet));
+ when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
+ .thenReturn((Map) ImmutableMap.of(Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet)));
+ when(clusterFacade.listActiveCompactions(any())).thenReturn(CompactionStats.builder().withActiveCompactions(
+ Collections.emptyList()).withPendingCompactions(Optional.of(0)).build());
+ context.repairManager
+ = RepairManager.create(
+ context,
+ clusterFacade,
+ Executors.newScheduledThreadPool(1),
+ 1,
+ TimeUnit.MILLISECONDS,
+ 1,
+ context.storage.getRepairRunDao());
+ context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) {
+ @Override
+ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException {
+ return jmx;
+ }
+ };
+ context.repairManager.startRepairRun(run);
+ await().with().atMost(2, TimeUnit.MINUTES).until(() -> {
+ try {
+ mutex.acquire();
+ LOG.info("MUTEX ACQUIRED");
+ // TODO: refactor so that we can properly wait for the repair runner to finish rather than using this sleep()
+ Thread.sleep(1000);
+ return true;
+ } catch (InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ });
+ assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
+ }
+
+ @Test
+ public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws InterruptedException, ReaperException,
+ MalformedObjectNameException, ReflectionException, IOException {
+ final String ksName = "reaper";
+ final Set cfNames = Sets.newHashSet("reaper");
+ final boolean incrementalRepair = true;
+ final Set nodeSet = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
+ final List nodeSetAfterTopologyChange = Lists.newArrayList("127.0.0.3", "127.0.0.2", "127.0.0.4");
+ final Map nodeMap = ImmutableMap.of("127.0.0.1", "dc1", "127.0.0.2", "dc1", "127.0.0.3", "dc1");
+ final Map nodeMapAfterTopologyChange = ImmutableMap.of(
+ "127.0.0.3", "dc1", "127.0.0.2", "dc1", "127.0.0.4", "dc1");
+ final Set datacenters = Collections.emptySet();
+ final Set blacklistedTables = Collections.emptySet();
+ final double intensity = 0.5f;
+ final int repairThreadCount = 1;
+ final int segmentTimeout = 30;
+ final List tokens = THREE_TOKENS;
+ final IStorageDao storage = new MemoryStorageFacade();
+ AppContext context = new AppContext();
+ context.storage = storage;
+ context.config = new ReaperApplicationConfiguration();
+ storage.getClusterDao().addCluster(cluster);
+ UUID cf = storage.getRepairUnitDao().addRepairUnit(
+ RepairUnit.builder()
+ .clusterName(cluster.getName())
+ .keyspaceName(ksName)
+ .columnFamilies(cfNames)
+ .incrementalRepair(incrementalRepair)
+ .subrangeIncrementalRepair(false)
+ .nodes(nodeSet)
+ .datacenters(datacenters)
+ .blacklistedTables(blacklistedTables)
+ .repairThreadCount(repairThreadCount)
+ .timeout(segmentTimeout))
+ .getId();
+ Map endpointToHostIDMap = endpointToHostIDMap();
+ RepairRun run = addNewRepairRun(nodeMap,
+ intensity,
+ storage,
+ cf,
+ UUID.fromString(endpointToHostIDMap.get("127.0.0.1")));
+ final UUID runId = run.getId();
+ final UUID segmentId = storage.getRepairSegmentDao().getNextFreeSegments(run.getId()).get(0).getId();
+ assertEquals(storage.getRepairSegmentDao().getRepairSegment(runId, segmentId).get().getState(),
+ RepairSegment.State.NOT_STARTED);
+ final JmxCassandraManagementProxy jmx = CassandraManagementProxyTest.mockJmxProxyImpl();
+ when(jmx.getClusterName()).thenReturn(cluster.getName());
+ when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.threeNodeClusterWithIps());
+ when(jmx.getEndpointToHostId()).thenReturn(endpointToHostIDMap);
+ when(jmx.getTokens()).thenReturn(tokens);
+ EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class);
+ when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1");
+ try {
+ when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1");
+ } catch (UnknownHostException ex) {
+ throw new AssertionError(ex);
+ }
+
+ ClusterFacade clusterFacade = mock(ClusterFacade.class);
+ when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx);
+ when(clusterFacade.nodeIsDirectlyAccessible(any(), any())).thenReturn(true);
+ when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any()))
+ .thenReturn(Lists.newArrayList(nodeSet));
+ when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
+ .thenReturn((Map) ImmutableMap.of(
+ Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSet),
+ Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSet)));
+ when(clusterFacade.getEndpointToHostId(any())).thenReturn(endpointToHostIDMap);
+ when(clusterFacade.listActiveCompactions(any())).thenReturn(
+ CompactionStats.builder()
+ .withActiveCompactions(Collections.emptyList())
+ .withPendingCompactions(Optional.of(0))
+ .build());
+ context.repairManager = RepairManager.create(
+ context,
+ clusterFacade,
+ Executors.newScheduledThreadPool(10),
+ 1,
+ TimeUnit.MILLISECONDS,
+ 1,
+ context.storage.getRepairRunDao());
+ AtomicInteger repairNumberCounter = new AtomicInteger(1);
+ when(jmx.triggerRepair(any(), any(), any(), any(), any(), any(), any(), anyInt()))
+ .then(
+ (invocation) -> {
+ final int repairNumber = repairNumberCounter.getAndIncrement();
+ new Thread() {
+ @Override
+ public void run() {
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber,
+ Optional.of(ActiveRepairService.Status.STARTED),
+ Optional.empty(),
+ null,
+ jmx);
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber,
+ Optional.of(ActiveRepairService.Status.SESSION_SUCCESS),
+ Optional.empty(),
+ null,
+ jmx);
+ ((RepairStatusHandler) invocation.getArgument(5))
+ .handle(
+ repairNumber,
+ Optional.of(ActiveRepairService.Status.FINISHED),
+ Optional.empty(),
+ null,
+ jmx);
+ }
+ }.start();
+ return repairNumber;
+ });
+ context.managementConnectionFactory = new JmxManagementConnectionFactory(context, new NoopCrypotograph()) {
+ @Override
+ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException {
+ return jmx;
+ }
+ };
+ ClusterFacade clusterProxy = ClusterFacade.create(context);
+ ClusterFacade clusterProxySpy = Mockito.spy(clusterProxy);
+ Mockito.doReturn(nodeSetAfterTopologyChange).when(clusterProxySpy).tokenRangeToEndpoint(any(), any(), any());
+ assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
+ storage.getRepairRunDao().updateRepairRun(
+ run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId));
+ // We'll now change the list of replicas for any segment, making the stored ones obsolete
+ when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
+ .thenReturn((Map) ImmutableMap.of(
+ Lists.newArrayList("0", "100"), Lists.newArrayList(nodeSetAfterTopologyChange),
+ Lists.newArrayList("100", "200"), Lists.newArrayList(nodeSetAfterTopologyChange)));
+ String hostIdToChange = endpointToHostIDMap.get("127.0.0.1");
+ endpointToHostIDMap.remove("127.0.0.1");
+ endpointToHostIDMap.put("127.0.0.4", hostIdToChange);
+ when(clusterFacade.getEndpointToHostId(any())).thenReturn(endpointToHostIDMap);
+ when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any()))
+ .thenReturn(Lists.newArrayList(nodeSetAfterTopologyChange));
+ context.repairManager.resumeRunningRepairRuns();
+
+ // The repair run should succeed despite the topology change.
+ await().with().atMost(60, TimeUnit.SECONDS).until(() -> {
+ return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState();
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java
index 100e6fc0a..b7f16e250 100644
--- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java
+++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java
@@ -27,6 +27,7 @@
import io.cassandrareaper.core.RepairRun;
import io.cassandrareaper.core.RepairSchedule;
import io.cassandrareaper.core.RepairSegment;
+import io.cassandrareaper.core.RepairType;
import io.cassandrareaper.core.RepairUnit;
import io.cassandrareaper.core.Segment;
import io.cassandrareaper.crypto.NoopCrypotograph;
@@ -58,10 +59,8 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.JMException;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
@@ -73,7 +72,6 @@
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Duration;
@@ -84,28 +82,23 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public final class RepairRunnerTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerTest.class);
private static final Set TABLES = ImmutableSet.of("table1");
private static final Duration POLL_INTERVAL = Duration.TWO_SECONDS;
private static final List THREE_TOKENS = Lists.newArrayList(
@@ -213,312 +206,6 @@ public void tearDown() {
DateTimeUtils.setCurrentMillisSystem();
}
- @Test
- @SuppressWarnings("checkstyle:methodlength")
- public void testHangingRepair() throws InterruptedException, ReaperException, JMException, IOException {
- final String ksName = "reaper";
- final Set cfNames = Sets.newHashSet("reaper");
- final boolean incrementalRepair = false;
- final Set nodeSet = Sets.newHashSet("127.0.0.1");
- final Set