Skip to content

Commit

Permalink
Added support for parallel vnode repair
Browse files Browse the repository at this point in the history
- Iterates through all token ranges in the replica repair group
- Adds each range to the combinedRanges set
- Create one vnode repair task for all combinedRanges
- Create one repair session per range in collection and
 start each session
  • Loading branch information
sajid riaz committed Dec 17, 2024
1 parent 267d72c commit e81faf9
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,6 +111,11 @@ public RepairGroup(final int priority, final Builder builder)
myTokensPerRepair = Preconditions
.checkNotNull(builder.myTokensPerRepair, "Tokens per repair must be set");
}
if (RepairType.PARALLEL_VNODE.equals(myRepairConfiguration.getRepairType()))
{
myNode = Preconditions
.checkNotNull(builder.myNode, "Node must be set");
}
myJobId = Preconditions
.checkNotNull(builder.myJobId, "Job id must be set");
}
Expand All @@ -125,7 +131,6 @@ public boolean execute(final UUID nodeID)
{
LOG.debug("Table {} running repair job {}", myTableReference, myReplicaRepairGroup);
boolean successful = true;

for (RepairTask repairTask : getRepairTasks(nodeID))
{
if (!shouldContinue())
Expand Down Expand Up @@ -223,6 +228,14 @@ else if (myRepairConfiguration.getRepairType().equals(RepairType.VNODE))
}
}
}
else
{
Set<LongTokenRange> combinedRanges = new LinkedHashSet<>();
myReplicaRepairGroup.iterator().forEachRemaining(combinedRanges::add);
tasks.add(new VnodeRepairTask(myNode, myJmxProxyFactory, myTableReference, myRepairConfiguration,
myTableRepairMetrics, myRepairHistory, combinedRanges,
new HashSet<>(myReplicaRepairGroup.getReplicas()), myJobId));
}

return tasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ final Collection<LongTokenRange> getUnknownRanges()
}

@VisibleForTesting
final Set<LongTokenRange> getTokenRanges()
public final Set<LongTokenRange> getTokenRanges()
{
return Sets.newLinkedHashSet(myTokenRanges);
}

@VisibleForTesting
final Set<DriverNode> getReplicas()
public final Set<DriverNode> getReplicas()
{
return Sets.newHashSet(myReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.DummyLock;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
Expand All @@ -53,6 +54,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -273,6 +276,46 @@ public void testExecuteSomeTasksFailed() throws ScheduledJobException
assertThat(success).isFalse();
}

@Test
public void testGetCombinedRepairTask()
{
LongTokenRange range1 = new LongTokenRange(0, 1);
LongTokenRange range2 = new LongTokenRange(3, 4);
LongTokenRange range3 = new LongTokenRange(6, 8);
LongTokenRange range4 = new LongTokenRange(10, 11);
Set<LongTokenRange> expectedTokenRanges = new LinkedHashSet(
ImmutableList.of(range1, range2, range3, range4)
);

// setup
DriverNode node = mockNode("DC1");

ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range1, range2, range3, range4), System.currentTimeMillis());

RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
.withParallelism(RepairParallelism.PARALLEL)
.withRepairWarningTime(RUN_INTERVAL_IN_DAYS * 2, TimeUnit.DAYS)
.withRepairErrorTime(GC_GRACE_DAYS_IN_DAYS, TimeUnit.DAYS)
.withRepairType(RepairType.PARALLEL_VNODE)
.build();

RepairGroup repairGroup = builderFor(replicaRepairGroup).withRepairConfiguration(repairConfiguration).build(PRIORITY);

Collection<RepairTask> repairTasks = repairGroup.getRepairTasks(myNodeID);

assertThat(repairTasks).hasSize(1);
Iterator<RepairTask> iterator = repairTasks.iterator();
assertThat(iterator.hasNext()).isTrue();
VnodeRepairTask repairTask = (VnodeRepairTask) iterator.next();

assertThat(repairTask.getReplicas()).containsExactlyInAnyOrderElementsOf(nodes);
assertThat(repairTask.getTokenRanges()).containsExactlyElementsOf(expectedTokenRanges);
assertThat(repairTask.getTableReference()).isEqualTo(TABLE_REFERENCE);
assertThat(repairTask.getRepairConfiguration().getRepairParallelism()).isEqualTo(RepairParallelism.PARALLEL);
}

private RepairGroup.Builder builderFor(ReplicaRepairGroup replicaRepairGroup)
{
return RepairGroup.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxy;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;

@RunWith(MockitoJUnitRunner.class)
public class TestVnodeRepairTask {
@Mock
private Node mockNode;

@Mock
private DistributedJmxProxyFactory mockJmxProxyFactory;

@Mock
private TableReference mockTableReference;

@Mock
private RepairConfiguration mockRepairConfiguration;

@Mock
private TableRepairMetrics mockTableRepairMetrics;

@Mock
private RepairHistory mockRepairHistory;

@Mock
private DistributedJmxProxy mockJmxProxy;

@Mock
private RepairHistoryService.RepairSession mockRepairSession;

private Set<LongTokenRange> tokenRanges;
private Set<DriverNode> replicas;
private UUID jobId;
private VnodeRepairTask vnodeRepairTask;

@Before
public void setup() {
UUID nodeId = UUID.randomUUID();
jobId = UUID.randomUUID();
tokenRanges = new HashSet<>();
replicas = new HashSet<>();

when(mockNode.getHostId()).thenReturn(nodeId);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), any(), any()))
.thenReturn(mockRepairSession);
}

