diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairTask.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairTask.java index f7ccd7f9a..0fb8061fe 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairTask.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairTask.java @@ -46,13 +46,8 @@ public abstract class RepairTask implements NotificationListener { private static final Logger LOG = LoggerFactory.getLogger(RepairTask.class); private static final Pattern RANGE_PATTERN = Pattern.compile("\\((-?[0-9]+),(-?[0-9]+)\\]"); - private static final long HANG_PREVENT_TIME_IN_MINUTES = 30; - private static final long HEALTH_CHECK_TIME_IN_MINUTES = 10; private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("HangPreventingTask-%d").build()); - private final ScheduledExecutorService myHealthCheckExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("NodeHealthCheck-%d").build()); - private final CountDownLatch myLatch = new CountDownLatch(1); private final JmxProxyFactory myJmxProxyFactory; private final TableReference myTableReference; @@ -89,11 +84,6 @@ public void execute() throws ScheduledJobException onExecute(); try (JmxProxy proxy = myJmxProxyFactory.connect()) { - if (!isNodeOperational(proxy)) - { - LOG.debug("Local Cassandra node is down, aborting repair task."); - new Exception(); - } rescheduleHangPrevention(); repair(proxy); onFinish(RepairStatus.SUCCESS); @@ -132,17 +122,8 @@ private void repair(final JmxProxy proxy) throws ScheduledJobException myCommand = proxy.repairAsync(myTableReference.getKeyspace(), getOptions()); if (myCommand > 0) { - ScheduledFuture healthCheckFuture = null; try { - healthCheckFuture = myHealthCheckExecutor.scheduleAtFixedRate(() -> - { - if (!isNodeOperational(proxy)) - { - myLastError = new ScheduledJobException("Node became non-operational during repair"); - myLatch.countDown(); - } - }, 0, HEALTH_CHECK_TIME_IN_MINUTES, TimeUnit.MINUTES); // Check every 10 minute myLatch.await(); proxy.removeStorageServiceListener(this); verifyRepair(proxy); @@ -164,23 +145,9 @@ private void repair(final JmxProxy proxy) throws ScheduledJobException Thread.currentThread().interrupt(); throw new ScheduledJobException(e); } - finally - { - if (healthCheckFuture != null && !healthCheckFuture.isCancelled()) - { - healthCheckFuture.cancel(false); - } - } } } - private boolean isNodeOperational(final JmxProxy proxy) - { - String nodeStatus = proxy.getNodeStatus(); - LOG.debug("Node Status {} ", nodeStatus); - return "NORMAL".equals(nodeStatus); - } - /** * Method used to construct options for the repair. * @@ -238,7 +205,6 @@ private void lazySleep(final long executionNanos) throws ScheduledJobException public void cleanup() { myExecutor.shutdown(); - myHealthCheckExecutor.shutdown(); } /** @@ -292,8 +258,9 @@ private void rescheduleHangPrevention() { myHangPreventFuture.cancel(false); } - myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), - HANG_PREVENT_TIME_IN_MINUTES, TimeUnit.MINUTES); + // Schedule the first check to happen after 10 minutes + myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), 10, + TimeUnit.MINUTES); } /** @@ -397,20 +364,46 @@ public enum ProgressEventType private class HangPreventingTask implements Runnable { + private static final int MAX_CHECKS = 3; + private static final String NORMAL_STATUS = "NORMAL"; + private int checkCount = 0; + @Override public void run() { try (JmxProxy proxy = myJmxProxyFactory.connect()) { - proxy.forceTerminateAllRepairSessions(); + if (checkCount < MAX_CHECKS) + { + String nodeStatus = proxy.getNodeStatus(); + if (!NORMAL_STATUS.equals(nodeStatus)) + { + LOG.error("Local Cassandra node is down, aborting repair task."); + myLastError = new ScheduledJobException("Local Cassandra node is down"); + proxy.forceTerminateAllRepairSessions(); + myLatch.countDown(); // Signal to abort the repair task + } + else + { + checkCount++; + myHangPreventFuture = myExecutor.schedule(this, 10, TimeUnit.MINUTES); + } + } + else + { + // After 3 successful checks or 30 minutes if still task is running terminate all repair sessions + proxy.forceTerminateAllRepairSessions(); + myLatch.countDown(); + } } catch (IOException e) { - LOG.error("Unable to prevent hanging repair task: {}", this, e); + LOG.error("Unable to check node status or prevent hanging repair task: {}", this, e); } } } + @VisibleForTesting final Set getFailedRanges() {