Skip to content

Commit

Permalink
Fixed pmd warning
Browse files Browse the repository at this point in the history
  • Loading branch information
sajid riaz committed Sep 5, 2024
1 parent 50a98f3 commit fa7d2e0
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.config.connection;

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -135,7 +136,7 @@ public void setFixedDelay(final Integer fixedDelay)

private long convertToMillis(final Integer value, final String unit)
{
return switch (unit.toLowerCase())
return switch (unit.toLowerCase(Locale.ENGLISH))
{
case "milliseconds" -> value;
case "seconds" -> TimeUnit.SECONDS.toMillis(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Service;

import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.Map;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -70,6 +71,7 @@ public final class RetrySchedulerService implements DisposableBean
private static final int PERCENTAGE_OF_DELAY = 10;
private static final int MINIMUM_DELAY_LIMIT_MS = 5000;
private static final int SCHEDULER_AWAIT_TERMINATION = 60;

private final EccNodesSync myEccNodesSync;
private final DistributedJmxConnectionProvider myJmxConnectionProvider;
private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider;
Expand Down Expand Up @@ -102,19 +104,29 @@ public void startScheduler()
public void retryNodes()
{
LOG.debug("Retrying unavailable nodes");

List<Node> unavailableNodes = findUnavailableNodes();
if (unavailableNodes.isEmpty())
{
LOG.info("No unavailable nodes found.");
return;
}

unavailableNodes.forEach(this::processNode);
}

private List<Node> findUnavailableNodes()
{
List<Node> unavailableNodes = new ArrayList<>();
ResultSet resultSet = myEccNodesSync.getResultSet();

// Process the results and filter out AVAILABLE nodes
for (Row row : resultSet)
{
UUID nodeId = row.getUuid(COLUMN_NODE_ID);
String status = row.getString(COLUMN_NODE_STATUS);
String status = Objects.requireNonNull(row.getString(COLUMN_NODE_STATUS)).toUpperCase(Locale.ENGLISH);

// Only add nodes that are not AVAILABLE
if (NodeStatus.UNAVAILABLE.name().equals(status))
{
// Find the corresponding Node object in the existing nodes list
myDistributedNativeConnectionProvider.getNodes()
.stream()
.filter(node -> Objects.equals(node.getHostId(), nodeId))
Expand All @@ -123,71 +135,84 @@ public void retryNodes()
}
}

if (unavailableNodes.isEmpty())
return unavailableNodes;
}

private void processNode(final Node node)
{
UUID nodeId = node.getHostId();
boolean nodeProcessed = false;

while (!nodeProcessed)
{
LOG.info("No unavailable nodes found.");
return;
nodeProcessed = handleNodeRetry(node, nodeId);
}
}

// Process each unavailable node
for (Node node : unavailableNodes)
private boolean handleNodeRetry(final Node node, final UUID nodeId)
{
long currentTime = System.currentTimeMillis();
RetryAttempt retryAttempt = myRetryAttempts.getOrDefault(nodeId, new RetryAttempt(0, currentTime));

LOG.info("Processing node: {}, attempt: {}", nodeId, retryAttempt.attempt());

if (retryAttempt.attempt() < myRetryPolicyConfig.getMaxAttempts())
{
return attemptReconnection(node, nodeId, retryAttempt, currentTime);
}
else
{
UUID nodeId = node.getHostId();
boolean nodeProcessed = false;
markNodeUnreachable(node, nodeId);
return true;
}
}

private boolean attemptReconnection(final Node node,
final UUID nodeId,
final RetryAttempt retryAttempt,
final long currentTime)
{
long delayMillis = calculateDelay(retryAttempt.attempt());
long nextRetryTime = retryAttempt.lastAttemptTime() + delayMillis;
long retryBufferMillis = Math.max(delayMillis / PERCENTAGE_OF_DELAY, MINIMUM_DELAY_LIMIT_MS);

while (!nodeProcessed)
if (currentTime >= (nextRetryTime - retryBufferMillis))
{
LOG.info("Attempting to reconnect to node: {}", nodeId);
if (attemptConnection(node))
{
long currentTime = System.currentTimeMillis();
RetryAttempt retryAttempt = myRetryAttempts.getOrDefault(nodeId, new RetryAttempt(0, currentTime));
LOG.info("Successfully reconnected to node: {}", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId);
myRetryAttempts.remove(nodeId); // Reset retry attempts on success
return true; // Node processed
}
else
{
LOG.warn("Failed to reconnect to node: {}, incrementing retry attempt.", nodeId);
myRetryAttempts.put(nodeId, new RetryAttempt(retryAttempt.attempt() + 1, currentTime));
sleepBeforeNextRetry(delayMillis);
}
}
return false; // Node not processed
}

LOG.info("Processing node: {}, attempt: {}", node.getHostId(), retryAttempt.attempt());
private void markNodeUnreachable(final Node node, final UUID nodeId)
{
LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId);
myRetryAttempts.remove(nodeId); // Remove entry after max attempts reached
}

if (retryAttempt.attempt() < myRetryPolicyConfig.getMaxAttempts())
{
long delayMillis = calculateDelay(retryAttempt.attempt());
long nextRetryTime = retryAttempt.lastAttemptTime() + delayMillis;

// Buffer to handle execution delays, minimum of 5000 ms or 10% of delay
long retryBufferMillis = Math.max(delayMillis / PERCENTAGE_OF_DELAY, MINIMUM_DELAY_LIMIT_MS);

if (currentTime >= (nextRetryTime - retryBufferMillis))
{
LOG.info("Attempting to reconnect to node: {}", nodeId);
boolean success = attemptConnection(node);

if (success)
{
LOG.info("Successfully reconnected to node: {}", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId);
myRetryAttempts.remove(nodeId); // Reset retry attempts on success
nodeProcessed = true; // Exit while loop
}
else
{
LOG.warn("Failed to reconnect to node: {}, incrementing retry attempt.", nodeId);
myRetryAttempts.put(nodeId, new RetryAttempt(retryAttempt.attempt() + 1, currentTime));

// Sleep for the delay before the next retry attempt
try
{
Thread.sleep(delayMillis);
}
catch (InterruptedException e)
{
LOG.error("Interrupted during sleep between retry attempts", e);
Thread.currentThread().interrupt(); // Preserve interruption status
}
}
}
}
else
{
LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId);
myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId);
myRetryAttempts.remove(nodeId); // Remove entry after max attempts reached
nodeProcessed = true; // Exit while loop
}
}
private void sleepBeforeNextRetry(final long delayMillis)
{
try
{
Thread.sleep(delayMillis);
}
catch (InterruptedException e)
{
LOG.error("Interrupted during sleep between retry attempts", e);
Thread.currentThread().interrupt(); // Preserve interruption status
}
}

Expand Down

0 comments on commit fa7d2e0

Please sign in to comment.