Skip to content

Commit

Permalink
Resolve CheckStyle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
andresbeckruiz committed Nov 6, 2024
1 parent 590f803 commit 28f33ae
Show file tree
Hide file tree
Showing 5 changed files with 876 additions and 32 deletions.
2 changes: 2 additions & 0 deletions src/server/src/main/docker/cassandra-reaper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ repairManagerSchedulingIntervalSeconds: ${REAPER_REPAIR_MANAGER_SCHEDULING_INTER
jmxConnectionTimeoutInSeconds: ${REAPER_JMX_CONNECTION_TIMEOUT_IN_SECONDS}
useAddressTranslator: ${REAPER_USE_ADDRESS_TRANSLATOR}
maxParallelRepairs: ${REAPER_MAX_PARALLEL_REPAIRS}
scheduleRetryOnError: ${REAPER_SCHEDULE_RETRY_ON_ERROR:-false}
scheduleRetryDelay: ${REAPER_SCHEDULE_RETRY_DELAY:-PT1H}

# datacenterAvailability has three possible values: ALL | LOCAL | EACH
# the correct value to use depends on whether jmx ports to C* nodes in remote datacenters are accessible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private String persistenceStoragePath;

@JsonProperty
private Boolean scheduleRetryOnError;

@JsonProperty
private Duration scheduleRetryDelay;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -533,6 +539,22 @@ public String getPersistenceStoragePath() {
return persistenceStoragePath;
}

public Boolean isScheduleRetryOnError() {
return scheduleRetryOnError != null ? scheduleRetryOnError : false;
}

public void setScheduleRetryOnError(Boolean scheduleRetryOnError) {
this.scheduleRetryOnError = scheduleRetryOnError;
}

public Duration getScheduleRetryDelay() {
return scheduleRetryDelay != null ? scheduleRetryDelay : Duration.ofMinutes(60);
}

public void setScheduleRetryDelay(Duration scheduleRetryDelay) {
this.scheduleRetryDelay = scheduleRetryDelay;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,42 @@ private void endRepairRun() {
}
}

private void maybeScheduleRetryOnError() {
if (context.config.isScheduleRetryOnError()) {
// Check if a schedule exists for this keyspace and cluster before rescheduling
Collection<RepairSchedule> schedulesForKeyspace
= context.storage.getRepairScheduleDao()
.getRepairSchedulesForClusterAndKeyspace(clusterName, repairUnit.getKeyspaceName());
List<RepairSchedule> repairSchedules = schedulesForKeyspace.stream()
.filter(schedule -> schedule.getRepairUnitId().equals(repairUnit.getId()))
.collect(Collectors.toList());

if (!repairSchedules.isEmpty()) {
// Set precondition that only a single schedule should match
Preconditions.checkArgument(repairSchedules.size() == 1,
String.format("Update for repair run %s and unit %s "
+ "should impact a single schedule. %d were found",
repairRunId,
repairUnit.getId(),
repairSchedules.size())
);
RepairSchedule scheduleToTune = repairSchedules.get(0);

int minuteRetryDelay = (int) context.config.getScheduleRetryDelay().toMinutes();
DateTime nextRepairRun = DateTime.now().plusMinutes(minuteRetryDelay);

if (nextRepairRun.isBefore(scheduleToTune.getNextActivation())) {
LOG.debug("Scheduling next repair run at {} for repair schedule {}", nextRepairRun,
scheduleToTune.getId());

RepairSchedule newSchedule
= scheduleToTune.with().nextActivation(nextRepairRun).build(scheduleToTune.getId());
context.storage.getRepairScheduleDao().updateRepairSchedule(newSchedule);
}
}
}
}

/**
* Tune segment timeout and number of segments for adaptive schedules.
* Checks that the run was triggered by an adaptive schedule and gathers info on the run to apply tunings.
Expand Down Expand Up @@ -508,6 +544,9 @@ private void startNextSegment() throws ReaperException, InterruptedException {
} else {
potentialReplicas.addAll(potentialReplicaMap.keySet());
}
if (potentialReplicas.isEmpty()) {
failRepairDueToOutdatedSegment(segment.getId(), segment.getTokenRange());
}
LOG.debug("Potential replicas for segment {}: {}", segment.getId(), potentialReplicas);
ICassandraManagementProxy coordinator = clusterFacade.connect(cluster, potentialReplicas);
if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) {
Expand Down Expand Up @@ -684,26 +723,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
return true;
}
if (potentialCoordinators.isEmpty()) {
LOG.warn(
"Segment #{} is faulty, no potential coordinators for range: {}",
segmentId,
segment.toString());
// This segment has a faulty token range. Abort the entire repair run.
synchronized (this) {
repairRunDao.updateRepairRun(
repairRunDao.getRepairRun(repairRunId).get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", segment))
.endTime(DateTime.now())
.build(repairRunId));

context.metricRegistry.counter(
MetricRegistry.name(RepairManager.class, "repairDone", RepairRun.RunState.ERROR.toString())).inc();

killAndCleanupRunner();
}

failRepairDueToOutdatedSegment(segmentId, segment);
return false;
}
} else {
Expand Down Expand Up @@ -754,6 +774,35 @@ public void onFailure(Throwable throwable) {
return true;
}

private synchronized void failRepairDueToOutdatedSegment(UUID segmentId, Segment segment) {
// This segment has a faulty token range possibly due to an additive change in cluster topology
// during repair. Abort the entire repair run.
LOG.warn("Segment #{} is faulty, no potential coordinators for range: {}", segmentId,
segment.toString());
// If the segment has been removed, ignore. Should only happen in tests on backends
// that delete repair segments.
Optional<RepairRun> repairRun = repairRunDao.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
try {
repairRunDao.updateRepairRun(
repairRunDao.getRepairRun(repairRunId).get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", segment))
.endTime(DateTime.now())
.build(repairRunId));

context.metricRegistry.counter(
MetricRegistry.name(RepairManager.class, "repairDone",
RepairRun.RunState.ERROR.toString())).inc();

maybeScheduleRetryOnError();
} finally {
killAndCleanupRunner();
}
}
}

private List<String> filterPotentialCoordinatorsByDatacenters(
Collection<String> datacenters,
List<String> potentialCoordinators) throws ReaperException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public final class RepairRunnerHangingTest {
Expand Down Expand Up @@ -245,12 +247,9 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM
final IStorageDao storage = new MemoryStorageFacade();
storage.getClusterDao().addCluster(cluster);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName("reaper")
.columnFamilies(cfNames)
.incrementalRepair(false)
.subrangeIncrementalRepair(false)
RepairUnit.builder().clusterName(cluster.getName())
.keyspaceName("reaper").columnFamilies(cfNames)
.incrementalRepair(false).subrangeIncrementalRepair(false)
.nodes(nodeSet)
.datacenters(datacenters)
.blacklistedTables(blacklistedTables)
Expand All @@ -266,7 +265,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM
Collections.singleton(
RepairSegment.builder(
Segment.builder()
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50")))
.withReplicas(replicas)
.build(),
cf.getId())));
Expand Down Expand Up @@ -382,6 +381,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
}
});
assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
verify(jmx, times(2)).triggerRepair(
any(), any(), any(), any(), any(), any(), any(), anyInt()
);
}

@Test
Expand All @@ -402,10 +404,8 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti
DateTimeUtils.setCurrentMillisFixed(timeRun);
RepairUnit cf = storage.getRepairUnitDao().addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(ksName)
.columnFamilies(cfNames)
.incrementalRepair(incrementalRepair)
.clusterName(cluster.getName()).keyspaceName(ksName)
.columnFamilies(cfNames).incrementalRepair(incrementalRepair)
.subrangeIncrementalRepair(incrementalRepair)
.nodes(nodeSet)
.datacenters(datacenters)
Expand All @@ -420,7 +420,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti
Collections.singleton(
RepairSegment.builder(
Segment.builder()
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("100")))
.withTokenRange(new RingRange(BigInteger.ZERO, new BigInteger("50")))
.withReplicas(replicas)
.build(),
cf.getId())));
Expand Down Expand Up @@ -534,6 +534,9 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept
}
});
assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState());
verify(jmx, times(2)).triggerRepair(
any(), any(), any(), any(), any(), any(), any(), anyInt()
);
}

@Test
Expand Down
Loading

0 comments on commit 28f33ae

Please sign in to comment.