From 664236b0795ffa601a28f765dcb5bc499ba8f76e Mon Sep 17 00:00:00 2001 From: Alexander Dejanovski Date: Wed, 18 Dec 2024 07:31:42 +0100 Subject: [PATCH] Memory store segment leader election (#1533) The memory storage backend didn't implement any if the IDistributedStorage leader election methods. This was ok until we started using leader election for segment scheduling, which doesn't require to run in a distributed mode. As a consequence, Reaper using the mem store would schedule one new segment at each poll, even if the replicas were already busy processing another segment. This PR introduces a class which manages locks on replicas for segments and moves the required methods from IDistributedStorage to IStorage so they can be implemented in the memory storage implementation. --- .../service/RepairManager.java | 4 +- .../service/SegmentRunner.java | 23 +-- .../storage/IDistributedStorage.java | 18 -- .../cassandrareaper/storage/IStorageDao.java | 20 ++ .../storage/MemoryStorageFacade.java | 41 ++++- .../memory/ReplicaLockManagerWithTtl.java | 172 ++++++++++++++++++ .../acceptance/BasicSteps.java | 1 + .../acceptance/ReaperMetricsIT.java | 28 +++ .../service/RepairManagerTest.java | 5 +- .../service/RepairRunnerHangingTest.java | 7 +- .../service/RepairRunnerTest.java | 21 ++- .../service/SegmentRunnerTest.java | 9 + .../memory/ReplicaLockManagerWithTtlTest.java | 97 ++++++++++ .../cassandra-reaper-metrics-test.yaml | 2 +- 14 files changed, 395 insertions(+), 53 deletions(-) create mode 100644 src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java create mode 100644 src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java index fdf5df234..63f461eb2 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java @@ -308,9 +308,7 @@ private void abortSegmentsWithNoLeaderNonIncremental(RepairRun repairRun, Collec if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) { // When multiple Reapers are in use, we can get stuck segments when one instance is rebooted // Any segment in RUNNING or STARTED state but with no leader should be killed - Set leaders = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).getLockedSegmentsForRun(repairRun.getId()) - : Collections.emptySet(); + Set leaders = context.storage.getLockedSegmentsForRun(repairRun.getId()); Collection orphanedSegments = runningSegments .stream() diff --git a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java index 68bf551e8..5bd30fe99 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java +++ b/src/server/src/main/java/io/cassandrareaper/service/SegmentRunner.java @@ -869,10 +869,8 @@ private boolean takeLead(RepairSegment segment) { ? ((IDistributedStorage) context.storage).takeLead(leaderElectionId) : true; } else { - result = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + result = context.storage.lockRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } if (!result) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "takeLead", "failed")).inc(); @@ -895,10 +893,8 @@ private boolean renewLead(RepairSegment segment) { } return result; } else { - boolean resultLock2 = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()) - : true; + boolean resultLock2 = context.storage.renewRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); if (!resultLock2) { context.metricRegistry.counter(MetricRegistry.name(SegmentRunner.class, "renewLead", "failed")).inc(); releaseLead(segment); @@ -912,13 +908,14 @@ private boolean renewLead(RepairSegment segment) { private void releaseLead(RepairSegment segment) { try (Timer.Context cx = context.metricRegistry.timer(MetricRegistry.name(SegmentRunner.class, "releaseLead")).time()) { - if (context.storage instanceof IDistributedStorage) { - if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + + if (repairUnit.getIncrementalRepair() && !repairUnit.getSubrangeIncrementalRepair()) { + if (context.storage instanceof IDistributedStorage) { ((IDistributedStorage) context.storage).releaseLead(leaderElectionId); - } else { - ((IDistributedStorage) context.storage).releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), - segment.getId(), segment.getReplicas().keySet()); } + } else { + context.storage.releaseRunningRepairsForNodes(this.repairRunner.getRepairRunId(), + segment.getId(), segment.getReplicas().keySet()); } } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java index 298bf825b..fc48dd6b9 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IDistributedStorage.java @@ -41,7 +41,6 @@ import io.cassandrareaper.storage.operations.IOperationsDao; import java.util.List; -import java.util.Set; import java.util.UUID; @@ -62,23 +61,6 @@ public interface IDistributedStorage extends IDistributedMetrics { void releaseLead(UUID leaderId); - boolean lockRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean renewRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - boolean releaseRunningRepairsForNodes( - UUID repairId, - UUID segmentId, - Set replicas); - - Set getLockedSegmentsForRun(UUID runId); - int countRunningReapers(); List getRunningReapers(); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java b/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java index 98e879371..7157ae995 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/IStorageDao.java @@ -26,6 +26,9 @@ import io.cassandrareaper.storage.repairunit.IRepairUnitDao; import io.cassandrareaper.storage.snapshot.ISnapshotDao; +import java.util.Set; +import java.util.UUID; + import io.dropwizard.lifecycle.Managed; /** @@ -34,6 +37,23 @@ public interface IStorageDao extends Managed, IMetricsDao { + boolean lockRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + boolean renewRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + boolean releaseRunningRepairsForNodes( + UUID repairId, + UUID segmentId, + Set replicas); + + Set getLockedSegmentsForRun(UUID runId); + boolean isStorageConnected(); IEventsDao getEventsDao(); diff --git a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java index f5b775fd4..84facac48 100644 --- a/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java +++ b/src/server/src/main/java/io/cassandrareaper/storage/MemoryStorageFacade.java @@ -29,6 +29,7 @@ import io.cassandrareaper.storage.events.IEventsDao; import io.cassandrareaper.storage.events.MemoryEventsDao; import io.cassandrareaper.storage.memory.MemoryStorageRoot; +import io.cassandrareaper.storage.memory.ReplicaLockManagerWithTtl; import io.cassandrareaper.storage.metrics.MemoryMetricsDao; import io.cassandrareaper.storage.repairrun.IRepairRunDao; import io.cassandrareaper.storage.repairrun.MemoryRepairRunDao; @@ -46,9 +47,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import com.google.common.io.Files; import org.eclipse.serializer.persistence.types.PersistenceFieldEvaluator; import org.eclipse.store.storage.embedded.types.EmbeddedStorage; import org.eclipse.store.storage.embedded.types.EmbeddedStorageManager; @@ -61,8 +64,9 @@ */ public final class MemoryStorageFacade implements IStorageDao { + // Default time to live of leads taken on a segment + private static final long DEFAULT_LEAD_TTL = 90_000; private static final Logger LOG = LoggerFactory.getLogger(MemoryStorageFacade.class); - /** Field evaluator to find transient attributes. This is needed to deal with persisting Guava collections objects * that sometimes use the transient keyword for some of their implementation's backing stores**/ private static final PersistenceFieldEvaluator TRANSIENT_FIELD_EVALUATOR = @@ -85,8 +89,9 @@ public final class MemoryStorageFacade implements IStorageDao { ); private final MemorySnapshotDao memSnapshotDao = new MemorySnapshotDao(); private final MemoryMetricsDao memMetricsDao = new MemoryMetricsDao(); + private final ReplicaLockManagerWithTtl replicaLockManagerWithTtl; - public MemoryStorageFacade(String persistenceStoragePath) { + public MemoryStorageFacade(String persistenceStoragePath, long leadTime) { LOG.info("Using memory storage backend. Persistence storage path: {}", persistenceStoragePath); this.embeddedStorage = EmbeddedStorage.Foundation(Paths.get(persistenceStoragePath)) .onConnectionFoundation( @@ -103,10 +108,19 @@ public MemoryStorageFacade(String persistenceStoragePath) { LOG.info("Loading existing data from persistence storage"); this.memoryStorageRoot = (MemoryStorageRoot) this.embeddedStorage.root(); } + this.replicaLockManagerWithTtl = new ReplicaLockManagerWithTtl(leadTime); } public MemoryStorageFacade() { - this("/tmp/" + UUID.randomUUID().toString()); + this(Files.createTempDir().getAbsolutePath(), DEFAULT_LEAD_TTL); + } + + public MemoryStorageFacade(String persistenceStoragePath) { + this(persistenceStoragePath, DEFAULT_LEAD_TTL); + } + + public MemoryStorageFacade(long leadTime) { + this(Files.createTempDir().getAbsolutePath(), leadTime); } @Override @@ -296,4 +310,25 @@ public Collection getRepairSegmentsByRunId(UUID runId) { public Map getSubscriptionsById() { return this.memoryStorageRoot.getSubscriptionsById(); } + + @Override + public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + return replicaLockManagerWithTtl.lockRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + return replicaLockManagerWithTtl.renewRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + LOG.info("Releasing locks for runId: {}, segmentId: {}, replicas: {}", runId, segmentId, replicas); + return replicaLockManagerWithTtl.releaseRunningRepairsForNodes(runId, segmentId, replicas); + } + + @Override + public Set getLockedSegmentsForRun(UUID runId) { + return replicaLockManagerWithTtl.getLockedSegmentsForRun(runId); + } } diff --git a/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java new file mode 100644 index 000000000..c614a1621 --- /dev/null +++ b/src/server/src/main/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtl.java @@ -0,0 +1,172 @@ +/* + * Copyright 2024-2024 DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.memory; + +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; + +public class ReplicaLockManagerWithTtl { + + private final ConcurrentHashMap replicaLocks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> repairRunToSegmentLocks = new ConcurrentHashMap<>(); + private final Lock lock = new ReentrantLock(); + + private final long ttlMilliSeconds; + + public ReplicaLockManagerWithTtl(long ttlMilliSeconds) { + this.ttlMilliSeconds = ttlMilliSeconds; + // Schedule cleanup of expired locks + ScheduledExecutorService lockCleanupScheduler = Executors.newScheduledThreadPool(1); + lockCleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredLocks, 1, 1, TimeUnit.SECONDS); + } + + private String getReplicaLockKey(String replica, UUID runId) { + return replica + runId; + } + + public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + // Check if any replica is already locked by another runId + boolean anyReplicaLocked = replicas.stream() + .map(replica -> replicaLocks.get(getReplicaLockKey(replica, runId))) + .anyMatch(lockInfo -> lockInfo != null + && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)); + + if (anyReplicaLocked) { + return false; // Replica is locked by another runId and not expired + } + + // Lock the replicas for the given runId and segmentId + long expirationTime = currentTime + ttlMilliSeconds; + replicas.forEach(replica -> + replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, expirationTime)) + ); + + // Update runId to segmentId mapping + repairRunToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId); + return true; + } finally { + lock.unlock(); + } + } + + public boolean renewRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + + // Check if all replicas are already locked by this runId + boolean allReplicasLocked = replicas.stream() + .map(replica -> replicaLocks.get(getReplicaLockKey(replica, runId))) + .allMatch(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId) + && lockInfo.expirationTime > currentTime); + + if (!allReplicasLocked) { + return false; // Some replica is not validly locked by this runId + } + + // Renew the lock by extending the expiration time + long newExpirationTime = currentTime + ttlMilliSeconds; + replicas.forEach(replica -> + replicaLocks.put(getReplicaLockKey(replica, runId), new LockInfo(runId, newExpirationTime)) + ); + + // Ensure the segmentId is linked to the runId + repairRunToSegmentLocks.computeIfAbsent(runId, k -> ConcurrentHashMap.newKeySet()).add(segmentId); + return true; + } finally { + lock.unlock(); + } + } + + public boolean releaseRunningRepairsForNodes(UUID runId, UUID segmentId, Set replicas) { + lock.lock(); + try { + // Remove the lock for replicas + replicas.stream() + .map(replica -> getReplicaLockKey(replica, runId)) + .map(replicaLocks::get) + .filter(lockInfo -> lockInfo != null && lockInfo.runId.equals(runId)) + .forEach(lockInfo -> replicaLocks.remove(getReplicaLockKey(lockInfo.runId.toString(), runId))); + + // Remove the segmentId from the runId mapping + Set segments = repairRunToSegmentLocks.get(runId); + if (segments != null) { + segments.remove(segmentId); + if (segments.isEmpty()) { + repairRunToSegmentLocks.remove(runId); + } + } + return true; + } finally { + lock.unlock(); + } + } + + public Set getLockedSegmentsForRun(UUID runId) { + return repairRunToSegmentLocks.getOrDefault(runId, Collections.emptySet()); + } + + @VisibleForTesting + public void cleanupExpiredLocks() { + lock.lock(); + try { + long currentTime = System.currentTimeMillis(); + + // Remove expired locks from replicaLocks + replicaLocks.entrySet().removeIf(entry -> entry.getValue().expirationTime <= currentTime); + + // Clean up runToSegmentLocks by removing segments with no active replicas + repairRunToSegmentLocks.entrySet().removeIf(entry -> { + UUID runId = entry.getKey(); + Set segments = entry.getValue(); + + // Retain only active segments + segments.removeIf(segmentId -> { + boolean active = replicaLocks.values().stream() + .anyMatch(info -> info.runId.equals(runId)); + return !active; + }); + return segments.isEmpty(); + }); + } finally { + lock.unlock(); + } + } + + // Class to store lock information + private static class LockInfo { + UUID runId; + long expirationTime; + + LockInfo(UUID runId, long expirationTime) { + this.runId = runId; + this.expirationTime = expirationTime; + } + } +} \ No newline at end of file diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java index d1cf856b8..2f8eec8dd 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/BasicSteps.java @@ -562,6 +562,7 @@ public void a_new_daily_repair_schedule_is_added_for_the_last_added_cluster_and_ params.put("intensity", "0.9"); params.put("scheduleDaysBetween", "1"); params.put("scheduleTriggerTime", DateTime.now().plusSeconds(1).toString()); + params.put("segmentCountPerNode", "1"); ReaperTestJettyRunner runner = RUNNERS.get(RAND.nextInt(RUNNERS.size())); Response response = runner.callReaper("POST", "/repair_schedule", Optional.of(params)); int responseStatus = response.getStatus(); diff --git a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java index 9e2ddcbed..dd4db2995 100644 --- a/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java +++ b/src/server/src/test/java/io/cassandrareaper/acceptance/ReaperMetricsIT.java @@ -17,6 +17,13 @@ package io.cassandrareaper.acceptance; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.stream.Stream; + import cucumber.api.CucumberOptions; import cucumber.api.junit.Cucumber; import org.junit.AfterClass; @@ -25,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + @RunWith(Cucumber.class) @CucumberOptions( features = { @@ -46,6 +54,8 @@ public static void setUp() throws Exception { "setting up testing Reaper runner with {} seed hosts defined and memory storage", TestContext.TEST_CLUSTER_SEED_HOSTS.size()); + // We now have persistence in the memory store, so we need to clean up the storage folder before starting the tests + deleteFolderContents("/tmp/reaper/storage/"); runner = new ReaperTestJettyRunner(MEMORY_CONFIG_FILE); BasicSteps.addReaperRunner(runner); } @@ -56,4 +66,22 @@ public static void tearDown() { runner.runnerInstance.after(); } + public static void deleteFolderContents(String folderPath) throws IOException { + // Check if the path exists + Path path = Paths.get(folderPath); + if (!Files.exists(path)) { + return; + } + try (Stream walk = Files.walk(path)) { + walk.sorted(Comparator.reverseOrder()) + .forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException("Failed to delete " + p, e); + } + }); + } + } + } diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java index 412191e9b..f8992240d 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairManagerTest.java @@ -26,7 +26,6 @@ import io.cassandrareaper.core.RepairUnit; import io.cassandrareaper.core.Segment; import io.cassandrareaper.management.ClusterFacade; -import io.cassandrareaper.storage.IDistributedStorage; import io.cassandrareaper.storage.IStorageDao; import io.cassandrareaper.storage.cassandra.CassandraStorageFacade; import io.cassandrareaper.storage.cluster.IClusterDao; @@ -144,7 +143,7 @@ public void abortRunningSegmentWithNoLeader() throws ReaperException, Interrupte Mockito.doNothing().when(context.repairManager).abortSegments(any(), any()); Mockito.doReturn(run).when(context.repairManager).startRepairRun(run); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); + when(context.storage.getLockedSegmentsForRun(any())).thenReturn(Collections.emptySet()); IRepairUnitDao mockedRepairUnitDao = mock(IRepairUnitDao.class); Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); @@ -238,7 +237,7 @@ public void doNotAbortRunningSegmentWithLeader() throws ReaperException, Interru Mockito.when(((CassandraStorageFacade) context.storage).getRepairUnitDao()).thenReturn(mockedRepairUnitDao); Mockito.when(mockedRepairUnitDao.getRepairUnit(any(UUID.class))).thenReturn(cf); - when(((IDistributedStorage) context.storage).getLockedSegmentsForRun(any())).thenReturn( + when(context.storage.getLockedSegmentsForRun(any())).thenReturn( new HashSet(Arrays.asList(segment.getId()))); context.repairManager.resumeRunningRepairRuns(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java index afbd073f8..577b6520b 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerHangingTest.java @@ -83,6 +83,7 @@ public final class RepairRunnerHangingTest { + private static final long LEAD_TTL = 1000L; private static final Logger LOG = LoggerFactory.getLogger(RepairRunnerHangingTest.class); private static final Set TABLES = ImmutableSet.of("table1"); private static final List THREE_TOKENS = Lists.newArrayList( @@ -242,7 +243,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException, JM final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( RepairUnit.builder() @@ -397,7 +398,7 @@ public void testHangingRepairNewApi() throws InterruptedException, ReaperExcepti final double intensity = 0.5f; final int repairThreadCount = 1; final int segmentTimeout = 1; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); storage.getClusterDao().addCluster(cluster); DateTimeUtils.setCurrentMillisFixed(timeRun); RepairUnit cf = storage.getRepairUnitDao().addRepairUnit( @@ -553,7 +554,7 @@ public void testDontFailRepairAfterTopologyChangeIncrementalRepair() throws Inte final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java index b7f16e250..91789d398 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/RepairRunnerTest.java @@ -99,6 +99,7 @@ import static org.mockito.Mockito.when; public final class RepairRunnerTest { + private static final long LEAD_TTL = 100L; private static final Set TABLES = ImmutableSet.of("table1"); private static final Duration POLL_INTERVAL = Duration.TWO_SECONDS; private static final List THREE_TOKENS = Lists.newArrayList( @@ -223,7 +224,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException, Mal final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -333,8 +334,10 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); context.repairManager.resumeRunningRepairRuns(); - Thread.sleep(1000); - assertEquals(RepairRun.RunState.DONE, storage.getRepairRunDao().getRepairRun(runId).get().getRunState()); + + await().with().pollInterval(POLL_INTERVAL).atMost(30, SECONDS).until(() -> { + return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); + }); } @Test(expected = ConditionTimeoutException.class) @@ -353,7 +356,7 @@ public void testTooManyPendingCompactions() final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -546,7 +549,7 @@ public void testDontFailRepairAfterTopologyChange() throws InterruptedException, final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -667,7 +670,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept context.repairManager.resumeRunningRepairRuns(); // The repair run should succeed despite the topology change. - await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + await().with().atMost(120, TimeUnit.SECONDS).until(() -> { return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); }); } @@ -687,7 +690,7 @@ public void testSubrangeIncrementalRepair() throws InterruptedException, ReaperE final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); @@ -802,7 +805,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept run.with().runState(RepairRun.RunState.RUNNING).startTime(DateTime.now()).build(runId)); context.repairManager.resumeRunningRepairRuns(); - await().with().atMost(20, TimeUnit.SECONDS).until(() -> { + await().with().atMost(120, TimeUnit.SECONDS).until(() -> { return RepairRun.RunState.DONE == storage.getRepairRunDao().getRepairRun(runId).get().getRunState(); }); } @@ -962,7 +965,7 @@ public void getNodeMetricsInLocalDcAvailabilityForLocalDcNodeTest() throws Excep final int repairThreadCount = 1; final int segmentTimeout = 30; final List tokens = THREE_TOKENS; - final IStorageDao storage = new MemoryStorageFacade(); + final IStorageDao storage = new MemoryStorageFacade(LEAD_TTL); AppContext context = new AppContext(); context.storage = storage; context.config = new ReaperApplicationConfiguration(); diff --git a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java index c4cdbff11..a533df458 100644 --- a/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java +++ b/src/server/src/test/java/io/cassandrareaper/service/SegmentRunnerTest.java @@ -193,6 +193,7 @@ public JmxCassandraManagementProxy connectImpl(Node host) throws ReaperException RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -348,6 +349,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -496,6 +498,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -639,6 +642,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -782,6 +786,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -927,6 +932,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1073,6 +1079,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1205,6 +1212,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); @@ -1300,6 +1308,7 @@ protected JmxCassandraManagementProxy connectImpl(Node host) throws ReaperExcept RepairRunner rr = mock(RepairRunner.class); RepairUnit ru = mock(RepairUnit.class); when(ru.getKeyspaceName()).thenReturn("reaper"); + when(rr.getRepairRunId()).thenReturn(runId); ClusterFacade clusterFacade = mock(ClusterFacade.class); when(clusterFacade.connect(any(Cluster.class), any())).thenReturn(jmx); diff --git a/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java new file mode 100644 index 000000000..08a5e12be --- /dev/null +++ b/src/server/src/test/java/io/cassandrareaper/storage/memory/ReplicaLockManagerWithTtlTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2014-2017 Spotify AB + * Copyright 2016-2019 The Last Pickle Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cassandrareaper.storage.memory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ReplicaLockManagerWithTtlTest { + + private ReplicaLockManagerWithTtl replicaLockManager; + private UUID runId; + private UUID segmentId; + private Set replicas; + private Set replicasOverlap; + + @BeforeEach + public void setUp() { + replicaLockManager = new ReplicaLockManagerWithTtl(1000); + runId = UUID.randomUUID(); + segmentId = UUID.randomUUID(); + replicas = new HashSet<>(Arrays.asList("replica1", "replica2", "replica3")); + replicasOverlap = new HashSet<>(Arrays.asList("replica4", "replica2", "replica5")); + } + + @Test + public void testLockRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testLockRunningRepairsForNodesAlreadyLocked() { + UUID anotherRunId = UUID.randomUUID(); + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testRenewRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertTrue(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testRenewRunningRepairsForNodesNotLocked() { + assertFalse(replicaLockManager.renewRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testReleaseRunningRepairsForNodes() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + assertTrue(replicaLockManager.releaseRunningRepairsForNodes(runId, segmentId, replicas)); + } + + @Test + public void testGetLockedSegmentsForRun() { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + Set lockedSegments = replicaLockManager.getLockedSegmentsForRun(runId); + assertTrue(lockedSegments.contains(segmentId)); + } + + @Test + public void testCleanupExpiredLocks() throws InterruptedException { + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicas)); + // We can lock the replica for a different run id + assertTrue(replicaLockManager.lockRunningRepairsForNodes(UUID.randomUUID(), UUID.randomUUID(), replicas)); + // The following lock should fail because overlapping replicas are already locked + assertFalse(replicaLockManager.lockRunningRepairsForNodes(runId, UUID.randomUUID(), replicasOverlap)); + Thread.sleep(1000); // Wait for TTL to expire + replicaLockManager.cleanupExpiredLocks(); + // The following lock should succeed as the lock expired + assertTrue(replicaLockManager.lockRunningRepairsForNodes(runId, segmentId, replicasOverlap)); + } +} \ No newline at end of file diff --git a/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml b/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml index e8ed291f1..ecedb61fe 100644 --- a/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml +++ b/src/server/src/test/resources/cassandra-reaper-metrics-test.yaml @@ -28,7 +28,7 @@ enableDynamicSeedList: false jmxConnectionTimeoutInSeconds: 10 datacenterAvailability: LOCAL percentRepairedCheckIntervalMinutes: 1 -repairManagerSchedulingIntervalSeconds: 0 +repairManagerSchedulingIntervalSeconds: 1 logging: level: WARN