Skip to content

Commit

Permalink
clean some stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
sajid riaz committed Dec 7, 2023
1 parent dc241c0 commit 8aebf63
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import javax.management.remote.JMXConnectionNotification;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -48,9 +47,11 @@ public abstract class RepairTask implements NotificationListener
private static final Logger LOG = LoggerFactory.getLogger(RepairTask.class);
private static final Pattern RANGE_PATTERN = Pattern.compile("\\((-?[0-9]+),(-?[0-9]+)\\]");
private static final long HANG_PREVENT_TIME_IN_MINUTES = 30;
private static final String NODE_STATUS_ERROR = "Local Cassandra node is down, aborting repair task.";
private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("HangPreventingTask-%d").build());
private final ScheduledExecutorService myHealthCheckExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("NodeHealthCheck-%d").build());

private final CountDownLatch myLatch = new CountDownLatch(1);
private final JmxProxyFactory myJmxProxyFactory;
private final TableReference myTableReference;
Expand Down Expand Up @@ -87,10 +88,10 @@ public void execute() throws ScheduledJobException
onExecute();
try (JmxProxy proxy = myJmxProxyFactory.connect())
{
if (isLocalNodeDown(proxy))
if(!isNodeOperational(proxy))
{
LOG.warn(NODE_STATUS_ERROR);
throw new Exception(NODE_STATUS_ERROR);
LOG.debug("Local Cassandra node is down, aborting repair task.");
new Exception();
}
rescheduleHangPrevention();
repair(proxy);
Expand Down Expand Up @@ -124,63 +125,23 @@ protected void onExecute()
// NOOP
}

private boolean isLocalNodeDown(JmxProxy proxy)
{
String status = proxy.getNodeStatus();
LOG.debug("NodeStatus {}", status);
return !"NORMAL".equals(status);
}

private long calculateDynamicHangPreventionTime()
{
try (JmxProxy jmxProxy = myJmxProxyFactory.connect())
{
// Get metrics
long diskSpaceUsed = jmxProxy.liveDiskSpaceUsed(myTableReference);
List<String> unreachableNodes = jmxProxy.getUnreachableNodes();

// Base timeout
long baseTimeoutMinutes = 5;

// Adjustments
long sizeAdjustment = calculateSizeAdjustment(diskSpaceUsed);
long clusterStateAdjustment = unreachableNodes.size() * 2;

// Total timeout
long dynamicTimeout = baseTimeoutMinutes + sizeAdjustment + clusterStateAdjustment;

// Ensure the timeout is within a reasonable range
return Math.min(dynamicTimeout, HANG_PREVENT_TIME_IN_MINUTES);
}
catch (IOException e)
{
LOG.error("Error accessing JMX: {}", e.getMessage());
return HANG_PREVENT_TIME_IN_MINUTES; // Use a default timeout if JMX access fails
}
}

private long calculateSizeAdjustment(long diskSpaceUsed)
{
// Constants for adjustment calculation
double logBase = 2;
double minutesPerLogUnit = 5; // Time added per logarithmic unit
double scaleFactor = 1024 * 1024 * 1024; // Scale factor (1 GB in bytes)
// Normalize disk space used to GB and apply logarithmic scale
double normalizedDiskSpace = Math.log(diskSpaceUsed / scaleFactor) / Math.log(logBase);
// Calculate adjustment based on logarithmic scale
long sizeAdjustment = (long)(normalizedDiskSpace * minutesPerLogUnit);
long maxSizeAdjustment = 15;
return Math.min(Math.max(0, sizeAdjustment), maxSizeAdjustment);
}

