Skip to content

Commit

Permalink
Add Locks In SchedulerManager Ericsson#768
Browse files Browse the repository at this point in the history
- added locks in schedule manger
  • Loading branch information
sajid riaz committed Nov 18, 2024
1 parent eb4df73 commit 78c64f3
Show file tree
Hide file tree
Showing 20 changed files with 864 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Version 1.0.0 (Not yet Released)

* Add Locks In SchedulerManager - Issue #768
* Cassandra-Based Distributed Locks - Issue #741
* Create New Repair Type Called "VNODE" - Issue #755
* Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.CasLockFactoryConfig;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
Expand Down Expand Up @@ -57,13 +59,13 @@ public class ECChronosInternals implements Closeable
private final CassandraMetrics myCassandraMetrics;
private final HostStatesImpl myHostStatesImpl;
private final TableStorageStatesImpl myTableStorageStatesImpl;
private final CASLockFactory myLockFactory;

public ECChronosInternals(
final Config configuration,
final DistributedNativeConnectionProvider nativeConnectionProvider,
final DistributedJmxConnectionProvider jmxConnectionProvider,
final EccNodesSync eccNodesSync
)
final EccNodesSync eccNodesSync)
{
myJmxProxyFactory = DistributedJmxProxyFactoryImpl.builder()
.withJmxConnectionProvider(jmxConnectionProvider)
Expand Down Expand Up @@ -91,10 +93,23 @@ public ECChronosInternals(
.build();

myCassandraMetrics = new CassandraMetrics(myJmxProxyFactory);

CasLockFactoryConfig casLockFactoryConfig = configuration.getLockFactory()
.getCasLockFactoryConfig();

myLockFactory = CASLockFactory.builder()
.withNativeConnectionProvider(nativeConnectionProvider)
.withHostStates(myHostStatesImpl)
.withKeyspaceName(casLockFactoryConfig.getKeyspaceName())
.withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds())
.withConsistencySerial(casLockFactoryConfig.getConsistencySerial())
.build();

myScheduleManagerImpl = ScheduleManagerImpl.builder()
.withRunInterval(configuration.getSchedulerConfig().getFrequency().getInterval(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.withNodeIDList(jmxConnectionProvider.getJmxConnections().keySet())
.withLockFactory(myLockFactory)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public final class CASLockFactory implements LockFactory, Closeable
private static final int REFRESH_INTERVAL_RATIO = 10;
private static final int DEFAULT_LOCK_TIME_IN_SECONDS = 600;

private final UUID myUuid;
private final HostStates myHostStates;
private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;

Expand All @@ -90,22 +89,12 @@ public final class CASLockFactory implements LockFactory, Closeable
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()),
builder.getConsistencyType(),
builder.getNativeConnectionProvider().getCqlSession(),
builder.getStatementDecorator());
builder.getNativeConnectionProvider().getCqlSession());

myHostStates = builder.getHostStates();

verifySchemasExists();

UUID hostId = builder.getNode().getHostId();

if (hostId == null)
{
hostId = UUID.randomUUID();
LOG.warn("Unable to determine local nodes host id, using {} instead", hostId);
}

myUuid = hostId;
myCasLockFactoryCacheContext = buildCasLockFactoryCacheContext(builder.getCacheExpiryTimeInSecond());

myCasLockStatement = new CASLockStatement(myCasLockProperties, myCasLockFactoryCacheContext);
Expand Down Expand Up @@ -144,11 +133,11 @@ private int getDefaultTimeToLiveFromLockTable()
public DistributedLock tryLock(final String dataCenter,
final String resource,
final int priority,
final Map<String, String> metadata)
throws LockException
final Map<String, String> metadata,
final UUID nodeId) throws LockException
{
return myCasLockFactoryCacheContext.getLockCache()
.getLock(dataCenter, resource, priority, metadata);
.getLock(dataCenter, resource, priority, metadata, nodeId);
}

@Override
Expand Down Expand Up @@ -214,12 +203,6 @@ public void close()
}
}

@VisibleForTesting
UUID getHostId()
{
return myUuid;
}

