Skip to content

Commit

Permalink
Added support for parallel vnode repair
Browse files Browse the repository at this point in the history
- Iterates through all token ranges in the replica repair group
- Adds each range to the combinedRanges set
- Create one vnode repair task for all combinedRanges
- Create one repair session per range in collection and
 start each session
  • Loading branch information
sajid riaz committed Dec 17, 2024
1 parent 267d72c commit 6f52038
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,6 +111,11 @@ public RepairGroup(final int priority, final Builder builder)
myTokensPerRepair = Preconditions
.checkNotNull(builder.myTokensPerRepair, "Tokens per repair must be set");
}
if (RepairType.PARALLEL_VNODE.equals(myRepairConfiguration.getRepairType()))
{
myNode = Preconditions
.checkNotNull(builder.myNode, "Node must be set");
}
myJobId = Preconditions
.checkNotNull(builder.myJobId, "Job id must be set");
}
Expand All @@ -125,7 +131,6 @@ public boolean execute(final UUID nodeID)
{
LOG.debug("Table {} running repair job {}", myTableReference, myReplicaRepairGroup);
boolean successful = true;

for (RepairTask repairTask : getRepairTasks(nodeID))
{
if (!shouldContinue())
Expand Down Expand Up @@ -223,6 +228,14 @@ else if (myRepairConfiguration.getRepairType().equals(RepairType.VNODE))
}
}
}
else
{
Set<LongTokenRange> combinedRanges = new LinkedHashSet<>();
myReplicaRepairGroup.iterator().forEachRemaining(combinedRanges::add);
tasks.add(new VnodeRepairTask(myNode, myJmxProxyFactory, myTableReference, myRepairConfiguration,
myTableRepairMetrics, myRepairHistory, combinedRanges,
new HashSet<>(myReplicaRepairGroup.getReplicas()), myJobId));
}

return tasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ final Collection<LongTokenRange> getUnknownRanges()
}

@VisibleForTesting
final Set<LongTokenRange> getTokenRanges()
public final Set<LongTokenRange> getTokenRanges()
{
return Sets.newLinkedHashSet(myTokenRanges);
}

@VisibleForTesting
final Set<DriverNode> getReplicas()
public final Set<DriverNode> getReplicas()
{
return Sets.newHashSet(myReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.DummyLock;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
Expand All @@ -53,6 +54,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -273,6 +276,46 @@ public void testExecuteSomeTasksFailed() throws ScheduledJobException
assertThat(success).isFalse();
}

@Test
public void testGetCombinedRepairTask()
{
LongTokenRange range1 = new LongTokenRange(0, 1);
LongTokenRange range2 = new LongTokenRange(3, 4);
LongTokenRange range3 = new LongTokenRange(6, 8);
LongTokenRange range4 = new LongTokenRange(10, 11);
Set<LongTokenRange> expectedTokenRanges = new LinkedHashSet(
ImmutableList.of(range1, range2, range3, range4)
);

// setup
DriverNode node = mockNode("DC1");

ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range1, range2, range3, range4), System.currentTimeMillis());

RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
.withParallelism(RepairParallelism.PARALLEL)
.withRepairWarningTime(RUN_INTERVAL_IN_DAYS * 2, TimeUnit.DAYS)
.withRepairErrorTime(GC_GRACE_DAYS_IN_DAYS, TimeUnit.DAYS)
.withRepairType(RepairType.PARALLEL_VNODE)
.build();

RepairGroup repairGroup = builderFor(replicaRepairGroup).withRepairConfiguration(repairConfiguration).build(PRIORITY);

Collection<RepairTask> repairTasks = repairGroup.getRepairTasks(myNodeID);

assertThat(repairTasks).hasSize(1);
Iterator<RepairTask> iterator = repairTasks.iterator();
assertThat(iterator.hasNext()).isTrue();
VnodeRepairTask repairTask = (VnodeRepairTask) iterator.next();

