Skip to content

Commit

Permalink
Merge branch 'master' into issue264
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielwEriksson authored Mar 20, 2024
2 parents afb56a4 + 6b42872 commit 248fd4c
Show file tree
Hide file tree
Showing 19 changed files with 892 additions and 483 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ jobs:
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Cache local Maven repository
uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2
uses: actions/cache@13aacd865c20de90d75de3b17ebe84f7a17d57d2 # v4.0.0
with:
path: ~/.m2/repository
key: build-${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
build-${{ runner.os }}-maven-
- name: Set up JDK
uses: actions/setup-java@0ab4596768b603586c0de567f2430c30f5b0d2b0 # v3.13.0
uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0
with:
java-version: 11
distribution: 'temurin'
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Set up Python 3.8
uses: actions/setup-python@b64ffcaf5b410884ad320a9cfac8866006a109aa # v4.8.0
uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: 3.8
- name: install dependencies
Expand All @@ -75,9 +75,10 @@ jobs:
path: ${{ matrix.artifacts_dir }}/cassandra*.log
if-no-files-found: 'ignore'
- name: Upload coverage to Codecov
uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4
uses: codecov/codecov-action@54bcd8715eee62d40e33596ef5e8f0f48dbbccab # v4.1.0
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
files: >
./rest/target/site/jacoco/jacoco.xml,
./core.osgi/target/site/jacoco/jacoco.xml,
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ecChronos

[![codecov](https://codecov.io/gh/ericsson/ecchronos/branch/master/graph/badge.svg)](https://codecov.io/gh/ericsson/ecchronos/tree/master)
[![maven central](https://img.shields.io/maven-central/v/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary.svg?label=maven%20central&versionPrefix=4.0)](https://central.sonatype.com/artifact/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary/versions)
[![maven central](https://img.shields.io/maven-central/v/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary.svg?label=maven%20central&versionPrefix=5.0)](https://central.sonatype.com/artifact/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary/versions)

ecChronos is a decentralized scheduling framework primarily focused on performing automatic repairs in Apache Cassandra.

Expand All @@ -20,7 +20,7 @@ More details on the underlying infrastructure can be found in [ARCHITECTURE.md](

More information on the REST interface of ecChronos is described in [REST.md](docs/REST.md).

### Prerequisites
## Prerequisites

* JDK 11
* Python 3.8
Expand Down Expand Up @@ -53,6 +53,7 @@ Please read [CONTRIBUTING.md](docs/CONTRIBUTING.md) for details on our code of c
## Versioning

We try to adhere to [SemVer](http://semver.org) for versioning.

* Anything requiring changes to configuration or plugin APIs should be released in a new major version.
* Anything extending configuration or plugins in a backwards compatible way should be released in a new minor version.
* Bug fixes should be made for the first known version and merged forward.
Expand Down
44 changes: 22 additions & 22 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,25 +189,25 @@
</dependency>
</dependencies>
<build>
<plugins>
<!-- Exec Maven Plugin to run YamlToMarkdown during package -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>yaml-to-markdown</id>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.ericsson.bss.cassandra.ecchronos.application.EccYamlToMarkdownConverter</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
<plugins>
<!-- Exec Maven Plugin to run YamlToMarkdown during package -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>yaml-to-markdown</id>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.ericsson.bss.cassandra.ecchronos.application.EccYamlToMarkdownConverter</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
4 changes: 2 additions & 2 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ lock_factory:
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## if you use remoteRouting: false and LOCAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
Expand Down Expand Up @@ -64,6 +66,7 @@ public final synchronized void activate(final Configuration configuration)
.withHostStates(myHostStates)
.withStatementDecorator(myStatementDecorator)
.withKeyspaceName(configuration.keyspaceName())
.withConsistencySerial(configuration.consistencySerial())
.build();
}

Expand All @@ -84,7 +87,9 @@ public final DistributedLock tryLock(final String dataCenter,
}

@Override
public final Map<String, String> getLockMetadata(final String dataCenter, final String resource)
public final Map<String, String> getLockMetadata(
final String dataCenter,
final String resource) throws LockException
{
return myDelegateLockFactory.getLockMetadata(dataCenter, resource);
}
Expand All @@ -101,5 +106,9 @@ public final boolean sufficientNodesForLocking(final String dataCenter, final St
@AttributeDefinition(name = "The lock factory keyspace to use",
description = "The name of the keyspace containing the lock factory tables")
String keyspaceName() default DEFAULT_KEYSPACE_NAME;

@AttributeDefinition(name = "The serial consistency level to use",
description = "The type of serial consistency level to use")
ConsistencyType consistencySerial() default ConsistencyType.DEFAULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;

/**
* Represents a container for builder configurations and state for the CASLock.
* This class is used to decouple builder fields from CASLockFactory to avoid excessive field count.
*/
class CASLock implements DistributedLock, Runnable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLock.class);

private final String myDataCenter;
private final String myResource;
private final int myPriority;
private final Map<String, String> myMetadata;

private final AtomicReference<ScheduledFuture<?>> myUpdateFuture = new AtomicReference<>();

private final AtomicInteger myFailedUpdateAttempts = new AtomicInteger();

private final int myLocallyHighestPriority;
private final int globalHighPriority;

private final UUID myUuid;

private final CASLockStatement myCasLockStatement;

CASLock(final String dataCenter,
final String resource,
final int priority,
final Map<String, String> metadata,
final UUID uuid,
final CASLockStatement casLockStatement)
{
myDataCenter = dataCenter;
myResource = resource;
myPriority = priority;
myMetadata = metadata;
myUuid = uuid;
myCasLockStatement = casLockStatement;

List<NodePriority> nodePriorities = computePriorities();

myLocallyHighestPriority = nodePriorities.stream()
.filter(n -> n.getUuid().equals(myUuid))
.map(NodePriority::getPriority)
.findFirst()
.orElse(myPriority);
globalHighPriority = nodePriorities.stream()
.filter(n -> !n.getUuid().equals(myUuid))
.map(NodePriority::getPriority)
.max(Integer::compare)
.orElse(myPriority);
}

public boolean lock()
{
if (compete())
{
LOG.trace("Trying to acquire lock for resource {}", myResource);
if (tryLock())
{
ScheduledExecutorService executor = myCasLockStatement.getCasLockProperties().getExecutor();
LOG.trace("Lock for resource {} acquired", myResource);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(this,
myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(),
myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(), TimeUnit.SECONDS);
myUpdateFuture.set(future);

return true;
}
}

return false;
}

@Override
public void run()
{
try
{
updateLock();
myFailedUpdateAttempts.set(0);
}
catch (LockException e)
{
int failedAttempts = myFailedUpdateAttempts.incrementAndGet();

if (failedAttempts >= myCasLockStatement.getCasLockFactoryCacheContext().getFailedLockRetryAttempts())
{
LOG.error("Unable to re-lock resource '{}' after {} failed attempts", myResource, failedAttempts);
}
else
{
LOG.warn("Unable to re-lock resource '{}', {} failed attempts", myResource, failedAttempts, e);
}
}
}

@Override
public void close()
{
ScheduledFuture<?> future = myUpdateFuture.get();
if (future != null)
{
future.cancel(true);
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getRemoveLockStatement().bind(myResource, myUuid));

if (myLocallyHighestPriority <= myPriority)
{
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getRemoveLockPriorityStatement().bind(myResource, myUuid));
}
else
{
LOG.debug("Locally highest priority ({}) is higher than current ({}), will not remove",
myLocallyHighestPriority, myPriority);
}
}
}

private void updateLock() throws LockException
{
ResultSet resultSet = myCasLockStatement.execute(myDataCenter,
myCasLockStatement.getUpdateLockStatement().bind(myUuid, myMetadata, myResource, myUuid));

if (!resultSet.wasApplied())
{
throw new LockException("CAS query failed");
}
}

private boolean compete()
{
if (myLocallyHighestPriority <= myPriority)
{
insertPriority();
}

LOG.trace("Highest priority for resource {}: {}", myResource, globalHighPriority);
return myPriority >= globalHighPriority;
}

private void insertPriority()
{
myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getCompeteStatement().bind(myResource, myUuid, myPriority));
}

private boolean tryLock()
{
return myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getLockStatement().bind(myResource, myUuid, myMetadata)).wasApplied();
}

private List<NodePriority> computePriorities()
{
List<NodePriority> nodePriorities = new ArrayList<>();

ResultSet resultSet = myCasLockStatement.execute(
myDataCenter,
myCasLockStatement.getGetPriorityStatement().bind(myResource));

for (Row row : resultSet)
{
int priority = row.getInt(CASLockStatement.COLUMN_PRIORITY);
UUID hostId = row.getUuid(CASLockStatement.COLUMN_NODE);

nodePriorities.add(new NodePriority(hostId, priority));
}

return nodePriorities;
}

int getFailedAttempts()
{
return myFailedUpdateAttempts.get();
}

}
Loading

0 comments on commit 248fd4c

Please sign in to comment.