@VisibleForTesting
CASLockFactoryCacheContext getCasLockFactoryCacheContext()
{
Expand All @@ -246,7 +229,8 @@ public static CASLockFactoryBuilder builder()
private DistributedLock doTryLock(final String dataCenter,
final String resource,
final int priority,
final Map<String, String> metadata) throws LockException
final Map<String, String> metadata,
final UUID nodeId) throws LockException
{
LOG.trace("Trying lock for {} - {}", dataCenter, resource);

Expand All @@ -255,7 +239,7 @@ private DistributedLock doTryLock(final String dataCenter,
LOG.warn("Not sufficient nodes to lock resource {} in datacenter {}", resource, dataCenter);
throw new LockException("Not sufficient nodes to lock");
}
CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, myUuid, myCasLockStatement); // NOSONAR
CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, nodeId, myCasLockStatement); // NOSONAR
if (casLock.lock())
{
return casLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.locks;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;

Expand All @@ -32,11 +30,9 @@ public class CASLockFactoryBuilder

private DistributedNativeConnectionProvider myNativeConnectionProvider;
private HostStates myHostStates;
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencyType = DEFAULT_CONSISTENCY_SERIAL;
private Node myNode;

public final CASLockFactoryBuilder withNativeConnectionProvider(final DistributedNativeConnectionProvider nativeConnectionProvider)
{
Expand All @@ -50,12 +46,6 @@ public final CASLockFactoryBuilder withHostStates(final HostStates hostStates)
return this;
}

public final CASLockFactoryBuilder withStatementDecorator(final StatementDecorator statementDecorator)
{
myStatementDecorator = statementDecorator;
return this;
}

public final CASLockFactoryBuilder withKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
Expand All @@ -74,12 +64,6 @@ public final CASLockFactoryBuilder withConsistencySerial(final ConsistencyType c
return this;
}

public final CASLockFactoryBuilder withNode(final Node node)
{
myNode = node;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand All @@ -92,11 +76,6 @@ public final CASLockFactory build()
throw new IllegalArgumentException("Host states cannot be null");
}

if (myStatementDecorator == null)
{
throw new IllegalArgumentException("Statement decorator cannot be null");
}

return new CASLockFactory(this);
}

Expand All @@ -110,11 +89,6 @@ public final HostStates getHostStates()
return myHostStates;
}

public final StatementDecorator getStatementDecorator()
{
return myStatementDecorator;
}

public final String getKeyspaceName()
{
return myKeyspaceName;
Expand All @@ -129,9 +103,4 @@ public final ConsistencyType getConsistencyType()
{
return myConsistencyType;
}

public final Node getNode()
{
return myNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;

/**
* Represents a container for builder configurations and state for the CASLockStatement.
Expand All @@ -33,21 +32,18 @@ public class CASLockProperties
private final ScheduledExecutorService myExecutor;
private final ConsistencyLevel mySerialConsistencyLevel;
private final CqlSession mySession;
private final StatementDecorator myStatementDecorator;

CASLockProperties(final ConnectionType connectionType,
final String keyspaceName,
final ScheduledExecutorService executor,
final ConsistencyType consistencyType,
final CqlSession session,
final StatementDecorator statementDecorator)
final CqlSession session)
{
myConnectionType = connectionType;
myKeyspaceName = keyspaceName;
myExecutor = executor;
mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType);
mySession = session;
myStatementDecorator = statementDecorator;
}

public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType)
Expand Down Expand Up @@ -88,11 +84,6 @@ public final CqlSession getSession()
return mySession;
}

public final StatementDecorator getStatementDecorator()
{
return myStatementDecorator;
}

public final boolean isDatacenterAwareAgentType()

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,11 @@ public CASLockStatement(

public final ResultSet execute(final String dataCenter, final BoundStatement statement)
{
Statement executeStatement;

if (dataCenter != null && myCasLockProperties.isDatacenterAwareAgentType())
{
executeStatement = new DataCenterAwareStatement(statement, dataCenter);
}
else
{
executeStatement = statement;
}

return myCasLockProperties.getSession()
.execute(myCasLockProperties
.getStatementDecorator()
.apply(executeStatement));
Statement executeStatement = (dataCenter != null
&& myCasLockProperties.isDatacenterAwareAgentType())
? new DataCenterAwareStatement(statement, dataCenter)
: statement;
return myCasLockProperties.getSession().execute(executeStatement);
}

private SimpleStatement insertLockStatement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,7 +59,8 @@ public Optional<LockException> getCachedFailure(final String dataCenter, final S
public DistributedLock getLock(final String dataCenter,
final String resource,
final int priority,
final Map<String, String> metadata)
final Map<String, String> metadata,
final UUID hostId)
throws LockException
{
LockKey lockKey = new LockKey(dataCenter, resource);
Expand All @@ -72,7 +74,7 @@ public DistributedLock getLock(final String dataCenter,

try
{
return myLockSupplier.getLock(dataCenter, resource, priority, metadata);
return myLockSupplier.getLock(dataCenter, resource, priority, metadata, hostId);
}
catch (LockException e)
{
Expand All @@ -95,7 +97,7 @@ private Optional<LockException> getCachedFailure(final LockKey lockKey)
@FunctionalInterface
public interface LockSupplier
{
DistributedLock getLock(String dataCenter, String resource, int priority, Map<String, String> metadata)
DistributedLock getLock(String dataCenter, String resource, int priority, Map<String, String> metadata, UUID hostId)
throws LockException;
}

Expand Down
Loading

0 comments on commit 78c64f3

Please sign in to comment.