assertThat(repairTask.getReplicas()).containsExactlyInAnyOrderElementsOf(nodes);
assertThat(repairTask.getTokenRanges()).containsExactlyElementsOf(expectedTokenRanges);
assertThat(repairTask.getTableReference()).isEqualTo(TABLE_REFERENCE);
assertThat(repairTask.getRepairConfiguration().getRepairParallelism()).isEqualTo(RepairParallelism.PARALLEL);
}

private RepairGroup.Builder builderFor(ReplicaRepairGroup replicaRepairGroup)
{
return RepairGroup.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,32 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistoryProvider;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.core.state.TokenSubRangeUtil;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableStorageStates;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.CassandraRepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.utils.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairParallelism;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
import com.google.common.collect.Sets;
import net.jcip.annotations.NotThreadSafe;
import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Ignore;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -92,28 +91,9 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

@Ignore
@RunWith (Parameterized.class)
@NotThreadSafe
public class ITSchedules extends TestBase
{
enum RepairHistoryType
{
CASSANDRA, ECC
}

@Parameterized.Parameters
public static Collection parameters()
{
return Arrays.asList(new Object[][] {
{ RepairHistoryType.CASSANDRA },
{ RepairHistoryType.ECC }
});
}

@Parameterized.Parameter
public RepairHistoryType myRepairHistoryType;

private static RepairFaultReporter mockFaultReporter;

private static TableRepairMetrics mockTableRepairMetrics;
Expand All @@ -128,7 +108,7 @@ public static Collection parameters()

private static DriverNode myLocalNode;

private static RepairHistoryProvider myRepairHistoryProvider;
private static RepairHistoryService myRepairHistoryService;

private static NodeResolver myNodeResolver;

Expand All @@ -144,8 +124,8 @@ public static Collection parameters()

private final Set<TableReference> myRepairs = new HashSet<>();

@Parameterized.BeforeParam
public static void init(RepairHistoryType repairHistoryType)
@BeforeClass
public static void init()
{
mockFaultReporter = mock(RepairFaultReporter.class);
mockTableRepairMetrics = mock(TableRepairMetrics.class);
Expand All @@ -165,21 +145,7 @@ public static void init(RepairHistoryType repairHistoryType)

ReplicationState replicationState = new ReplicationStateImpl(myNodeResolver, mySession);

RepairHistoryService repairHistoryService =
new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30));

if (repairHistoryType == RepairHistoryType.ECC)
{
myRepairHistoryProvider = repairHistoryService;
}
else if (repairHistoryType == RepairHistoryType.CASSANDRA)
{
myRepairHistoryProvider = new CassandraRepairHistoryService(myNodeResolver, mySession, TimeUnit.DAYS.toMillis(30));
}
else
{
throw new IllegalArgumentException("Unknown repair history type for test");
}
myRepairHistoryService = new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30));

myLockFactory = CASLockFactory.builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
Expand All @@ -199,12 +165,10 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
.withHostStates(HostStatesImpl.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.build())
.withRepairHistoryProvider(myRepairHistoryProvider)
.withRepairHistoryProvider(myRepairHistoryService)
.withTableRepairMetrics(mockTableRepairMetrics)
.build();

RepairHistoryService eccRepairHistory = new RepairHistoryService(mySession, replicationState, myNodeResolver, 2_592_000_000L);

myRepairSchedulerImpl = RepairSchedulerImpl.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.withTableRepairMetrics(mockTableRepairMetrics)
Expand All @@ -213,9 +177,8 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
.withRepairStateFactory(repairStateFactory)
.withRepairLockType(RepairLockType.VNODE)
.withTableStorageStates(mockTableStorageStates)
.withRepairHistory(repairHistoryService)
.withReplicationState(replicationState)
.withRepairHistory(eccRepairHistory)
.withRepairHistory(myRepairHistoryService)
.build();

myRepairConfiguration = RepairConfiguration.newBuilder()
Expand Down Expand Up @@ -260,7 +223,7 @@ public void clean()
reset(mockTableStorageStates);
}

