Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for parallel vnode repair #813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,6 +111,11 @@ public RepairGroup(final int priority, final Builder builder)
myTokensPerRepair = Preconditions
.checkNotNull(builder.myTokensPerRepair, "Tokens per repair must be set");
}
if (RepairType.PARALLEL_VNODE.equals(myRepairConfiguration.getRepairType()))
{
myNode = Preconditions
.checkNotNull(builder.myNode, "Node must be set");
}
myJobId = Preconditions
.checkNotNull(builder.myJobId, "Job id must be set");
}
Expand All @@ -125,7 +131,6 @@ public boolean execute(final UUID nodeID)
{
LOG.debug("Table {} running repair job {}", myTableReference, myReplicaRepairGroup);
boolean successful = true;

for (RepairTask repairTask : getRepairTasks(nodeID))
{
if (!shouldContinue())
Expand Down Expand Up @@ -223,6 +228,14 @@ else if (myRepairConfiguration.getRepairType().equals(RepairType.VNODE))
}
}
}
else
{
Set<LongTokenRange> combinedRanges = new LinkedHashSet<>();
myReplicaRepairGroup.iterator().forEachRemaining(combinedRanges::add);
tasks.add(new VnodeRepairTask(myNode, myJmxProxyFactory, myTableReference, myRepairConfiguration,
myTableRepairMetrics, myRepairHistory, combinedRanges,
new HashSet<>(myReplicaRepairGroup.getReplicas()), myJobId));
}

return tasks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ final Collection<LongTokenRange> getUnknownRanges()
}

@VisibleForTesting
final Set<LongTokenRange> getTokenRanges()
public final Set<LongTokenRange> getTokenRanges()
{
return Sets.newLinkedHashSet(myTokenRanges);
}

