diff --git a/7199 b/7199
new file mode 100644
index 00000000..e69de29b
diff --git a/cassandra-test-image/pom.xml b/cassandra-test-image/pom.xml
index 2d13cd1a..bb657599 100644
--- a/cassandra-test-image/pom.xml
+++ b/cassandra-test-image/pom.xml
@@ -110,6 +110,7 @@
maven-jar-plugin
+ test-jar
test-jar
diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java
index 32551376..26f45e11 100644
--- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java
+++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java
@@ -115,7 +115,25 @@ public ConcurrentHashMap getJmxConnections()
@Override
public JMXConnector getJmxConnector(final UUID nodeID)
{
- return myJMXConnections.get(nodeID);
+ JMXConnector connector = myJMXConnections.get(nodeID);
+ if (isConnected(nodeID))
+ {
+ return connector;
+ }
+ LOG.info("Connection expired or disconnected with node Id {}", nodeID);
+ Node node = myNativeConnectionProvider.getNodes().get(nodeID);
+ try
+ {
+ LOG.info("Attempting to create JMX connection with node {}", node.getHostId());
+ myDistributedJmxBuilder.reconnect(node);
+ LOG.info("Connection created successfully with node {}", node.getHostId());
+ connector = myJMXConnections.get(nodeID);
+ }
+ catch (EcChronosException e)
+ {
+ LOG.error("Failed to connect to node {}: {}", node.getHostId(), e.getMessage());
+ }
+ return connector;
}
/**
@@ -144,7 +162,10 @@ public void close() throws IOException
@Override
public void close(final UUID nodeID) throws IOException
{
- myJMXConnections.get(nodeID).close();
+ if (myJMXConnections.get(nodeID) != null)
+ {
+ myJMXConnections.get(nodeID).close();
+ }
}
/**
diff --git a/pom.xml b/pom.xml
index 78dfd3ac..a619d9e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@
fault.manager
fault.manager.impl
rest
+ standalone-integration
diff --git a/standalone-integration/pom.xml b/standalone-integration/pom.xml
new file mode 100644
index 00000000..ea40ad3f
--- /dev/null
+++ b/standalone-integration/pom.xml
@@ -0,0 +1,234 @@
+
+
+
+ 4.0.0
+
+ com.ericsson.bss.cassandra.ecchronos
+ agent
+ 1.0.0-SNAPSHOT
+
+ standalone-integration
+ Integration tests for a standalone environment
+ EcChronos Standalone Integration
+
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ application
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection.impl
+ ${project.version}
+ test
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ cassandra-test-image
+ ${project.version}
+ tests
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ data
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core.impl
+ ${project.version}
+ test
+
+
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ net.jcip
+ jcip-annotations
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+ org.testcontainers
+ cassandra
+ test
+
+
+
+
+ org.slf4j
+ slf4j-api
+ test
+
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+
+ ch.qos.logback
+ logback-core
+ test
+
+
+
+
+
+
+
+ precommit.tests
+
+
+ standalone-integration-tests
+
+
+
+
+ maven-failsafe-plugin
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+ io.fabric8
+ docker-maven-plugin
+
+
+
+
+
+
+
+ localprecommit.tests
+
+
+ local-standalone-integration-tests
+
+
+
+ maven-failsafe-plugin
+
+
+ 127.0.0.1
+ 7100
+ 9042
+ true
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ ${java.version}
+ ${project.build.directory}/generated-sources/
+
+
+
+
+ maven-deploy-plugin
+
+ true
+
+
+
+
+ maven-install-plugin
+
+ true
+
+
+
+
+
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
new file mode 100644
index 00000000..9673d257
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
+import java.util.ArrayList;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.mockito.Mock;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@NotThreadSafe
+public class ITIncrementalOnDemandRepairJob extends TestBase
+{
+ private static final int DEFAULT_JOB_TIMEOUT_IN_SECONDS = 90;
+
+ @Mock
+ private static TableRepairMetrics mockTableRepairMetrics;
+ private static Metadata myMetadata;
+ private static HostStatesImpl myHostStates;
+ private static OnDemandRepairSchedulerImpl myOnDemandRepairSchedulerImpl;
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+ private static CASLockFactory myLockFactory;
+ private static TableReferenceFactory myTableReferenceFactory;
+ private static CassandraMetrics myCassandraMetrics;
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init()
+ {
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ myMetadata = mySession.getMetadata();
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(1, TimeUnit.SECONDS)
+ .build();
+
+ myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
+ Duration.ofSeconds(5), Duration.ofMinutes(30));
+
+ myOnDemandRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(mySession), mySession))
+ .withSession(mySession)
+ .withRepairConfiguration(RepairConfiguration.DEFAULT)
+ .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider()))
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ for (TableReference tableReference : myRepairs)
+ {
+ mySession.execute(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build());
+ for (Node node : myMetadata.getNodes().values())
+ {
+ mySession.execute(QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status")
+ .whereColumn("host_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ }
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ closeConnections();
+ }
+
+ public static void closeConnections()
+ {
+ if (myHostStates != null)
+ {
+ myHostStates.close();
+ }
+ if (myOnDemandRepairSchedulerImpl != null)
+ {
+ myOnDemandRepairSchedulerImpl.close();
+ }
+ if (myScheduleManagerImpl != null)
+ {
+ myScheduleManagerImpl.close();
+ }
+ if (myLockFactory != null)
+ {
+ myLockFactory.close();
+ }
+ }
+
+ @Test
+ public void repairSingleTable() throws Exception
+ {
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ getJmxConnectionProvider().add(node);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ long startTime = System.currentTimeMillis();
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(15, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ return maxRepairedAt < startTime && percentRepaired < 100.0d;
+ });
+
+ UUID jobId = triggerRepair(tableReference, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(15, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ return maxRepairedAt >= startTime && percentRepaired >= 100.0d;
+ });
+
+ verifyOnDemandCompleted(jobId, startTime, node.getHostId());
+ }
+
+ @Test
+ public void repairMultipleTables() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ getJmxConnectionProvider().add(node);
+ TableReference tableReference1 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ TableReference tableReference2 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_TWO_NAME);
+ insertSomeDataAndFlush(tableReference1, mySession, node);
+ insertSomeDataAndFlush(tableReference2, mySession, node);
+ UUID jobId1 = triggerRepair(tableReference1, node);
+ UUID jobId2 = triggerRepair(tableReference2, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+ verifyOnDemandCompleted(jobId1, startTime, node.getHostId());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference1), any(long.class), any(TimeUnit.class),
+ eq(true));
+ verifyOnDemandCompleted(jobId2, startTime, node.getHostId());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference2), any(long.class), any(TimeUnit.class),
+ eq(true));
+ }
+
+ @Test
+ public void repairSameTableTwice() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ getJmxConnectionProvider().add(node);
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ UUID jobId1 = triggerRepair(tableReference, node);
+ UUID jobId2 = triggerRepair(tableReference, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+ verifyOnDemandCompleted(jobId1, startTime, node.getHostId());
+ verifyOnDemandCompleted(jobId2, startTime, node.getHostId());
+ verify(mockTableRepairMetrics, times(2)).repairSession(eq(tableReference),
+ any(long.class), any(TimeUnit.class), eq(true));
+ }
+
+ private void verifyOnDemandCompleted(UUID jobId, long startTime, UUID hostId)
+ {
+ List completedJobs = myOnDemandRepairSchedulerImpl.getAllRepairJobs(hostId)
+ .stream()
+ .filter(j -> j.getId().equals(jobId))
+ .collect(Collectors.toList());
+ assertThat(completedJobs).hasSize(1);
+ assertThat(completedJobs.get(0).getCompletionTime()).isGreaterThanOrEqualTo(startTime);
+ }
+
+ private UUID triggerRepair(TableReference tableReference, Node node) throws EcChronosException
+ {
+ myRepairs.add(tableReference);
+ return myOnDemandRepairSchedulerImpl.scheduleJob(tableReference, RepairType.INCREMENTAL, node.getHostId()).getId();
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
new file mode 100644
index 00000000..4ed21e64
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.RepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import java.util.UUID;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+@NotThreadSafe
+public class ITIncrementalSchedules extends TestBase
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ITIncrementalSchedules.class);
+ private static final int DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS = 90;
+ private static final int CASSANDRA_METRICS_UPDATE_IN_SECONDS = 5;
+ private static RepairFaultReporter mockFaultReporter;
+ private static TableRepairMetrics mockTableRepairMetrics;
+ private static HostStatesImpl myHostStates;
+ private static RepairSchedulerImpl myRepairSchedulerImpl;
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+ private static CASLockFactory myLockFactory;
+ private static RepairConfiguration myRepairConfiguration;
+ private static TableReferenceFactory myTableReferenceFactory;
+ private static CassandraMetrics myCassandraMetrics;
+ protected static Metadata myMetadata;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init()
+ {
+ mockFaultReporter = mock(RepairFaultReporter.class);
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ myMetadata = mySession.getMetadata();
+
+ myTableReferenceFactory = new TableReferenceFactoryImpl(mySession);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(1, TimeUnit.SECONDS)
+ .build();
+
+ myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
+ Duration.ofSeconds(CASSANDRA_METRICS_UPDATE_IN_SECONDS), Duration.ofMinutes(30));
+
+ myRepairSchedulerImpl = RepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withFaultReporter(mockFaultReporter)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withCassandraMetrics(myCassandraMetrics)
+ .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(mySession), mySession))
+ .build();
+
+ myRepairConfiguration = RepairConfiguration.newBuilder()
+ .withRepairInterval(5, TimeUnit.SECONDS)
+ .withRepairType(RepairType.INCREMENTAL)
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ List> stages = new ArrayList<>();
+ for (TableReference tableReference : myRepairs)
+ {
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ myRepairSchedulerImpl.removeConfiguration(node, tableReference);
+
+ stages.add(mySession.executeAsync(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build()));
+ }
+ for (CompletionStage stage : stages)
+ {
+ CompletableFutures.getUninterruptibly(stage);
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ reset(mockFaultReporter);
+ closeConnections();
+ }
+
+ public static void closeConnections()
+ {
+ if (myHostStates != null)
+ {
+ myHostStates.close();
+ }
+ if (myRepairSchedulerImpl != null)
+ {
+ myRepairSchedulerImpl.close();
+ }
+ if (myScheduleManagerImpl != null)
+ {
+ myScheduleManagerImpl.close();
+ }
+ if (myLockFactory != null)
+ {
+ myLockFactory.close();
+ }
+ }
+
+ @Test
+ public void repairSingleTable() throws Exception
+ {
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ long startTime = System.currentTimeMillis();
+
+ // Wait for metrics to be updated, wait at least 3 times the update time for metrics (worst case scenario)
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(CASSANDRA_METRICS_UPDATE_IN_SECONDS * 3, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ LOG.info("Waiting for metrics to be updated, percentRepaired: {} maxRepairedAt: {}",
+ percentRepaired, maxRepairedAt);
+ return maxRepairedAt < startTime && percentRepaired < 100.0d;
+ });
+
+ // Create a schedule
+ addSchedule(tableReference, node);
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> getSchedule(tableReference).isPresent());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ LOG.info("Waiting for schedule to run, percentRepaired: {} maxRepairedAt: {}",
+ percentRepaired, maxRepairedAt);
+ return maxRepairedAt >= startTime && percentRepaired >= 100.0d;
+ });
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference),
+ any(long.class), any(TimeUnit.class), eq(true));
+ Optional view = getSchedule(tableReference);
+ assertThat(view).isPresent();
+ assertThat(view.get().getStatus()).isEqualTo(ScheduledRepairJobView.Status.COMPLETED);
+ assertThat(view.get().getCompletionTime()).isGreaterThanOrEqualTo(startTime);
+ assertThat(view.get().getProgress()).isGreaterThanOrEqualTo(1.0d);
+ }
+
+ private void addSchedule(TableReference tableReference, Node node)
+ {
+ if (myRepairs.add(tableReference))
+ {
+ myRepairSchedulerImpl.putConfigurations(node, tableReference, Collections.singleton(myRepairConfiguration));
+ }
+ }
+
+ private Optional getSchedule(TableReference tableReference)
+ {
+ return myRepairSchedulerImpl.getCurrentRepairJobs()
+ .stream()
+ .filter(s -> s.getRepairConfiguration().equals(myRepairConfiguration))
+ .filter(s -> s.getTableReference().equals(tableReference))
+ .findFirst();
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
new file mode 100644
index 00000000..3405d189
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry;
+import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import org.assertj.core.util.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.jcip.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class ITOnDemandRepairJob extends TestBase
+{
+
+ private static TableRepairMetrics mockTableRepairMetrics;
+
+ private static Metadata myMetadata;
+
+ private static HostStatesImpl myHostStates;
+
+ private static RepairHistoryService myEccRepairHistory;
+
+ private static OnDemandRepairSchedulerImpl myRepairSchedulerImpl;
+
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+
+ private static CASLockFactory myLockFactory;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init() throws IOException
+ {
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ myMetadata = mySession.getMetadata();
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ NodeResolver nodeResolver = new NodeResolverImpl(mySession);
+
+ ReplicationState replicationState = new ReplicationStateImpl(nodeResolver, mySession);
+
+ myEccRepairHistory = new RepairHistoryService(mySession, replicationState, nodeResolver, 2_592_000_000L);
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(100, TimeUnit.MILLISECONDS)
+ .build();
+
+ myRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withReplicationState(replicationState)
+ .withSession(mySession)
+ .withRepairConfiguration(RepairConfiguration.DEFAULT)
+ .withRepairHistory(myEccRepairHistory)
+ .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider()))
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ for (TableReference tableReference : myRepairs)
+ {
+ mySession.execute(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build());
+ for (Node node : myMetadata.getNodes().values())
+ {
+ mySession.execute(QueryBuilder.deleteFrom("ecchronos", "repair_history")
+ .whereColumn("table_id")
+ .isEqualTo(literal(tableReference.getId()))
+ .whereColumn("node_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ mySession.execute(QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status")
+ .whereColumn("host_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ }
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ closeConnections();
+ }
+
+ private static void closeConnections()
+ {
+ myHostStates.close();
+ myRepairSchedulerImpl.close();
+ myScheduleManagerImpl.close();
+ myLockFactory.close();
+ }
+
+ /**
+ * Create a table that is replicated and schedule a repair on it
+ */
+ @Test
+ public void repairSingleTable() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+
+ schedule(tableReference, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, node);
+ }
+
+ /**
+ * Create two tables that are replicated and repair both
+ **/
+
+ @Test
+ public void repairMultipleTables() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ TableReference tableReference2 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_TWO_NAME);
+
+ schedule(tableReference, node.getHostId());
+ schedule(tableReference2, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, node);
+ verifyTableRepairedSince(tableReference2, startTime, node);
+ }
+
+ /**
+ * Schedule two jobs on the same table
+ **/
+
+ @Test
+ public void repairSameTableTwice() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+
+ schedule(tableReference, node.getHostId());
+ schedule(tableReference, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, tokenRangesFor(tableReference.getKeyspace(), node).size() * 2, node);
+ }
+
+ private void schedule(TableReference tableReference, UUID nodeId) throws EcChronosException
+ {
+ myRepairs.add(tableReference);
+ myRepairSchedulerImpl.scheduleJob(tableReference, RepairType.VNODE, nodeId);
+ }
+
+ private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, Node node)
+ {
+ verifyTableRepairedSince(tableReference, repairedSince, tokenRangesFor(tableReference.getKeyspace(), node).size(), node);
+ }
+
+ private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, int expectedTokenRanges, Node node)
+ {
+ OptionalLong repairedAtWithCassandraHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node);
+ assertThat(repairedAtWithCassandraHistory.isPresent()).isTrue();
+
+ OptionalLong repairedAtWithEccHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node);
+ assertThat(repairedAtWithEccHistory.isPresent()).isTrue();
+
+ verify(mockTableRepairMetrics, times(expectedTokenRanges)).repairSession(eq(tableReference), anyLong(),
+ any(TimeUnit.class), eq(true));
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference,
+ long repairedSince,
+ RepairHistoryService repairHistoryProvider,
+ Node node)
+ {
+ return lastRepairedSince(tableReference,
+ repairedSince,
+ tokenRangesFor(tableReference.getKeyspace(), node),
+ repairHistoryProvider,
+ node);
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference,
+ long repairedSince,
+ Set expectedRepaired,
+ RepairHistoryService repairHistoryProvider,
+ Node node)
+ {
+ Set expectedRepairedCopy = new HashSet<>(expectedRepaired);
+ Iterator repairEntryIterator = repairHistoryProvider.iterate(node, tableReference,
+ System.currentTimeMillis(), repairedSince,
+ repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange()));
+
+ List repairEntries = Lists.newArrayList(repairEntryIterator);
+
+ Set actuallyRepaired = repairEntries.stream()
+ .map(RepairEntry::getRange)
+ .collect(Collectors.toSet());
+
+ if (expectedRepaired.equals(actuallyRepaired))
+ {
+ return repairEntries.stream().mapToLong(RepairEntry::getStartedAt).min();
+ }
+ else
+ {
+ return OptionalLong.empty();
+ }
+ }
+
+ private Set tokenRangesFor(String keyspace, Node node)
+ {
+ return myMetadata.getTokenMap()
+ .get()
+ .getTokenRanges(keyspace, node)
+ .stream()
+ .map(this::convertTokenRange)
+ .collect(Collectors.toSet());
+ }
+
+ private boolean fullyRepaired(RepairEntry repairEntry)
+ {
+ return repairEntry.getParticipants().size() == 2 && repairEntry.getStatus() == RepairStatus.SUCCESS;
+ }
+
+ private LongTokenRange convertTokenRange(TokenRange range)
+ {
+ // Assuming murmur3 partitioner
+ long start = ((Murmur3Token) range.getStart()).getValue();
+ long end = ((Murmur3Token) range.getEnd()).getValue();
+ return new LongTokenRange(start, end);
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java
new file mode 100644
index 00000000..28ea8f8f
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import cassandracluster.AbstractCassandraCluster;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.ericsson.bss.cassandra.ecchronos.application.config.ConfigurationHelper;
+import com.ericsson.bss.cassandra.ecchronos.application.config.security.Credentials;
+import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security;
+import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;
+import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedJmxConnectionProviderImpl;
+import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.AfterClass;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.remote.JMXConnector;
+import java.io.IOException;
+import java.util.UUID;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@NotThreadSafe
+abstract public class TestBase extends AbstractCassandraCluster
+{
+ protected static final Logger LOG = LoggerFactory.getLogger(TestBase.class);
+ protected static final String SECURITY_FILE = "security.yml";
+ protected static final String ECCHRONOS_KEYSPACE = "ecchronos";
+ protected static final String TEST_KEYSPACE = "test_keyspace";
+ protected static final String TEST_TABLE_ONE_NAME = "test_table1";
+ protected static final String TEST_TABLE_TWO_NAME = "test_table2";
+ private static final String ON_DEMAND_REPAIR_STATUS_TABLE = "on_demand_repair_status";
+ private static final String ECCHRONOS_ID = "EcchronosID";
+ protected static final String ECCHRONOS_KEYSPACE_QUERY =
+ String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}",
+ ECCHRONOS_KEYSPACE);
+ protected static final String TEST_KEYSPACE_QUERY =
+ String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}",
+ TEST_KEYSPACE);
+ private static final String NODE_SYNC_TABLE_QUERY = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.nodes_sync(ecchronos_id TEXT, datacenter_name TEXT, node_id UUID, node_endpoint TEXT, node_status TEXT, last_connection TIMESTAMP, next_connection TIMESTAMP, PRIMARY KEY(ecchronos_id, datacenter_name, node_id)) WITH CLUSTERING ORDER BY(datacenter_name DESC, node_id DESC);",
+ ECCHRONOS_KEYSPACE);
+ private static final String LOCK_TABLE_QUERY = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.lock (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE);
+ private static final String LOCK_PRIORITY_TABLE_QUERY = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.lock_priority (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE);
+ private static final String REFERENCE_TABLE_QUERY = String
+ .format("CREATE TYPE IF NOT EXISTS %s.table_reference (id uuid, keyspace_name text, table_name text)", ECCHRONOS_KEYSPACE);
+ private static final String TOKEN_RANGE_TABLE_QUERY =
+ String.format("CREATE TYPE IF NOT EXISTS %s.token_range (start text, end text)", ECCHRONOS_KEYSPACE);
+ private static final String ON_DEMAND_REPAIR_STATUS_TABLE_QUERY = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s (host_id uuid, job_id uuid, table_reference frozen, token_map_hash int, repaired_tokens frozen>>, status text, completed_time timestamp, repair_type text, PRIMARY KEY(host_id, job_id)) WITH default_time_to_live = 2592000 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE, ON_DEMAND_REPAIR_STATUS_TABLE);
+ private static final String REPAIR_TABLE_QUERY = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.repair_history(table_id UUID, node_id UUID, repair_id timeuuid, job_id UUID, coordinator_id UUID, range_begin text, range_end text, participants set, status text, started_at timestamp, finished_at timestamp, PRIMARY KEY((table_id,node_id), repair_id)) WITH CLUSTERING ORDER BY (repair_id DESC);",
+ ECCHRONOS_KEYSPACE);
+ private static final String TEST_TABLE_QUERY =
+ String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", TEST_KEYSPACE, TEST_TABLE_ONE_NAME);
+ private static final String TEST_TABLE_TWO_QUERY =
+ String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", TEST_KEYSPACE, TEST_TABLE_TWO_NAME);
+ protected static final int DEFAULT_INSERT_DATA_COUNT = 1000;
+ private static final int NUMBER_OF_RETRIES = 100;
+
+ private static DistributedNativeConnectionProvider myNativeConnectionProvider;
+ private static DistributedJmxConnectionProvider myJmxConnectionProvider;
+ private static DistributedJmxProxyFactoryImpl myJmxProxyFactory;
+ protected static EccNodesSync myEccNodesSync;
+ protected static TableReferenceFactory myTableReferenceFactory;
+ protected static final AtomicReference jmxSecurity = new AtomicReference<>();
+
+ @BeforeClass
+ public static void setUpCluster() throws Exception
+ {
+ createKeyspaceAndTables();
+ Security security = ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class);
+ jmxSecurity.set(security.getJmxSecurity());
+ Map nodesList = new HashMap<>(mySession.getMetadata().getNodes());
+ List contactPoints = new ArrayList<>();
+
+ for (Node node : nodesList.values())
+ {
+ String hostname = node.getBroadcastRpcAddress().get().getHostName();
+ int port = node.getBroadcastRpcAddress().get().getPort();
+ contactPoints.add(new InetSocketAddress(hostname, port));
+ }
+
+ DistributedNativeBuilder distributedNativeBuilder = DistributedNativeConnectionProviderImpl.builder()
+ .withInitialContactPoints(contactPoints)
+ .withAgentType(ConnectionType.hostAware)
+ .withHostAware(contactPoints);
+
+ myNativeConnectionProvider =
+ new DistributedNativeConnectionProviderImpl(mySession, nodesList, distributedNativeBuilder, ConnectionType.hostAware);
+ myEccNodesSync = EccNodesSync.newBuilder()
+ .withSession(mySession)
+ .withNativeConnection(myNativeConnectionProvider)
+ .withEcchronosID(ECCHRONOS_ID)
+ .build();
+
+ Supplier credentials = () -> convertCredentials(jmxSecurity::get);
+ Supplier