Skip to content

Commit

Permalink
Refactor CASLockFactory to eliminate "PMD.GodClass" and Uncouple sub …
Browse files Browse the repository at this point in the history
…classess (#648)
  • Loading branch information
VictorCavichioli authored Mar 13, 2024
1 parent 0e1a373 commit 6b42872
Show file tree
Hide file tree
Showing 16 changed files with 861 additions and 452 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ dependency-reduced-pom.xml
*.iml
.coverage
*htmlcov
statistics/
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ More details on the underlying infrastructure can be found in [ARCHITECTURE.md](

More information on the REST interface of ecChronos is described in [REST.md](docs/REST.md).

### Prerequisites
## Prerequisites

* JDK 11
* Python 3.8
Expand Down Expand Up @@ -53,6 +53,7 @@ Please read [CONTRIBUTING.md](docs/CONTRIBUTING.md) for details on our code of c
## Versioning

We try to adhere to [SemVer](http://semver.org) for versioning.

* Anything requiring changes to configuration or plugin APIs should be released in a new major version.
* Anything extending configuration or plugins in a backwards compatible way should be released in a new minor version.
* Bug fixes should be made for the first known version and merged forward.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
Expand Down Expand Up @@ -64,6 +66,7 @@ public final synchronized void activate(final Configuration configuration)
.withHostStates(myHostStates)
.withStatementDecorator(myStatementDecorator)
.withKeyspaceName(configuration.keyspaceName())
.withConsistencySerial(configuration.consistencySerial())
.build();
}

Expand All @@ -84,7 +87,9 @@ public final DistributedLock tryLock(final String dataCenter,
}

@Override
public final Map<String, String> getLockMetadata(final String dataCenter, final String resource)
public final Map<String, String> getLockMetadata(
final String dataCenter,
final String resource) throws LockException
{
return myDelegateLockFactory.getLockMetadata(dataCenter, resource);
}
Expand All @@ -101,5 +106,9 @@ public final boolean sufficientNodesForLocking(final String dataCenter, final St
@AttributeDefinition(name = "The lock factory keyspace to use",
description = "The name of the keyspace containing the lock factory tables")
String keyspaceName() default DEFAULT_KEYSPACE_NAME;

@AttributeDefinition(name = "The serial consistency level to use",
description = "The type of serial consistency level to use")
ConsistencyType consistencySerial() default ConsistencyType.DEFAULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;

/**
* Represents a container for builder configurations and state for the CASLock.
* This class is used to decouple builder fields from CASLockFactory to avoid excessive field count.
*/
class CASLock implements DistributedLock, Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLock.class);

private final String myDataCenter;
private final String myResource;
private final int myPriority;
private final Map<String, String> myMetadata;

private final AtomicReference<ScheduledFuture<?>> myUpdateFuture = new AtomicReference<>();

private final AtomicInteger myFailedUpdateAttempts = new AtomicInteger();

private final int myLocallyHighestPriority;
private final int globalHighPriority;

private final UUID myUuid;

private final CASLockStatement myCasLockStatement;

CASLock(final String dataCenter,
final String resource,
final int priority,
final Map<String, String> metadata,
final UUID uuid,
final CASLockStatement casLockStatement)
{
myDataCenter = dataCenter;
myResource = resource;
myPriority = priority;
myMetadata = metadata;
myUuid = uuid;
myCasLockStatement = casLockStatement;

List<NodePriority> nodePriorities = computePriorities();

myLocallyHighestPriority = nodePriorities.stream()
.filter(n -> n.getUuid().equals(myUuid))
.map(NodePriority::getPriority)
.findFirst()
.orElse(myPriority);
globalHighPriority = nodePriorities.stream()
.filter(n -> !n.getUuid().equals(myUuid))
.map(NodePriority::getPriority)
.max(Integer::compare)
.orElse(myPriority);
}

public boolean lock()
{
if (compete())
{
LOG.trace("Trying to acquire lock for resource {}", myResource);
if (tryLock())
{
ScheduledExecutorService executor = myCasLockStatement.getCasLockProperties().getExecutor();
LOG.trace("Lock for resource {} acquired", myResource);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(this,
myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(),
myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(), TimeUnit.SECONDS);
myUpdateFuture.set(future);

return true;
}
}

return false;
}

@Override
public void run()
{
try
{
updateLock();
myFailedUpdateAttempts.set(0);
}
catch (LockException e)
{
int failedAttempts = myFailedUpdateAttempts.incrementAndGet();

if (failedAttempts >= myCasLockStatement.getCasLockFactoryCacheContext().getFailedLockRetryAttempts())
{
LOG.error("Unable to re-lock resource '{}' after {} failed attempts", myResource, failedAttempts);
}
else
{
LOG.warn("Unable to re-lock resource '{}', {} failed attempts", myResource, failedAttempts, e);
}
}
}

@Override
public void close()
{
ScheduledFuture<?> future = myUpdateFuture.get();
if (future != null)
{
future.cancel(true);
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getRemoveLockStatement().bind(myResource, myUuid));

if (myLocallyHighestPriority <= myPriority)
{
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getRemoveLockPriorityStatement().bind(myResource, myUuid));
}
else
{
LOG.debug("Locally highest priority ({}) is higher than current ({}), will not remove",
myLocallyHighestPriority, myPriority);
}
}
}

private void updateLock() throws LockException
{
ResultSet resultSet = myCasLockStatement.execute(myDataCenter,
myCasLockStatement.getUpdateLockStatement().bind(myUuid, myMetadata, myResource, myUuid));

if (!resultSet.wasApplied())
{
throw new LockException("CAS query failed");
}
}

private boolean compete()
{
if (myLocallyHighestPriority <= myPriority)
{
insertPriority();
}

LOG.trace("Highest priority for resource {}: {}", myResource, globalHighPriority);
return myPriority >= globalHighPriority;
}

private void insertPriority()
{
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getCompeteStatement().bind(myResource, myUuid, myPriority));
}

private boolean tryLock()
{
return myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getLockStatement().bind(myResource, myUuid, myMetadata)).wasApplied();
}

private List<NodePriority> computePriorities()
{
List<NodePriority> nodePriorities = new ArrayList<>();

ResultSet resultSet = myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getGetPriorityStatement().bind(myResource));

for (Row row : resultSet)
{
int priority = row.getInt(CASLockStatement.COLUMN_PRIORITY);
UUID hostId = row.getUuid(CASLockStatement.COLUMN_NODE);

nodePriorities.add(new NodePriority(hostId, priority));
}

return nodePriorities;
}

int getFailedAttempts()
{
return myFailedUpdateAttempts.get();
}

}
Loading

0 comments on commit 6b42872

Please sign in to comment.