@VisibleForTesting
final Set<DriverNode> getReplicas()
public final Set<DriverNode> getReplicas()
{
return Sets.newHashSet(myReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.DummyLock;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode;
Expand All @@ -53,6 +54,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -273,6 +276,46 @@ public void testExecuteSomeTasksFailed() throws ScheduledJobException
assertThat(success).isFalse();
}

@Test
public void testGetCombinedRepairTask()
{
LongTokenRange range1 = new LongTokenRange(0, 1);
LongTokenRange range2 = new LongTokenRange(3, 4);
LongTokenRange range3 = new LongTokenRange(6, 8);
LongTokenRange range4 = new LongTokenRange(10, 11);
Set<LongTokenRange> expectedTokenRanges = new LinkedHashSet(
ImmutableList.of(range1, range2, range3, range4)
);

// setup
DriverNode node = mockNode("DC1");

ImmutableSet<DriverNode> nodes = ImmutableSet.of(node);

ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup(nodes, ImmutableList.of(range1, range2, range3, range4), System.currentTimeMillis());

RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
.withParallelism(RepairParallelism.PARALLEL)
.withRepairWarningTime(RUN_INTERVAL_IN_DAYS * 2, TimeUnit.DAYS)
.withRepairErrorTime(GC_GRACE_DAYS_IN_DAYS, TimeUnit.DAYS)
.withRepairType(RepairType.PARALLEL_VNODE)
.build();

RepairGroup repairGroup = builderFor(replicaRepairGroup).withRepairConfiguration(repairConfiguration).build(PRIORITY);

Collection<RepairTask> repairTasks = repairGroup.getRepairTasks(myNodeID);

assertThat(repairTasks).hasSize(1);
Iterator<RepairTask> iterator = repairTasks.iterator();
assertThat(iterator.hasNext()).isTrue();
VnodeRepairTask repairTask = (VnodeRepairTask) iterator.next();

assertThat(repairTask.getReplicas()).containsExactlyInAnyOrderElementsOf(nodes);
assertThat(repairTask.getTokenRanges()).containsExactlyElementsOf(expectedTokenRanges);
assertThat(repairTask.getTableReference()).isEqualTo(TABLE_REFERENCE);
assertThat(repairTask.getRepairConfiguration().getRepairParallelism()).isEqualTo(RepairParallelism.PARALLEL);
}

private RepairGroup.Builder builderFor(ReplicaRepairGroup replicaRepairGroup)
{
return RepairGroup.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,32 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairEntry;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistoryProvider;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState;
import com.ericsson.bss.cassandra.ecchronos.core.state.TokenSubRangeUtil;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableStorageStates;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.CassandraRepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.utils.converter.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairParallelism;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairStatus;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
import com.google.common.collect.Sets;
import net.jcip.annotations.NotThreadSafe;
import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Ignore;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -92,28 +91,9 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

@Ignore
@RunWith (Parameterized.class)
@NotThreadSafe
public class ITSchedules extends TestBase
{
enum RepairHistoryType
{
CASSANDRA, ECC
}

@Parameterized.Parameters
public static Collection parameters()
{
return Arrays.asList(new Object[][] {
{ RepairHistoryType.CASSANDRA },
{ RepairHistoryType.ECC }
});
}

@Parameterized.Parameter
public RepairHistoryType myRepairHistoryType;

private static RepairFaultReporter mockFaultReporter;

private static TableRepairMetrics mockTableRepairMetrics;
Expand All @@ -128,7 +108,7 @@ public static Collection parameters()

private static DriverNode myLocalNode;

private static RepairHistoryProvider myRepairHistoryProvider;
private static RepairHistoryService myRepairHistoryService;

private static NodeResolver myNodeResolver;

Expand All @@ -144,8 +124,8 @@ public static Collection parameters()

private final Set<TableReference> myRepairs = new HashSet<>();

@Parameterized.BeforeParam
public static void init(RepairHistoryType repairHistoryType)
@BeforeClass
public static void init()
{
mockFaultReporter = mock(RepairFaultReporter.class);
mockTableRepairMetrics = mock(TableRepairMetrics.class);
Expand All @@ -165,21 +145,7 @@ public static void init(RepairHistoryType repairHistoryType)

ReplicationState replicationState = new ReplicationStateImpl(myNodeResolver, mySession);

RepairHistoryService repairHistoryService =
new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30));

if (repairHistoryType == RepairHistoryType.ECC)
{
myRepairHistoryProvider = repairHistoryService;
}
else if (repairHistoryType == RepairHistoryType.CASSANDRA)
{
myRepairHistoryProvider = new CassandraRepairHistoryService(myNodeResolver, mySession, TimeUnit.DAYS.toMillis(30));
}
else
{
throw new IllegalArgumentException("Unknown repair history type for test");
}
myRepairHistoryService = new RepairHistoryService(mySession, replicationState, myNodeResolver, TimeUnit.DAYS.toMillis(30));

myLockFactory = CASLockFactory.builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
Expand All @@ -199,12 +165,10 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
.withHostStates(HostStatesImpl.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.build())
.withRepairHistoryProvider(myRepairHistoryProvider)
.withRepairHistoryProvider(myRepairHistoryService)
.withTableRepairMetrics(mockTableRepairMetrics)
.build();

RepairHistoryService eccRepairHistory = new RepairHistoryService(mySession, replicationState, myNodeResolver, 2_592_000_000L);

myRepairSchedulerImpl = RepairSchedulerImpl.builder()
.withJmxProxyFactory(getJmxProxyFactory())
.withTableRepairMetrics(mockTableRepairMetrics)
Expand All @@ -213,9 +177,8 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
.withRepairStateFactory(repairStateFactory)
.withRepairLockType(RepairLockType.VNODE)
.withTableStorageStates(mockTableStorageStates)
.withRepairHistory(repairHistoryService)
.withReplicationState(replicationState)
.withRepairHistory(eccRepairHistory)
.withRepairHistory(myRepairHistoryService)
.build();

myRepairConfiguration = RepairConfiguration.newBuilder()
Expand Down Expand Up @@ -260,7 +223,7 @@ public void clean()
reset(mockTableStorageStates);
}

@Parameterized.AfterParam
@AfterClass
public static void closeConnections()
{
myHostStates.close();
Expand Down Expand Up @@ -294,6 +257,36 @@ public void repairSingleTable()
verify(mockFaultReporter, never()).raise(any(RepairFaultReporter.FaultCode.class), anyMap());
}

/**
* Create a table that is replicated and was repaired two hours ago.
* The repair factory should detect the new table automatically and schedule it to run.
*/
@Test
public void repairSingleTableInParallel()
{
long startTime = System.currentTimeMillis();

TableReference tableReference = myTableReferenceFactory.forTable(TEST_KEYSPACE, TEST_TABLE_ONE_NAME);

injectRepairHistory(tableReference, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2));

RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder()
.withRepairInterval(60, TimeUnit.MINUTES)
.withRepairType(RepairType.PARALLEL_VNODE)
.withParallelism(RepairParallelism.PARALLEL)
.build();
schedule(tableReference, repairConfiguration);

await().pollInterval(1, TimeUnit.SECONDS)
.atMost(90, TimeUnit.SECONDS)
.until(() -> isRepairedSince(tableReference, startTime));

verifyTableRepairedSince(tableReference, startTime);
verifyRepairSessionMetrics(tableReference, 1); // Amount of repair groups
verify(mockFaultReporter, never())
.raise(any(RepairFaultReporter.FaultCode.class), anyMap());
}