private void repair(final JmxProxy proxy) throws ScheduledJobException
{
proxy.addStorageServiceListener(this);
myCommand = proxy.repairAsync(myTableReference.getKeyspace(), getOptions());
if (myCommand > 0)
{
ScheduledFuture<?> healthCheckFuture = null;
try
{
healthCheckFuture = myHealthCheckExecutor.scheduleAtFixedRate(() ->
{
if (!isNodeOperational(proxy))
{
myLastError = new ScheduledJobException("Node became non-operational during repair");
myLatch.countDown();
}
}, 0, 10, TimeUnit.MINUTES); // Check every 10 minute
myLatch.await();
proxy.removeStorageServiceListener(this);
verifyRepair(proxy);
Expand All @@ -202,9 +163,23 @@ private void repair(final JmxProxy proxy) throws ScheduledJobException
Thread.currentThread().interrupt();
throw new ScheduledJobException(e);
}
finally
{
if (healthCheckFuture != null && !healthCheckFuture.isCancelled())
{
healthCheckFuture.cancel(false);
}
}
}
}

private boolean isNodeOperational(JmxProxy proxy)
{
String nodeStatus = proxy.getNodeStatus();
LOG.debug("Node Status {} ", nodeStatus);
return "NORMAL".equals(nodeStatus);
}

/**
* Method used to construct options for the repair.
*
Expand Down Expand Up @@ -262,6 +237,7 @@ private void lazySleep(final long executionNanos) throws ScheduledJobException
public void cleanup()
{
myExecutor.shutdown();
myHealthCheckExecutor.shutdown();
}

/**
Expand Down Expand Up @@ -315,7 +291,7 @@ private void rescheduleHangPrevention()
{
myHangPreventFuture.cancel(false);
}
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), calculateDynamicHangPreventionTime(), TimeUnit.MINUTES);
myHangPreventFuture = myExecutor.schedule(new HangPreventingTask(), HANG_PREVENT_TIME_IN_MINUTES, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -424,17 +400,12 @@ public void run()
{
try (JmxProxy proxy = myJmxProxyFactory.connect())
{
if (isLocalNodeDown(proxy))
{
throw new IOException(NODE_STATUS_ERROR);
}
proxy.forceTerminateAllRepairSessions();
}
catch (IOException e)
{
LOG.error("Unable to prevent hanging repair task: {}", this, e);
}
myLatch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import static com.ericsson.bss.cassandra.ecchronos.core.repair.TestUtils.startRepair;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.ignoreStubs;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -47,11 +44,8 @@
import javax.management.Notification;
import javax.management.remote.JMXConnectionNotification;

import com.ericsson.bss.cassandra.ecchronos.core.JmxProxy;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.ScheduledJobException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -82,9 +76,6 @@ public class TestVnodeRepairTask
@Mock
private RepairHistory repairHistory;

@Mock
private JmxProxy myJmxProxy;

private RepairConfiguration myRepairConfiguration = RepairConfiguration.DEFAULT;

private UUID jobId = UUID.randomUUID();
Expand Down Expand Up @@ -117,30 +108,6 @@ public void finalVerification()
verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics));
}


@Test
public void testWhenLocalNodeIsDown() throws Exception
{
when(jmxProxyFactory.connect()).thenReturn(myJmxProxy);
when(myJmxProxy.getNodeStatus()).thenReturn("DOWN");
Set<LongTokenRange> ranges = new HashSet<>();
LongTokenRange range1 = new LongTokenRange(1, 2);
LongTokenRange range2 = new LongTokenRange(3, 4);

ranges.add(range1);
ranges.add(range2);

RepairTask repairTask = new VnodeRepairTask(jmxProxyFactory, myTableReference, myRepairConfiguration,
myTableRepairMetrics, repairHistory, ranges, participants, jobId);

Exception exception = assertThrows(ScheduledJobException.class, repairTask::execute);
String expectedMessage = "Unable to repair 'Vnode repairTask of keyspace.table (mock)";
String actualMessage = exception.getMessage();
assertTrue(actualMessage.contains(expectedMessage));

verify(myTableRepairMetrics).repairSession(eq(myTableReference), anyLong(), any(TimeUnit.class), eq(false));
}

@Test
public void testRepairSuccessfully() throws InterruptedException
{
Expand Down

0 comments on commit 8aebf63

Please sign in to comment.