Skip to content

Commit

Permalink
Add Locks In SchedulerManager Ericsson#768
Browse files Browse the repository at this point in the history
- added locks in schedule manger
  • Loading branch information
sajid riaz committed Nov 15, 2024
1 parent eb4df73 commit 5eb3bf3
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 100 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Version 1.0.0 (Not yet Released)

* Add Locks In SchedulerManager - Issue #768
* Cassandra-Based Distributed Locks - Issue #741
* Create New Repair Type Called "VNODE" - Issue #755
* Create ReplicaRepairGroup Class for Grouping Replicas and Token Ranges - Issue #721
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory.CasLockFactoryConfig;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.impl.jmx.DistributedJmxProxyFactoryImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactoryBuilder;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.ScheduleManagerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.HostStatesImpl;
Expand Down Expand Up @@ -57,13 +60,13 @@ public class ECChronosInternals implements Closeable
private final CassandraMetrics myCassandraMetrics;
private final HostStatesImpl myHostStatesImpl;
private final TableStorageStatesImpl myTableStorageStatesImpl;
private final CASLockFactoryBuilder myLockFactoryBuilder;

public ECChronosInternals(
final Config configuration,
final DistributedNativeConnectionProvider nativeConnectionProvider,
final DistributedJmxConnectionProvider jmxConnectionProvider,
final EccNodesSync eccNodesSync
)
final EccNodesSync eccNodesSync)
{
myJmxProxyFactory = DistributedJmxProxyFactoryImpl.builder()
.withJmxConnectionProvider(jmxConnectionProvider)
Expand Down Expand Up @@ -91,10 +94,22 @@ public ECChronosInternals(
.build();

myCassandraMetrics = new CassandraMetrics(myJmxProxyFactory);

CasLockFactoryConfig casLockFactoryConfig = configuration.getLockFactory()
.getCasLockFactoryConfig();

myLockFactoryBuilder = CASLockFactory.builder()
.withNativeConnectionProvider(nativeConnectionProvider)
.withHostStates(myHostStatesImpl)
.withKeyspaceName(casLockFactoryConfig.getKeyspaceName())
.withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds())
.withConsistencySerial(casLockFactoryConfig.getConsistencySerial());

myScheduleManagerImpl = ScheduleManagerImpl.builder()
.withRunInterval(configuration.getSchedulerConfig().getFrequency().getInterval(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
.withNodeIDList(jmxConnectionProvider.getJmxConnections().keySet())
.withLockFactoryBuilder(myLockFactoryBuilder)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,13 @@ public final class CASLockFactory implements LockFactory, Closeable
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()),
builder.getConsistencyType(),
builder.getNativeConnectionProvider().getCqlSession(),
builder.getStatementDecorator());
builder.getNativeConnectionProvider().getCqlSession());

myHostStates = builder.getHostStates();

verifySchemasExists();

UUID hostId = builder.getNode().getHostId();
UUID hostId = builder.getHostId();