@Parameterized.AfterParam
@AfterClass
public static void closeConnections()
{
myHostStates.close();
Expand Down Expand Up @@ -294,6 +257,36 @@ public void repairSingleTable()
verify(mockFaultReporter, never()).raise(any(RepairFaultReporter.FaultCode.class), anyMap());
}

/**
* Create a table that is replicated and was repaired two hours ago.
* The repair factory should detect the new table automatically and schedule it to run.
*/
@Test
public void repairSingleTableInParallel()
{
long startTime = System.currentTimeMillis();

TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);

injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));

RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
.withRepairInterval(60, TimeUnit.MINUTES)
.withRepairType(RepairType.PARALLEL_VNODE)
.withParallelism(RepairParallelism.PARALLEL)
.build();
schedule(tableReference, repairConfiguration);

await().pollInterval(1, TimeUnit.SECONDS)
.atMost(90, TimeUnit.SECONDS)
.until(() -> isRepairedSince(tableReference, startTime));

verifyTableRepairedSince(tableReference, startTime);
verifyRepairSessionMetrics(tableReference, 1); // Amount of repair groups
verify(mockFaultReporter, never())
.raise(any(RepairFaultReporter.FaultCode.class), anyMap());
}

/**
* Create a table that is replicated and was repaired two hours ago using sub ranges.
* The repair factory should detect the new table automatically and schedule it to run. If the sub ranges are not
Expand Down Expand Up @@ -438,7 +431,7 @@ private OptionalLong lastRepairedSince(TableReference tableReference,
Set<LongTokenRange> expectedRepaired)
{
Set<LongTokenRange> expectedRepairedCopy = new HashSet<>(expectedRepaired);
Iterator<RepairEntry> repairEntryIterator = myRepairHistoryProvider.iterate(myLocalHost, tableReference,
Iterator<RepairEntry> repairEntryIterator = myRepairHistoryService.iterate(myLocalHost, tableReference,
System.currentTimeMillis(), repairedSince,
repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange()));

Expand Down Expand Up @@ -538,47 +531,26 @@ private void injectRepairHistory(TableReference tableReference,
{
long started_at = timestamp;
long finished_at = timestamp + 5;
Set<UUID> nodes = participants.stream()
.map(myNodeResolver::fromIp)
.filter(Optional::isPresent)
.map(Optional::get)
.map(DriverNode::getId)
.collect(Collectors.toSet());

SimpleStatement statement;

if (myRepairHistoryType == RepairHistoryType.CASSANDRA)
{
statement = QueryBuilder.insertInto("system_distributed", "repair_history")
.value("keyspace_name", literal(tableReference.getKeyspace()))
.value("columnfamily_name", literal(tableReference.getTable()))
.value("participants", literal(participants))
.value("coordinator", literal(myLocalNode.getPublicAddress()))
.value("id", literal(Uuids.startOf(started_at)))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("status", literal("SUCCESS"))
.build();
}
else
{
Set<UUID> nodes = participants.stream()
.map(myNodeResolver::fromIp)
.filter(Optional::isPresent)
.map(Optional::get)
.map(DriverNode::getId)
.collect(Collectors.toSet());

statement = QueryBuilder.insertInto("ecchronos", "repair_history")
.value("table_id", literal(tableReference.getId()))
.value("node_id", literal(myLocalNode.getId()))
.value("repair_id", literal(Uuids.startOf(finished_at)))
.value("job_id", literal(tableReference.getId()))
.value("coordinator_id", literal(myLocalNode.getId()))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("participants", literal(nodes))
.value("status", literal("SUCCESS"))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.build();
}
SimpleStatement statement = QueryBuilder.insertInto("ecchronos", "repair_history")
.value("table_id", literal(tableReference.getId()))
.value("node_id", literal(myLocalNode.getId()))
.value("repair_id", literal(Uuids.startOf(finished_at)))
.value("job_id", literal(tableReference.getId()))
.value("coordinator_id", literal(myLocalNode.getId()))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("participants", literal(nodes))
.value("status", literal("SUCCESS"))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.build();

mySession.execute(statement);
}
Expand Down

0 comments on commit 6f52038

Please sign in to comment.