From fb5b2395be632283224b78561c73a01f6bfd5771 Mon Sep 17 00:00:00 2001 From: sajid riaz Date: Tue, 3 Sep 2024 13:30:05 +0200 Subject: [PATCH] Retry Policy for Jmx Connection #700 --- .../config/connection/ConnectionConfig.java | 8 +- .../connection/DistributedJmxConnection.java | 22 +- .../config/connection/RetryPolicyConfig.java | 122 +++++++++ .../application/spring/BeanConfigurator.java | 9 + .../spring/RetrySchedulerService.java | 236 ++++++++++++++++++ application/src/main/resources/ecc.yml | 12 + .../application/config/TestConfig.java | 59 +++-- application/src/test/resources/all_set.yml | 6 + .../impl/builders/DistributedJmxBuilder.java | 2 +- .../ecchronos/data/enums/NodeStatus.java | 3 +- .../ecchronos/data/sync/EccNodesSync.java | 19 +- .../ecchronos/data/sync/TestEccNodesSync.java | 17 ++ 12 files changed, 487 insertions(+), 28 deletions(-) create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java create mode 100644 application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java index 7ad8de2e1..e1fac2dbc 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/ConnectionConfig.java @@ -21,19 +21,19 @@ public class ConnectionConfig private DistributedNativeConnection myCqlConnection = new DistributedNativeConnection(); private DistributedJmxConnection myJmxConnection = new DistributedJmxConnection(); - @JsonProperty("cql") + @JsonProperty ("cql") public final DistributedNativeConnection getCqlConnection() { return myCqlConnection; } - @JsonProperty("jmx") + @JsonProperty ("jmx") public final DistributedJmxConnection getJmxConnection() { return myJmxConnection; } - @JsonProperty("cql") + @JsonProperty ("cql") public final void setCqlConnection(final DistributedNativeConnection cqlConnection) { if (cqlConnection != null) @@ -42,7 +42,7 @@ public final void setCqlConnection(final DistributedNativeConnection cqlConnecti } } - @JsonProperty("jmx") + @JsonProperty ("jmx") public final void setJmxConnection(final DistributedJmxConnection jmxConnection) { if (jmxConnection != null) diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java index 1e4548791..4ec831a48 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/DistributedJmxConnection.java @@ -19,10 +19,14 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.function.Supplier; public class DistributedJmxConnection extends Connection { + private RetryPolicyConfig myRetryPolicyConfig = new RetryPolicyConfig(); + public DistributedJmxConnection() { try @@ -35,6 +39,18 @@ public DistributedJmxConnection() } } + @JsonProperty ("retryPolicy") + public final RetryPolicyConfig getRetryPolicyConfig() + { + return myRetryPolicyConfig; + } + + @JsonProperty ("retryPolicy") + public final void setRetryPolicyConfig(final RetryPolicyConfig retryPolicyConfig) + { + myRetryPolicyConfig = retryPolicyConfig; + } + /** * @return */ @@ -42,9 +58,9 @@ public DistributedJmxConnection() protected Class[] expectedConstructor() { return new Class[] { - Supplier.class, - DistributedNativeConnectionProvider.class, - EccNodesSync.class + Supplier.class, + DistributedNativeConnectionProvider.class, + EccNodesSync.class }; } } diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java new file mode 100644 index 000000000..1a0e4a8f9 --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/connection/RetryPolicyConfig.java @@ -0,0 +1,122 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.connection; + +import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.annotation.JsonProperty; + +public final class RetryPolicyConfig +{ + + private static final int DEFAULT_MAX_ATTEMPTS = 5; + private static final long DEFAULT_DELAY = 5000; + private static final long DEFAULT_MAX_DELAY = 30000; + + @JsonProperty ("maxAttempts") + private Integer myMaxAttempts = DEFAULT_MAX_ATTEMPTS; + + @JsonProperty ("delay") + private long myDelay = DEFAULT_DELAY; + + @JsonProperty ("maxDelay") + private long myMaxDelay = DEFAULT_MAX_DELAY; + + @JsonProperty ("unit") + private String myUnit = "seconds"; // Default to seconds + + public RetryPolicyConfig() + { + } + + public RetryPolicyConfig(final Integer maxAttempts, final Integer delay, final Integer maxDelay, final String unit) + { + this.myMaxAttempts = maxAttempts; + this.myDelay = convertToMillis(delay, unit); + this.myMaxDelay = convertToMillis(maxDelay, unit); + this.myUnit = unit; + } + + @JsonProperty ("maxAttempts") + public Integer getMaxAttempts() + { + return myMaxAttempts; + } + + @JsonProperty ("maxAttempts") + public void setMaxAttempts(final Integer maxAttempts) + { + this.myMaxAttempts = maxAttempts; + } + + @JsonProperty ("delay") + public long getDelay() + { + return myDelay; + } + + @JsonProperty ("delay") + public void setDelay(final Integer delay) + { + this.myDelay = convertToMillis(delay, myUnit); + } + + @JsonProperty ("maxDelay") + public long getMaxDelay() + { + return myMaxDelay; + } + + @JsonProperty ("maxDelay") + public void setMaxDelay(final Integer maxDelay) + { + this.myMaxDelay = convertToMillis(maxDelay, myUnit); + } + + @JsonProperty ("unit") + public String getUnit() + { + return myUnit; + } + + @JsonProperty ("unit") + public void setUnit(final String unit) + { + this.myUnit = unit; + // Recalculate delays with the new unit + this.myDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myDelay), unit); + this.myMaxDelay = convertToMillis((int) TimeUnit.MILLISECONDS.toSeconds(this.myMaxDelay), unit); + } + + private long convertToMillis(final Integer value, final String unit) + { + return switch (unit.toLowerCase()) + { + case "milliseconds" -> value; + case "seconds" -> TimeUnit.SECONDS.toMillis(value); + case "minutes" -> TimeUnit.MINUTES.toMillis(value); + default -> throw new IllegalArgumentException("Unsupported time unit: " + unit); + }; + } + + public long currentDelay(final Integer count) + { + long currentDelay = myDelay * count; + if (currentDelay > myMaxDelay) + { + currentDelay = myMaxDelay; + } + return currentDelay; + } +} diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java index 57e449bfb..d40401fd2 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/BeanConfigurator.java @@ -208,6 +208,15 @@ public DistributedJmxConnectionProvider distributedJmxConnectionProvider( jmxSecurity::get, distributedNativeConnectionProvider, eccNodesSync); } + @Bean + public RetrySchedulerService retrySchedulerService(final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final EccNodesSync eccNodesSync, + final DistributedNativeConnectionProvider distributedNativeConnectionProvider) + { + return new RetrySchedulerService(eccNodesSync, config, jmxConnectionProvider, distributedNativeConnectionProvider); + } + private Security getSecurityConfig() throws ConfigurationException { return ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java new file mode 100644 index 000000000..81eae03bb --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/RetrySchedulerService.java @@ -0,0 +1,236 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.spring; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.RetryPolicyConfig; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.data.enums.NodeStatus; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import jakarta.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Service; + +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Service responsible for managing and scheduling retry attempts to reconnect to Cassandra nodes that have become unavailable. + *

