diff --git a/7199 b/7199
new file mode 100644
index 000000000..e69de29bb
diff --git a/cassandra-test-image/pom.xml b/cassandra-test-image/pom.xml
index 2d13cd1a4..bb6575991 100644
--- a/cassandra-test-image/pom.xml
+++ b/cassandra-test-image/pom.xml
@@ -110,6 +110,7 @@
maven-jar-plugin
+ test-jar
test-jar
diff --git a/pom.xml b/pom.xml
index 05f597f43..a000af16d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,7 @@
utils
fault.manager
fault.manager.impl
+ standalone-integration
diff --git a/standalone-integration/pom.xml b/standalone-integration/pom.xml
new file mode 100644
index 000000000..03245fb7b
--- /dev/null
+++ b/standalone-integration/pom.xml
@@ -0,0 +1,228 @@
+
+
+
+ 4.0.0
+
+ com.ericsson.bss.cassandra.ecchronos
+ agent
+ 1.0.0-SNAPSHOT
+
+ standalone-integration
+ Integration tests for a standalone environment
+ EcChronos Standalone Integration
+
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection
+ ${project.version}
+ test
+
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ connection.impl
+ ${project.version}
+ test
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ cassandra-test-image
+ ${project.version}
+ tests
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ data
+ 1.0.0-SNAPSHOT
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core
+ ${project.version}
+ test
+
+
+
+ com.ericsson.bss.cassandra.ecchronos
+ core.impl
+ ${project.version}
+ test
+
+
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ net.jcip
+ jcip-annotations
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+ org.testcontainers
+ cassandra
+ test
+
+
+
+
+ org.slf4j
+ slf4j-api
+ test
+
+
+
+ ch.qos.logback
+ logback-classic
+ test
+
+
+
+ ch.qos.logback
+ logback-core
+ test
+
+
+
+
+
+
+
+ precommit.tests
+
+
+ standalone-integration-tests
+
+
+
+
+ maven-failsafe-plugin
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+ io.fabric8
+ docker-maven-plugin
+
+
+
+
+
+
+
+ localprecommit.tests
+
+
+ local-standalone-integration-tests
+
+
+
+ maven-failsafe-plugin
+
+
+ 127.0.0.1
+ 7100
+ 9042
+ true
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ ${java.version}
+ ${project.build.directory}/generated-sources/
+
+
+
+
+ maven-deploy-plugin
+
+ true
+
+
+
+
+ maven-install-plugin
+
+ true
+
+
+
+
+
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
new file mode 100644
index 000000000..c92fd32fa
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
+import java.util.ArrayList;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.mockito.Mock;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@NotThreadSafe
+public class ITIncrementalOnDemandRepairJob extends TestBase
+{
+ private static final int DEFAULT_JOB_TIMEOUT_IN_SECONDS = 90;
+
+ @Mock
+ protected static TableRepairMetrics mockTableRepairMetrics;
+
+ protected static Metadata myMetadata;
+
+ protected static HostStatesImpl myHostStates;
+
+ protected static OnDemandRepairSchedulerImpl myOnDemandRepairSchedulerImpl;
+
+ protected static ScheduleManagerImpl myScheduleManagerImpl;
+
+ protected static CASLockFactory myLockFactory;
+
+ protected static TableReferenceFactory myTableReferenceFactory;
+ protected static CassandraMetrics myCassandraMetrics;
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init()
+ {
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ CqlSession session = getNativeConnectionProvider().getCqlSession();
+ myMetadata = session.getMetadata();
+
+ myTableReferenceFactory = new TableReferenceFactoryImpl(session);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(1, TimeUnit.SECONDS)
+ .build();
+
+ myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
+ Duration.ofSeconds(5), Duration.ofMinutes(30));
+
+ myOnDemandRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(session), session))
+ .withSession(session)
+ .withRepairConfiguration(RepairConfiguration.DEFAULT)
+ .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider()))
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ for (TableReference tableReference : myRepairs)
+ {
+ getNativeConnectionProvider().getCqlSession()
+ .execute(
+ QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build());
+ for (Node node : myMetadata.getNodes().values())
+ {
+ getNativeConnectionProvider().getCqlSession()
+ .execute(
+ QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status")
+ .whereColumn("host_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ }
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ closeConnections();
+ }
+
+ public static void closeConnections()
+ {
+ if (myHostStates != null)
+ {
+ myHostStates.close();
+ }
+ if (myOnDemandRepairSchedulerImpl != null)
+ {
+ myOnDemandRepairSchedulerImpl.close();
+ }
+ if (myScheduleManagerImpl != null)
+ {
+ myScheduleManagerImpl.close();
+ }
+ if (myLockFactory != null)
+ {
+ myLockFactory.close();
+ }
+ }
+
+ @Test
+ public void repairSingleTable() throws Exception
+ {
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ getJmxConnectionProvider().add(node);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ long startTime = System.currentTimeMillis();
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(15, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ return maxRepairedAt < startTime && percentRepaired < 100.0d;
+ });
+
+ UUID jobId = triggerRepair(tableReference, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(15, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ return maxRepairedAt >= startTime && percentRepaired >= 100.0d;
+ });
+
+ verifyOnDemandCompleted(jobId, startTime, node.getHostId());
+ }
+
+ @Test
+ public void repairMultipleTables() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference1 = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+ TableReference tableReference2 = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_TWO_NAME);
+ insertSomeDataAndFlush(tableReference1, mySession, node);
+ insertSomeDataAndFlush(tableReference2, mySession, node);
+ UUID jobId1 = triggerRepair(tableReference1, node);
+ UUID jobId2 = triggerRepair(tableReference2, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+ verifyOnDemandCompleted(jobId1, startTime, node.getHostId());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference1), any(long.class), any(TimeUnit.class),
+ eq(true));
+ verifyOnDemandCompleted(jobId2, startTime, node.getHostId());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference2), any(long.class), any(TimeUnit.class),
+ eq(true));
+ }
+
+ @Test
+ public void repairSameTableTwice() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+ getJmxConnectionProvider().add(node);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ UUID jobId1 = triggerRepair(tableReference, node);
+ UUID jobId2 = triggerRepair(tableReference, node);
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+ verifyOnDemandCompleted(jobId1, startTime, node.getHostId());
+ verifyOnDemandCompleted(jobId2, startTime, node.getHostId());
+ verify(mockTableRepairMetrics, times(2)).repairSession(eq(tableReference),
+ any(long.class), any(TimeUnit.class), eq(true));
+ }
+
+ private void verifyOnDemandCompleted(UUID jobId, long startTime, UUID hostId)
+ {
+ List completedJobs = myOnDemandRepairSchedulerImpl.getAllRepairJobs(hostId)
+ .stream()
+ .filter(j -> j.getId().equals(jobId))
+ .collect(Collectors.toList());
+ assertThat(completedJobs).hasSize(1);
+ assertThat(completedJobs.get(0).getCompletionTime()).isGreaterThanOrEqualTo(startTime);
+ }
+
+ private UUID triggerRepair(TableReference tableReference, Node node) throws EcChronosException
+ {
+ myRepairs.add(tableReference);
+ return myOnDemandRepairSchedulerImpl.scheduleJob(tableReference, RepairType.INCREMENTAL, node.getHostId()).getId();
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
new file mode 100644
index 000000000..d54add2b5
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.RepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import java.util.UUID;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+@NotThreadSafe
+public class ITIncrementalSchedules extends TestBase
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ITIncrementalSchedules.class);
+ private static final int DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS = 90;
+ private static final int CASSANDRA_METRICS_UPDATE_IN_SECONDS = 5;
+ private static RepairFaultReporter mockFaultReporter;
+ private static TableRepairMetrics mockTableRepairMetrics;
+ private static HostStatesImpl myHostStates;
+ private static RepairSchedulerImpl myRepairSchedulerImpl;
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+ private static CASLockFactory myLockFactory;
+ private static RepairConfiguration myRepairConfiguration;
+ private static TableReferenceFactory myTableReferenceFactory;
+ private static CassandraMetrics myCassandraMetrics;
+ protected static Metadata myMetadata;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init()
+ {
+ mockFaultReporter = mock(RepairFaultReporter.class);
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ myMetadata = mySession.getMetadata();
+
+ myTableReferenceFactory = new TableReferenceFactoryImpl(mySession);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(1, TimeUnit.SECONDS)
+ .build();
+
+ myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(),
+ Duration.ofSeconds(CASSANDRA_METRICS_UPDATE_IN_SECONDS), Duration.ofMinutes(30));
+
+ myRepairSchedulerImpl = RepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withFaultReporter(mockFaultReporter)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withCassandraMetrics(myCassandraMetrics)
+ .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(mySession), mySession))
+ .build();
+
+ myRepairConfiguration = RepairConfiguration.newBuilder()
+ .withRepairInterval(5, TimeUnit.SECONDS)
+ .withRepairType(RepairType.INCREMENTAL)
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ List> stages = new ArrayList<>();
+ for (TableReference tableReference : myRepairs)
+ {
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ myRepairSchedulerImpl.removeConfiguration(node, tableReference);
+
+ stages.add(mySession.executeAsync(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build()));
+ }
+ for (CompletionStage stage : stages)
+ {
+ CompletableFutures.getUninterruptibly(stage);
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ reset(mockFaultReporter);
+ closeConnections();
+ }
+
+ public static void closeConnections()
+ {
+ if (myHostStates != null)
+ {
+ myHostStates.close();
+ }
+ if (myRepairSchedulerImpl != null)
+ {
+ myRepairSchedulerImpl.close();
+ }
+ if (myScheduleManagerImpl != null)
+ {
+ myScheduleManagerImpl.close();
+ }
+ if (myLockFactory != null)
+ {
+ myLockFactory.close();
+ }
+ }
+
+ @Test
+ public void repairSingleTable() throws Exception
+ {
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+ insertSomeDataAndFlush(tableReference, mySession, node);
+ long startTime = System.currentTimeMillis();
+
+ // Wait for metrics to be updated, wait at least 3 times the update time for metrics (worst case scenario)
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(CASSANDRA_METRICS_UPDATE_IN_SECONDS * 3, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ LOG.info("Waiting for metrics to be updated, percentRepaired: {} maxRepairedAt: {}",
+ percentRepaired, maxRepairedAt);
+ return maxRepairedAt < startTime && percentRepaired < 100.0d;
+ });
+
+ // Create a schedule
+ addSchedule(tableReference, node);
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() -> getSchedule(tableReference).isPresent());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
+ .until(() ->
+ {
+ double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference);
+ long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference);
+ LOG.info("Waiting for schedule to run, percentRepaired: {} maxRepairedAt: {}",
+ percentRepaired, maxRepairedAt);
+ return maxRepairedAt >= startTime && percentRepaired >= 100.0d;
+ });
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ verify(mockTableRepairMetrics).repairSession(eq(tableReference),
+ any(long.class), any(TimeUnit.class), eq(true));
+ Optional view = getSchedule(tableReference);
+ assertThat(view).isPresent();
+ assertThat(view.get().getStatus()).isEqualTo(ScheduledRepairJobView.Status.COMPLETED);
+ assertThat(view.get().getCompletionTime()).isGreaterThanOrEqualTo(startTime);
+ assertThat(view.get().getProgress()).isGreaterThanOrEqualTo(1.0d);
+ }
+
+ private void addSchedule(TableReference tableReference, Node node)
+ {
+ if (myRepairs.add(tableReference))
+ {
+ myRepairSchedulerImpl.putConfigurations(node, tableReference, Collections.singleton(myRepairConfiguration));
+ }
+ }
+
+ private Optional getSchedule(TableReference tableReference)
+ {
+ return myRepairSchedulerImpl.getCurrentRepairJobs()
+ .stream()
+ .filter(s -> s.getRepairConfiguration().equals(myRepairConfiguration))
+ .filter(s -> s.getTableReference().equals(tableReference))
+ .findFirst();
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
new file mode 100644
index 000000000..bfa3101c9
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
+import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver;
+import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
+import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry;
+import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
+import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
+import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import org.assertj.core.util.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.jcip.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class ITOnDemandRepairJob extends TestBase
+{
+
+ private static TableRepairMetrics mockTableRepairMetrics;
+
+ private static Metadata myMetadata;
+
+ private static HostStatesImpl myHostStates;
+
+ private static RepairHistoryService myEccRepairHistory;
+
+ private static OnDemandRepairSchedulerImpl myRepairSchedulerImpl;
+
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+
+ private static CASLockFactory myLockFactory;
+
+ private static TableReferenceFactory myTableReferenceFactory;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Before
+ public void init() throws IOException
+ {
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ myMetadata = mySession.getMetadata();
+ myTableReferenceFactory = new TableReferenceFactoryImpl(mySession);
+
+ Set nodeIds = getNativeConnectionProvider().getNodes().keySet();
+ List nodeIdList = new ArrayList<>(nodeIds);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ NodeResolver nodeResolver = new NodeResolverImpl(mySession);
+
+ ReplicationState replicationState = new ReplicationStateImpl(nodeResolver, mySession);
+
+ myEccRepairHistory = new RepairHistoryService(mySession, replicationState, nodeResolver, 2_592_000_000L);
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withNodeIDList(nodeIdList)
+ .withRunInterval(100, TimeUnit.MILLISECONDS)
+ .build();
+
+ myRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withReplicationState(replicationState)
+ .withSession(mySession)
+ .withRepairConfiguration(RepairConfiguration.DEFAULT)
+ .withRepairHistory(myEccRepairHistory)
+ .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider()))
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ for (TableReference tableReference : myRepairs)
+ {
+ mySession.execute(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name")
+ .isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name")
+ .isEqualTo(literal(tableReference.getTable()))
+ .build());
+ for (Node node : myMetadata.getNodes().values())
+ {
+ mySession.execute(QueryBuilder.deleteFrom("ecchronos", "repair_history")
+ .whereColumn("table_id")
+ .isEqualTo(literal(tableReference.getId()))
+ .whereColumn("node_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ mySession.execute(QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status")
+ .whereColumn("host_id")
+ .isEqualTo(literal(node.getHostId()))
+ .build());
+ }
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ closeConnections();
+ }
+
+ private static void closeConnections()
+ {
+ myHostStates.close();
+ myRepairSchedulerImpl.close();
+ myScheduleManagerImpl.close();
+ myLockFactory.close();
+ }
+
+ /**
+ * Create a table that is replicated and schedule a repair on it
+ */
+ @Test
+ public void repairSingleTable() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+
+ schedule(tableReference, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, node);
+ }
+
+ /**
+ * Create two tables that are replicated and repair both
+ **/
+
+ @Test
+ public void repairMultipleTables() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+ TableReference tableReference2 = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_TWO_NAME);
+
+ schedule(tableReference, node.getHostId());
+ schedule(tableReference2, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, node);
+ verifyTableRepairedSince(tableReference2, startTime, node);
+ }
+
+ /**
+ * Schedule two jobs on the same table
+ **/
+
+ @Test
+ public void repairSameTableTwice() throws EcChronosException
+ {
+ long startTime = System.currentTimeMillis();
+ Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue();
+ TableReference tableReference = myTableReferenceFactory.forTable(ECCHRONOS_KEYSPACE, TEST_TABLE_ONE_NAME);
+
+ schedule(tableReference, node.getHostId());
+ schedule(tableReference, node.getHostId());
+
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty());
+ await().pollInterval(1, TimeUnit.SECONDS)
+ .atMost(90, TimeUnit.SECONDS)
+ .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0);
+
+ verifyTableRepairedSince(tableReference, startTime, tokenRangesFor(tableReference.getKeyspace(), node).size() * 2, node);
+ }
+
+ private void schedule(TableReference tableReference, UUID nodeId) throws EcChronosException
+ {
+ myRepairs.add(tableReference);
+ myRepairSchedulerImpl.scheduleJob(tableReference, RepairType.VNODE, nodeId);
+ }
+
+ private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, Node node)
+ {
+ verifyTableRepairedSince(tableReference, repairedSince, tokenRangesFor(tableReference.getKeyspace(), node).size(), node);
+ }
+
+ private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, int expectedTokenRanges, Node node)
+ {
+ OptionalLong repairedAtWithCassandraHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node);
+ assertThat(repairedAtWithCassandraHistory.isPresent()).isTrue();
+
+ OptionalLong repairedAtWithEccHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node);
+ assertThat(repairedAtWithEccHistory.isPresent()).isTrue();
+
+ verify(mockTableRepairMetrics, times(expectedTokenRanges)).repairSession(eq(tableReference), anyLong(),
+ any(TimeUnit.class), eq(true));
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference,
+ long repairedSince,
+ RepairHistoryService repairHistoryProvider,
+ Node node)
+ {
+ return lastRepairedSince(tableReference,
+ repairedSince,
+ tokenRangesFor(tableReference.getKeyspace(), node),
+ repairHistoryProvider,
+ node);
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference,
+ long repairedSince,
+ Set expectedRepaired,
+ RepairHistoryService repairHistoryProvider,
+ Node node)
+ {
+ Set expectedRepairedCopy = new HashSet<>(expectedRepaired);
+ Iterator repairEntryIterator = repairHistoryProvider.iterate(node, tableReference,
+ System.currentTimeMillis(), repairedSince,
+ repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange()));
+
+ List repairEntries = Lists.newArrayList(repairEntryIterator);
+
+ Set actuallyRepaired = repairEntries.stream()
+ .map(RepairEntry::getRange)
+ .collect(Collectors.toSet());
+
+ if (expectedRepaired.equals(actuallyRepaired))
+ {
+ return repairEntries.stream().mapToLong(RepairEntry::getStartedAt).min();
+ }
+ else
+ {
+ return OptionalLong.empty();
+ }
+ }
+
+ private Set tokenRangesFor(String keyspace, Node node)
+ {
+ return myMetadata.getTokenMap()
+ .get()
+ .getTokenRanges(keyspace, node)
+ .stream()
+ .map(this::convertTokenRange)
+ .collect(Collectors.toSet());
+ }
+
+ private boolean fullyRepaired(RepairEntry repairEntry)
+ {
+ return repairEntry.getParticipants().size() == 2 && repairEntry.getStatus() == RepairStatus.SUCCESS;
+ }
+
+ private LongTokenRange convertTokenRange(TokenRange range)
+ {
+ // Assuming murmur3 partitioner
+ long start = ((Murmur3Token) range.getStart()).getValue();
+ long end = ((Murmur3Token) range.getEnd()).getValue();
+ return new LongTokenRange(start, end);
+ }
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITRepairInfo.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITRepairInfo.java
new file mode 100644
index 000000000..f3c3cc368
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITRepairInfo.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2022 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.api.core.uuid.Uuids;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(Parameterized.class)
+@NotThreadSafe
+public class ITRepairInfo extends TestBase
+{
+ /* enum RepairHistoryType
+ {
+ CASSANDRA, ECC
+ }
+ @Parameterized.Parameters
+ public static Collection parameters()
+ {
+ return Arrays.asList(new Object[][] {
+ { RepairHistoryType.CASSANDRA, true },
+ { RepairHistoryType.CASSANDRA, false },
+ { RepairHistoryType.ECC, true },
+ { RepairHistoryType.ECC, false }
+ });
+ }
+
+ @Parameterized.Parameter
+ public RepairHistoryType myRepairHistoryType;
+
+ @Parameterized.Parameter(1)
+ public Boolean myRemoteRoutingOption;
+
+ private static CqlSession mySession;
+
+ private static CqlSession myAdminSession;
+
+ private static Node myLocalNode;
+
+ private static DriverNode myLocalDriverNode;
+
+ private static RepairHistoryProvider myRepairHistoryProvider;
+
+ private static NodeResolver myNodeResolver;
+
+ private static TableReferenceFactory myTableReferenceFactory;
+
+ private static RepairStatsProvider myRepairStatsProvider;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Parameterized.BeforeParam
+ public static void init(RepairHistoryType repairHistoryType, Boolean remoteRoutingOption) throws IOException
+ {
+ myRemoteRouting = remoteRoutingOption;
+ initialize();
+
+ myAdminSession = getAdminNativeConnectionProvider().getSession();
+
+ myLocalNode = getNativeConnectionProvider().getLocalNode();
+ mySession = getNativeConnectionProvider().getSession();
+
+ myTableReferenceFactory = new TableReferenceFactoryImpl(mySession);
+
+ myNodeResolver = new NodeResolverImpl(mySession);
+ myLocalDriverNode = myNodeResolver.fromUUID(myLocalNode.getHostId()).orElseThrow(IllegalStateException::new);
+
+ ReplicationState replicationState = new ReplicationStateImpl(myNodeResolver, mySession, myLocalNode);
+
+ EccRepairHistory eccRepairHistory = EccRepairHistory.newBuilder()
+ .withReplicationState(replicationState)
+ .withLookbackTime(30, TimeUnit.DAYS)
+ .withLocalNode(myLocalDriverNode)
+ .withSession(mySession)
+ .withStatementDecorator(s -> s)
+ .build();
+
+ if (repairHistoryType == RepairHistoryType.ECC)
+ {
+ myRepairHistoryProvider = eccRepairHistory;
+ }
+ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
+ {
+ myRepairHistoryProvider = new RepairHistoryProviderImpl(myNodeResolver, mySession, s -> s,
+ TimeUnit.DAYS.toMillis(30));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown repair history type for test");
+ }
+ myRepairStatsProvider = new RepairStatsProviderImpl(
+ new VnodeRepairStateFactoryImpl(replicationState, myRepairHistoryProvider, true));
+ }
+
+ @After
+ public void clean()
+ {
+ List> stages = new ArrayList<>();
+
+ Metadata metadata = mySession.getMetadata();
+ for (TableReference tableReference : myRepairs)
+ {
+ stages.add(myAdminSession.executeAsync(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name").isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name").isEqualTo(literal(tableReference.getTable()))
+ .build().setConsistencyLevel(ConsistencyLevel.ALL)));
+ for (Node node : metadata.getNodes().values())
+ {
+ stages.add(myAdminSession.executeAsync(QueryBuilder.deleteFrom("ecchronos", "repair_history")
+ .whereColumn("table_id").isEqualTo(literal(tableReference.getId()))
+ .whereColumn("node_id").isEqualTo(literal(node.getHostId()))
+ .build().setConsistencyLevel(ConsistencyLevel.ALL)));
+ }
+ }
+
+ for (CompletionStage stage : stages)
+ {
+ CompletableFutures.getUninterruptibly(stage);
+ }
+ myRepairs.clear();
+ }
+
+ @Test
+ public void repairInfoForRepaired()
+ {
+ long maxRepairedTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+ myRepairs.add(tableReference);
+ injectRepairHistory(tableReference, maxRepairedTime);
+ //Get repairstats for now - 3 hours to now
+ long since = maxRepairedTime - TimeUnit.HOURS.toMillis(1);
+
+ RepairStats repairStatsLocal = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), true);
+ assertThat(repairStatsLocal).isNotNull();
+ assertThat(repairStatsLocal.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsLocal.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsLocal.repairedRatio).isGreaterThanOrEqualTo(1.0);
+
+ RepairStats repairStatsClusterWide = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), false);
+ assertThat(repairStatsClusterWide).isNotNull();
+ assertThat(repairStatsClusterWide.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsClusterWide.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsClusterWide.repairedRatio).isGreaterThan(0.0);
+ assertThat(repairStatsClusterWide.repairedRatio).isLessThan(1.0);
+ }
+
+ @Test
+ public void repairInfoForRepairedInSubRanges()
+ {
+ long maxRepairedTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+ myRepairs.add(tableReference);
+ injectRepairHistory(tableReference, maxRepairedTime, true);
+ //Get repairstats for now - 3 hours to now
+ long since = maxRepairedTime - TimeUnit.HOURS.toMillis(1);
+
+ RepairStats repairStatsLocal = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), true);
+ assertThat(repairStatsLocal).isNotNull();
+ assertThat(repairStatsLocal.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsLocal.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsLocal.repairedRatio).isGreaterThanOrEqualTo(1.0);
+
+ RepairStats repairStatsClusterWide = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), false);
+ assertThat(repairStatsClusterWide).isNotNull();
+ assertThat(repairStatsClusterWide.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsClusterWide.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsClusterWide.repairedRatio).isGreaterThan(0.0);
+ assertThat(repairStatsClusterWide.repairedRatio).isLessThan(1.0);
+ }
+
+ @Test
+ public void repairInfoForHalfOfRangesRepaired()
+ {
+ long maxRepairedTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+ myRepairs.add(tableReference);
+ Set expectedRepairedBefore = halfOfTokenRanges(tableReference);
+ injectRepairHistory(tableReference, maxRepairedTime, expectedRepairedBefore);
+ //Get repairstats for now - 3 hours to now
+ long since = maxRepairedTime - TimeUnit.HOURS.toMillis(1);
+
+ RepairStats repairStatsLocal = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), true);
+ assertThat(repairStatsLocal).isNotNull();
+ assertThat(repairStatsLocal.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsLocal.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsLocal.repairedRatio).isGreaterThan(0.0);
+ assertThat(repairStatsLocal.repairedRatio).isLessThan(1.0);
+
+ RepairStats repairStatsClusterWide = myRepairStatsProvider.getRepairStats(tableReference, since,
+ System.currentTimeMillis(), false);
+ assertThat(repairStatsClusterWide).isNotNull();
+ assertThat(repairStatsClusterWide.keyspace).isEqualTo(tableReference.getKeyspace());
+ assertThat(repairStatsClusterWide.table).isEqualTo(tableReference.getTable());
+ assertThat(repairStatsClusterWide.repairedRatio).isGreaterThan(0.0);
+ assertThat(repairStatsClusterWide.repairedRatio).isLessThan(repairStatsLocal.repairedRatio);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax)
+ {
+ injectRepairHistory(tableReference, timestampMax, false);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, boolean splitRanges)
+ {
+ injectRepairHistory(tableReference, timestampMax,
+ mySession.getMetadata().getTokenMap().get().getTokenRanges(tableReference.getKeyspace(), myLocalNode), splitRanges);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, Set tokenRanges)
+ {
+ injectRepairHistory(tableReference, timestampMax, tokenRanges, false);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, Set tokenRanges,
+ boolean splitRanges)
+ {
+ long timestamp = timestampMax - 1;
+
+ for (TokenRange tokenRange : tokenRanges)
+ {
+ Set participants = mySession.getMetadata().getTokenMap().get()
+ .getReplicas(tableReference.getKeyspace(), tokenRange).stream()
+ .map(node -> ((InetSocketAddress) node.getEndPoint().resolve()).getAddress())
+ .collect(Collectors.toSet());
+
+ if (splitRanges)
+ {
+ LongTokenRange longTokenRange = convertTokenRange(tokenRange);
+ BigInteger tokensPerRange = longTokenRange.rangeSize().divide(BigInteger.TEN);
+ List subRanges = new TokenSubRangeUtil(longTokenRange)
+ .generateSubRanges(tokensPerRange);
+
+ for (LongTokenRange subRange : subRanges)
+ {
+ String start = Long.toString(subRange.start);
+ String end = Long.toString(subRange.end);
+ //Make sure to use unique timestamp, otherwise new row will overwrite previous row
+ injectRepairHistory(tableReference, timestamp, participants, start, end);
+ timestamp--;
+ }
+ }
+ else
+ {
+ String start = Long.toString(((Murmur3Token) tokenRange.getStart()).getValue());
+ String end = Long.toString(((Murmur3Token) tokenRange.getEnd()).getValue());
+ injectRepairHistory(tableReference, timestamp, participants, start, end);
+ timestamp--;
+ }
+ }
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestamp, Set participants,
+ String range_begin, String range_end)
+ {
+ long started_at = timestamp;
+ long finished_at = timestamp + 5;
+
+ SimpleStatement statement;
+
+ if (myRepairHistoryType == RepairHistoryType.CASSANDRA)
+ {
+ statement = QueryBuilder.insertInto("system_distributed", "repair_history")
+ .value("keyspace_name", literal(tableReference.getKeyspace()))
+ .value("columnfamily_name", literal(tableReference.getTable()))
+ .value("participants", literal(participants))
+ .value("coordinator", literal(myLocalDriverNode.getPublicAddress()))
+ .value("id", literal(Uuids.startOf(started_at)))
+ .value("started_at", literal(Instant.ofEpochMilli(started_at)))
+ .value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
+ .value("range_begin", literal(range_begin))
+ .value("range_end", literal(range_end))
+ .value("status", literal("SUCCESS")).build();
+ }
+ else
+ {
+ Set nodes = participants.stream()
+ .map(myNodeResolver::fromIp)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(DriverNode::getId)
+ .collect(Collectors.toSet());
+
+ statement = QueryBuilder.insertInto("ecchronos", "repair_history")
+ .value("table_id", literal(tableReference.getId()))
+ .value("node_id", literal(myLocalDriverNode.getId()))
+ .value("repair_id", literal(Uuids.startOf(started_at)))
+ .value("job_id", literal(tableReference.getId()))
+ .value("coordinator_id", literal(myLocalDriverNode.getId()))
+ .value("range_begin", literal(range_begin))
+ .value("range_end", literal(range_end))
+ .value("participants", literal(nodes))
+ .value("status", literal("SUCCESS"))
+ .value("started_at", literal(Instant.ofEpochMilli(started_at)))
+ .value("finished_at", literal(Instant.ofEpochMilli(finished_at))).build();
+ }
+ myAdminSession.execute(statement.setConsistencyLevel(ConsistencyLevel.ALL));
+ }
+
+ private Set halfOfTokenRanges(TableReference tableReference)
+ {
+ Set halfOfRanges = new HashSet<>();
+ Set allTokenRanges = mySession.getMetadata().getTokenMap().get()
+ .getTokenRanges(tableReference.getKeyspace(), myLocalNode);
+ Iterator iterator = allTokenRanges.iterator();
+ for (int i = 0; i < allTokenRanges.size() / 2 && iterator.hasNext(); i++)
+ {
+ halfOfRanges.add(iterator.next());
+ }
+
+ return halfOfRanges;
+ }
+
+ private LongTokenRange convertTokenRange(TokenRange range)
+ {
+ // Assuming murmur3 partitioner
+ long start = ((Murmur3Token) range.getStart()).getValue();
+ long end = ((Murmur3Token) range.getEnd()).getValue();
+ return new LongTokenRange(start, end);
+ }*/
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java
new file mode 100644
index 000000000..b2c549bba
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java
@@ -0,0 +1,671 @@
+/*
+ * Copyright 2018 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.metadata.Metadata;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
+import com.datastax.oss.driver.api.core.uuid.Uuids;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
+import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
+import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
+import com.google.common.collect.Sets;
+import net.jcip.annotations.NotThreadSafe;
+import org.assertj.core.util.Lists;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.longThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(Parameterized.class)
+@NotThreadSafe
+public class ITSchedules extends TestBase
+{
+ /* enum RepairHistoryType
+ {
+ CASSANDRA, ECC
+ }
+
+ @Parameterized.Parameters
+ public static Collection parameters()
+ {
+ return Arrays.asList(new Object[][] {
+ { RepairHistoryType.CASSANDRA, true },
+ { RepairHistoryType.CASSANDRA, false },
+ { RepairHistoryType.ECC, true },
+ { RepairHistoryType.ECC, false }
+ });
+ }
+
+ @Parameterized.Parameter
+ public RepairHistoryType myRepairHistoryType;
+
+ @Parameterized.Parameter(1)
+ public Boolean myRemoteRoutingOption;
+
+ private static RepairFaultReporter mockFaultReporter;
+
+ private static TableRepairMetrics mockTableRepairMetrics;
+
+ private static TableStorageStates mockTableStorageStates;
+
+ private static Metadata myMetadata;
+
+ private static CqlSession myAdminSession;
+
+ private static Node myLocalHost;
+
+ private static HostStatesImpl myHostStates;
+
+ private static DriverNode myLocalNode;
+
+ private static RepairHistoryProvider myRepairHistoryProvider;
+
+ private static NodeResolver myNodeResolver;
+
+ private static RepairSchedulerImpl myRepairSchedulerImpl;
+
+ private static ScheduleManagerImpl myScheduleManagerImpl;
+
+ private static CASLockFactory myLockFactory;
+
+ private static RepairConfiguration myRepairConfiguration;
+
+ private static TableReferenceFactory myTableReferenceFactory;
+
+ private Set myRepairs = new HashSet<>();
+
+ @Parameterized.BeforeParam
+ public static void init(RepairHistoryType repairHistoryType, Boolean remoteRoutingOption) throws IOException
+ {
+ myRemoteRouting = remoteRoutingOption;
+ initialize();
+
+ mockFaultReporter = mock(RepairFaultReporter.class);
+ mockTableRepairMetrics = mock(TableRepairMetrics.class);
+ mockTableStorageStates = mock(TableStorageStates.class);
+
+ myAdminSession = getAdminNativeConnectionProvider().getSession();
+
+ myLocalHost = getNativeConnectionProvider().getLocalNode();
+ CqlSession session = getNativeConnectionProvider().getSession();
+ myMetadata = session.getMetadata();
+
+ myTableReferenceFactory = new TableReferenceFactoryImpl(session);
+
+ myHostStates = HostStatesImpl.builder()
+ .withRefreshIntervalInMs(1000)
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build();
+
+ myNodeResolver = new NodeResolverImpl(session);
+ myLocalNode = myNodeResolver.fromUUID(myLocalHost.getHostId()).orElseThrow(IllegalStateException::new);
+
+ ReplicationState replicationState = new ReplicationStateImpl(myNodeResolver, session, myLocalHost);
+
+ EccRepairHistory eccRepairHistory = EccRepairHistory.newBuilder()
+ .withReplicationState(replicationState)
+ .withLookbackTime(30, TimeUnit.DAYS)
+ .withLocalNode(myLocalNode)
+ .withSession(session)
+ .withStatementDecorator(s -> s)
+ .build();
+
+ if (repairHistoryType == RepairHistoryType.ECC)
+ {
+ myRepairHistoryProvider = eccRepairHistory;
+ }
+ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
+ {
+ myRepairHistoryProvider = new RepairHistoryProviderImpl(myNodeResolver, session, s -> s,
+ TimeUnit.DAYS.toMillis(30));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown repair history type for test");
+ }
+
+ myLockFactory = CASLockFactory.builder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(myHostStates)
+ .withStatementDecorator(s -> s)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
+ .build();
+
+ myScheduleManagerImpl = ScheduleManagerImpl.builder()
+ .withLockFactory(myLockFactory)
+ .withRunInterval(1, TimeUnit.SECONDS)
+ .build();
+
+ RepairStateFactoryImpl repairStateFactory = RepairStateFactoryImpl.builder()
+ .withReplicationState(replicationState)
+ .withHostStates(HostStatesImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .build())
+ .withRepairHistoryProvider(myRepairHistoryProvider)
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .build();
+
+ myRepairSchedulerImpl = RepairSchedulerImpl.builder()
+ .withJmxProxyFactory(getJmxProxyFactory())
+ .withTableRepairMetrics(mockTableRepairMetrics)
+ .withFaultReporter(mockFaultReporter)
+ .withScheduleManager(myScheduleManagerImpl)
+ .withRepairStateFactory(repairStateFactory)
+ .withRepairLockType(RepairLockType.VNODE)
+ .withTableStorageStates(mockTableStorageStates)
+ .withRepairHistory(eccRepairHistory)
+ .build();
+
+ myRepairConfiguration = RepairConfiguration.newBuilder()
+ .withRepairInterval(60, TimeUnit.MINUTES)
+ .withTargetRepairSizeInBytes(UnitConverter.toBytes("100m")) // 100MiB
+ .build();
+ }
+
+ @After
+ public void clean()
+ {
+ List> stages = new ArrayList<>();
+
+ for (TableReference tableReference : myRepairs)
+ {
+ myRepairSchedulerImpl.removeConfiguration(tableReference);
+
+ stages.add(myAdminSession.executeAsync(QueryBuilder.deleteFrom("system_distributed", "repair_history")
+ .whereColumn("keyspace_name").isEqualTo(literal(tableReference.getKeyspace()))
+ .whereColumn("columnfamily_name").isEqualTo(literal(tableReference.getTable()))
+ .build()));
+ for (Node node : myMetadata.getNodes().values())
+ {
+ stages.add(myAdminSession.executeAsync(QueryBuilder.deleteFrom("ecchronos", "repair_history")
+ .whereColumn("table_id").isEqualTo(literal(tableReference.getId()))
+ .whereColumn("node_id").isEqualTo(literal(node.getHostId()))
+ .build()));
+ }
+ }
+
+ for (CompletionStage stage : stages)
+ {
+ CompletableFutures.getUninterruptibly(stage);
+ }
+ myRepairs.clear();
+ reset(mockTableRepairMetrics);
+ reset(mockFaultReporter);
+ reset(mockTableStorageStates);
+ }
+
+ @Parameterized.AfterParam
+ public static void closeConnections()
+ {
+ myHostStates.close();
+ myRepairSchedulerImpl.close();
+ myScheduleManagerImpl.close();
+ myLockFactory.close();
+ }
+
+ *//**
+ * Create a table that is replicated and was repaired two hours ago.
+ *
+ * The repair factory should detect the new table automatically and schedule it to run.
+ *//*
+ @Test
+ public void repairSingleTable()
+ {
+ long startTime = System.currentTimeMillis();
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));
+
+ schedule(tableReference);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime));
+
+ verifyTableRepairedSince(tableReference, startTime);
+ verifyRepairSessionMetrics(tableReference, tokenRangesFor(tableReference.getKeyspace()).size());
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ *//**
+ * Create a table that is replicated and was repaired two hours ago using sub ranges.
+ *
+ * The repair factory should detect the new table automatically and schedule it to run. If the sub ranges are not
+ * detected the repair will be postponed for 1 hour based on repair configuration.
+ *//*
+ @Test
+ public void repairSingleTableRepairedInSubRanges()
+ {
+ long startTime = System.currentTimeMillis();
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2), true);
+
+ schedule(tableReference);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime));
+
+ verifyTableRepairedSince(tableReference, startTime);
+ verifyRepairSessionMetrics(tableReference, tokenRangesFor(tableReference.getKeyspace()).size());
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ *//**
+ * Create a table that is replicated and was repaired two hours ago. It also has a simulated size of 10 GiB and a
+ * target repair size of 100 MiB which should result in around 102 repair sessions.
+ *//*
+ @Test
+ public void repairSingleTableInSubRanges()
+ {
+ long startTime = System.currentTimeMillis();
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+
+ when(mockTableStorageStates.getDataSize(eq(tableReference))).thenReturn(UnitConverter.toBytes("10g"));
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));
+
+ schedule(tableReference);
+
+ BigInteger numberOfRanges = BigInteger.valueOf(UnitConverter.toBytes("10g"))
+ .divide(BigInteger.valueOf(UnitConverter.toBytes("100m"))); // 102
+
+ Set expectedRanges = splitTokenRanges(tableReference, numberOfRanges);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime, expectedRanges));
+
+ verifyTableRepairedSinceWithSubRangeRepair(tableReference, startTime, expectedRanges);
+ verifyRepairSessionMetrics(tableReference, expectedRanges.size());
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ *//**
+ * Create a table that is replicated and was repaired two hours ago.
+ *
+ * The repair factory should detect the new table automatically and schedule it to run.
+ *//*
+ @Test
+ public void repairSingleTableInParallel()
+ {
+ long startTime = System.currentTimeMillis();
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));
+
+ RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
+ .withRepairInterval(60, TimeUnit.MINUTES)
+ .withRepairType(RepairOptions.RepairType.PARALLEL_VNODE)
+ .build();
+ schedule(tableReference, repairConfiguration);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime));
+
+ verifyTableRepairedSince(tableReference, startTime);
+ verifyRepairSessionMetrics(tableReference, 3); // Amount of repair groups
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ *//**
+ * Create two tables that are replicated and was repaired two and four hours ago.
+ *
+ * The repair factory should detect the new tables automatically and schedule them to run.
+ *//*
+ @Test
+ public void repairMultipleTables()
+ {
+ long startTime = System.currentTimeMillis();
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+ TableReference tableReference2 = myTableReferenceFactory.forTable("test", "table2");
+
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));
+ injectRepairHistory(tableReference2, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(4));
+
+ schedule(tableReference);
+ schedule(tableReference2);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime));
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference2, startTime));
+
+ verifyTableRepairedSince(tableReference, startTime);
+ verifyRepairSessionMetrics(tableReference, tokenRangesFor(tableReference.getKeyspace()).size());
+ verifyTableRepairedSince(tableReference2, startTime);
+ verifyRepairSessionMetrics(tableReference2, tokenRangesFor(tableReference2.getKeyspace()).size());
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ *//**
+ * Create a table that is replicated and was fully repaired two hours ago.
+ *
+ * It was also partially repaired by another node.
+ *
+ * The repair factory should detect the table automatically and schedule it to run on the ranges that were not
+ * repaired.
+ *//*
+ @Test
+ public void partialTableRepair()
+ {
+ long startTime = System.currentTimeMillis();
+ long expectedRepairedInterval = startTime - TimeUnit.HOURS.toMillis(1);
+
+ TableReference tableReference = myTableReferenceFactory.forTable("test", "table1");
+
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));
+
+ Set expectedRepairedBefore = halfOfTokenRanges(tableReference);
+ injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(30),
+ expectedRepairedBefore);
+
+ Set allTokenRanges = myMetadata.getTokenMap().get()
+ .getTokenRanges(tableReference.getKeyspace(), myLocalHost);
+ Set expectedRepairedRanges = Sets.difference(convertTokenRanges(allTokenRanges),
+ convertTokenRanges(expectedRepairedBefore));
+
+ schedule(tableReference);
+
+ await().pollInterval(1, TimeUnit.SECONDS).atMost(90, TimeUnit.SECONDS)
+ .until(() -> isRepairedSince(tableReference, startTime, expectedRepairedRanges));
+
+ verifyTableRepairedSince(tableReference, expectedRepairedInterval);
+ verifyRepairSessionMetrics(tableReference, expectedRepairedRanges.size());
+ verify(mockFaultReporter, never())
+ .raise(any(RepairFaultReporter.FaultCode.class), anyMap());
+ }
+
+ private void schedule(TableReference tableReference)
+ {
+ schedule(tableReference, myRepairConfiguration);
+ }
+
+ private void schedule(TableReference tableReference, RepairConfiguration repairConfiguration)
+ {
+ if (myRepairs.add(tableReference))
+ {
+ myRepairSchedulerImpl.putConfigurations(tableReference, Collections.singleton(repairConfiguration));
+ }
+ }
+
+ private void verifyRepairSessionMetrics(TableReference tableReference, int times)
+ {
+ verify(mockTableRepairMetrics, atLeast(times))
+ .repairSession(eq(tableReference), anyLong(), any(TimeUnit.class), eq(true));
+ }
+
+ private void verifyTableRepairedSinceWithSubRangeRepair(TableReference tableReference, long repairedSince,
+ Set expectedRepaired)
+ {
+ OptionalLong repairedAt = lastRepairedSince(tableReference, repairedSince, expectedRepaired);
+ assertThat(repairedAt.isPresent()).isTrue();
+
+ verify(mockTableRepairMetrics, timeout(5000)).lastRepairedAt(eq(tableReference), longThat(l -> l >= repairedAt.getAsLong()));
+ }
+
+ private void verifyTableRepairedSince(TableReference tableReference, long repairedSince)
+ {
+ OptionalLong repairedAt = lastRepairedSince(tableReference, repairedSince);
+ assertThat(repairedAt.isPresent()).isTrue();
+
+ verify(mockTableRepairMetrics, timeout(5000)).lastRepairedAt(eq(tableReference), longThat(l -> l >= repairedAt.getAsLong()));
+ }
+
+ private boolean isRepairedSince(TableReference tableReference, long repairedSince)
+ {
+ return lastRepairedSince(tableReference, repairedSince).isPresent();
+ }
+
+ private boolean isRepairedSince(TableReference tableReference, long repairedSince,
+ Set expectedRepaired)
+ {
+ return lastRepairedSince(tableReference, repairedSince, expectedRepaired).isPresent();
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference, long repairedSince)
+ {
+ return lastRepairedSince(tableReference, repairedSince, tokenRangesFor(tableReference.getKeyspace()));
+ }
+
+ private OptionalLong lastRepairedSince(TableReference tableReference, long repairedSince,
+ Set expectedRepaired)
+ {
+ Set expectedRepairedCopy = new HashSet<>(expectedRepaired);
+ Iterator repairEntryIterator = myRepairHistoryProvider.iterate(tableReference,
+ System.currentTimeMillis(), repairedSince,
+ repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange()));
+
+ List repairEntries = Lists.newArrayList(repairEntryIterator);
+
+ Set actuallyRepaired = repairEntries.stream()
+ .map(RepairEntry::getRange)
+ .collect(Collectors.toSet());
+
+ if (expectedRepaired.equals(actuallyRepaired))
+ {
+ return repairEntries.stream().mapToLong(RepairEntry::getStartedAt).min();
+ }
+ else
+ {
+ return OptionalLong.empty();
+ }
+ }
+
+ private Set tokenRangesFor(String keyspace)
+ {
+ return myMetadata.getTokenMap().get().getTokenRanges(keyspace, myLocalHost).stream()
+ .map(this::convertTokenRange)
+ .collect(Collectors.toSet());
+ }
+
+ private boolean fullyRepaired(RepairEntry repairEntry)
+ {
+ return repairEntry.getParticipants().size() == 3 && repairEntry.getStatus() == RepairStatus.SUCCESS;
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax)
+ {
+ injectRepairHistory(tableReference, timestampMax, false);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, boolean splitRanges)
+ {
+ injectRepairHistory(tableReference, timestampMax,
+ myMetadata.getTokenMap().get().getTokenRanges(tableReference.getKeyspace(), myLocalHost), splitRanges);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, Set tokenRanges)
+ {
+ injectRepairHistory(tableReference, timestampMax, tokenRanges, false);
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestampMax, Set tokenRanges,
+ boolean splitRanges)
+ {
+ long timestamp = timestampMax - 1;
+
+ for (TokenRange tokenRange : tokenRanges)
+ {
+ Set participants = myMetadata.getTokenMap().get()
+ .getReplicas(tableReference.getKeyspace(), tokenRange).stream()
+ .map(node -> ((InetSocketAddress) node.getEndPoint().resolve()).getAddress())
+ .collect(Collectors.toSet());
+
+ if (splitRanges)
+ {
+ LongTokenRange longTokenRange = convertTokenRange(tokenRange);
+ BigInteger tokensPerRange = longTokenRange.rangeSize().divide(BigInteger.TEN);
+ List subRanges = new TokenSubRangeUtil(longTokenRange)
+ .generateSubRanges(tokensPerRange);
+
+ for (LongTokenRange subRange : subRanges)
+ {
+ String start = Long.toString(subRange.start);
+ String end = Long.toString(subRange.end);
+ injectRepairHistory(tableReference, timestampMax, participants, start, end);
+ }
+ }
+ else
+ {
+ String start = Long.toString(((Murmur3Token) tokenRange.getStart()).getValue());
+ String end = Long.toString(((Murmur3Token) tokenRange.getEnd()).getValue());
+ injectRepairHistory(tableReference, timestamp, participants, start, end);
+ }
+
+ timestamp--;
+ }
+ }
+
+ private void injectRepairHistory(TableReference tableReference, long timestamp, Set participants,
+ String range_begin, String range_end)
+ {
+ long started_at = timestamp;
+ long finished_at = timestamp + 5;
+
+ SimpleStatement statement;
+
+ if (myRepairHistoryType == RepairHistoryType.CASSANDRA)
+ {
+ statement = QueryBuilder.insertInto("system_distributed", "repair_history")
+ .value("keyspace_name", literal(tableReference.getKeyspace()))
+ .value("columnfamily_name", literal(tableReference.getTable()))
+ .value("participants", literal(participants))
+ .value("coordinator", literal(myLocalNode.getPublicAddress()))
+ .value("id", literal(Uuids.startOf(started_at)))
+ .value("started_at", literal(Instant.ofEpochMilli(started_at)))
+ .value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
+ .value("range_begin", literal(range_begin))
+ .value("range_end", literal(range_end))
+ .value("status", literal("SUCCESS")).build();
+ }
+ else
+ {
+ Set nodes = participants.stream()
+ .map(myNodeResolver::fromIp)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(DriverNode::getId)
+ .collect(Collectors.toSet());
+
+ statement = QueryBuilder.insertInto("ecchronos", "repair_history")
+ .value("table_id", literal(tableReference.getId()))
+ .value("node_id", literal(myLocalNode.getId()))
+ .value("repair_id", literal(Uuids.startOf(finished_at)))
+ .value("job_id", literal(tableReference.getId()))
+ .value("coordinator_id", literal(myLocalNode.getId()))
+ .value("range_begin", literal(range_begin))
+ .value("range_end", literal(range_end))
+ .value("participants", literal(nodes))
+ .value("status", literal("SUCCESS"))
+ .value("started_at", literal(Instant.ofEpochMilli(started_at)))
+ .value("finished_at", literal(Instant.ofEpochMilli(finished_at))).build();
+ }
+
+ myAdminSession.execute(statement);
+ }
+
+ private Set halfOfTokenRanges(TableReference tableReference)
+ {
+ Set halfOfRanges = new HashSet<>();
+ Set allTokenRanges = myMetadata.getTokenMap().get()
+ .getTokenRanges(tableReference.getKeyspace(), myLocalHost);
+ Iterator iterator = allTokenRanges.iterator();
+ for (int i = 0; i < allTokenRanges.size() / 2 && iterator.hasNext(); i++)
+ {
+ halfOfRanges.add(iterator.next());
+ }
+
+ return halfOfRanges;
+ }
+
+ private Set convertTokenRanges(Set tokenRanges)
+ {
+ return tokenRanges.stream().map(this::convertTokenRange).collect(Collectors.toSet());
+ }
+
+ private Set splitTokenRanges(TableReference tableReference, BigInteger numberOfRanges)
+ {
+ Set allRanges = convertTokenRanges(
+ myMetadata.getTokenMap().get().getTokenRanges(tableReference.getKeyspace(), myLocalHost));
+
+ BigInteger totalRangeSize = allRanges.stream()
+ .map(LongTokenRange::rangeSize)
+ .reduce(BigInteger.ZERO, BigInteger::add);
+
+ BigInteger tokensPerSubRange = totalRangeSize.divide(numberOfRanges);
+
+ return allRanges.stream()
+ .flatMap(range -> new TokenSubRangeUtil(range).generateSubRanges(tokensPerSubRange).stream())
+ .collect(Collectors.toSet());
+ }
+
+ private LongTokenRange convertTokenRange(TokenRange range)
+ {
+ // Assuming murmur3 partitioner
+ long start = ((Murmur3Token) range.getStart()).getValue();
+ long end = ((Murmur3Token) range.getEnd()).getValue();
+ return new LongTokenRange(start, end);
+ }*/
+}
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java
new file mode 100644
index 000000000..d997cff62
--- /dev/null
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java
@@ -0,0 +1,469 @@
+/*
+ * Copyright 2018 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.standalone;
+
+import cassandracluster.AbstractCassandraCluster;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.metadata.Node;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
+import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
+import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import net.jcip.annotations.NotThreadSafe;
+import org.junit.AfterClass;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.DockerComposeContainer;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
+import static org.mockito.Mockito.mock;
+
+@NotThreadSafe
+abstract public class TestBase extends AbstractCassandraCluster
+{
+ private static final Logger LOG = LoggerFactory.getLogger(TestBase.class);
+ private static final String JMX_FORMAT_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+ protected static final String ECCHRONOS_KEYSPACE = "ecchronos";
+ protected static final String TEST_TABLE_ONE_NAME = "test_table1";
+ protected static final String TEST_TABLE_TWO_NAME = "test_table2";
+ private static final String ON_DEMAND_REPAIR_STATUS_TABLE = "on_demand_repair_status";
+ private static final int DEFAULT_INSERT_DATA_COUNT = 1000;
+
+ private static final List> nodes = new ArrayList<>();
+ private static DistributedNativeConnectionProvider myNativeConnectionProvider;
+ private static DistributedJmxConnectionProvider myJmxConnectionProvider;
+ private static DistributedJmxProxyFactoryImpl myJmxProxyFactory;
+
+ @BeforeClass
+ public static void setUpCluster() throws IOException, InterruptedException
+ {
+ mySession.execute(String.format(
+ "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}",
+ ECCHRONOS_KEYSPACE));
+ String query = String.format(
+ "CREATE TABLE IF NOT EXISTS %s.nodes_sync(" +
+ "ecchronos_id TEXT, " +
+ "datacenter_name TEXT, " +
+ "node_id UUID, " +
+ "node_endpoint TEXT, " +
+ "node_status TEXT, " +
+ "last_connection TIMESTAMP, " +
+ "next_connection TIMESTAMP, " +
+ "PRIMARY KEY(ecchronos_id, datacenter_name, node_id)) " +
+ "WITH CLUSTERING ORDER BY( datacenter_name DESC, node_id DESC);",
+ ECCHRONOS_KEYSPACE);
+
+ mySession.execute(query);
+
+ mySession.execute(String.format(
+ "CREATE TABLE IF NOT EXISTS %s.lock (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE));
+ mySession.execute(String.format(
+ "CREATE TABLE IF NOT EXISTS %s.lock_priority (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE));
+ mySession.execute(
+ String.format("CREATE TYPE IF NOT EXISTS %s.token_range (start text, end text)", ECCHRONOS_KEYSPACE));
+ mySession.execute(String.format(
+ "CREATE TYPE IF NOT EXISTS %s.table_reference (id uuid, keyspace_name text, table_name text)",
+ ECCHRONOS_KEYSPACE));
+ mySession.execute(String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s (host_id uuid, job_id uuid, table_reference frozen, token_map_hash int, repaired_tokens frozen>>, status text, completed_time timestamp, repair_type text, PRIMARY KEY(host_id, job_id)) WITH default_time_to_live = 2592000 AND gc_grace_seconds = 0",
+ ECCHRONOS_KEYSPACE, ON_DEMAND_REPAIR_STATUS_TABLE));
+ mySession.execute(String.format(
+ "CREATE TABLE IF NOT EXISTS %s.repair_history(table_id UUID, node_id UUID, repair_id timeuuid, job_id UUID, coordinator_id UUID, range_begin text, range_end text, participants set, status text, started_at timestamp, finished_at timestamp, PRIMARY KEY((table_id,node_id), repair_id)) WITH CLUSTERING ORDER BY (repair_id DESC);",
+ ECCHRONOS_KEYSPACE));
+ mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", ECCHRONOS_KEYSPACE,
+ TEST_TABLE_ONE_NAME));
+ mySession.execute(
+ String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", ECCHRONOS_KEYSPACE,
+ TEST_TABLE_TWO_NAME));
+ Map nodesList = mySession.getMetadata().getNodes();
+
+ myNativeConnectionProvider = new DistributedNativeConnectionProvider()
+ {
+ @Override
+ public CqlSession getCqlSession()
+ {
+ return mySession;
+ }
+
+ @Override
+ public Map getNodes()
+ {
+ return nodesList;
+ }
+
+ @Override
+ public void addNode(Node myNode)
+ {
+ nodesList.put(myNode.getHostId(), myNode);
+ }
+
+ @Override
+ public void removeNode(Node myNode)
+ {
+ nodesList.remove(myNode.getHostId());
+ }
+
+ @Override
+ public Boolean confirmNodeValid(Node node)
+ {
+ return false;
+ }
+
+ @Override
+ public ConnectionType getConnectionType()
+ {
+ return ConnectionType.datacenterAware;
+ }
+ };
+
+ myJmxConnectionProvider = new DistributedJmxConnectionProvider()
+ {
+ private static final int DEFAULT_PORT = 7199;
+ private final ConcurrentHashMap connections = new ConcurrentHashMap<>();
+
+ @Override
+ public ConcurrentHashMap getJmxConnections()
+ {
+ return connections;
+ }
+
+ @Override
+ public JMXConnector getJmxConnector(UUID nodeID)
+ {
+ JMXConnector connector = connections.get(nodeID);
+ if (connector == null)
+ {
+ String host = nodesList.get(nodeID).getBroadcastRpcAddress().get().getHostString();
+ connector = createJMXConnectorWithAuth(host, DEFAULT_PORT);
+ connections.put(nodeID, connector);
+ }
+ try
+ {
+ if (connector.getConnectionId() != null)
+ {
+ return connector;
+ }
+ }
+ catch (IOException e)
+ {
+ String host = nodesList.get(nodeID).getBroadcastRpcAddress().get().getHostString();
+ connector = createJMXConnectorWithAuth(host, DEFAULT_PORT);
+ connections.put(nodeID, connector);
+ }
+ return connector;
+ }
+
+ @Override
+ public boolean isConnected(JMXConnector jmxConnector)
+ {
+ try
+ {
+ return jmxConnector != null && jmxConnector.getConnectionId() != null;
+ }
+ catch (IOException e)
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void close(UUID nodeID) throws IOException
+ {
+ JMXConnector connector = connections.remove(nodeID);
+ if (connector != null)
+ {
+ connector.close();
+ }
+ }
+
+ @Override
+ public void add(Node node) throws IOException
+ {
+ String host = node.getBroadcastRpcAddress().get().getHostString();
+ UUID nodeID = node.getHostId();
+ Integer port = getJMXPort(node);
+ if (!connections.containsKey(nodeID))
+ {
+ JMXConnector connector = createJMXConnectorWithAuth(host, port);
+ connections.put(nodeID, connector);
+ }
+ }
+
+ public static JMXConnector createJMXConnectorWithAuth(final String host, final Integer port)
+ {
+ try
+ {
+ JMXServiceURL jmxUrl = new JMXServiceURL(java.lang.String.format(JMX_FORMAT_URL, host, port));
+ String username = "cassandra";
+ String password = "cassandra";
+ // Create an environment map to hold connection properties, including credentials
+ Map env = new HashMap<>();
+ env.put("jmx.remote.x.request.waiting.timeout", 6000000L); // Wait for 60 seconds
+ env.put("jmx.remote.x.notification.fetch.timeout", 6000000L);
+ // Set the credentials in the environment map
+ String[] credentials = new String[] { username, password };
+ env.put(JMXConnector.CREDENTIALS, credentials);
+
+ // Connect to the JMX server using the service URL and environment map with authentication
+ return JMXConnectorFactory.connect(jmxUrl, env);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Integer getJMXPort(final Node node)
+ {
+ SimpleStatement simpleStatement = SimpleStatement
+ .builder("SELECT value FROM system_views.system_properties WHERE name = 'cassandra.jmx.remote.port';")
+ .setNode(node)
+ .build();
+
+ Row row = mySession.execute(simpleStatement).one();
+ if ((row == null) || (row.getString("value") == null))
+ {
+ simpleStatement = SimpleStatement
+ .builder("SELECT value FROM system_views.system_properties WHERE name = 'cassandra.jmx.local.port';")
+ .setNode(node)
+ .build();
+ row = mySession.execute(simpleStatement).one();
+
+ }
+ if ((row != null) && (row.getString("value") != null))
+ {
+ return Integer.parseInt(Objects.requireNonNull(row.getString("value")));
+ }
+ else
+ {
+ return DEFAULT_PORT;
+ }
+ }
+ };
+
+ final Map nodesMap = new HashMap<>();
+ myJmxProxyFactory = DistributedJmxProxyFactoryImpl.builder()
+ .withJmxConnectionProvider(myJmxConnectionProvider)
+ .withEccNodesSync(new EccNodesSync.Builder()
+ .withConnectionDelayValue(10L)
+ .withConnectionDelayUnit(TimeUnit.SECONDS)
+ .withNativeConnection(getNativeConnectionProvider())
+ .withSession(mySession)
+ .withEcchronosID("Id")
+ .build())
+ .withNodesMap(nodesList)
+ .build();
+ }
+
+ @AfterClass
+ public static void tearDownCluster()
+ {
+ AbstractCassandraCluster.tearDownCluster();
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException
+ {
+ myJmxConnectionProvider.close();
+ myNativeConnectionProvider.close();
+ }
+
+ protected static DistributedNativeConnectionProvider getNativeConnectionProvider()
+ {
+ return myNativeConnectionProvider;
+ }
+
+ protected static DistributedJmxConnectionProvider getJmxConnectionProvider()
+ {
+ return myJmxConnectionProvider;
+ }
+
+ protected static DistributedJmxProxyFactoryImpl getJmxProxyFactory()
+ {
+ return myJmxProxyFactory;
+ }
+
+ protected void insertSomeDataAndFlush(TableReference tableReference, CqlSession session, Node node)
+ throws ReflectionException,
+ MalformedObjectNameException,
+ InstanceNotFoundException,
+ MBeanException,
+ IOException
+ {
+ for (int i = 0; i < DEFAULT_INSERT_DATA_COUNT; i++)
+ {
+ UUID randomUUID = UUID.randomUUID();
+ SimpleStatement statement = QueryBuilder.insertInto(tableReference.getKeyspace(), tableReference.getTable())
+ .value("col1", literal(randomUUID))
+ .value("col2", literal(randomUUID.hashCode()))
+ // .value("value", literal(randomUUID.hashCode()))
+ .build();
+ session.execute(statement);
+ }
+ //getReadMetrics(tableReference, node);
+ forceFlush(tableReference, node);
+ // runRepair(tableReference, node);
+ // execute();
+ }
+
+ private void forceFlush(TableReference tableReference, Node node)
+ throws IOException,
+ MalformedObjectNameException,
+ ReflectionException,
+ InstanceNotFoundException,
+ MBeanException
+ {
+ try (JMXConnector jmxConnector = getJmxConnectionProvider().getJmxConnector(node.getHostId()))
+ {
+ String[] table = new String[] { tableReference.getTable() };
+ boolean isConnected = jmxConnector.getConnectionId() != null;
+ System.out.println(isConnected);
+ jmxConnector.getMBeanServerConnection()
+ .invoke(new ObjectName("org.apache.cassandra.db:type=StorageService"),
+ "forceKeyspaceFlush",
+ new Object[] {
+ tableReference.getKeyspace(), table
+ },
+ new String[] {
+ String.class.getName(), String[].class.getName()
+ });
+ }
+ }
+
+ private void runRepair(TableReference tableReference, Node node)
+ throws IOException,
+ MalformedObjectNameException
+ {
+ try (JMXConnector jmxConnector = getJmxConnectionProvider().getJmxConnector(node.getHostId()))
+ {
+ // Trigger the repair
+ ObjectName storageServiceObjectName = new ObjectName("org.apache.cassandra.db:type=ActiveRepairService");
+ final Map options = new HashMap<>();
+ options.put("keyspace", tableReference.getKeyspace());
+ options.put("table", tableReference.getTable());
+ options.put("fullRepair", "true");
+ try
+ {
+ Object result = (int) jmxConnector
+ .getMBeanServerConnection()
+ .invoke(storageServiceObjectName,
+ "repairAsync",
+ new Object[] {
+ tableReference.getKeyspace(), options
+ },
+ new String[] {
+ String.class.getName(), Map.class.getName()
+ });
+ }
+ catch (InstanceNotFoundException | MBeanException | ReflectionException | IOException e)
+ {
+ LOG.error("Unable to repair node {} because of {}", node.getHostId(), e.getMessage());
+ }
+ }
+ }
+
+ private void getReadMetrics(TableReference tableReference, Node node)
+ throws IOException,
+ MalformedObjectNameException,
+ ReflectionException,
+ InstanceNotFoundException,
+ MBeanException
+ {
+ try (JMXConnector jmxConnector = getJmxConnectionProvider().getJmxConnector(node.getHostId()))
+ {
+ MBeanServerConnection mbeanServerConnection = jmxConnector.getMBeanServerConnection();
+
+ // Access the StorageProxy MBean for write metrics
+ ObjectName writeLatencyObjectName =
+ new ObjectName("org.apache.cassandra.metrics:type=Table,keyspace=ecchronos,scope=test_table,name=WriteLatency");
+ Object writeLatency = mbeanServerConnection.getAttribute(writeLatencyObjectName, "Mean");
+
+ System.out.println("WriteLatency for ecchronos.test_table: " + writeLatency);
+ }
+
+ catch (AttributeNotFoundException e)
+ {
+ throw new RuntimeException("Attribute not found: ReadLatency", e);
+ }
+ }
+
+ private void execute()
+ {
+ try
+ {
+ // Build the command to execute nodetool repairstats
+ ProcessBuilder processBuilder = new ProcessBuilder("nodetool", "repairstats");
+
+ // Start the process
+ Process process = processBuilder.start();
+
+ // Get the output of the command
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ while ((line = reader.readLine()) != null)
+ {
+ System.out.println(line); // Print the status of ongoing repairs
+ }
+
+ // Wait for the command to complete
+ int exitCode = process.waitFor();
+ if (exitCode == 0)
+ {
+ System.out.println("Repair status fetched successfully.");
+ }
+ else
+ {
+ System.out.println("Failed to fetch repair status with exit code " + exitCode);
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/standalone-integration/src/test/resources/logback-test.xml b/standalone-integration/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..d6dcf34cc
--- /dev/null
+++ b/standalone-integration/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+ %-4relative [%thread] %-5level %logger{35} - %msg %n
+
+
+
+
+
+
+
+
+
+