/**
* Create a table that is replicated and was repaired two hours ago using sub ranges.
* The repair factory should detect the new table automatically and schedule it to run. If the sub ranges are not
Expand Down Expand Up @@ -438,7 +431,7 @@ private OptionalLong lastRepairedSince(TableReference tableReference,
Set<LongTokenRange> expectedRepaired)
{
Set<LongTokenRange> expectedRepairedCopy = new HashSet<>(expectedRepaired);
Iterator<RepairEntry> repairEntryIterator = myRepairHistoryProvider.iterate(myLocalHost, tableReference,
Iterator<RepairEntry> repairEntryIterator = myRepairHistoryService.iterate(myLocalHost, tableReference,
System.currentTimeMillis(), repairedSince,
repairEntry -> fullyRepaired(repairEntry) && expectedRepairedCopy.remove(repairEntry.getRange()));

Expand Down Expand Up @@ -538,47 +531,26 @@ private void injectRepairHistory(TableReference tableReference,
{
long started_at = timestamp;
long finished_at = timestamp + 5;
Set<UUID> nodes = participants.stream()
.map(myNodeResolver::fromIp)
.filter(Optional::isPresent)
.map(Optional::get)
.map(DriverNode::getId)
.collect(Collectors.toSet());

SimpleStatement statement;

if (myRepairHistoryType == RepairHistoryType.CASSANDRA)
{
statement = QueryBuilder.insertInto("system_distributed", "repair_history")
.value("keyspace_name", literal(tableReference.getKeyspace()))
.value("columnfamily_name", literal(tableReference.getTable()))
.value("participants", literal(participants))
.value("coordinator", literal(myLocalNode.getPublicAddress()))
.value("id", literal(Uuids.startOf(started_at)))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("status", literal("SUCCESS"))
.build();
}
else
{
Set<UUID> nodes = participants.stream()
.map(myNodeResolver::fromIp)
.filter(Optional::isPresent)
.map(Optional::get)
.map(DriverNode::getId)
.collect(Collectors.toSet());

statement = QueryBuilder.insertInto("ecchronos", "repair_history")
.value("table_id", literal(tableReference.getId()))
.value("node_id", literal(myLocalNode.getId()))
.value("repair_id", literal(Uuids.startOf(finished_at)))
.value("job_id", literal(tableReference.getId()))
.value("coordinator_id", literal(myLocalNode.getId()))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("participants", literal(nodes))
.value("status", literal("SUCCESS"))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.build();
}
SimpleStatement statement = QueryBuilder.insertInto("ecchronos", "repair_history")
.value("table_id", literal(tableReference.getId()))
.value("node_id", literal(myLocalNode.getId()))
.value("repair_id", literal(Uuids.startOf(finished_at)))
.value("job_id", literal(tableReference.getId()))
.value("coordinator_id", literal(myLocalNode.getId()))
.value("range_begin", literal(range_begin))
.value("range_end", literal(range_end))
.value("participants", literal(nodes))
.value("status", literal("SUCCESS"))
.value("started_at", literal(Instant.ofEpochMilli(started_at)))
.value("finished_at", literal(Instant.ofEpochMilli(finished_at)))
.build();

mySession.execute(statement);
}
Expand Down