if (hostId == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.locks;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.impl.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;
import java.util.UUID;

/**
* Represents a container for builder configurations and state for the CASLockFactory.
Expand All @@ -32,11 +31,10 @@ public class CASLockFactoryBuilder

private DistributedNativeConnectionProvider myNativeConnectionProvider;
private HostStates myHostStates;
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencyType = DEFAULT_CONSISTENCY_SERIAL;
private Node myNode;
private UUID myHostId;

public final CASLockFactoryBuilder withNativeConnectionProvider(final DistributedNativeConnectionProvider nativeConnectionProvider)
{
Expand All @@ -50,12 +48,6 @@ public final CASLockFactoryBuilder withHostStates(final HostStates hostStates)
return this;
}

public final CASLockFactoryBuilder withStatementDecorator(final StatementDecorator statementDecorator)
{
myStatementDecorator = statementDecorator;
return this;
}

public final CASLockFactoryBuilder withKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
Expand All @@ -74,9 +66,9 @@ public final CASLockFactoryBuilder withConsistencySerial(final ConsistencyType c
return this;
}

public final CASLockFactoryBuilder withNode(final Node node)
public final CASLockFactoryBuilder withHostId(final UUID hostId)
{
myNode = node;
myHostId = hostId;
return this;
}

Expand All @@ -92,11 +84,6 @@ public final CASLockFactory build()
throw new IllegalArgumentException("Host states cannot be null");
}

if (myStatementDecorator == null)
{
throw new IllegalArgumentException("Statement decorator cannot be null");
}

return new CASLockFactory(this);
}

Expand All @@ -110,11 +97,6 @@ public final HostStates getHostStates()
return myHostStates;
}

public final StatementDecorator getStatementDecorator()
{
return myStatementDecorator;
}

public final String getKeyspaceName()
{
return myKeyspaceName;
Expand All @@ -130,8 +112,8 @@ public final ConsistencyType getConsistencyType()
return myConsistencyType;
}

public final Node getNode()
public final UUID getHostId()
{
return myNode;
return myHostId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;

/**
* Represents a container for builder configurations and state for the CASLockStatement.
Expand All @@ -33,21 +32,18 @@ public class CASLockProperties
private final ScheduledExecutorService myExecutor;
private final ConsistencyLevel mySerialConsistencyLevel;
private final CqlSession mySession;
private final StatementDecorator myStatementDecorator;

CASLockProperties(final ConnectionType connectionType,
final String keyspaceName,
final ScheduledExecutorService executor,
final ConsistencyType consistencyType,
final CqlSession session,
final StatementDecorator statementDecorator)
final CqlSession session)
{
myConnectionType = connectionType;
myKeyspaceName = keyspaceName;
myExecutor = executor;
mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType);
mySession = session;
myStatementDecorator = statementDecorator;
}

public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType)
Expand Down Expand Up @@ -88,11 +84,6 @@ public final CqlSession getSession()
return mySession;
}

public final StatementDecorator getStatementDecorator()
{
return myStatementDecorator;
}

public final boolean isDatacenterAwareAgentType()

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,11 @@ public CASLockStatement(

public final ResultSet execute(final String dataCenter, final BoundStatement statement)
{
Statement executeStatement;

if (dataCenter != null && myCasLockProperties.isDatacenterAwareAgentType())
{
executeStatement = new DataCenterAwareStatement(statement, dataCenter);
}
else
{
executeStatement = statement;
}

return myCasLockProperties.getSession()
.execute(myCasLockProperties
.getStatementDecorator()
.apply(executeStatement));
Statement executeStatement = (dataCenter != null
&& myCasLockProperties.isDatacenterAwareAgentType())
? new DataCenterAwareStatement(statement, dataCenter)
: statement;
return myCasLockProperties.getSession().execute(executeStatement);
}

private SimpleStatement insertLockStatement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler;

import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.CASLockFactoryBuilder;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RunPolicy;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob;
Expand Down Expand Up @@ -54,13 +56,15 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable
private final Set<RunPolicy> myRunPolicies = Sets.newConcurrentHashSet();
private final Map<UUID, ScheduledFuture<?>> myRunFuture = new ConcurrentHashMap<>();
private final Map<UUID, JobRunTask> myRunTasks = new ConcurrentHashMap<>();
private final CASLockFactoryBuilder myLockFactoryBuilder;

private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("TaskExecutor-%d").build());

private ScheduleManagerImpl(final Builder builder)
{
myNodeIDList = builder.myNodeIDList;
myLockFactoryBuilder = builder.myLockFactoryBuilder;
createScheduleFutureForNodeIDList(builder);
}

Expand Down Expand Up @@ -267,9 +271,11 @@ private boolean tryRunTask(
final ScheduledTask task)
{
LOG.debug("Trying to run task {} in node {}", task, nodeID);
// TODO need to implement lock mechanism
try
LOG.debug("Trying to acquire lock for {}", task);
LockFactory lockFactory = myLockFactoryBuilder.withHostId(nodeID).build();
try (LockFactory.DistributedLock lock = task.getLock(lockFactory))
{
LOG.debug("Lock has been acquired on node with Id {} with lock {}", nodeID, lock);
boolean successful = runTask(task);
job.postExecute(successful);
return true;
Expand Down Expand Up @@ -317,6 +323,7 @@ public static Builder builder()
public static class Builder
{
private Collection<UUID> myNodeIDList;
private CASLockFactoryBuilder myLockFactoryBuilder;
private long myRunIntervalInMs = DEFAULT_RUN_DELAY_IN_MS;

/**
Expand Down Expand Up @@ -344,6 +351,19 @@ public Builder withNodeIDList(final Collection<UUID> nodeIDList)
return this;
}

/**
* Sets the {@link CASLockFactoryBuilder} to be used by this builder.
*
* @param lockFactoryBuilder The {@link CASLockFactoryBuilder} to be used.
* Must not be {@code null}.
* @return The current {@code Builder} instance, allowing for method chaining.
*/
public final Builder withLockFactoryBuilder(final CASLockFactoryBuilder lockFactoryBuilder)
{
myLockFactoryBuilder = lockFactoryBuilder;
return this;
}

/**
* Build SchedulerManager with the provided configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.connection;
package com.ericsson.bss.cassandra.ecchronos.core.impl.locks;

import com.datastax.oss.driver.api.core.cql.Statement;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;

public interface StatementDecorator
public class DummyLock implements LockFactory.DistributedLock
{
/**
* Decorates a statement before sending it over to the server.
* @param statement The original statement
* @return The decorated statement
*/
Statement apply(Statement statement);
public volatile boolean closed = false;

@Override
public void close() {
closed = true;
}
}
Loading

0 comments on commit 5eb3bf3

Please sign in to comment.