@Test
public void testParallelVnodeRepairExecution() {
// Setup
LongTokenRange range1 = new LongTokenRange(0, 100);
LongTokenRange range2 = new LongTokenRange(100, 200);
tokenRanges.add(range1);
tokenRanges.add(range2);

when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE);
when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy);

RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class);
RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class);

when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas)))
.thenReturn(mockSession1);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas)))
.thenReturn(mockSession2);

// Create task
vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference,
mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory,
tokenRanges, replicas, jobId);

// Execute repair
vnodeRepairTask.execute();

// Verify both sessions were started in parallel
verify(mockSession1, timeout(1000)).start();
verify(mockSession2, timeout(1000)).start();

// Verify sessions are properly finished
vnodeRepairTask.onFinish(RepairStatus.SUCCESS);

verify(mockSession1).finish(RepairStatus.SUCCESS);
verify(mockSession2).finish(RepairStatus.SUCCESS);
}

@Test
public void testParallelVnodeRepairWithFailure() {
// Setup
LongTokenRange range1 = new LongTokenRange(0, 100);
LongTokenRange range2 = new LongTokenRange(100, 200);
LongTokenRange range3 = new LongTokenRange(200, 300);
tokenRanges.add(range1);
tokenRanges.add(range2);
tokenRanges.add(range3);

when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE);
when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy);

RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class);
RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class);
RepairHistoryService.RepairSession mockSession3 = mock(RepairHistoryService.RepairSession.class);

when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas)))
.thenReturn(mockSession1);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas)))
.thenReturn(mockSession2);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range3), eq(replicas)))
.thenReturn(mockSession3);

// Simulate session2 failing during start
doThrow(new RuntimeException("Simulated repair failure")).when(mockSession2).start();

// Create task
vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference,
mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory,
tokenRanges, replicas, jobId);

// Execute repair
vnodeRepairTask.execute();

// Verify all sessions were attempted to start
verify(mockSession1, timeout(1000)).start();
verify(mockSession2, timeout(1000)).start();
verify(mockSession3, timeout(1000)).start();

// Verify session2 was marked as failed
verify(mockSession2, timeout(1000)).finish(RepairStatus.FAILED);

// Finish the repair task with failure status due to partial failure
vnodeRepairTask.onFinish(RepairStatus.FAILED);

// Verify all sessions are properly finished
verify(mockSession1).finish(RepairStatus.FAILED);
verify(mockSession3).finish(RepairStatus.FAILED);

// Verify the failed ranges are tracked
Set<LongTokenRange> failedRanges = vnodeRepairTask.getUnknownRanges();
assert failedRanges != null && failedRanges.contains(range2);
}

@Test
public void testParallelVnodeRepairProgress() {
// Setup
LongTokenRange range1 = new LongTokenRange(0, 100);
LongTokenRange range2 = new LongTokenRange(100, 200);
LongTokenRange range3 = new LongTokenRange(200, 300);
tokenRanges.add(range1);
tokenRanges.add(range2);
tokenRanges.add(range3);

when(mockRepairConfiguration.getRepairType()).thenReturn(RepairType.PARALLEL_VNODE);
when(mockJmxProxyFactory.create(any())).thenReturn(mockJmxProxy);

RepairHistoryService.RepairSession mockSession1 = mock(RepairHistoryService.RepairSession.class);
RepairHistoryService.RepairSession mockSession2 = mock(RepairHistoryService.RepairSession.class);
RepairHistoryService.RepairSession mockSession3 = mock(RepairHistoryService.RepairSession.class);

when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range1), eq(replicas)))
.thenReturn(mockSession1);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range2), eq(replicas)))
.thenReturn(mockSession2);
when(mockRepairHistory.newSession(eq(mockNode), eq(mockTableReference), eq(jobId), eq(range3), eq(replicas)))
.thenReturn(mockSession3);

// Create task
vnodeRepairTask = new VnodeRepairTask(mockNode, mockJmxProxyFactory, mockTableReference,
mockRepairConfiguration, mockTableRepairMetrics, mockRepairHistory,
tokenRanges, replicas, jobId);

// Execute repair
vnodeRepairTask.execute();

// Verify all sessions are started
verify(mockSession1, timeout(1000)).start();
verify(mockSession2, timeout(1000)).start();
verify(mockSession3, timeout(1000)).start();

// Simulate progress by completing repairs one by one
vnodeRepairTask.onRangeFinished(range1, RepairStatus.SUCCESS);
assertEquals("Progress should be ~33% after first range", 0.33d, vnodeRepairTask.getProgress(), 0.01d);

vnodeRepairTask.onRangeFinished(range2, RepairStatus.SUCCESS);
assertEquals("Progress should be ~66% after second range", 0.66d, vnodeRepairTask.getProgress(), 0.01d);

vnodeRepairTask.onRangeFinished(range3, RepairStatus.SUCCESS);
assertEquals("Progress should be 100% after all ranges", 1.0d, vnodeRepairTask.getProgress(), 0.01d);

// Verify final state
vnodeRepairTask.onFinish(RepairStatus.SUCCESS);
verify(mockSession1).finish(RepairStatus.SUCCESS);
verify(mockSession2).finish(RepairStatus.SUCCESS);
verify(mockSession3).finish(RepairStatus.SUCCESS);
}
}
Loading

0 comments on commit e81faf9

Please sign in to comment.