From 34e58f773e246246316b2314333e71141331beca Mon Sep 17 00:00:00 2001 From: sajid riaz Date: Thu, 5 Dec 2024 18:41:37 +0100 Subject: [PATCH] Added stand alonde integration tests - Integration tests for on demand repairs --- .github/workflows/actions.yml | 2 +- cassandra-test-image/pom.xml | 11 +- .../DistributedJmxConnectionProviderImpl.java | 25 +- pom.xml | 2 + standalone-integration/pom.xml | 170 +++++++++ .../ITIncrementalOnDemandRepairJob.java | 267 ++++++++++++++ .../standalone/ITIncrementalSchedules.java | 244 +++++++++++++ .../standalone/ITOnDemandRepairJob.java | 330 ++++++++++++++++++ .../ecchronos/standalone/TestBase.java | 278 +++++++++++++++ .../src/test/resources/logback-test.xml | 32 ++ .../src/test/resources/security.yml | 49 +++ 11 files changed, 1405 insertions(+), 5 deletions(-) create mode 100644 standalone-integration/pom.xml create mode 100644 standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java create mode 100644 standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java create mode 100644 standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java create mode 100644 standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java create mode 100644 standalone-integration/src/test/resources/logback-test.xml create mode 100644 standalone-integration/src/test/resources/security.yml diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index 6216db73c..3c82a4841 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -42,7 +42,7 @@ jobs: java-version: 17 distribution: 'temurin' - name: install dependencies - run: mvn install -DskipTests=true + run: mvn install -Pbuild-cassandra-test-jar -DskipTests=true - run: mvn $TEST_SUITE -B id: tests env: diff --git a/cassandra-test-image/pom.xml b/cassandra-test-image/pom.xml index 2d13cd1a4..8b3897e05 100644 --- a/cassandra-test-image/pom.xml +++ b/cassandra-test-image/pom.xml @@ -87,14 +87,14 @@ maven-deploy-plugin - true + false maven-install-plugin - true + false @@ -103,6 +103,12 @@ build-cassandra-test-jar + + + build-cassandra-test-jar + true + + @@ -110,6 +116,7 @@ maven-jar-plugin + test-jar test-jar diff --git a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java index 325513765..26f45e111 100644 --- a/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java +++ b/connection.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/impl/providers/DistributedJmxConnectionProviderImpl.java @@ -115,7 +115,25 @@ public ConcurrentHashMap getJmxConnections() @Override public JMXConnector getJmxConnector(final UUID nodeID) { - return myJMXConnections.get(nodeID); + JMXConnector connector = myJMXConnections.get(nodeID); + if (isConnected(nodeID)) + { + return connector; + } + LOG.info("Connection expired or disconnected with node Id {}", nodeID); + Node node = myNativeConnectionProvider.getNodes().get(nodeID); + try + { + LOG.info("Attempting to create JMX connection with node {}", node.getHostId()); + myDistributedJmxBuilder.reconnect(node); + LOG.info("Connection created successfully with node {}", node.getHostId()); + connector = myJMXConnections.get(nodeID); + } + catch (EcChronosException e) + { + LOG.error("Failed to connect to node {}: {}", node.getHostId(), e.getMessage()); + } + return connector; } /** @@ -144,7 +162,10 @@ public void close() throws IOException @Override public void close(final UUID nodeID) throws IOException { - myJMXConnections.get(nodeID).close(); + if (myJMXConnections.get(nodeID) != null) + { + myJMXConnections.get(nodeID).close(); + } } /** diff --git a/pom.xml b/pom.xml index 78dfd3acf..d6bbb9e78 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,7 @@ fault.manager fault.manager.impl rest + standalone-integration @@ -463,6 +464,7 @@ pmd-rules.xml + true diff --git a/standalone-integration/pom.xml b/standalone-integration/pom.xml new file mode 100644 index 000000000..8bbc9bdb2 --- /dev/null +++ b/standalone-integration/pom.xml @@ -0,0 +1,170 @@ + + + + 4.0.0 + + com.ericsson.bss.cassandra.ecchronos + agent + 1.0.0-SNAPSHOT + + standalone-integration + Integration tests for a standalone environment + EcChronos Standalone Integration + + + + + com.ericsson.bss.cassandra.ecchronos + connection + ${project.version} + test + + + + com.ericsson.bss.cassandra.ecchronos + application + ${project.version} + test + + + + com.ericsson.bss.cassandra.ecchronos + connection + ${project.version} + test + + + + com.ericsson.bss.cassandra.ecchronos + connection.impl + ${project.version} + test + + + com.ericsson.bss.cassandra.ecchronos + cassandra-test-image + ${project.version} + tests + test + + + + com.ericsson.bss.cassandra.ecchronos + data + ${project.version} + test + + + + com.ericsson.bss.cassandra.ecchronos + core + ${project.version} + test + + + + com.ericsson.bss.cassandra.ecchronos + core.impl + ${project.version} + test + + + + + org.junit.vintage + junit-vintage-engine + test + + + + org.mockito + mockito-core + test + + + + net.jcip + jcip-annotations + test + + + + org.assertj + assertj-core + test + + + + org.awaitility + awaitility + test + + + + org.testcontainers + cassandra + test + + + + + org.slf4j + slf4j-api + test + + + + ch.qos.logback + logback-classic + test + + + + ch.qos.logback + logback-core + test + + + + + + maven-compiler-plugin + + ${java.version} + ${java.version} + ${project.build.directory}/generated-sources/ + + + + + maven-deploy-plugin + + true + + + + + maven-install-plugin + + true + + + + + diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java new file mode 100644 index 000000000..9673d2571 --- /dev/null +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java @@ -0,0 +1,267 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ericsson.bss.cassandra.ecchronos.standalone; + +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import java.util.ArrayList; +import net.jcip.annotations.NotThreadSafe; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.mockito.Mock; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@NotThreadSafe +public class ITIncrementalOnDemandRepairJob extends TestBase +{ + private static final int DEFAULT_JOB_TIMEOUT_IN_SECONDS = 90; + + @Mock + private static TableRepairMetrics mockTableRepairMetrics; + private static Metadata myMetadata; + private static HostStatesImpl myHostStates; + private static OnDemandRepairSchedulerImpl myOnDemandRepairSchedulerImpl; + private static ScheduleManagerImpl myScheduleManagerImpl; + private static CASLockFactory myLockFactory; + private static TableReferenceFactory myTableReferenceFactory; + private static CassandraMetrics myCassandraMetrics; + private Set myRepairs = new HashSet<>(); + + @Before + public void init() + { + mockTableRepairMetrics = mock(TableRepairMetrics.class); + myMetadata = mySession.getMetadata(); + + myHostStates = HostStatesImpl.builder() + .withRefreshIntervalInMs(1000) + .withJmxProxyFactory(getJmxProxyFactory()) + .build(); + + myLockFactory = CASLockFactory.builder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(myHostStates) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + Set nodeIds = getNativeConnectionProvider().getNodes().keySet(); + List nodeIdList = new ArrayList<>(nodeIds); + + myScheduleManagerImpl = ScheduleManagerImpl.builder() + .withLockFactory(myLockFactory) + .withNodeIDList(nodeIdList) + .withRunInterval(1, TimeUnit.SECONDS) + .build(); + + myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(), + Duration.ofSeconds(5), Duration.ofMinutes(30)); + + myOnDemandRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder() + .withJmxProxyFactory(getJmxProxyFactory()) + .withTableRepairMetrics(mockTableRepairMetrics) + .withScheduleManager(myScheduleManagerImpl) + .withRepairLockType(RepairLockType.VNODE) + .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(mySession), mySession)) + .withSession(mySession) + .withRepairConfiguration(RepairConfiguration.DEFAULT) + .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider())) + .build(); + } + + @After + public void clean() + { + for (TableReference tableReference : myRepairs) + { + mySession.execute(QueryBuilder.deleteFrom("system_distributed", "repair_history") + .whereColumn("keyspace_name") + .isEqualTo(literal(tableReference.getKeyspace())) + .whereColumn("columnfamily_name") + .isEqualTo(literal(tableReference.getTable())) + .build()); + for (Node node : myMetadata.getNodes().values()) + { + mySession.execute(QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status") + .whereColumn("host_id") + .isEqualTo(literal(node.getHostId())) + .build()); + } + } + myRepairs.clear(); + reset(mockTableRepairMetrics); + closeConnections(); + } + + public static void closeConnections() + { + if (myHostStates != null) + { + myHostStates.close(); + } + if (myOnDemandRepairSchedulerImpl != null) + { + myOnDemandRepairSchedulerImpl.close(); + } + if (myScheduleManagerImpl != null) + { + myScheduleManagerImpl.close(); + } + if (myLockFactory != null) + { + myLockFactory.close(); + } + } + + @Test + public void repairSingleTable() throws Exception + { + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + getJmxConnectionProvider().add(node); + insertSomeDataAndFlush(tableReference, mySession, node); + long startTime = System.currentTimeMillis(); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) + .until(() -> + { + double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference); + long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference); + return maxRepairedAt < startTime && percentRepaired < 100.0d; + }); + + UUID jobId = triggerRepair(tableReference, node); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) + .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) + .until(() -> + { + double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference); + long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference); + return maxRepairedAt >= startTime && percentRepaired >= 100.0d; + }); + + verifyOnDemandCompleted(jobId, startTime, node.getHostId()); + } + + @Test + public void repairMultipleTables() throws Exception + { + long startTime = System.currentTimeMillis(); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + getJmxConnectionProvider().add(node); + TableReference tableReference1 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + TableReference tableReference2 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_TWO_NAME); + insertSomeDataAndFlush(tableReference1, mySession, node); + insertSomeDataAndFlush(tableReference2, mySession, node); + UUID jobId1 = triggerRepair(tableReference1, node); + UUID jobId2 = triggerRepair(tableReference2, node); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS) + .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + verifyOnDemandCompleted(jobId1, startTime, node.getHostId()); + verify(mockTableRepairMetrics).repairSession(eq(tableReference1), any(long.class), any(TimeUnit.class), + eq(true)); + verifyOnDemandCompleted(jobId2, startTime, node.getHostId()); + verify(mockTableRepairMetrics).repairSession(eq(tableReference2), any(long.class), any(TimeUnit.class), + eq(true)); + } + + @Test + public void repairSameTableTwice() throws Exception + { + long startTime = System.currentTimeMillis(); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + getJmxConnectionProvider().add(node); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + insertSomeDataAndFlush(tableReference, mySession, node); + UUID jobId1 = triggerRepair(tableReference, node); + UUID jobId2 = triggerRepair(tableReference, node); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS) + .until(() -> myOnDemandRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_JOB_TIMEOUT_IN_SECONDS * 2, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + verifyOnDemandCompleted(jobId1, startTime, node.getHostId()); + verifyOnDemandCompleted(jobId2, startTime, node.getHostId()); + verify(mockTableRepairMetrics, times(2)).repairSession(eq(tableReference), + any(long.class), any(TimeUnit.class), eq(true)); + } + + private void verifyOnDemandCompleted(UUID jobId, long startTime, UUID hostId) + { + List completedJobs = myOnDemandRepairSchedulerImpl.getAllRepairJobs(hostId) + .stream() + .filter(j -> j.getId().equals(jobId)) + .collect(Collectors.toList()); + assertThat(completedJobs).hasSize(1); + assertThat(completedJobs.get(0).getCompletionTime()).isGreaterThanOrEqualTo(startTime); + } + + private UUID triggerRepair(TableReference tableReference, Node node) throws EcChronosException + { + myRepairs.add(tableReference); + return myOnDemandRepairSchedulerImpl.scheduleJob(tableReference, RepairType.INCREMENTAL, node.getHostId()).getId(); + } +} diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java new file mode 100644 index 000000000..4ed21e64c --- /dev/null +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java @@ -0,0 +1,244 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ericsson.bss.cassandra.ecchronos.standalone; + +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.RepairSchedulerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import java.util.UUID; +import net.jcip.annotations.NotThreadSafe; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + +@NotThreadSafe +public class ITIncrementalSchedules extends TestBase +{ + private static final Logger LOG = LoggerFactory.getLogger(ITIncrementalSchedules.class); + private static final int DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS = 90; + private static final int CASSANDRA_METRICS_UPDATE_IN_SECONDS = 5; + private static RepairFaultReporter mockFaultReporter; + private static TableRepairMetrics mockTableRepairMetrics; + private static HostStatesImpl myHostStates; + private static RepairSchedulerImpl myRepairSchedulerImpl; + private static ScheduleManagerImpl myScheduleManagerImpl; + private static CASLockFactory myLockFactory; + private static RepairConfiguration myRepairConfiguration; + private static TableReferenceFactory myTableReferenceFactory; + private static CassandraMetrics myCassandraMetrics; + protected static Metadata myMetadata; + + private Set myRepairs = new HashSet<>(); + + @Before + public void init() + { + mockFaultReporter = mock(RepairFaultReporter.class); + mockTableRepairMetrics = mock(TableRepairMetrics.class); + myMetadata = mySession.getMetadata(); + + myTableReferenceFactory = new TableReferenceFactoryImpl(mySession); + + myHostStates = HostStatesImpl.builder() + .withRefreshIntervalInMs(1000) + .withJmxProxyFactory(getJmxProxyFactory()) + .build(); + + myLockFactory = CASLockFactory.builder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(myHostStates) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + Set nodeIds = getNativeConnectionProvider().getNodes().keySet(); + List nodeIdList = new ArrayList<>(nodeIds); + + myScheduleManagerImpl = ScheduleManagerImpl.builder() + .withLockFactory(myLockFactory) + .withNodeIDList(nodeIdList) + .withRunInterval(1, TimeUnit.SECONDS) + .build(); + + myCassandraMetrics = new CassandraMetrics(getJmxProxyFactory(), + Duration.ofSeconds(CASSANDRA_METRICS_UPDATE_IN_SECONDS), Duration.ofMinutes(30)); + + myRepairSchedulerImpl = RepairSchedulerImpl.builder() + .withJmxProxyFactory(getJmxProxyFactory()) + .withTableRepairMetrics(mockTableRepairMetrics) + .withFaultReporter(mockFaultReporter) + .withScheduleManager(myScheduleManagerImpl) + .withRepairLockType(RepairLockType.VNODE) + .withCassandraMetrics(myCassandraMetrics) + .withReplicationState(new ReplicationStateImpl(new NodeResolverImpl(mySession), mySession)) + .build(); + + myRepairConfiguration = RepairConfiguration.newBuilder() + .withRepairInterval(5, TimeUnit.SECONDS) + .withRepairType(RepairType.INCREMENTAL) + .build(); + } + + @After + public void clean() + { + List> stages = new ArrayList<>(); + for (TableReference tableReference : myRepairs) + { + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + myRepairSchedulerImpl.removeConfiguration(node, tableReference); + + stages.add(mySession.executeAsync(QueryBuilder.deleteFrom("system_distributed", "repair_history") + .whereColumn("keyspace_name") + .isEqualTo(literal(tableReference.getKeyspace())) + .whereColumn("columnfamily_name") + .isEqualTo(literal(tableReference.getTable())) + .build())); + } + for (CompletionStage stage : stages) + { + CompletableFutures.getUninterruptibly(stage); + } + myRepairs.clear(); + reset(mockTableRepairMetrics); + reset(mockFaultReporter); + closeConnections(); + } + + public static void closeConnections() + { + if (myHostStates != null) + { + myHostStates.close(); + } + if (myRepairSchedulerImpl != null) + { + myRepairSchedulerImpl.close(); + } + if (myScheduleManagerImpl != null) + { + myScheduleManagerImpl.close(); + } + if (myLockFactory != null) + { + myLockFactory.close(); + } + } + + @Test + public void repairSingleTable() throws Exception + { + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + insertSomeDataAndFlush(tableReference, mySession, node); + long startTime = System.currentTimeMillis(); + + // Wait for metrics to be updated, wait at least 3 times the update time for metrics (worst case scenario) + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(CASSANDRA_METRICS_UPDATE_IN_SECONDS * 3, TimeUnit.SECONDS) + .until(() -> + { + double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference); + long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference); + LOG.info("Waiting for metrics to be updated, percentRepaired: {} maxRepairedAt: {}", + percentRepaired, maxRepairedAt); + return maxRepairedAt < startTime && percentRepaired < 100.0d; + }); + + // Create a schedule + addSchedule(tableReference, node); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) + .until(() -> getSchedule(tableReference).isPresent()); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(DEFAULT_SCHEDULE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) + .until(() -> + { + double percentRepaired = myCassandraMetrics.getPercentRepaired(node.getHostId(), tableReference); + long maxRepairedAt = myCassandraMetrics.getMaxRepairedAt(node.getHostId(), tableReference); + LOG.info("Waiting for schedule to run, percentRepaired: {} maxRepairedAt: {}", + percentRepaired, maxRepairedAt); + return maxRepairedAt >= startTime && percentRepaired >= 100.0d; + }); + verify(mockFaultReporter, never()) + .raise(any(RepairFaultReporter.FaultCode.class), anyMap()); + verify(mockTableRepairMetrics).repairSession(eq(tableReference), + any(long.class), any(TimeUnit.class), eq(true)); + Optional view = getSchedule(tableReference); + assertThat(view).isPresent(); + assertThat(view.get().getStatus()).isEqualTo(ScheduledRepairJobView.Status.COMPLETED); + assertThat(view.get().getCompletionTime()).isGreaterThanOrEqualTo(startTime); + assertThat(view.get().getProgress()).isGreaterThanOrEqualTo(1.0d); + } + + private void addSchedule(TableReference tableReference, Node node) + { + if (myRepairs.add(tableReference)) + { + myRepairSchedulerImpl.putConfigurations(node, tableReference, Collections.singleton(myRepairConfiguration)); + } + } + + private Optional getSchedule(TableReference tableReference) + { + return myRepairSchedulerImpl.getCurrentRepairJobs() + .stream() + .filter(s -> s.getRepairConfiguration().equals(myRepairConfiguration)) + .filter(s -> s.getTableReference().equals(tableReference)) + .findFirst(); + } +} diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java new file mode 100644 index 000000000..3405d1896 --- /dev/null +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java @@ -0,0 +1,330 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.standalone; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.OptionalLong; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.token.TokenRange; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token; +import org.assertj.core.util.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import net.jcip.annotations.NotThreadSafe; + +@NotThreadSafe +public class ITOnDemandRepairJob extends TestBase +{ + + private static TableRepairMetrics mockTableRepairMetrics; + + private static Metadata myMetadata; + + private static HostStatesImpl myHostStates; + + private static RepairHistoryService myEccRepairHistory; + + private static OnDemandRepairSchedulerImpl myRepairSchedulerImpl; + + private static ScheduleManagerImpl myScheduleManagerImpl; + + private static CASLockFactory myLockFactory; + + private Set myRepairs = new HashSet<>(); + + @Before + public void init() throws IOException + { + mockTableRepairMetrics = mock(TableRepairMetrics.class); + myMetadata = mySession.getMetadata(); + + Set nodeIds = getNativeConnectionProvider().getNodes().keySet(); + List nodeIdList = new ArrayList<>(nodeIds); + + myHostStates = HostStatesImpl.builder() + .withRefreshIntervalInMs(1000) + .withJmxProxyFactory(getJmxProxyFactory()) + .build(); + + NodeResolver nodeResolver = new NodeResolverImpl(mySession); + + ReplicationState replicationState = new ReplicationStateImpl(nodeResolver, mySession); + + myEccRepairHistory = new RepairHistoryService(mySession, replicationState, nodeResolver, 2_592_000_000L); + + myLockFactory = CASLockFactory.builder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(myHostStates) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + myScheduleManagerImpl = ScheduleManagerImpl.builder() + .withLockFactory(myLockFactory) + .withNodeIDList(nodeIdList) + .withRunInterval(100, TimeUnit.MILLISECONDS) + .build(); + + myRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder() + .withJmxProxyFactory(getJmxProxyFactory()) + .withTableRepairMetrics(mockTableRepairMetrics) + .withScheduleManager(myScheduleManagerImpl) + .withRepairLockType(RepairLockType.VNODE) + .withReplicationState(replicationState) + .withSession(mySession) + .withRepairConfiguration(RepairConfiguration.DEFAULT) + .withRepairHistory(myEccRepairHistory) + .withOnDemandStatus(new OnDemandStatus(getNativeConnectionProvider())) + .build(); + } + + @After + public void clean() + { + for (TableReference tableReference : myRepairs) + { + mySession.execute(QueryBuilder.deleteFrom("system_distributed", "repair_history") + .whereColumn("keyspace_name") + .isEqualTo(literal(tableReference.getKeyspace())) + .whereColumn("columnfamily_name") + .isEqualTo(literal(tableReference.getTable())) + .build()); + for (Node node : myMetadata.getNodes().values()) + { + mySession.execute(QueryBuilder.deleteFrom("ecchronos", "repair_history") + .whereColumn("table_id") + .isEqualTo(literal(tableReference.getId())) + .whereColumn("node_id") + .isEqualTo(literal(node.getHostId())) + .build()); + mySession.execute(QueryBuilder.deleteFrom("ecchronos", "on_demand_repair_status") + .whereColumn("host_id") + .isEqualTo(literal(node.getHostId())) + .build()); + } + } + myRepairs.clear(); + reset(mockTableRepairMetrics); + closeConnections(); + } + + private static void closeConnections() + { + myHostStates.close(); + myRepairSchedulerImpl.close(); + myScheduleManagerImpl.close(); + myLockFactory.close(); + } + + /** + * Create a table that is replicated and schedule a repair on it + */ + @Test + public void repairSingleTable() throws EcChronosException + { + long startTime = System.currentTimeMillis(); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + + schedule(tableReference, node.getHostId()); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + + verifyTableRepairedSince(tableReference, startTime, node); + } + + /** + * Create two tables that are replicated and repair both + **/ + + @Test + public void repairMultipleTables() throws EcChronosException + { + long startTime = System.currentTimeMillis(); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + TableReference tableReference2 = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_TWO_NAME); + + schedule(tableReference, node.getHostId()); + schedule(tableReference2, node.getHostId()); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + + verifyTableRepairedSince(tableReference, startTime, node); + verifyTableRepairedSince(tableReference2, startTime, node); + } + + /** + * Schedule two jobs on the same table + **/ + + @Test + public void repairSameTableTwice() throws EcChronosException + { + long startTime = System.currentTimeMillis(); + Node node = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getValue(); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + + schedule(tableReference, node.getHostId()); + schedule(tableReference, node.getHostId()); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myRepairSchedulerImpl.getActiveRepairJobs().isEmpty()); + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> myScheduleManagerImpl.getQueueSize(node.getHostId()) == 0); + + verifyTableRepairedSince(tableReference, startTime, tokenRangesFor(tableReference.getKeyspace(), node).size() * 2, node); + } + + private void schedule(TableReference tableReference, UUID nodeId) throws EcChronosException + { + myRepairs.add(tableReference); + myRepairSchedulerImpl.scheduleJob(tableReference, RepairType.VNODE, nodeId); + } + + private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, Node node) + { + verifyTableRepairedSince(tableReference, repairedSince, tokenRangesFor(tableReference.getKeyspace(), node).size(), node); + } + + private void verifyTableRepairedSince(TableReference tableReference, long repairedSince, int expectedTokenRanges, Node node) + { + OptionalLong repairedAtWithCassandraHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node); + assertThat(repairedAtWithCassandraHistory.isPresent()).isTrue(); + + OptionalLong repairedAtWithEccHistory = lastRepairedSince(tableReference, repairedSince, myEccRepairHistory, node); + assertThat(repairedAtWithEccHistory.isPresent()).isTrue(); + + verify(mockTableRepairMetrics, times(expectedTokenRanges)).repairSession(eq(tableReference), anyLong(), + any(TimeUnit.class), eq(true)); + } + + private OptionalLong lastRepairedSince(TableReference tableReference, + long repairedSince, + RepairHistoryService repairHistoryProvider, + Node node) + { + return lastRepairedSince(tableReference, + repairedSince, + tokenRangesFor(tableReference.getKeyspace(), node), + repairHistoryProvider, + node); + } + + private OptionalLong lastRepairedSince(TableReference tableReference, + long repairedSince, + Set expectedRepaired, + RepairHistoryService repairHistoryProvider, + Node node) + { + Set expectedRepairedCopy = new HashSet<>(expectedRepaired); + Iterator repairEntryIterator = repairHistoryProvider.iterate(node, tableReference, + System.currentTimeMillis(), repairedSince, + repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange())); + + List repairEntries = Lists.newArrayList(repairEntryIterator); + + Set actuallyRepaired = repairEntries.stream() + .map(RepairEntry::getRange) + .collect(Collectors.toSet()); + + if (expectedRepaired.equals(actuallyRepaired)) + { + return repairEntries.stream().mapToLong(RepairEntry::getStartedAt).min(); + } + else + { + return OptionalLong.empty(); + } + } + + private Set tokenRangesFor(String keyspace, Node node) + { + return myMetadata.getTokenMap() + .get() + .getTokenRanges(keyspace, node) + .stream() + .map(this::convertTokenRange) + .collect(Collectors.toSet()); + } + + private boolean fullyRepaired(RepairEntry repairEntry) + { + return repairEntry.getParticipants().size() == 2 && repairEntry.getStatus() == RepairStatus.SUCCESS; + } + + private LongTokenRange convertTokenRange(TokenRange range) + { + // Assuming murmur3 partitioner + long start = ((Murmur3Token) range.getStart()).getValue(); + long end = ((Murmur3Token) range.getEnd()).getValue(); + return new LongTokenRange(start, end); + } +} diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java new file mode 100644 index 000000000..28ea8f8f6 --- /dev/null +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/TestBase.java @@ -0,0 +1,278 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.standalone; + +import cassandracluster.AbstractCassandraCluster; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.ericsson.bss.cassandra.ecchronos.application.config.ConfigurationHelper; +import com.ericsson.bss.cassandra.ecchronos.application.config.security.Credentials; +import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder; +import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedJmxConnectionProviderImpl; +import com.ericsson.bss.cassandra.ecchronos.connection.impl.providers.DistributedNativeConnectionProviderImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.connection.ConnectionType; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import net.jcip.annotations.NotThreadSafe; +import org.junit.AfterClass; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.util.UUID; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@NotThreadSafe +abstract public class TestBase extends AbstractCassandraCluster +{ + protected static final Logger LOG = LoggerFactory.getLogger(TestBase.class); + protected static final String SECURITY_FILE = "security.yml"; + protected static final String ECCHRONOS_KEYSPACE = "ecchronos"; + protected static final String TEST_KEYSPACE = "test_keyspace"; + protected static final String TEST_TABLE_ONE_NAME = "test_table1"; + protected static final String TEST_TABLE_TWO_NAME = "test_table2"; + private static final String ON_DEMAND_REPAIR_STATUS_TABLE = "on_demand_repair_status"; + private static final String ECCHRONOS_ID = "EcchronosID"; + protected static final String ECCHRONOS_KEYSPACE_QUERY = + String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}", + ECCHRONOS_KEYSPACE); + protected static final String TEST_KEYSPACE_QUERY = + String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2}", + TEST_KEYSPACE); + private static final String NODE_SYNC_TABLE_QUERY = String.format( + "CREATE TABLE IF NOT EXISTS %s.nodes_sync(ecchronos_id TEXT, datacenter_name TEXT, node_id UUID, node_endpoint TEXT, node_status TEXT, last_connection TIMESTAMP, next_connection TIMESTAMP, PRIMARY KEY(ecchronos_id, datacenter_name, node_id)) WITH CLUSTERING ORDER BY(datacenter_name DESC, node_id DESC);", + ECCHRONOS_KEYSPACE); + private static final String LOCK_TABLE_QUERY = String.format( + "CREATE TABLE IF NOT EXISTS %s.lock (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + ECCHRONOS_KEYSPACE); + private static final String LOCK_PRIORITY_TABLE_QUERY = String.format( + "CREATE TABLE IF NOT EXISTS %s.lock_priority (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", + ECCHRONOS_KEYSPACE); + private static final String REFERENCE_TABLE_QUERY = String + .format("CREATE TYPE IF NOT EXISTS %s.table_reference (id uuid, keyspace_name text, table_name text)", ECCHRONOS_KEYSPACE); + private static final String TOKEN_RANGE_TABLE_QUERY = + String.format("CREATE TYPE IF NOT EXISTS %s.token_range (start text, end text)", ECCHRONOS_KEYSPACE); + private static final String ON_DEMAND_REPAIR_STATUS_TABLE_QUERY = String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (host_id uuid, job_id uuid, table_reference frozen, token_map_hash int, repaired_tokens frozen>>, status text, completed_time timestamp, repair_type text, PRIMARY KEY(host_id, job_id)) WITH default_time_to_live = 2592000 AND gc_grace_seconds = 0", + ECCHRONOS_KEYSPACE, ON_DEMAND_REPAIR_STATUS_TABLE); + private static final String REPAIR_TABLE_QUERY = String.format( + "CREATE TABLE IF NOT EXISTS %s.repair_history(table_id UUID, node_id UUID, repair_id timeuuid, job_id UUID, coordinator_id UUID, range_begin text, range_end text, participants set, status text, started_at timestamp, finished_at timestamp, PRIMARY KEY((table_id,node_id), repair_id)) WITH CLUSTERING ORDER BY (repair_id DESC);", + ECCHRONOS_KEYSPACE); + private static final String TEST_TABLE_QUERY = + String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + private static final String TEST_TABLE_TWO_QUERY = + String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 UUID, col2 int, PRIMARY KEY(col1))", TEST_KEYSPACE, TEST_TABLE_TWO_NAME); + protected static final int DEFAULT_INSERT_DATA_COUNT = 1000; + private static final int NUMBER_OF_RETRIES = 100; + + private static DistributedNativeConnectionProvider myNativeConnectionProvider; + private static DistributedJmxConnectionProvider myJmxConnectionProvider; + private static DistributedJmxProxyFactoryImpl myJmxProxyFactory; + protected static EccNodesSync myEccNodesSync; + protected static TableReferenceFactory myTableReferenceFactory; + protected static final AtomicReference jmxSecurity = new AtomicReference<>(); + + @BeforeClass + public static void setUpCluster() throws Exception + { + createKeyspaceAndTables(); + Security security = ConfigurationHelper.DEFAULT_INSTANCE.getConfiguration(SECURITY_FILE, Security.class); + jmxSecurity.set(security.getJmxSecurity()); + Map nodesList = new HashMap<>(mySession.getMetadata().getNodes()); + List contactPoints = new ArrayList<>(); + + for (Node node : nodesList.values()) + { + String hostname = node.getBroadcastRpcAddress().get().getHostName(); + int port = node.getBroadcastRpcAddress().get().getPort(); + contactPoints.add(new InetSocketAddress(hostname, port)); + } + + DistributedNativeBuilder distributedNativeBuilder = DistributedNativeConnectionProviderImpl.builder() + .withInitialContactPoints(contactPoints) + .withAgentType(ConnectionType.hostAware) + .withHostAware(contactPoints); + + myNativeConnectionProvider = + new DistributedNativeConnectionProviderImpl(mySession, nodesList, distributedNativeBuilder, ConnectionType.hostAware); + myEccNodesSync = EccNodesSync.newBuilder() + .withSession(mySession) + .withNativeConnection(myNativeConnectionProvider) + .withEcchronosID(ECCHRONOS_ID) + .build(); + + Supplier credentials = () -> convertCredentials(jmxSecurity::get); + Supplier> tls = mock(Supplier.class); + when(tls.get()).thenReturn(Map.of()); + myJmxConnectionProvider = DistributedJmxConnectionProviderImpl.builder() + .withCqlSession(mySession) + .withCredentials(credentials) + .withTLS(tls) + .withNativeConnection(myNativeConnectionProvider) + .withJolokiaEnabled(false) + .withEccNodesSync(myEccNodesSync) + .build(); + + myJmxProxyFactory = DistributedJmxProxyFactoryImpl.builder() + .withJmxConnectionProvider(myJmxConnectionProvider) + .withEccNodesSync(new EccNodesSync.Builder() + .withConnectionDelayValue(10L) + .withConnectionDelayUnit(TimeUnit.SECONDS) + .withNativeConnection(getNativeConnectionProvider()) + .withSession(mySession) + .withEcchronosID(ECCHRONOS_ID) + .build()) + .withNodesMap(nodesList) + .build(); + } + + private static void createKeyspaceAndTables() + { + for (int i = 0; i <= NUMBER_OF_RETRIES; i++) + { + try + { + mySession.execute(ECCHRONOS_KEYSPACE_QUERY); + mySession.execute(TEST_KEYSPACE_QUERY); + mySession.execute(NODE_SYNC_TABLE_QUERY); + mySession.execute(LOCK_TABLE_QUERY); + mySession.execute(LOCK_PRIORITY_TABLE_QUERY); + mySession.execute(TOKEN_RANGE_TABLE_QUERY); + mySession.execute(REFERENCE_TABLE_QUERY); + mySession.execute(ON_DEMAND_REPAIR_STATUS_TABLE_QUERY); + mySession.execute(REPAIR_TABLE_QUERY); + mySession.execute(TEST_TABLE_QUERY); + mySession.execute(TEST_TABLE_TWO_QUERY); + myTableReferenceFactory = new TableReferenceFactoryImpl(mySession); + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + if (tableReference == null) + { + throw new Exception("Still table is not available in keyspace"); + } + return; + } + catch (Exception exception) + { + LOG.warn("Cluster was not ready to serve the request so try again to set up keyspaces and tables correctly"); + } + } + } + + @AfterClass + public static void cleanup() throws IOException + { + if (myJmxConnectionProvider != null && myNativeConnectionProvider != null) + { + myJmxConnectionProvider.close(); + myNativeConnectionProvider.close(); + } + + } + + protected static DistributedNativeConnectionProvider getNativeConnectionProvider() + { + return myNativeConnectionProvider; + } + + protected static DistributedJmxConnectionProvider getJmxConnectionProvider() + { + return myJmxConnectionProvider; + } + + protected static DistributedJmxProxyFactoryImpl getJmxProxyFactory() + { + return myJmxProxyFactory; + } + + protected void insertSomeDataAndFlush(TableReference tableReference, CqlSession session, Node node) + throws ReflectionException, + MalformedObjectNameException, + InstanceNotFoundException, + MBeanException, + IOException + { + for (int i = 0; i < DEFAULT_INSERT_DATA_COUNT; i++) + { + UUID randomUUID = UUID.randomUUID(); + SimpleStatement statement = QueryBuilder.insertInto(tableReference.getKeyspace(), tableReference.getTable()) + .value("col1", literal(randomUUID)) + .value("col2", literal(randomUUID.hashCode())) + .build(); + session.execute(statement); + } + forceFlush(tableReference, node); + } + + private void forceFlush(TableReference tableReference, Node node) + throws IOException, + MalformedObjectNameException, + ReflectionException, + InstanceNotFoundException, + MBeanException + { + try (JMXConnector jmxConnector = getJmxConnectionProvider().getJmxConnector(node.getHostId())) + { + String[] table = new String[] { tableReference.getTable() }; + boolean isConnected = jmxConnector.getConnectionId() != null; + System.out.println(isConnected); + jmxConnector.getMBeanServerConnection() + .invoke(new ObjectName("org.apache.cassandra.db:type=StorageService"), + "forceKeyspaceFlush", + new Object[] { + tableReference.getKeyspace(), table + }, + new String[] { + String.class.getName(), String[].class.getName() + }); + } + } + + protected static String[] convertCredentials(final Supplier jmxSecurity) + { + Credentials credentials = jmxSecurity.get().getJmxCredentials(); + if (!credentials.isEnabled()) + { + return null; + } + return new String[] { + credentials.getUsername(), credentials.getPassword() + }; + } +} diff --git a/standalone-integration/src/test/resources/logback-test.xml b/standalone-integration/src/test/resources/logback-test.xml new file mode 100644 index 000000000..d863bf575 --- /dev/null +++ b/standalone-integration/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + %-4relative [%thread] %-5level %logger{35} - %msg %n + + + + + + + + + + diff --git a/standalone-integration/src/test/resources/security.yml b/standalone-integration/src/test/resources/security.yml new file mode 100644 index 000000000..64b4b43d4 --- /dev/null +++ b/standalone-integration/src/test/resources/security.yml @@ -0,0 +1,49 @@ +# +# Copyright 2024 Telefonaktiebolaget LM Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cql: + credentials: + enabled: true + username: cassandra + password: cassandra + tls: + enabled: false + keystore: /path/to/keystore + keystore_password: ecchronos + truststore: /path/to/truststore + truststore_password: ecchronos + certificate: + certificate_private_key: + trust_certificate: + protocol: TLSv1.2 + algorithm: + store_type: JKS + cipher_suites: + require_endpoint_verification: false + + +jmx: + credentials: + enabled: true + username: cassandra + password: cassandra + tls: + enabled: false + keystore: /path/to/keystore + keystore_password: ecchronos + truststore: /path/to/truststore + truststore_password: ecchronos + protocol: TLSv1.2 + cipher_suites: