From e81faf9d0595aae22c22ed3c6cd1c81c84e3178e Mon Sep 17 00:00:00 2001 From: sajid riaz Date: Tue, 17 Dec 2024 14:11:31 +0100 Subject: [PATCH] Added support for parallel vnode repair - Iterates through all token ranges in the replica repair group - Adds each range to the combinedRanges set - Create one vnode repair task for all combinedRanges - Create one repair session per range in collection and start each session --- .../core/impl/repair/RepairGroup.java | 15 +- .../impl/repair/vnode/VnodeRepairTask.java | 4 +- .../core/impl/repair/TestRepairGroup.java | 43 ++++ .../repair/vnode/TestVnodeRepairTask.java | 230 ++++++++++++++++++ .../ecchronos/standalone/ITSchedules.java | 152 +++++------- 5 files changed, 351 insertions(+), 93 deletions(-) create mode 100644 core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeRepairTask.java diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java index 7558f7f9..99bf4e3e 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -110,6 +111,11 @@ public RepairGroup(final int priority, final Builder builder) myTokensPerRepair = Preconditions .checkNotNull(builder.myTokensPerRepair, "Tokens per repair must be set"); } + if (RepairType.PARALLEL_VNODE.equals(myRepairConfiguration.getRepairType())) + { + myNode = Preconditions + .checkNotNull(builder.myNode, "Node must be set"); + } myJobId = Preconditions .checkNotNull(builder.myJobId, "Job id must be set"); } @@ -125,7 +131,6 @@ public boolean execute(final UUID nodeID) { LOG.debug("Table {} running repair job {}", myTableReference, myReplicaRepairGroup); boolean successful = true; - for (RepairTask repairTask : getRepairTasks(nodeID)) { if (!shouldContinue()) @@ -223,6 +228,14 @@ else if (myRepairConfiguration.getRepairType().equals(RepairType.VNODE)) } } } + else + { + Set combinedRanges = new LinkedHashSet<>(); + myReplicaRepairGroup.iterator().forEachRemaining(combinedRanges::add); + tasks.add(new VnodeRepairTask(myNode, myJmxProxyFactory, myTableReference, myRepairConfiguration, + myTableRepairMetrics, myRepairHistory, combinedRanges, + new HashSet<>(myReplicaRepairGroup.getReplicas()), myJobId)); + } return tasks; } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/VnodeRepairTask.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/VnodeRepairTask.java index 97a92e2b..de2bf450 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/VnodeRepairTask.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/VnodeRepairTask.java @@ -167,13 +167,13 @@ final Collection getUnknownRanges() } @VisibleForTesting - final Set getTokenRanges() + public final Set getTokenRanges() { return Sets.newLinkedHashSet(myTokenRanges); } @VisibleForTesting - final Set getReplicas() + public final Set getReplicas() { return Sets.newHashSet(myReplicas); } diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairGroup.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairGroup.java index efe89e1d..89362011 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairGroup.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairGroup.java @@ -32,6 +32,7 @@ import com.datastax.oss.driver.api.core.metadata.Node; import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.DummyLock; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairTask; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairTask; import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; @@ -53,6 +54,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -273,6 +276,46 @@ public void testExecuteSomeTasksFailed() throws ScheduledJobException assertThat(success).isFalse(); } + @Test + public void testGetCombinedRepairTask() + { + LongTokenRange range1 = new LongTokenRange(0, 1); + LongTokenRange range2 = new LongTokenRange(3, 4); + LongTokenRange range3 = new LongTokenRange(6, 8); + LongTokenRange range4 = new LongTokenRange(10, 11); + Set expectedTokenRanges = new LinkedHashSet( + ImmutableList.of(range1, range2, range3, range4) + ); + + // setup + DriverNode node = mockNode("DC1"); + + ImmutableSet nodes = ImmutableSet.of(node); + + ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range1, range2, range3, range4), System.currentTimeMillis()); + + RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder() + .withParallelism(RepairParallelism.PARALLEL) + .withRepairWarningTime(RUN_INTERVAL_IN_DAYS * 2, TimeUnit.DAYS) + .withRepairErrorTime(GC_GRACE_DAYS_IN_DAYS, TimeUnit.DAYS) + .withRepairType(RepairType.PARALLEL_VNODE) + .build(); + + RepairGroup repairGroup = builderFor(replicaRepairGroup).withRepairConfiguration(repairConfiguration).build(PRIORITY); + + Collection repairTasks = repairGroup.getRepairTasks(myNodeID); + + assertThat(repairTasks).hasSize(1); + Iterator iterator = repairTasks.iterator(); + assertThat(iterator.hasNext()).isTrue(); + VnodeRepairTask repairTask = (VnodeRepairTask) iterator.next(); + + assertThat(repairTask.getReplicas()).containsExactlyInAnyOrderElementsOf(nodes); + assertThat(repairTask.getTokenRanges()).containsExactlyElementsOf(expectedTokenRanges); + assertThat(repairTask.getTableReference()).isEqualTo(TABLE_REFERENCE); + assertThat(repairTask.getRepairConfiguration().getRepairParallelism()).isEqualTo(RepairParallelism.PARALLEL); + } + private RepairGroup.Builder builderFor(ReplicaRepairGroup replicaRepairGroup) { return RepairGroup.newBuilder() diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeRepairTask.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeRepairTask.java new file mode 100644 index 00000000..5d7bf553 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeRepairTask.java @@ -0,0 +1,230 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxy; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class TestVnodeRepairTask { + @Mock + private Node mockNode; + + @Mock + private DistributedJmxProxyFactory mockJmxProxyFactory; + + @Mock + private TableReference mockTableReference; + + @Mock + private RepairConfiguration mockRepairConfiguration; + + @Mock + private TableRepairMetrics mockTableRepairMetrics; + + @Mock + private RepairHistory mockRepairHistory; + + @Mock + private DistributedJmxProxy mockJmxProxy; + + @Mock + private RepairHistoryService.RepairSession mockRepairSession; + + private Set tokenRanges; + private Set replicas; + private UUID jobId; + private VnodeRepairTask vnodeRepairTask; + + @Before + public void setup() { + UUID nodeId = UUID.randomUUID(); + jobId = UUID.randomUUID(); + tokenRanges = new HashSet<>(); + replicas = new HashSet<>(); + + when(mockNode.getHostId()).thenReturn(nodeId); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), any(), any())) + .thenReturn(mockRepairSession); + } + + @Test + public void testParallelVnodeRepairExecution() { + // Setup + LongTokenRange range1 = new LongTokenRange(0, 100); + LongTokenRange range2 = new LongTokenRange(100, 200); + tokenRanges.add(range1); + tokenRanges.add(range2); + + when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE); + when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy); + + RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class); + RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class); + + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas))) + .thenReturn(mockSession1); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas))) + .thenReturn(mockSession2); + + // Create task + vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference, + mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory, + tokenRanges, replicas, jobId); + + // Execute repair + vnodeRepairTask.execute(); + + // Verify both sessions were started in parallel + verify(mockSession1, timeout(1000)).start(); + verify(mockSession2, timeout(1000)).start(); + + // Verify sessions are properly finished + vnodeRepairTask.onFinish(RepairStatus.SUCCESS); + + verify(mockSession1).finish(RepairStatus.SUCCESS); + verify(mockSession2).finish(RepairStatus.SUCCESS); + } + + @Test + public void testParallelVnodeRepairWithFailure() { + // Setup + LongTokenRange range1 = new LongTokenRange(0, 100); + LongTokenRange range2 = new LongTokenRange(100, 200); + LongTokenRange range3 = new LongTokenRange(200, 300); + tokenRanges.add(range1); + tokenRanges.add(range2); + tokenRanges.add(range3); + + when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE); + when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy); + + RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class); + RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class); + RepairHistoryService.RepairSession mockSession3 = mock(RepairHistoryService.RepairSession.class); + + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas))) + .thenReturn(mockSession1); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas))) + .thenReturn(mockSession2); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range3), eq(replicas))) + .thenReturn(mockSession3); + + // Simulate session2 failing during start + doThrow(new RuntimeException("Simulated repair failure")).when(mockSession2).start(); + + // Create task + vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference, + mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory, + tokenRanges, replicas, jobId); + + // Execute repair + vnodeRepairTask.execute(); + + // Verify all sessions were attempted to start + verify(mockSession1, timeout(1000)).start(); + verify(mockSession2, timeout(1000)).start(); + verify(mockSession3, timeout(1000)).start(); + + // Verify session2 was marked as failed + verify(mockSession2, timeout(1000)).finish(RepairStatus.FAILED); + + // Finish the repair task with failure status due to partial failure + vnodeRepairTask.onFinish(RepairStatus.FAILED); + + // Verify all sessions are properly finished + verify(mockSession1).finish(RepairStatus.FAILED); + verify(mockSession3).finish(RepairStatus.FAILED); + + // Verify the failed ranges are tracked + Set failedRanges = vnodeRepairTask.getUnknownRanges(); + assert failedRanges != null && failedRanges.contains(range2); + } + + @Test + public void testParallelVnodeRepairProgress() { + // Setup + LongTokenRange range1 = new LongTokenRange(0, 100); + LongTokenRange range2 = new LongTokenRange(100, 200); + LongTokenRange range3 = new LongTokenRange(200, 300); + tokenRanges.add(range1); + tokenRanges.add(range2); + tokenRanges.add(range3); + + when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE); + when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy); + + RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class); + RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class); + RepairHistoryService.RepairSession mockSession3 = mock(RepairHistoryService.RepairSession.class); + + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas))) + .thenReturn(mockSession1); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas))) + .thenReturn(mockSession2); + when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range3), eq(replicas))) + .thenReturn(mockSession3); + + // Create task + vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference, + mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory, + tokenRanges, replicas, jobId); + + // Execute repair + vnodeRepairTask.execute(); + + // Verify all sessions are started + verify(mockSession1, timeout(1000)).start(); + verify(mockSession2, timeout(1000)).start(); + verify(mockSession3, timeout(1000)).start(); + + // Simulate progress by completing repairs one by one + vnodeRepairTask.onRangeFinished(range1, RepairStatus.SUCCESS); + assertEquals("Progress should be ~33% after first range", 0.33d, vnodeRepairTask.getProgress(), 0.01d); + + vnodeRepairTask.onRangeFinished(range2, RepairStatus.SUCCESS); + assertEquals("Progress should be ~66% after second range", 0.66d, vnodeRepairTask.getProgress(), 0.01d); + + vnodeRepairTask.onRangeFinished(range3, RepairStatus.SUCCESS); + assertEquals("Progress should be 100% after all ranges", 1.0d, vnodeRepairTask.getProgress(), 0.01d); + + // Verify final state + vnodeRepairTask.onFinish(RepairStatus.SUCCESS); + verify(mockSession1).finish(RepairStatus.SUCCESS); + verify(mockSession2).finish(RepairStatus.SUCCESS); + verify(mockSession3).finish(RepairStatus.SUCCESS); + } +} \ No newline at end of file diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java index bc5c5d7e..ad528368 100644 --- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java +++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java @@ -38,33 +38,32 @@ import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry; -import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistoryProvider; import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; import com.ericsson.bss.cassandra.ecchronos.core.state.TokenSubRangeUtil; import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; import com.ericsson.bss.cassandra.ecchronos.core.table.TableStorageStates; -import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.CassandraRepairHistoryService; import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService; import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter; import com.ericsson.bss.cassandra.ecchronos.utils.converter.UnitConverter; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairParallelism; import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; import com.google.common.collect.Sets; import net.jcip.annotations.NotThreadSafe; import org.assertj.core.util.Lists; import org.junit.After; -import org.junit.Ignore; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -92,28 +91,9 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -@Ignore -@RunWith (Parameterized.class) @NotThreadSafe public class ITSchedules extends TestBase { - enum RepairHistoryType - { - CASSANDRA, ECC - } - - @Parameterized.Parameters - public static Collection parameters() - { - return Arrays.asList(new Object[][] { - { RepairHistoryType.CASSANDRA }, - { RepairHistoryType.ECC } - }); - } - - @Parameterized.Parameter - public RepairHistoryType myRepairHistoryType; - private static RepairFaultReporter mockFaultReporter; private static TableRepairMetrics mockTableRepairMetrics; @@ -128,7 +108,7 @@ public static Collection parameters() private static DriverNode myLocalNode; - private static RepairHistoryProvider myRepairHistoryProvider; + private static RepairHistoryService myRepairHistoryService; private static NodeResolver myNodeResolver; @@ -144,8 +124,8 @@ public static Collection parameters() private final Set myRepairs = new HashSet<>(); - @Parameterized.BeforeParam - public static void init(RepairHistoryType repairHistoryType) + @BeforeClass + public static void init() { mockFaultReporter = mock(RepairFaultReporter.class); mockTableRepairMetrics = mock(TableRepairMetrics.class); @@ -165,21 +145,7 @@ public static void init(RepairHistoryType repairHistoryType) ReplicationState replicationState = new ReplicationStateImpl(myNodeResolver, mySession); - RepairHistoryService repairHistoryService = - new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30)); - - if (repairHistoryType == RepairHistoryType.ECC) - { - myRepairHistoryProvider = repairHistoryService; - } - else if (repairHistoryType == RepairHistoryType.CASSANDRA) - { - myRepairHistoryProvider = new CassandraRepairHistoryService(myNodeResolver, mySession, TimeUnit.DAYS.toMillis(30)); - } - else - { - throw new IllegalArgumentException("Unknown repair history type for test"); - } + myRepairHistoryService = new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30)); myLockFactory = CASLockFactory.builder() .withNativeConnectionProvider(getNativeConnectionProvider()) @@ -199,12 +165,10 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA) .withHostStates(HostStatesImpl.builder() .withJmxProxyFactory(getJmxProxyFactory()) .build()) - .withRepairHistoryProvider(myRepairHistoryProvider) + .withRepairHistoryProvider(myRepairHistoryService) .withTableRepairMetrics(mockTableRepairMetrics) .build(); - RepairHistoryService eccRepairHistory = new RepairHistoryService(mySession, replicationState, myNodeResolver, 2_592_000_000L); - myRepairSchedulerImpl = RepairSchedulerImpl.builder() .withJmxProxyFactory(getJmxProxyFactory()) .withTableRepairMetrics(mockTableRepairMetrics) @@ -213,9 +177,8 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA) .withRepairStateFactory(repairStateFactory) .withRepairLockType(RepairLockType.VNODE) .withTableStorageStates(mockTableStorageStates) - .withRepairHistory(repairHistoryService) .withReplicationState(replicationState) - .withRepairHistory(eccRepairHistory) + .withRepairHistory(myRepairHistoryService) .build(); myRepairConfiguration = RepairConfiguration.newBuilder() @@ -260,7 +223,7 @@ public void clean() reset(mockTableStorageStates); } - @Parameterized.AfterParam + @AfterClass public static void closeConnections() { myHostStates.close(); @@ -294,6 +257,36 @@ public void repairSingleTable() verify(mockFaultReporter, never()).raise(any(RepairFaultReporter.FaultCode.class), anyMap()); } + /** + * Create a table that is replicated and was repaired two hours ago. + * The repair factory should detect the new table automatically and schedule it to run. + */ + @Test + public void repairSingleTableInParallel() + { + long startTime = System.currentTimeMillis(); + + TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME); + + injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)); + + RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder() + .withRepairInterval(60, TimeUnit.MINUTES) + .withRepairType(RepairType.PARALLEL_VNODE) + .withParallelism(RepairParallelism.PARALLEL) + .build(); + schedule(tableReference, repairConfiguration); + + await().pollInterval(1, TimeUnit.SECONDS) + .atMost(90, TimeUnit.SECONDS) + .until(() -> isRepairedSince(tableReference, startTime)); + + verifyTableRepairedSince(tableReference, startTime); + verifyRepairSessionMetrics(tableReference, 1); // Amount of repair groups + verify(mockFaultReporter, never()) + .raise(any(RepairFaultReporter.FaultCode.class), anyMap()); + } + /** * Create a table that is replicated and was repaired two hours ago using sub ranges. * The repair factory should detect the new table automatically and schedule it to run. If the sub ranges are not @@ -438,7 +431,7 @@ private OptionalLong lastRepairedSince(TableReference tableReference, Set expectedRepaired) { Set expectedRepairedCopy = new HashSet<>(expectedRepaired); - Iterator repairEntryIterator = myRepairHistoryProvider.iterate(myLocalHost, tableReference, + Iterator repairEntryIterator = myRepairHistoryService.iterate(myLocalHost, tableReference, System.currentTimeMillis(), repairedSince, repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange())); @@ -538,47 +531,26 @@ private void injectRepairHistory(TableReference tableReference, { long started_at = timestamp; long finished_at = timestamp + 5; + Set nodes = participants.stream() + .map(myNodeResolver::fromIp) + .filter(Optional::isPresent) + .map(Optional::get) + .map(DriverNode::getId) + .collect(Collectors.toSet()); - SimpleStatement statement; - - if (myRepairHistoryType == RepairHistoryType.CASSANDRA) - { - statement = QueryBuilder.insertInto("system_distributed", "repair_history") - .value("keyspace_name", literal(tableReference.getKeyspace())) - .value("columnfamily_name", literal(tableReference.getTable())) - .value("participants", literal(participants)) - .value("coordinator", literal(myLocalNode.getPublicAddress())) - .value("id", literal(Uuids.startOf(started_at))) - .value("started_at", literal(Instant.ofEpochMilli(started_at))) - .value("finished_at", literal(Instant.ofEpochMilli(finished_at))) - .value("range_begin", literal(range_begin)) - .value("range_end", literal(range_end)) - .value("status", literal("SUCCESS")) - .build(); - } - else - { - Set nodes = participants.stream() - .map(myNodeResolver::fromIp) - .filter(Optional::isPresent) - .map(Optional::get) - .map(DriverNode::getId) - .collect(Collectors.toSet()); - - statement = QueryBuilder.insertInto("ecchronos", "repair_history") - .value("table_id", literal(tableReference.getId())) - .value("node_id", literal(myLocalNode.getId())) - .value("repair_id", literal(Uuids.startOf(finished_at))) - .value("job_id", literal(tableReference.getId())) - .value("coordinator_id", literal(myLocalNode.getId())) - .value("range_begin", literal(range_begin)) - .value("range_end", literal(range_end)) - .value("participants", literal(nodes)) - .value("status", literal("SUCCESS")) - .value("started_at", literal(Instant.ofEpochMilli(started_at))) - .value("finished_at", literal(Instant.ofEpochMilli(finished_at))) - .build(); - } + SimpleStatement statement = QueryBuilder.insertInto("ecchronos", "repair_history") + .value("table_id", literal(tableReference.getId())) + .value("node_id", literal(myLocalNode.getId())) + .value("repair_id", literal(Uuids.startOf(finished_at))) + .value("job_id", literal(tableReference.getId())) + .value("coordinator_id", literal(myLocalNode.getId())) + .value("range_begin", literal(range_begin)) + .value("range_end", literal(range_end)) + .value("participants", literal(nodes)) + .value("status", literal("SUCCESS")) + .value("started_at", literal(Instant.ofEpochMilli(started_at))) + .value("finished_at", literal(Instant.ofEpochMilli(finished_at))) + .build(); mySession.execute(statement); }