+ * This service periodically checks the status of nodes and attempts to reconnect based on a configurable retry policy. + * It uses a scheduled executor service to perform retries at fixed intervals, with the intervals and the retry logic + * configurable via external configurations. + *

+ * + *

+ * The retry logic involves calculating the delay between attempts, which increases with each subsequent retry for a node. + * If the maximum number of retry attempts is reached, the node is marked as unreachable. + *

+ * + *

+ * This service is designed to run continuously in the background, adjusting its behavior based on the state of the + * Cassandra cluster and the provided configurations. It also ensures that resources are properly cleaned up on shutdown. + *

+ */ + +@Service +public final class RetrySchedulerService implements DisposableBean +{ + private static final Logger LOG = LoggerFactory.getLogger(RetrySchedulerService.class); + private static final String COLUMN_DC_NAME = "datacenter_name"; + private static final String COLUMN_NODE_ID = "node_id"; + private static final String COLUMN_NODE_ENDPOINT = "node_endpoint"; + private static final String COLUMN_NODE_STATUS = "node_status"; + private final EccNodesSync myEccNodesSync; + private final DistributedJmxConnectionProvider myJmxConnectionProvider; + private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider; + private final RetryPolicyConfig myRetryPolicyConfig; + private final Map myRetryAttempts = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + public RetrySchedulerService(final EccNodesSync eccNodesSync, + final Config config, + final DistributedJmxConnectionProvider jmxConnectionProvider, + final DistributedNativeConnectionProvider distributedNativeConnectionProvider) + { + myEccNodesSync = eccNodesSync; + myJmxConnectionProvider = jmxConnectionProvider; + myDistributedNativeConnectionProvider = distributedNativeConnectionProvider; + myRetryPolicyConfig = config.getConnectionConfig().getJmxConnection().getRetryPolicyConfig(); + } + + @PostConstruct + public void startScheduler() + { + long initialDelay = myRetryPolicyConfig.getDelay(); + long fixedDelay = myRetryPolicyConfig.getMaxDelay(); + + LOG.info("Starting RetrySchedulerService with initialDelay={} ms and fixedDelay={} ms", initialDelay, fixedDelay); + + scheduler.scheduleWithFixedDelay(this::retryNodes, initialDelay, fixedDelay, TimeUnit.MILLISECONDS); + } + + public void retryNodes() + { + LOG.debug("Retrying unavailable nodes"); + List unavailableNodes = new ArrayList<>(); + ResultSet resultSet = myEccNodesSync.getResultSet(); + + // Process the results and filter out AVAILABLE nodes + for (Row row : resultSet) + { + UUID nodeId = row.getUuid(COLUMN_NODE_ID); + String nodeEndpoint = row.getString(COLUMN_NODE_ENDPOINT); + String datacenter = row.getString(COLUMN_DC_NAME); + String status = row.getString(COLUMN_NODE_STATUS); + + // Only add nodes that are not AVAILABLE + if (!NodeStatus.AVAILABLE.name().equals(status)) + { + // Find the corresponding Node object in the existing nodes list + myDistributedNativeConnectionProvider.getNodes() + .stream() + .filter(node -> Objects.equals(node.getHostId(), nodeId)) + .findFirst() + .ifPresent(unavailableNodes::add); + } + } + + if (unavailableNodes.isEmpty()) + { + LOG.info("No unavailable nodes found."); + return; + } + + for (Node node : unavailableNodes) + { + UUID nodeId = node.getHostId(); + RetryAttempt retryAttempt = myRetryAttempts.getOrDefault(nodeId, new RetryAttempt(0, System.currentTimeMillis())); + + LOG.info("Processing node: {}, attempt: {}", node.getHostId(), retryAttempt.attempt()); + + if (retryAttempt.attempt() < myRetryPolicyConfig.getMaxAttempts()) + { + long nextRetryTime = retryAttempt.lastAttemptTime() + calculateDelay(retryAttempt.attempt()); + + if (System.currentTimeMillis() >= nextRetryTime) + { + LOG.info("Attempting to reconnect to node: {}", nodeId); + boolean success = attemptConnection(node); + + if (success) + { + LOG.info("Successfully reconnected to node: {}", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.AVAILABLE, node.getDatacenter(), nodeId); + myRetryAttempts.remove(nodeId); // Reset retry attempts on success + } + else + { + LOG.warn("Failed to reconnect to node: {}, incrementing retry attempt.", nodeId); + myRetryAttempts.put(nodeId, new RetryAttempt(retryAttempt.attempt() + 1, System.currentTimeMillis())); + } + } + } + else + { + LOG.error("Max retry attempts reached for node: {}. Marking as UNREACHABLE.", nodeId); + myEccNodesSync.updateNodeStatus(NodeStatus.UNREACHABLE, node.getDatacenter(), nodeId); + myRetryAttempts.remove(nodeId); // Remove entry after max attempts reached + } + } + } + + @Override + public void destroy() + { + LOG.info("Shutting down RetrySchedulerService..."); + + scheduler.shutdown(); + try + { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) + { + LOG.warn("Scheduler did not terminate within the timeout. Attempting to force shutdown..."); + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) + { + LOG.error("Scheduler did not terminate after force shutdown."); + } + } + LOG.info("RetrySchedulerService shut down complete."); + } + catch (InterruptedException e) + { + LOG.error("Interrupted during shutdown. Forcing shutdown now...", e); + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private long calculateDelay(int attempt) + { + long calculatedDelay = myRetryPolicyConfig.getDelay() * (attempt + 1) * 2; + LOG.debug("Calculated delay for attempt {}: {} ms", attempt, calculatedDelay); + return Math.min(calculatedDelay, myRetryPolicyConfig.getMaxDelay()); + } + + private boolean attemptConnection(final Node node) + { + UUID nodeId = node.getHostId(); + JMXConnector jmxConnector = myJmxConnectionProvider.getJmxConnector(nodeId); + boolean isConnected = jmxConnector != null && isConnected(jmxConnector); + + if (isConnected) + { + myJmxConnectionProvider.getJmxConnections().put(nodeId, jmxConnector); + LOG.info("Node {} connected successfully.", nodeId); + } + else + { + LOG.warn("Failed to connect to node {}.", nodeId); + } + + return isConnected; + } + + private boolean isConnected(final JMXConnector jmxConnector) + { + try + { + jmxConnector.getConnectionId(); + return true; + } + catch (IOException e) + { + LOG.error("Error while checking connection for JMX connector", e); + return false; + } + } + + // Helper class to track retry attempts and last attempt time + private record RetryAttempt(int attempt, long lastAttemptTime) { + } +} diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 34a0f9701..c463a327b 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -93,6 +93,18 @@ connection: ## The default provider will be used unless another is specified. ## provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + ## Max number of attempts ecChronos will try to connect with Cassandra. + maxAttempts: 5 + ## Delay use to wait between an attempt and another, this value will be multiplied by the current attempt count powered by two. + ## If the current attempt is 4 and the default delay is 5 seconds, so ((4(attempt) x 2) x 5(default delay)) = 40 seconds. + ## If the calculated delay is greater than maxDelay, maxDelay will be used instead of the calculated delay. + delay: 5 + ## Maximum delay before the next connection attempt is made. + ## Setting it as 0 will disable maxDelay and the delay interval will + ## be calculated based on the attempt count and the default delay. + maxDelay: 30 + unit: seconds rest_server: ## diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index f299ebe48..e73cfb30c 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -14,10 +14,9 @@ */ package com.ericsson.bss.cassandra.ecchronos.application.config; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.AgentConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.ConnectionConfig; -import com.ericsson.bss.cassandra.ecchronos.application.config.connection.DistributedNativeConnection; +import com.ericsson.bss.cassandra.ecchronos.application.config.connection.*; import com.ericsson.bss.cassandra.ecchronos.application.exceptions.ConfigurationException; +import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwarePolicy; import com.fasterxml.jackson.core.exc.StreamReadException; @@ -32,6 +31,7 @@ import java.io.IOException; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; public class TestConfig @@ -39,6 +39,7 @@ public class TestConfig private static final String DEFAULT_AGENT_FILE_NAME = "all_set.yml"; private static Config config; private static DistributedNativeConnection nativeConnection; + private static DistributedJmxConnection distributedJmxConnection; @Before public void setup() throws StreamReadException, DatabindException, IOException @@ -54,6 +55,7 @@ public void setup() throws StreamReadException, DatabindException, IOException ConnectionConfig connection = config.getConnectionConfig(); nativeConnection = connection.getCqlConnection(); + distributedJmxConnection = connection.getJmxConnection(); } @Test @@ -85,7 +87,8 @@ public void testDefaultDatacenterAware() .getAgentConnectionConfig() .getDatacenterAware() .getDatacenters() - .get("datacenter1").getName()).isEqualTo("datacenter1"); + .get("datacenter1") + .getName()).isEqualTo("datacenter1"); } @Test @@ -95,9 +98,9 @@ public void testDefaultRackAware() assertThat(nativeConnection .getAgentConnectionConfig() .getRackAware() - .getRacks().get("rack1") - .getDatacenterName() - ).isEqualTo("datacenter1"); + .getRacks() + .get("rack1") + .getDatacenterName()).isEqualTo("datacenter1"); } @Test @@ -106,26 +109,34 @@ public void testDefaultHostAware() assertThat(nativeConnection.getAgentConnectionConfig().getHostAware()).isNotNull(); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.1").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.1") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.2").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.2") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.3").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.3") + .getPort()) .isEqualTo(9042); assertThat(nativeConnection .getAgentConnectionConfig() - .getHostAware().getHosts() - .get("127.0.0.4").getPort()) + .getHostAware() + .getHosts() + .get("127.0.0.4") + .getPort()) .isEqualTo(9042); } @@ -139,7 +150,8 @@ public void testAgentProviderConfig() @Test public void testConfigurationExceptionForWrongAgentType() { - assertThrows(ConfigurationException.class, () -> { + assertThrows(ConfigurationException.class, () -> + { nativeConnection.getAgentConnectionConfig().setType("wrongType"); }); } @@ -156,4 +168,17 @@ public void testDefaultLoadBalancingPolicy() { assertThat(nativeConnection.getAgentConnectionConfig().getDatacenterAwarePolicy()).isEqualTo(DataCenterAwarePolicy.class); } -} \ No newline at end of file + + @Test + public void testRetryPolicyConfig() + { + Class providerClass = distributedJmxConnection.getProviderClass(); + assertThat(providerClass).isEqualTo(AgentJmxConnectionProvider.class); + RetryPolicyConfig retryPolicyConfig = distributedJmxConnection.getRetryPolicyConfig(); + assertNotNull(retryPolicyConfig); + assertThat(5).isEqualTo(retryPolicyConfig.getMaxAttempts()); + assertThat(5000).isEqualTo(retryPolicyConfig.getDelay()); + assertThat(30000).isEqualTo(retryPolicyConfig.getMaxDelay()); + assertThat("seconds").isEqualTo(retryPolicyConfig.getUnit()); + } +} diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index e7bd0aad3..25092c75d 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -43,6 +43,12 @@ connection: port: 9042 provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider jmx: + provider: com.ericsson.bss.cassandra.ecchronos.application.providers.AgentJmxConnectionProvider + retryPolicy: + maxAttempts: 5 + delay: 5 + maxDelay: 30 + unit: seconds rest_server: host: 127.0.0.2 diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java index 948d2395d..2089c91cb 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/builders/DistributedJmxBuilder.java @@ -245,7 +245,7 @@ private Integer getJMXPort(final Node node) .builder("SELECT value FROM system_views.system_properties WHERE name = 'cassandra.jmx.remote.port';") .setNode(node) .build(); - Row row = mySession.execute(simpleStatement).one(); + Row row = null;// mySession.execute(simpleStatement).one(); if (row != null) { return Integer.parseInt(Objects.requireNonNull(row.getString("value"))); diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java index 706a5998a..845b35e1e 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/enums/NodeStatus.java @@ -20,5 +20,6 @@ public enum NodeStatus { UNAVAILABLE, - AVAILABLE + AVAILABLE, + UNREACHABLE } diff --git a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java index 4b70b0878..56fb25058 100644 --- a/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java +++ b/data/src/main/java/com/ericsson/bss/cassandra/ecchronos/data/sync/EccNodesSync.java @@ -16,9 +16,10 @@ import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; @@ -29,14 +30,16 @@ import java.net.UnknownHostException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; /** * CQL Definition for nodes_sync table. CREATE TABLE ecchronos_agent.nodes_sync ( ecchronos_id TEXT, datacenter_name @@ -66,6 +69,7 @@ public final class EccNodesSync private final PreparedStatement myCreateStatement; private final PreparedStatement myUpdateStatusStatement; + private final PreparedStatement mySelectStatusStatement; private EccNodesSync(final Builder builder) throws UnknownHostException { @@ -91,9 +95,20 @@ private EccNodesSync(final Builder builder) throws UnknownHostException .whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + mySelectStatusStatement = mySession.prepare(selectFrom(KEYSPACE_NAME, TABLE_NAME) + .columns(COLUMN_NODE_ID, COLUMN_NODE_ENDPOINT, COLUMN_DC_NAME, COLUMN_NODE_STATUS) + .whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker()) + .build()); ecChronosID = builder.myEcchronosID; } + public ResultSet getResultSet() + { + // Bind the parameters + BoundStatement boundStatement = mySelectStatusStatement.bind(ecChronosID); + return mySession.execute(boundStatement); + } + public void acquireNodes() throws EcChronosException { if (myNodesList.isEmpty()) diff --git a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java index daf0ea662..8781cbc30 100644 --- a/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java +++ b/data/src/test/java/com/ericsson/bss/cassandra/ecchronos/data/sync/TestEccNodesSync.java @@ -25,6 +25,7 @@ import net.jcip.annotations.NotThreadSafe; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.net.UnknownHostException; @@ -145,4 +146,20 @@ public void testEccNodesWithNullSession() NullPointerException.class, tmpEccNodesSyncBuilder::build); assertEquals("Session cannot be null", exception.getMessage()); } + + @Ignore + @Test + public void testGetUnavailableNodes() + { + // Setup test data in the table with mixed statuses + // Insert some nodes with AVAILABLE status + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.1", NodeStatus.AVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + + // Insert some nodes with other statuses + eccNodesSync.verifyInsertNodeInfo(datacenterName, "127.0.0.2", NodeStatus.UNAVAILABLE.name(), Instant.now(), Instant.now().plus(30, ChronoUnit.MINUTES), UUID.randomUUID()); + + // Call the method + ResultSet resultSet = eccNodesSync.getResultSet(); + assertNotNull(resultSet); + } }