From de1d7231b0098a28166aea5e3af5caef88ed0972 Mon Sep 17 00:00:00 2001 From: sajid riaz Date: Wed, 13 Nov 2024 12:26:32 +0100 Subject: [PATCH] Add Locks In SchedulerManager #768 - added locks in schedule manger --- CHANGES.md | 1 + .../spring/ECChronosInternals.java | 19 +- .../core/impl/locks/CASLockFactory.java | 30 +- .../impl/locks/CASLockFactoryBuilder.java | 32 +- .../core/impl/locks/CASLockProperties.java | 11 +- .../core/impl/locks/CASLockStatement.java | 20 +- .../ecchronos/core/impl/locks/LockCache.java | 8 +- .../impl/repair/RepairLockFactoryImpl.java | 187 ++++++++++++ .../repair/scheduler/ScheduleManagerImpl.java | 23 +- .../ecchronos/core/impl/locks/DummyLock.java | 18 +- .../core/impl/locks/TestCASLockFactory.java | 30 +- .../core/impl/locks/TestLockCollection.java | 11 - .../repair/TestRepairLockFactoryImpl.java | 286 ++++++++++++++++++ .../repair/scheduler/TestScheduleManager.java | 90 +++++- .../ecchronos/core/locks/LockFactory.java | 7 +- .../core/repair/RepairLockFactory.java | 44 +++ .../ecchronos/core/repair/RepairResource.java | 104 +++++++ .../core/repair/RepairResourceFactory.java | 34 +++ .../core/repair/scheduler/ScheduledTask.java | 20 ++ 19 files changed, 843 insertions(+), 132 deletions(-) create mode 100644 core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairLockFactoryImpl.java rename connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/StatementDecorator.java => core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/DummyLock.java (63%) create mode 100644 core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairLockFactoryImpl.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairLockFactory.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResource.java create mode 100644 core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResourceFactory.java diff --git a/CHANGES.md b/CHANGES.md index 4fe0e8c7e..447f753d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 1.0.0 (Not yet Released) +* Add Locks In SchedulerManager - Issue #768 * Cassandra-Based Distributed Locks - Issue #741 * Create New Repair Type Called "VNODE" - Issue #755 * Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronosInternals.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronosInternals.java index 8c137f46d..7fb9f7a20 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronosInternals.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronosInternals.java @@ -17,9 +17,11 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.metadata.Node; import com.ericsson.bss.cassandra.ecchronos.application.config.Config; +import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.CasLockFactoryConfig; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl; @@ -57,13 +59,13 @@ public class ECChronosInternals implements Closeable private final CassandraMetrics myCassandraMetrics; private final HostStatesImpl myHostStatesImpl; private final TableStorageStatesImpl myTableStorageStatesImpl; + private final CASLockFactory myLockFactory; public ECChronosInternals( final Config configuration, final DistributedNativeConnectionProvider nativeConnectionProvider, final DistributedJmxConnectionProvider jmxConnectionProvider, - final EccNodesSync eccNodesSync - ) + final EccNodesSync eccNodesSync) { myJmxProxyFactory = DistributedJmxProxyFactoryImpl.builder() .withJmxConnectionProvider(jmxConnectionProvider) @@ -91,10 +93,23 @@ public ECChronosInternals( .build(); myCassandraMetrics = new CassandraMetrics(myJmxProxyFactory); + + CasLockFactoryConfig casLockFactoryConfig = configuration.getLockFactory() + .getCasLockFactoryConfig(); + + myLockFactory = CASLockFactory.builder() + .withNativeConnectionProvider(nativeConnectionProvider) + .withHostStates(myHostStatesImpl) + .withKeyspaceName(casLockFactoryConfig.getKeyspaceName()) + .withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()) + .withConsistencySerial(casLockFactoryConfig.getConsistencySerial()) + .build(); + myScheduleManagerImpl = ScheduleManagerImpl.builder() .withRunInterval(configuration.getSchedulerConfig().getFrequency().getInterval(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .withNodeIDList(jmxConnectionProvider.getJmxConnections().keySet()) + .withLockFactory(myLockFactory) .build(); } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java index 4ad9727cd..28091c503 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java @@ -75,7 +75,6 @@ public final class CASLockFactory implements LockFactory, Closeable private static final int REFRESH_INTERVAL_RATIO = 10; private static final int DEFAULT_LOCK_TIME_IN_SECONDS = 600; - private final UUID myUuid; private final HostStates myHostStates; private final CASLockFactoryCacheContext myCasLockFactoryCacheContext; @@ -90,22 +89,12 @@ public final class CASLockFactory implements LockFactory, Closeable Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()), builder.getConsistencyType(), - builder.getNativeConnectionProvider().getCqlSession(), - builder.getStatementDecorator()); + builder.getNativeConnectionProvider().getCqlSession()); myHostStates = builder.getHostStates(); verifySchemasExists(); - UUID hostId = builder.getNode().getHostId(); - - if (hostId == null) - { - hostId = UUID.randomUUID(); - LOG.warn("Unable to determine local nodes host id, using {} instead", hostId); - } - - myUuid = hostId; myCasLockFactoryCacheContext = buildCasLockFactoryCacheContext(builder.getCacheExpiryTimeInSecond()); myCasLockStatement = new CASLockStatement(myCasLockProperties, myCasLockFactoryCacheContext); @@ -144,11 +133,11 @@ private int getDefaultTimeToLiveFromLockTable() public DistributedLock tryLock(final String dataCenter, final String resource, final int priority, - final Map metadata) - throws LockException + final Map metadata, + final UUID nodeId) throws LockException { return myCasLockFactoryCacheContext.getLockCache() - .getLock(dataCenter, resource, priority, metadata); + .getLock(dataCenter, resource, priority, metadata, nodeId); } @Override @@ -214,12 +203,6 @@ public void close() } } - @VisibleForTesting - UUID getHostId() - { - return myUuid; - } - @VisibleForTesting CASLockFactoryCacheContext getCasLockFactoryCacheContext() { @@ -246,7 +229,8 @@ public static CASLockFactoryBuilder builder() private DistributedLock doTryLock(final String dataCenter, final String resource, final int priority, - final Map metadata) throws LockException + final Map metadata, + final UUID nodeId) throws LockException { LOG.trace("Trying lock for {} - {}", dataCenter, resource); @@ -255,7 +239,7 @@ private DistributedLock doTryLock(final String dataCenter, LOG.warn("Not sufficient nodes to lock resource {} in datacenter {}", resource, dataCenter); throw new LockException("Not sufficient nodes to lock"); } - CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, myUuid, myCasLockStatement); // NOSONAR + CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, nodeId, myCasLockStatement); // NOSONAR if (casLock.lock()) { return casLock; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java index 0e60d4960..ab901b755 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactoryBuilder.java @@ -14,11 +14,10 @@ */ package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; -import com.datastax.oss.driver.api.core.metadata.Node; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; -import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType; import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; +import java.util.UUID; /** * Represents a container for builder configurations and state for the CASLockFactory. @@ -32,11 +31,9 @@ public class CASLockFactoryBuilder private DistributedNativeConnectionProvider myNativeConnectionProvider; private HostStates myHostStates; - private StatementDecorator myStatementDecorator; private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS; private ConsistencyType myConsistencyType = DEFAULT_CONSISTENCY_SERIAL; - private Node myNode; public final CASLockFactoryBuilder withNativeConnectionProvider(final DistributedNativeConnectionProvider nativeConnectionProvider) { @@ -50,12 +47,6 @@ public final CASLockFactoryBuilder withHostStates(final HostStates hostStates) return this; } - public final CASLockFactoryBuilder withStatementDecorator(final StatementDecorator statementDecorator) - { - myStatementDecorator = statementDecorator; - return this; - } - public final CASLockFactoryBuilder withKeyspaceName(final String keyspaceName) { myKeyspaceName = keyspaceName; @@ -74,12 +65,6 @@ public final CASLockFactoryBuilder withConsistencySerial(final ConsistencyType c return this; } - public final CASLockFactoryBuilder withNode(final Node node) - { - myNode = node; - return this; - } - public final CASLockFactory build() { if (myNativeConnectionProvider == null) @@ -92,11 +77,6 @@ public final CASLockFactory build() throw new IllegalArgumentException("Host states cannot be null"); } - if (myStatementDecorator == null) - { - throw new IllegalArgumentException("Statement decorator cannot be null"); - } - return new CASLockFactory(this); } @@ -110,11 +90,6 @@ public final HostStates getHostStates() return myHostStates; } - public final StatementDecorator getStatementDecorator() - { - return myStatementDecorator; - } - public final String getKeyspaceName() { return myKeyspaceName; @@ -129,9 +104,4 @@ public final ConsistencyType getConsistencyType() { return myConsistencyType; } - - public final Node getNode() - { - return myNode; - } } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java index 5395eaf36..009e06b88 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockProperties.java @@ -20,7 +20,6 @@ import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlSession; -import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; /** * Represents a container for builder configurations and state for the CASLockStatement. @@ -33,21 +32,18 @@ public class CASLockProperties private final ScheduledExecutorService myExecutor; private final ConsistencyLevel mySerialConsistencyLevel; private final CqlSession mySession; - private final StatementDecorator myStatementDecorator; CASLockProperties(final ConnectionType connectionType, final String keyspaceName, final ScheduledExecutorService executor, final ConsistencyType consistencyType, - final CqlSession session, - final StatementDecorator statementDecorator) + final CqlSession session) { myConnectionType = connectionType; myKeyspaceName = keyspaceName; myExecutor = executor; mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType); mySession = session; - myStatementDecorator = statementDecorator; } public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType) @@ -88,11 +84,6 @@ public final CqlSession getSession() return mySession; } - public final StatementDecorator getStatementDecorator() - { - return myStatementDecorator; - } - public final boolean isDatacenterAwareAgentType() { diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java index cb121c153..55d6d0be0 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockStatement.java @@ -67,21 +67,11 @@ public CASLockStatement( public final ResultSet execute(final String dataCenter, final BoundStatement statement) { - Statement executeStatement; - - if (dataCenter != null && myCasLockProperties.isDatacenterAwareAgentType()) - { - executeStatement = new DataCenterAwareStatement(statement, dataCenter); - } - else - { - executeStatement = statement; - } - - return myCasLockProperties.getSession() - .execute(myCasLockProperties - .getStatementDecorator() - .apply(executeStatement)); + Statement executeStatement = (dataCenter != null + && myCasLockProperties.isDatacenterAwareAgentType()) + ? new DataCenterAwareStatement(statement, dataCenter) + : statement; + return myCasLockProperties.getSession().execute(executeStatement); } private SimpleStatement insertLockStatement() diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java index 7f8f76a61..d69d10d15 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java @@ -18,6 +18,7 @@ import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,8 @@ public Optional getCachedFailure(final String dataCenter, final S public DistributedLock getLock(final String dataCenter, final String resource, final int priority, - final Map metadata) + final Map metadata, + final UUID hostId) throws LockException { LockKey lockKey = new LockKey(dataCenter, resource); @@ -72,7 +74,7 @@ public DistributedLock getLock(final String dataCenter, try { - return myLockSupplier.getLock(dataCenter, resource, priority, metadata); + return myLockSupplier.getLock(dataCenter, resource, priority, metadata, hostId); } catch (LockException e) { @@ -95,7 +97,7 @@ private Optional getCachedFailure(final LockKey lockKey) @FunctionalInterface public interface LockSupplier { - DistributedLock getLock(String dataCenter, String resource, int priority, Map metadata) + DistributedLock getLock(String dataCenter, String resource, int priority, Map metadata, UUID hostId) throws LockException; } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairLockFactoryImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairLockFactoryImpl.java new file mode 100644 index 000000000..ef47485d4 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairLockFactoryImpl.java @@ -0,0 +1,187 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.LockCollection; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairResource; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RepairLockFactoryImpl implements RepairLockFactory +{ + private static final Logger LOG = LoggerFactory.getLogger(RepairLockFactoryImpl.class); + + private static final int LOCKS_PER_RESOURCE = 1; + + @Override + public final LockFactory.DistributedLock getLock(final LockFactory lockFactory, + final Set repairResources, + final Map metadata, + final int priority, + final UUID nodeId) throws LockException + { + for (RepairResource repairResource : repairResources) + { + if (!lockFactory.sufficientNodesForLocking(repairResource.getDataCenter(), + repairResource.getResourceName(LOCKS_PER_RESOURCE))) + { + throw new LockException(repairResource + " not lockable. Repair will be retried later."); + } + } + + if (repairResources.isEmpty()) + { + String msg = String.format("No datacenters to lock for %s", this); + LOG.warn(msg); + throw new LockException(msg); + } + + validateNoCachedFailures(lockFactory, repairResources); + + Collection locks = getRepairResourceLocks(lockFactory, + repairResources, + metadata, + priority, + nodeId); + + return new LockCollection(locks); + } + + private void validateNoCachedFailures(final LockFactory lockFactory, final Set repairResources) + throws LockException + { + for (RepairResource repairResource : repairResources) + { + Optional cachedException = lockFactory.getCachedFailure(repairResource.getDataCenter(), + repairResource.getResourceName(LOCKS_PER_RESOURCE)); + if (cachedException.isPresent()) + { + LockException e = cachedException.get(); + LOG.debug("Found cached locking failure for {}, rethrowing", repairResource, e); + throw e; + } + } + } + + private Collection getRepairResourceLocks( + final LockFactory lockFactory, + final Collection repairResources, + final Map metadata, + final int priority, + final UUID nodeId) + throws LockException + { + try (TemporaryLockHolder lockHolder = new TemporaryLockHolder()) + { + for (RepairResource repairResource : repairResources) + { + try + { + lockHolder.add(getLockForRepairResource(lockFactory, repairResource, metadata, priority, nodeId)); + } + catch (LockException e) + { + LOG.debug("{} - Unable to get repair resource lock '{}', releasing previously acquired locks - {}", + this, + repairResource, + e.getMessage()); + throw e; + } + } + + return lockHolder.getAndClear(); + } + } + + private LockFactory.DistributedLock getLockForRepairResource( + final LockFactory lockFactory, + final RepairResource repairResource, + final Map metadata, + final int priority, + final UUID nodeId) + throws LockException + { + LockFactory.DistributedLock myLock; + + String dataCenter = repairResource.getDataCenter(); + + String resource = repairResource.getResourceName(LOCKS_PER_RESOURCE); + try + { + myLock = lockFactory.tryLock(dataCenter, resource, priority, metadata, nodeId); + + if (myLock != null) + { + return myLock; + } + + String msg = String.format("Lock resources exhausted for %s", repairResource); + LOG.warn(msg); + throw new LockException(msg); + } + catch (LockException e) + { + LOG.debug("Lock ({} in datacenter {}) got error {}", + resource, + dataCenter, + e.getMessage()); + throw e; + } + } + + static class TemporaryLockHolder implements AutoCloseable + { + private final List temporaryLocks = new ArrayList<>(); + + void add(final LockFactory.DistributedLock lock) + { + temporaryLocks.add(lock); + } + + Collection getAndClear() + { + Collection allLocks = new ArrayList<>(temporaryLocks); + temporaryLocks.clear(); + return allLocks; + } + + @Override + public void close() + { + for (LockFactory.DistributedLock lock : temporaryLocks) + { + try + { + lock.close(); + } + catch (Exception e) + { + LOG.warn("Unable to release temporary lock {} for {} ", lock, this, e); + } + } + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java index 8d88807ee..6d20e248b 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java @@ -14,6 +14,8 @@ */ package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RunPolicy; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; @@ -54,6 +56,7 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable private final Set myRunPolicies = Sets.newConcurrentHashSet(); private final Map> myRunFuture = new ConcurrentHashMap<>(); private final Map myRunTasks = new ConcurrentHashMap<>(); + private final CASLockFactory myLockFactory; private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("TaskExecutor-%d").build()); @@ -61,6 +64,7 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable private ScheduleManagerImpl(final Builder builder) { myNodeIDList = builder.myNodeIDList; + myLockFactory = builder.myLockFactory; createScheduleFutureForNodeIDList(builder); } @@ -267,9 +271,10 @@ private boolean tryRunTask( final ScheduledTask task) { LOG.debug("Trying to run task {} in node {}", task, nodeID); - // TODO need to implement lock mechanism - try + LOG.debug("Trying to acquire lock for {}", task); + try (LockFactory.DistributedLock lock = task.getLock(myLockFactory, nodeID)) { + LOG.debug("Lock has been acquired on node with Id {} with lock {}", nodeID, lock); boolean successful = runTask(task); job.postExecute(successful); return true; @@ -317,6 +322,7 @@ public static Builder builder() public static class Builder { private Collection myNodeIDList; + private CASLockFactory myLockFactory; private long myRunIntervalInMs = DEFAULT_RUN_DELAY_IN_MS; /** @@ -344,6 +350,19 @@ public Builder withNodeIDList(final Collection nodeIDList) return this; } + /** + * Sets the {@link CASLockFactory}. + * + * @param lockFactory The {@link CASLockFactory} to be used. + * Must not be {@code null}. + * @return The current {@code Builder} instance, allowing for method chaining. + */ + public final Builder withLockFactory(final CASLockFactory lockFactory) + { + myLockFactory = lockFactory; + return this; + } + /** * Build SchedulerManager with the provided configuration. * diff --git a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/StatementDecorator.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/DummyLock.java similarity index 63% rename from connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/StatementDecorator.java rename to core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/DummyLock.java index f3c0a49c5..4af6c40f9 100644 --- a/connection/src/main/java/com/ericsson/bss/cassandra/ecchronos/connection/StatementDecorator.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/DummyLock.java @@ -12,16 +12,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.ericsson.bss.cassandra.ecchronos.connection; +package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; -import com.datastax.oss.driver.api.core.cql.Statement; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; -public interface StatementDecorator +public class DummyLock implements LockFactory.DistributedLock { - /** - * Decorates a statement before sending it over to the server. - * @param statement The original statement - * @return The decorated statement - */ - Statement apply(Statement statement); + public volatile boolean closed = false; + + @Override + public void close() { + closed = true; + } } diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java index ba6069adb..e38114ee5 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestCASLockFactory.java @@ -114,14 +114,11 @@ public void startup() hostStates = mock(HostStates.class); when(hostStates.isUp(any(Node.class))).thenReturn(true); - Node node = mock(Node.class); - when(node.getHostId()).thenReturn(UUID.randomUUID()); myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) - .withNode(node) + .withHostId(UUID.randomUUID()) .build(); myLockStatement = mySession.prepare(QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK) @@ -159,9 +156,8 @@ public void testGetDefaultTimeToLiveFromLockTable() throws LockException myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) - .withNode(node) + .withHostId(node.getHostId()) .build(); assertThat(myLockFactory.getCasLockFactoryCacheContext().getFailedLockRetryAttempts()).isEqualTo(9); assertThat(myLockFactory.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds()).isEqualTo(120); @@ -276,9 +272,8 @@ public void testGetLockWithLocallyHigherPriority() throws LockException CASLockFactory lockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) - .withNode(getNativeConnectionProvider().getNodes().get(0)) + .withHostId(localHostId) .build(); try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 1, new HashMap<>())) @@ -297,9 +292,8 @@ public void testGetLockWithLocallyLowerPriority() throws LockException CASLockFactory lockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) - .withNode(getNativeConnectionProvider().getNodes().get(0)) + .withHostId(localHostId) .build(); try (LockFactory.DistributedLock lock = lockFactory.tryLock(DATA_CENTER, "lock", 2, new HashMap<>())) { @@ -395,7 +389,6 @@ public void testActivateWithoutKeyspaceCausesIllegalStateException() .isThrownBy(() -> new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .build()); @@ -418,7 +411,6 @@ public void testActivateWithoutLockTableCausesIllegalStateException() .isThrownBy(() -> new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .build()); @@ -436,7 +428,6 @@ public void testActivateWithoutLockPriorityTableCausesIllegalStateException() .isThrownBy(() -> new CASLockFactoryBuilder() .withNativeConnectionProvider(getNativeConnectionProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .build()); @@ -493,7 +484,6 @@ public ConnectionType getConnectionType() } }) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .build()); } @@ -512,10 +502,9 @@ public void testDataCenterAwareAgentTypeWithDefaultSerialConsistency() myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(getDataCenterAwareConnectionTypeProvider()) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .withConsistencySerial(ConsistencyType.DEFAULT) - .withNode(nodeMock) + .withHostId(nodeMock.getHostId()) .build(); assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); @@ -534,10 +523,9 @@ public void testOtherThanDataCenterAwareAgentTypeWithDefaultSerialConsistency() myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(connectionProviderMock) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .withConsistencySerial(ConsistencyType.DEFAULT) - .withNode(nodeMock) + .withHostId(nodeMock.getHostId()) .build(); assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); @@ -555,10 +543,9 @@ public void testLocalSerialConsistency() myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(connectionProviderMock) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .withConsistencySerial(ConsistencyType.LOCAL) - .withNode(nodeMock) + .withHostId(nodeMock.getHostId()) .build(); assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); @@ -576,10 +563,9 @@ public void testSerialConsistency() myLockFactory = new CASLockFactoryBuilder() .withNativeConnectionProvider(connectionProviderMock) .withHostStates(hostStates) - .withStatementDecorator(s -> s) .withKeyspaceName(myKeyspaceName) .withConsistencySerial(ConsistencyType.SERIAL) - .withNode(nodeMock) + .withHostId(nodeMock.getHostId()) .build(); assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java index 160096b44..2a49cbf1d 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/TestLockCollection.java @@ -78,14 +78,3 @@ public void close() } } } - -class DummyLock implements LockFactory.DistributedLock -{ - public volatile boolean closed = false; - - @Override - public void close() - { - closed = true; - } -} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairLockFactoryImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairLockFactoryImpl.java new file mode 100644 index 000000000..f87684840 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestRepairLockFactoryImpl.java @@ -0,0 +1,286 @@ +/* + * Copyright 2018 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairResource; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import com.google.common.collect.Sets; + +@RunWith(MockitoJUnitRunner.class) +public class TestRepairLockFactoryImpl +{ + private static final int LOCKS_PER_RESOURCE = 1; + + @Mock + private LockFactory mockLockFactory; + + @Mock + private LockFactory.DistributedLock mockLock; + + @Before + public void setup() + { + when(mockLockFactory.getCachedFailure(anyString(), anyString())).thenReturn(Optional.empty()); + } + + @Test + public void testNothingToLockThrowsException() + { + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata); + verify(mockLock, never()).close(); + } + + @Test + public void testSingleLock() throws LockException + { + RepairResource repairResource = new RepairResource("DC1", "my-resource"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResource); + withSuccessfulLocking(repairResource, priority, metadata); + + verifyLocksAreTriedWhenGettingLock(repairLockFactory, priority, metadata, repairResource); + verify(mockLock, never()).close(); + } + + @Test + public void testSingleLockNotSufficientNodes() throws LockException + { + RepairResource repairResource = new RepairResource("DC1", "my-resource"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withoutSufficientNodesForLocking(repairResource); + withSuccessfulLocking(repairResource, priority, metadata); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResource); + verify(mockLock, never()).close(); + } + + @Test + public void testSingleLockFailing() throws LockException + { + RepairResource repairResource = new RepairResource("DC1", "my-resource"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResource); + withUnsuccessfulLocking(repairResource, priority, metadata); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResource); + verify(mockLock, never()).close(); + } + + @Test + public void testUnexpectedException() throws LockException + { + RepairResource repairResource = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResource2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResource); + withSufficientNodesForLocking(repairResource2); + withSuccessfulLocking(repairResource, priority, metadata); + withUnexpectedLockingFailure(repairResource2, priority, metadata, NullPointerException.class); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, NullPointerException.class, repairResource, repairResource2); + verify(mockLock).close(); + } + + @Test + public void testMultipleLocks() throws LockException + { + RepairResource repairResourceDc1 = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResourceDc2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResourceDc1); + withSufficientNodesForLocking(repairResourceDc2); + + withSuccessfulLocking(repairResourceDc1, priority, metadata); + withSuccessfulLocking(repairResourceDc2, priority, metadata); + + verifyLocksAreTriedWhenGettingLock(repairLockFactory, priority, metadata, repairResourceDc1, repairResourceDc2); + verify(mockLock, never()).close(); + } + + @Test + public void testMultipleLocksNotSufficientNodes() + { + RepairResource repairResourceDc1 = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResourceDc2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResourceDc1); + withoutSufficientNodesForLocking(repairResourceDc2); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResourceDc1, repairResourceDc2); + verify(mockLock, never()).close(); + } + + @Test + public void testMultipleLocksOneFailing() throws LockException + { + RepairResource repairResourceDc1 = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResourceDc2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withSufficientNodesForLocking(repairResourceDc1); + withSuccessfulLocking(repairResourceDc1, priority, metadata); + + withSufficientNodesForLocking(repairResourceDc2); + withUnsuccessfulLocking(repairResourceDc2, priority, metadata); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResourceDc1, repairResourceDc2); + verify(mockLock).close(); + } + + @Test + public void testMultipleLocksOneHasCachedFailure() throws LockException + { + RepairResource repairResourceDc1 = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResourceDc2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withUnsuccessfulCachedLock(repairResourceDc1); + + withSufficientNodesForLocking(repairResourceDc1); + withSuccessfulLocking(repairResourceDc1, priority, metadata); + + withSufficientNodesForLocking(repairResourceDc2); + withSuccessfulLocking(repairResourceDc2, priority, metadata); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResourceDc1, repairResourceDc2); + verifyNoLockWasTried(); + verify(mockLock, never()).close(); + } + + @Test + public void testMultipleLocksTheOtherHasCachedFailure() throws LockException + { + RepairResource repairResourceDc1 = new RepairResource("DC1", "my-resource-dc1"); + RepairResource repairResourceDc2 = new RepairResource("DC2", "my-resource-dc2"); + RepairLockFactoryImpl repairLockFactory = new RepairLockFactoryImpl(); + Map metadata = Collections.singletonMap("metadatakey", "metadatavalue"); + int priority = 1; + + withUnsuccessfulCachedLock(repairResourceDc2); + + withSufficientNodesForLocking(repairResourceDc1); + withSuccessfulLocking(repairResourceDc1, priority, metadata); + + withSufficientNodesForLocking(repairResourceDc2); + withSuccessfulLocking(repairResourceDc2, priority, metadata); + + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, repairResourceDc1, repairResourceDc2); + verifyNoLockWasTried(); + verify(mockLock, never()).close(); + } + + private void verifyNoLockWasTried() throws LockException + { + verify(mockLockFactory, never()).tryLock(anyString(), anyString(), anyInt(), anyMap()); + } + + private void verifyLocksAreTriedWhenGettingLock(RepairLockFactory repairLockFactory, int priority, Map metadata, RepairResource... repairResources) throws LockException + { + repairLockFactory.getLock(mockLockFactory, Sets.newHashSet(repairResources), metadata, priority); + + for (RepairResource repairResource : repairResources) + { + verify(mockLockFactory).tryLock(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)), eq(priority), eq(metadata)); + } + } + + private void verifyExceptionIsThrownWhenGettingLock(RepairLockFactory repairLockFactory, int priority, Map metadata, RepairResource... repairResources) + { + verifyExceptionIsThrownWhenGettingLock(repairLockFactory, priority, metadata, LockException.class, repairResources); + } + + private void verifyExceptionIsThrownWhenGettingLock(RepairLockFactory repairLockFactory, int priority, Map metadata, Class exceptionType, RepairResource... repairResources) + { + assertThatExceptionOfType(exceptionType) + .isThrownBy(() -> repairLockFactory.getLock(mockLockFactory, Sets.newLinkedHashSet(Arrays.asList(repairResources)), metadata, priority)); + } + + private void withUnsuccessfulCachedLock(RepairResource repairResource) + { + when(mockLockFactory.getCachedFailure(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)))).thenReturn(Optional.of(new LockException(""))); + } + + private void withSuccessfulLocking(RepairResource repairResource, int priority, Map metadata) throws LockException + { + when(mockLockFactory.tryLock(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)), eq(priority), eq(metadata))).thenReturn(mockLock); + } + + private void withUnsuccessfulLocking(RepairResource repairResource, int priority, Map metadata) throws LockException + { + when(mockLockFactory.tryLock(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)), eq(priority), eq(metadata))).thenThrow(new LockException("")); + } + + private void withUnexpectedLockingFailure(RepairResource repairResource, int priority, Map metadata, Class exceptionClass) throws LockException + { + when(mockLockFactory.tryLock(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)), eq(priority), eq(metadata))).thenThrow(exceptionClass); + } + + private void withSufficientNodesForLocking(RepairResource repairResource) + { + when(mockLockFactory.sufficientNodesForLocking(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)))).thenReturn(true); + } + + private void withoutSufficientNodesForLocking(RepairResource repairResource) + { + when(mockLockFactory.sufficientNodesForLocking(eq(repairResource.getDataCenter()), eq(repairResource.getResourceName(LOCKS_PER_RESOURCE)))).thenReturn(false); + } +} diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java index 6fd5eb1cd..e17f5ccce 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java @@ -16,13 +16,24 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactoryBuilder; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.DummyLock; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RunPolicy; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask; +import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,6 +49,15 @@ @RunWith (MockitoJUnitRunner.Silent.class) public class TestScheduleManager { + @Mock + private CASLockFactoryBuilder myLockFactoryBuilder; + @Mock + private CASLockFactory myLockFactory; + @Mock + private DistributedNativeConnectionProvider myNativeConnectionProvider; + @Mock + private HostStates myHostStates; + @Mock private RunPolicy myRunPolicy; @@ -50,14 +70,19 @@ public class TestScheduleManager private final Collection myNodes = Arrays.asList(nodeID1, nodeID2); @Before - public void startup() + public void startup() throws LockException { myScheduler = ScheduleManagerImpl.builder() .withNodeIDList(myNodes) + .withLockFactoryBuilder(myLockFactoryBuilder) .build(); myScheduler.addRunPolicy(job -> myRunPolicy.validate(job)); when(myRunPolicy.validate(any(ScheduledJob.class))).thenReturn(-1L); + doReturn(myLockFactoryBuilder).when(myLockFactoryBuilder).withNativeConnectionProvider(myNativeConnectionProvider); + doReturn(myLockFactoryBuilder).when(myLockFactoryBuilder).withHostStates(myHostStates); + doReturn(myLockFactoryBuilder).when(myLockFactoryBuilder).withHostId(nodeID1); + doReturn(myLockFactory).when(myLockFactoryBuilder).build(); } @After @@ -66,6 +91,14 @@ public void cleanup() myScheduler.close(); } + @Test + public void testRunningNoJobs() throws LockException + { + myScheduler.run(nodeID1); + + verify(myLockFactory, never()).tryLock(any(), anyString(), anyInt(), anyMap()); + } + @Test public void testRunningOneJob() { @@ -93,17 +126,19 @@ public void testRunningJobWithFailingRunPolicy() } @Test - public void testRunningTwoTasksStoppedAfterFirstByPolicy() + public void testRunningTwoTasksStoppedAfterFirstByPolicy() throws LockException { TestJob job1 = new TestJob(ScheduledJob.Priority.LOW, 2, () -> { when(myRunPolicy.validate(any(ScheduledJob.class))).thenReturn(1L); }); myScheduler.schedule(nodeID1, job1); + when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap())).thenReturn(new DummyLock()); myScheduler.run(nodeID1); assertThat(job1.getTaskRuns()).isEqualTo(1); assertThat(myScheduler.getQueueSize(nodeID1)).isEqualTo(1); + verify(myLockFactory).tryLock(any(), anyString(), anyInt(), anyMap()); } @Test @@ -192,6 +227,57 @@ public void testGetCurrentJobStatusNoRunning() throws InterruptedException assertThat(myScheduler.getCurrentJobStatus()).isNotEqualTo("Job ID: " + jobId.toString() + ", Status: Running"); latch.countDown(); } + + @Test + public void testRunningOneJobWithThrowingLock() throws LockException + { + DummyJob job = new DummyJob(ScheduledJob.Priority.LOW); + myScheduler.schedule(nodeID1, job); + + when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap())).thenThrow(new LockException("")); + + myScheduler.run(nodeID1); + + assertThat(job.hasRun()).isFalse(); + assertThat(myScheduler.getQueueSize(nodeID1)).isEqualTo(1); + } + + @Test + public void testTwoJobsThrowingLock() throws LockException + { + DummyJob job1 = new DummyJob(ScheduledJob.Priority.LOW); + DummyJob job2 = new DummyJob(ScheduledJob.Priority.LOW); + myScheduler.schedule(nodeID1, job1); + myScheduler.schedule(nodeID1, job2); + + when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap())).thenThrow(new LockException("")); + + myScheduler.run(nodeID1); + + assertThat(job1.hasRun()).isFalse(); + assertThat(job2.hasRun()).isFalse(); + assertThat(myScheduler.getQueueSize(nodeID1)).isEqualTo(2); + verify(myLockFactory, times(2)).tryLock(any(), anyString(), anyInt(), anyMap()); + } + + @Test + public void testThreeTasksOneThrowing() throws LockException + { + TestJob job = new TestJob(ScheduledJob.Priority.LOW, 3); + myScheduler.schedule(nodeID1, job); + + when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap())) + .thenReturn(new DummyLock()) + .thenThrow(new LockException("")) + .thenReturn(new DummyLock()); + + myScheduler.run(nodeID1); + + assertThat(job.getTaskRuns()).isEqualTo(2); + assertThat(myScheduler.getQueueSize(nodeID1)).isEqualTo(1); + verify(myLockFactory, times(3)).tryLock(any(), anyString(), anyInt(), anyMap()); + } + private void waitForJobStarted(TestJob job) throws InterruptedException { while(!job.hasStarted()) diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java index 39afa9ac8..17efb2590 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Optional; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import java.util.UUID; /** * Interface for distributed lock factories. @@ -35,10 +36,12 @@ public interface LockFactory * @param priority * The priority of the lock. * @param metadata - * The metadata of the lock. + * The metadata of the lock. + * @param hostId + * The hostId. * @return The lock if able to lock the resource. */ - DistributedLock tryLock(String dataCenter, String resource, int priority, Map metadata) + DistributedLock tryLock(String dataCenter, String resource, int priority, Map metadata, UUID hostId) throws LockException; /** diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairLockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairLockFactory.java new file mode 100644 index 000000000..e763013df --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairLockFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair; + +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * A locking factory for repair jobs. + */ +public interface RepairLockFactory +{ + /** + * Take a collected lock based on the repair resources provided. + * + * @param lockFactory The base lock factory to use. + * @param repairResources The repair resources to lock. + * @param metadata The metadata to add to the locks. + * @param priority The priority of the repair resources. + * @param nodeId + * @return The collected lock for the repair resources. + * @throws LockException Thrown in case there is an issue with taking the locks for the repair resources. + */ + LockFactory.DistributedLock getLock(LockFactory lockFactory, + Set repairResources, + Map metadata, + int priority, + UUID nodeId) throws LockException; +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResource.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResource.java new file mode 100644 index 000000000..d8f297aae --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResource.java @@ -0,0 +1,104 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair; + +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A lock resource for repair. + */ +public class RepairResource +{ + private final String myDataCenter; + private final String myResourceName; + + /** + * Constructor. + * + * @param dataCenter The data center. + * @param resourceName Resource name. + */ + public RepairResource(final String dataCenter, final String resourceName) + { + myDataCenter = dataCenter; + myResourceName = checkNotNull(resourceName); + } + + /** + * Get datacenter. + * + * @return String + */ + public String getDataCenter() + { + return myDataCenter; + } + + /** + * Get resource name. + * + * @param n Resource number. + * @return String + */ + public String getResourceName(final int n) + { + return String.format("RepairResource-%s-%d", myResourceName, n); + } + + /** + * String representation. + * + * @return String + */ + @Override + public String toString() + { + return String.format("RepairResource(dc=%s,resource=%s)", myDataCenter, myResourceName); + } + + /** + * Checks equality. + * + * @param o Object to check equality with. + * @return boolean + */ + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + RepairResource that = (RepairResource) o; + return Objects.equals(myDataCenter, that.myDataCenter) && Objects.equals(myResourceName, that.myResourceName); + } + + /** + * Hash representation. + * + * @return int + */ + @Override + public int hashCode() + { + return Objects.hash(myDataCenter, myResourceName); + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResourceFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResourceFactory.java new file mode 100644 index 000000000..93f8f0e6d --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairResourceFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair; + +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicaRepairGroup; +import java.util.Set; + +/** + * Interface for generating repair resources to lock when running repair. + */ +public interface RepairResourceFactory +{ + /** + * Generate repair resources to lock based on the provided {@link ReplicaRepairGroup}. + * + * It is up to the implementation to decide which repair resources needs to be locked for the repair group. + * + * @param replicaRepairGroup The replica repair group. + * @return The repair resources that needs to be locked for the repair to run. + */ + Set getRepairResources(ReplicaRepairGroup replicaRepairGroup); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledTask.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledTask.java index 1070cb03c..f21698cee 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledTask.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledTask.java @@ -14,12 +14,17 @@ */ package com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler; +import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.ScheduledJobException; +import java.util.HashMap; import java.util.UUID; @SuppressWarnings("VisibilityModifier") public abstract class ScheduledTask { + private static final String DEFAULT_SCHEDULE_RESOURCE = "SCHEDULE_LOCK"; + protected final int myPriority; protected ScheduledTask() @@ -53,5 +58,20 @@ public void cleanup() { // Let subclasses override } + + /** + * Get the lock used by this scheduled job. + * + * @param lockFactory + * The lock factory to use. + @param nodeId + * The nodeId. + * @return The lock used by this scheduled job. + * @throws LockException Thrown when it's not possible to get the lock. + */ + public LockFactory.DistributedLock getLock(final LockFactory lockFactory, final UUID nodeId) throws LockException + { + return lockFactory.tryLock(null, DEFAULT_SCHEDULE_RESOURCE, myPriority, new HashMap<>(), nodeId); + } }