Skip to content

Commit

Permalink
Refactor and clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
sajid riaz committed Dec 7, 2023
1 parent 85f7e56 commit 6bb738d
Showing 1 changed file with 31 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ public abstract class RepairTask implements NotificationListener
{
private static final Logger LOG = LoggerFactory.getLogger(RepairTask.class);
private static final Pattern RANGE_PATTERN = Pattern.compile("\\((-?[0-9]+),(-?[0-9]+)\\]");
private static final long HANG_PREVENT_TIME_IN_MINUTES = 30;
private static final long HEALTH_CHECK_TIME_IN_MINUTES = 10;
private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("HangPreventingTask-%d").build());
private final ScheduledExecutorService myHealthCheckExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("NodeHealthCheck-%d").build());

private final CountDownLatch myLatch = new CountDownLatch(1);
private final JmxProxyFactory myJmxProxyFactory;
private final TableReference myTableReference;
Expand Down Expand Up @@ -89,11 +84,6 @@ public void execute() throws ScheduledJobException
onExecute();
try (JmxProxy proxy = myJmxProxyFactory.connect())
{
if (!isNodeOperational(proxy))
{
LOG.debug("Local Cassandra node is down, aborting repair task.");
new Exception();
}
rescheduleHangPrevention();
repair(proxy);
onFinish(RepairStatus.SUCCESS);
Expand Down Expand Up @@ -132,17 +122,8 @@ private void repair(final JmxProxy proxy) throws ScheduledJobException
myCommand = proxy.repairAsync(myTableReference.getKeyspace(), getOptions());
if (myCommand > 0)
{
ScheduledFuture<?> healthCheckFuture = null;
try
{
healthCheckFuture = myHealthCheckExecutor.scheduleAtFixedRate(() ->
{
if (!isNodeOperational(proxy))
{
myLastError = new ScheduledJobException("Node became non-operational during repair");
myLatch.countDown();
}
}, 0, HEALTH_CHECK_TIME_IN_MINUTES, TimeUnit.MINUTES); // Check every 10 minute
myLatch.await();
proxy.removeStorageServiceListener(this);
verifyRepair(proxy);
Expand All @@ -164,23 +145,9 @@ private void repair(final JmxProxy proxy) throws ScheduledJobException
Thread.currentThread().interrupt();
throw new ScheduledJobException(e);
}
finally
{
if (healthCheckFuture != null && !healthCheckFuture.isCancelled())
{
healthCheckFuture.cancel(false);
}
}
}
}

private boolean isNodeOperational(final JmxProxy proxy)
{
String nodeStatus = proxy.getNodeStatus();
LOG.debug("Node Status {} ", nodeStatus);
return "NORMAL".equals(nodeStatus);
}

/**
* Method used to construct options for the repair.
*
Expand Down Expand Up @@ -238,7 +205,6 @@ private void lazySleep(final long executionNanos) throws ScheduledJobException
public void cleanup()
{
myExecutor.shutdown();
myHealthCheckExecutor.shutdown();
}

/**
Expand Down Expand Up @@ -292,8 +258,9 @@ private void rescheduleHangPrevention()
{
myHangPreventFuture.cancel(false);
}
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(),
HANG_PREVENT_TIME_IN_MINUTES, TimeUnit.MINUTES);
// Schedule the first check to happen after 10 minutes
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), 10,
TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -397,20 +364,46 @@ public enum ProgressEventType

private class HangPreventingTask implements Runnable
{
private static final int MAX_CHECKS = 3;
private static final String NORMAL_STATUS = "NORMAL";
private int checkCount = 0;

@Override
public void run()
{
try (JmxProxy proxy = myJmxProxyFactory.connect())
{
proxy.forceTerminateAllRepairSessions();
if (checkCount < MAX_CHECKS)
{
String nodeStatus = proxy.getNodeStatus();
if (!NORMAL_STATUS.equals(nodeStatus))
{
LOG.error("Local Cassandra node is down, aborting repair task.");
myLastError = new ScheduledJobException("Local Cassandra node is down");
proxy.forceTerminateAllRepairSessions();
myLatch.countDown(); // Signal to abort the repair task
}
else
{
checkCount++;
myHangPreventFuture = myExecutor.schedule(this, 10, TimeUnit.MINUTES);
}
}
else
{
// After 3 successful checks or 30 minutes if still task is running terminate all repair sessions
proxy.forceTerminateAllRepairSessions();
myLatch.countDown();
}
}
catch (IOException e)
{
LOG.error("Unable to prevent hanging repair task: {}", this, e);
LOG.error("Unable to check node status or prevent hanging repair task: {}", this, e);
}
}
}


@VisibleForTesting
final Set<LongTokenRange> getFailedRanges()
{
Expand Down

0 comments on commit 6bb738d

Please sign in to comment.