diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml
index c41b0f639..320e5271c 100644
--- a/.github/workflows/actions.yml
+++ b/.github/workflows/actions.yml
@@ -45,20 +45,20 @@ jobs:
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Cache local Maven repository
- uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2
+ uses: actions/cache@13aacd865c20de90d75de3b17ebe84f7a17d57d2 # v4.0.0
with:
path: ~/.m2/repository
key: build-${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
build-${{ runner.os }}-maven-
- name: Set up JDK
- uses: actions/setup-java@0ab4596768b603586c0de567f2430c30f5b0d2b0 # v3.13.0
+ uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0
with:
java-version: 11
distribution: 'temurin'
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: Set up Python 3.8
- uses: actions/setup-python@b64ffcaf5b410884ad320a9cfac8866006a109aa # v4.8.0
+ uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: 3.8
- name: install dependencies
@@ -75,9 +75,10 @@ jobs:
path: ${{ matrix.artifacts_dir }}/cassandra*.log
if-no-files-found: 'ignore'
- name: Upload coverage to Codecov
- uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4
+ uses: codecov/codecov-action@54bcd8715eee62d40e33596ef5e8f0f48dbbccab # v4.1.0
with:
token: ${{ secrets.CODECOV_TOKEN }}
+ fail_ci_if_error: false
files: >
./rest/target/site/jacoco/jacoco.xml,
./core.osgi/target/site/jacoco/jacoco.xml,
diff --git a/README.md b/README.md
index bc79be2b6..2824caf1a 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# ecChronos
[![codecov](https://codecov.io/gh/ericsson/ecchronos/branch/master/graph/badge.svg)](https://codecov.io/gh/ericsson/ecchronos/tree/master)
-[![maven central](https://img.shields.io/maven-central/v/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary.svg?label=maven%20central&versionPrefix=4.0)](https://central.sonatype.com/artifact/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary/versions)
+[![maven central](https://img.shields.io/maven-central/v/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary.svg?label=maven%20central&versionPrefix=5.0)](https://central.sonatype.com/artifact/com.ericsson.bss.cassandra.ecchronos/ecchronos-binary/versions)
ecChronos is a decentralized scheduling framework primarily focused on performing automatic repairs in Apache Cassandra.
@@ -20,7 +20,7 @@ More details on the underlying infrastructure can be found in [ARCHITECTURE.md](
More information on the REST interface of ecChronos is described in [REST.md](docs/REST.md).
-### Prerequisites
+## Prerequisites
* JDK 11
* Python 3.8
@@ -53,6 +53,7 @@ Please read [CONTRIBUTING.md](docs/CONTRIBUTING.md) for details on our code of c
## Versioning
We try to adhere to [SemVer](http://semver.org) for versioning.
+
* Anything requiring changes to configuration or plugin APIs should be released in a new major version.
* Anything extending configuration or plugins in a backwards compatible way should be released in a new minor version.
* Bug fixes should be made for the first known version and merged forward.
diff --git a/application/pom.xml b/application/pom.xml
index 5e732f6bd..2a73b7aba 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -189,25 +189,25 @@
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
-
-
- yaml-to-markdown
- package
-
- java
-
-
- com.ericsson.bss.cassandra.ecchronos.application.EccYamlToMarkdownConverter
-
-
-
-
-
-
-
-
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+
+ yaml-to-markdown
+ package
+
+ java
+
+
+ com.ericsson.bss.cassandra.ecchronos.application.EccYamlToMarkdownConverter
+
+
+
+
+
+
+
+
diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml
index d84b5a5fe..242a52921 100644
--- a/application/src/main/resources/ecc.yml
+++ b/application/src/main/resources/ecc.yml
@@ -282,9 +282,9 @@ lock_factory:
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
- ## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
+ ## "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
- ## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
+ ## if you use remoteRouting: false and LOCAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
diff --git a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/CASLockFactoryService.java b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/CASLockFactoryService.java
index 0655e5ff6..025991f89 100644
--- a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/CASLockFactoryService.java
+++ b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/CASLockFactoryService.java
@@ -21,6 +21,8 @@
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
+
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -64,6 +66,7 @@ public final synchronized void activate(final Configuration configuration)
.withHostStates(myHostStates)
.withStatementDecorator(myStatementDecorator)
.withKeyspaceName(configuration.keyspaceName())
+ .withConsistencySerial(configuration.consistencySerial())
.build();
}
@@ -84,7 +87,9 @@ public final DistributedLock tryLock(final String dataCenter,
}
@Override
- public final Map getLockMetadata(final String dataCenter, final String resource)
+ public final Map getLockMetadata(
+ final String dataCenter,
+ final String resource) throws LockException
{
return myDelegateLockFactory.getLockMetadata(dataCenter, resource);
}
@@ -101,5 +106,9 @@ public final boolean sufficientNodesForLocking(final String dataCenter, final St
@AttributeDefinition(name = "The lock factory keyspace to use",
description = "The name of the keyspace containing the lock factory tables")
String keyspaceName() default DEFAULT_KEYSPACE_NAME;
+
+ @AttributeDefinition(name = "The serial consistency level to use",
+ description = "The type of serial consistency level to use")
+ ConsistencyType consistencySerial() default ConsistencyType.DEFAULT;
}
}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLock.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLock.java
new file mode 100644
index 000000000..8eaba3b09
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLock.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
+import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;
+
+/**
+ * Represents a container for builder configurations and state for the CASLock.
+ * This class is used to decouple builder fields from CASLockFactory to avoid excessive field count.
+ */
+class CASLock implements DistributedLock, Runnable
+{
+ private static final Logger LOG = LoggerFactory.getLogger(CASLock.class);
+
+ private final String myDataCenter;
+ private final String myResource;
+ private final int myPriority;
+ private final Map myMetadata;
+
+ private final AtomicReference> myUpdateFuture = new AtomicReference<>();
+
+ private final AtomicInteger myFailedUpdateAttempts = new AtomicInteger();
+
+ private final int myLocallyHighestPriority;
+ private final int globalHighPriority;
+
+ private final UUID myUuid;
+
+ private final CASLockStatement myCasLockStatement;
+
+ CASLock(final String dataCenter,
+ final String resource,
+ final int priority,
+ final Map metadata,
+ final UUID uuid,
+ final CASLockStatement casLockStatement)
+ {
+ myDataCenter = dataCenter;
+ myResource = resource;
+ myPriority = priority;
+ myMetadata = metadata;
+ myUuid = uuid;
+ myCasLockStatement = casLockStatement;
+
+ List nodePriorities = computePriorities();
+
+ myLocallyHighestPriority = nodePriorities.stream()
+ .filter(n -> n.getUuid().equals(myUuid))
+ .map(NodePriority::getPriority)
+ .findFirst()
+ .orElse(myPriority);
+ globalHighPriority = nodePriorities.stream()
+ .filter(n -> !n.getUuid().equals(myUuid))
+ .map(NodePriority::getPriority)
+ .max(Integer::compare)
+ .orElse(myPriority);
+ }
+
+ public boolean lock()
+ {
+ if (compete())
+ {
+ LOG.trace("Trying to acquire lock for resource {}", myResource);
+ if (tryLock())
+ {
+ ScheduledExecutorService executor = myCasLockStatement.getCasLockProperties().getExecutor();
+ LOG.trace("Lock for resource {} acquired", myResource);
+ ScheduledFuture> future = executor.scheduleAtFixedRate(this,
+ myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(),
+ myCasLockStatement.getCasLockFactoryCacheContext().getLockUpdateTimeInSeconds(), TimeUnit.SECONDS);
+ myUpdateFuture.set(future);
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ updateLock();
+ myFailedUpdateAttempts.set(0);
+ }
+ catch (LockException e)
+ {
+ int failedAttempts = myFailedUpdateAttempts.incrementAndGet();
+
+ if (failedAttempts >= myCasLockStatement.getCasLockFactoryCacheContext().getFailedLockRetryAttempts())
+ {
+ LOG.error("Unable to re-lock resource '{}' after {} failed attempts", myResource, failedAttempts);
+ }
+ else
+ {
+ LOG.warn("Unable to re-lock resource '{}', {} failed attempts", myResource, failedAttempts, e);
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ ScheduledFuture> future = myUpdateFuture.get();
+ if (future != null)
+ {
+ future.cancel(true);
+ myCasLockStatement.execute(
+ myDataCenter,
+ myCasLockStatement.getRemoveLockStatement().bind(myResource, myUuid));
+
+ if (myLocallyHighestPriority <= myPriority)
+ {
+ myCasLockStatement.execute(
+ myDataCenter,
+ myCasLockStatement.getRemoveLockPriorityStatement().bind(myResource, myUuid));
+ }
+ else
+ {
+ LOG.debug("Locally highest priority ({}) is higher than current ({}), will not remove",
+ myLocallyHighestPriority, myPriority);
+ }
+ }
+ }
+
+ private void updateLock() throws LockException
+ {
+ ResultSet resultSet = myCasLockStatement.execute(myDataCenter,
+ myCasLockStatement.getUpdateLockStatement().bind(myUuid, myMetadata, myResource, myUuid));
+
+ if (!resultSet.wasApplied())
+ {
+ throw new LockException("CAS query failed");
+ }
+ }
+
+ private boolean compete()
+ {
+ if (myLocallyHighestPriority <= myPriority)
+ {
+ insertPriority();
+ }
+
+ LOG.trace("Highest priority for resource {}: {}", myResource, globalHighPriority);
+ return myPriority >= globalHighPriority;
+ }
+
+ private void insertPriority()
+ {
+ myCasLockStatement.execute(
+ myDataCenter,
+ myCasLockStatement.getCompeteStatement().bind(myResource, myUuid, myPriority));
+ }
+
+ private boolean tryLock()
+ {
+ return myCasLockStatement.execute(
+ myDataCenter,
+ myCasLockStatement.getLockStatement().bind(myResource, myUuid, myMetadata)).wasApplied();
+ }
+
+ private List computePriorities()
+ {
+ List nodePriorities = new ArrayList<>();
+
+ ResultSet resultSet = myCasLockStatement.execute(
+ myDataCenter,
+ myCasLockStatement.getGetPriorityStatement().bind(myResource));
+
+ for (Row row : resultSet)
+ {
+ int priority = row.getInt(CASLockStatement.COLUMN_PRIORITY);
+ UUID hostId = row.getUuid(CASLockStatement.COLUMN_NODE);
+
+ nodePriorities.add(new NodePriority(hostId, priority));
+ }
+
+ return nodePriorities;
+ }
+
+ int getFailedAttempts()
+ {
+ return myFailedUpdateAttempts.get();
+ }
+
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java
index 07ddc431c..69a3aaa4b 100644
--- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java
@@ -16,25 +16,15 @@
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
-import com.datastax.oss.driver.api.core.CqlSession;
-import com.datastax.oss.driver.api.core.cql.BoundStatement;
-import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
-import com.datastax.oss.driver.api.core.cql.SimpleStatement;
-import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
-import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
-import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwareStatement;
-import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
-import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
-import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
@@ -43,23 +33,15 @@
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
/**
* Lock factory using Cassandras LWT (Compare-And-Set operations) to create and maintain locks.
@@ -83,132 +65,38 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
*
*/
-@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
- private static final String COLUMN_RESOURCE = "resource";
- private static final String COLUMN_NODE = "node";
- private static final String COLUMN_METADATA = "metadata";
- private static final String COLUMN_PRIORITY = "priority";
private static final String TABLE_LOCK = "lock";
private static final String TABLE_LOCK_PRIORITY = "lock_priority";
private static final int REFRESH_INTERVAL_RATIO = 10;
private static final int DEFAULT_LOCK_TIME_IN_SECONDS = 600;
private final UUID myUuid;
-
- private final ScheduledExecutorService myExecutor;
-
- private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
- private final boolean myRemoteRouting;
- private final CqlSession mySession;
- private final String myKeyspaceName;
- private final PreparedStatement myCompeteStatement;
- private final PreparedStatement myGetPriorityStatement;
- private final PreparedStatement myLockStatement;
- private final PreparedStatement myGetLockMetadataStatement;
- private final PreparedStatement myRemoveLockStatement;
- private final PreparedStatement myUpdateLockStatement;
- private final PreparedStatement myRemoveLockPriorityStatement;
private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;
- private final ConsistencyLevel mySerialConsistencyLevel;
- private CASLockFactory(final Builder builder)
+ private final CASLockProperties myCasLockProperties;
+ private final CASLockStatement myCasLockStatement;
+
+ CASLockFactory(final CASLockFactoryBuilder builder)
{
- myStatementDecorator = builder.myStatementDecorator;
- myHostStates = builder.myHostStates;
- myKeyspaceName = builder.myKeyspaceName;
- myExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build());
+ myCasLockProperties = new CASLockProperties(
+ builder.getNativeConnectionProvider().getRemoteRouting(),
+ builder.getKeyspaceName(),
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()),
+ builder.getConsistencyType(),
+ builder.getNativeConnectionProvider().getSession(),
+ builder.getStatementDecorator());
- mySession = builder.myNativeConnectionProvider.getSession();
- myRemoteRouting = builder.myNativeConnectionProvider.getRemoteRouting();
+ myHostStates = builder.getHostStates();
verifySchemasExists();
- if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType))
- {
- mySerialConsistencyLevel = myRemoteRouting
- ? ConsistencyLevel.LOCAL_SERIAL
- : ConsistencyLevel.SERIAL;
- }
- else
- {
- mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType)
- ? ConsistencyLevel.LOCAL_SERIAL
- : ConsistencyLevel.SERIAL;
- }
-
- SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
- .value(COLUMN_RESOURCE, bindMarker())
- .value(COLUMN_NODE, bindMarker())
- .value(COLUMN_METADATA, bindMarker())
- .ifNotExists()
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
- .setSerialConsistencyLevel(mySerialConsistencyLevel);
-
- SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK)
- .column(COLUMN_METADATA)
- .whereColumn(COLUMN_RESOURCE)
- .isEqualTo(bindMarker())
- .build()
- .setSerialConsistencyLevel(mySerialConsistencyLevel);
-
- SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK)
- .whereColumn(COLUMN_RESOURCE)
- .isEqualTo(bindMarker())
- .ifColumn(COLUMN_NODE)
- .isEqualTo(bindMarker())
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
- .setSerialConsistencyLevel(mySerialConsistencyLevel);
-
- SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK)
- .setColumn(COLUMN_NODE, bindMarker())
- .setColumn(COLUMN_METADATA, bindMarker())
- .whereColumn(COLUMN_RESOURCE)
- .isEqualTo(bindMarker())
- .ifColumn(COLUMN_NODE)
- .isEqualTo(bindMarker())
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
- .setSerialConsistencyLevel(mySerialConsistencyLevel);
-
- SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY)
- .value(COLUMN_RESOURCE, bindMarker())
- .value(COLUMN_NODE, bindMarker())
- .value(COLUMN_PRIORITY, bindMarker())
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
-
- SimpleStatement getPriorityStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK_PRIORITY)
- .columns(COLUMN_PRIORITY, COLUMN_NODE)
- .whereColumn(COLUMN_RESOURCE)
- .isEqualTo(bindMarker())
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
-
- SimpleStatement removeLockPriorityStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK_PRIORITY)
- .whereColumn(COLUMN_RESOURCE)
- .isEqualTo(bindMarker())
- .whereColumn(COLUMN_NODE)
- .isEqualTo(bindMarker())
- .build()
- .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
-
- myLockStatement = mySession.prepare(insertLockStatement);
- myGetLockMetadataStatement = mySession.prepare(getLockMetadataStatement);
- myRemoveLockStatement = mySession.prepare(removeLockStatement);
- myUpdateLockStatement = mySession.prepare(updateLockStatement);
- myCompeteStatement = mySession.prepare(competeStatement);
- myGetPriorityStatement = mySession.prepare(getPriorityStatement);
- myRemoveLockPriorityStatement = mySession.prepare(removeLockPriorityStatement);
-
- UUID hostId = builder.myNativeConnectionProvider.getLocalNode().getHostId();
+ UUID hostId = builder.getNativeConnectionProvider().getLocalNode().getHostId();
if (hostId == null)
{
@@ -217,7 +105,9 @@ private CASLockFactory(final Builder builder)
}
myUuid = hostId;
- myCasLockFactoryCacheContext = buildCasLockFactoryCacheContext(builder.myCacheExpiryTimeInSeconds);
+ myCasLockFactoryCacheContext = buildCasLockFactoryCacheContext(builder.getCacheExpiryTimeInSecond());
+
+ myCasLockStatement = new CASLockStatement(myCasLockProperties, myCasLockFactoryCacheContext);
}
private CASLockFactoryCacheContext buildCasLockFactoryCacheContext(final long cacheExpiryTimeInSeconds)
@@ -235,13 +125,13 @@ private CASLockFactoryCacheContext buildCasLockFactoryCacheContext(final long ca
private int getDefaultTimeToLiveFromLockTable()
{
- TableMetadata tableMetadata = mySession.getMetadata()
- .getKeyspace(myKeyspaceName)
+ TableMetadata tableMetadata = myCasLockProperties.getSession().getMetadata()
+ .getKeyspace(myCasLockProperties.getKeyspaceName())
.flatMap(ks -> ks.getTable(TABLE_LOCK))
.orElse(null);
if (tableMetadata == null || tableMetadata.getOptions() == null)
{
- LOG.warn("Could not parse default ttl of {}.{}", myKeyspaceName, TABLE_LOCK);
+ LOG.warn("Could not parse default ttl of {}.{}", myCasLockProperties.getKeyspaceName(), TABLE_LOCK);
return DEFAULT_LOCK_TIME_IN_SECONDS;
}
Map tableOptions = tableMetadata.getOptions();
@@ -260,25 +150,21 @@ public DistributedLock tryLock(final String dataCenter,
}
@Override
- public Map getLockMetadata(final String dataCenter, final String resource)
+ public Map getLockMetadata(final String dataCenter, final String resource) throws LockException
{
- try
- {
- ResultSet resultSet = execute(dataCenter, myGetLockMetadataStatement.bind(resource));
+ ResultSet resultSet = myCasLockStatement.execute(
+ dataCenter, myCasLockStatement.getLockMetadataStatement().bind(resource));
- Row row = resultSet.one();
+ Row row = resultSet.one();
- if (row != null)
- {
- return row.getMap("metadata", String.class, String.class);
- }
+ if (row != null)
+ {
+ return row.getMap("metadata", String.class, String.class);
}
- catch (Exception e)
+ else
{
- LOG.warn("Unable to retrieve metadata for resource {}", resource, e);
+ throw new LockException("Unable to retrieve metadata for resource " + resource);
}
-
- return null;
}
@Override
@@ -312,10 +198,10 @@ public Optional getCachedFailure(final String dataCenter, final S
@Override
public void close()
{
- myExecutor.shutdown();
+ myCasLockProperties.getExecutor().shutdown();
try
{
- if (!myExecutor.awaitTermination(1, TimeUnit.SECONDS))
+ if (!myCasLockProperties.getExecutor().awaitTermination(1, TimeUnit.SECONDS))
{
LOG.warn("Executing tasks did not finish within one second");
}
@@ -333,89 +219,26 @@ UUID getHostId()
}
@VisibleForTesting
- ConsistencyLevel getSerialConsistencyLevel()
+ CASLockFactoryCacheContext getCasLockFactoryCacheContext()
{
- return mySerialConsistencyLevel;
+ return myCasLockFactoryCacheContext;
}
@VisibleForTesting
- CASLockFactoryCacheContext getCasLockFactoryCacheContext()
+ CASLockStatement getCasLockStatement()
{
- return myCasLockFactoryCacheContext;
+ return myCasLockStatement;
}
- public static Builder builder()
+ @VisibleForTesting
+ ConsistencyLevel getSerialConsistencyLevel()
{
- return new Builder();
+ return myCasLockProperties.getSerialConsistencyLevel();
}
- public static class Builder
+ public static CASLockFactoryBuilder builder()
{
- private static final String DEFAULT_KEYSPACE_NAME = "ecchronos";
- private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L;
-
- private NativeConnectionProvider myNativeConnectionProvider;
- private HostStates myHostStates;
- private StatementDecorator myStatementDecorator;
- private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
- private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
- private ConsistencyType myConsistencyType;
-
- public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
- {
- myNativeConnectionProvider = nativeConnectionProvider;
- return this;
- }
-
- public final Builder withHostStates(final HostStates hostStates)
- {
- myHostStates = hostStates;
- return this;
- }
-
- public final Builder withStatementDecorator(final StatementDecorator statementDecorator)
- {
- myStatementDecorator = statementDecorator;
- return this;
- }
-
- public final Builder withKeyspaceName(final String keyspaceName)
- {
- myKeyspaceName = keyspaceName;
- return this;
- }
-
- public final Builder withCacheExpiryInSeconds(final long cacheExpiryInSeconds)
- {
- myCacheExpiryTimeInSeconds = cacheExpiryInSeconds;
- return this;
- }
-
- public final Builder withConsistencySerial(final ConsistencyType consistencyType)
- {
- myConsistencyType = consistencyType;
- return this;
- }
-
- public final CASLockFactory build()
- {
- if (myNativeConnectionProvider == null)
- {
- throw new IllegalArgumentException("Native connection provider cannot be null");
- }
-
- if (myHostStates == null)
- {
- throw new IllegalArgumentException("Host states cannot be null");
- }
-
- if (myStatementDecorator == null)
- {
- throw new IllegalArgumentException("Statement decorator cannot be null");
- }
-
- return new CASLockFactory(this);
- }
+ return new CASLockFactoryBuilder();
}
private DistributedLock doTryLock(final String dataCenter,
@@ -430,22 +253,15 @@ private DistributedLock doTryLock(final String dataCenter,
LOG.warn("Not sufficient nodes to lock resource {} in datacenter {}", resource, dataCenter);
throw new LockException("Not sufficient nodes to lock");
}
-
- try
+ CASLock casLock = new CASLock(dataCenter, resource, priority, metadata, myUuid, myCasLockStatement); // NOSONAR
+ if (casLock.lock())
{
- CASLock casLock = new CASLock(dataCenter, resource, priority, metadata); // NOSONAR
- if (casLock.lock())
- {
- return casLock;
- }
+ return casLock;
}
- catch (Exception e)
+ else
{
- LOG.warn("Unable to lock resource {} in datacenter {} - {}", resource, dataCenter, e.getMessage());
- throw new LockException(e);
+ throw new LockException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter));
}
-
- throw new LockException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter));
}
private Set getNodesForResource(final String dataCenter,
@@ -453,10 +269,11 @@ private Set getNodesForResource(final String dataCenter,
{
Set dataCenterNodes = new HashSet<>();
- Metadata metadata = mySession.getMetadata();
+ Metadata metadata = myCasLockProperties.getSession().getMetadata();
TokenMap tokenMap = metadata.getTokenMap()
.orElseThrow(() -> new IllegalStateException("Couldn't get token map, is it disabled?"));
- Set nodes = tokenMap.getReplicas(myKeyspaceName, ByteBuffer.wrap(resource.getBytes("UTF-8")));
+ Set nodes = tokenMap.getReplicas(
+ myCasLockProperties.getKeyspaceName(), ByteBuffer.wrap(resource.getBytes("UTF-8")));
if (dataCenter != null)
{
@@ -491,221 +308,37 @@ private int liveNodes(final Collection nodes)
return live;
}
- private ResultSet execute(final String dataCenter, final BoundStatement statement)
- {
- Statement executeStatement;
-
- if (dataCenter != null && myRemoteRouting)
- {
- executeStatement = new DataCenterAwareStatement(statement, dataCenter);
- }
- else
- {
- executeStatement = statement;
- }
-
- return mySession.execute(myStatementDecorator.apply(executeStatement));
- }
-
private void verifySchemasExists()
{
- Optional keyspaceMetadata = mySession.getMetadata().getKeyspace(myKeyspaceName);
+ Optional keyspaceMetadata = myCasLockProperties
+ .getSession().getMetadata()
+ .getKeyspace(myCasLockProperties.getKeyspaceName());
if (!keyspaceMetadata.isPresent())
{
- LOG.error("Keyspace {} does not exist, it needs to be created", myKeyspaceName);
+ LOG.error("Keyspace {} does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName());
throw new IllegalStateException(
- String.format("Keyspace %s does not exist, it needs to be created", myKeyspaceName));
+ String.format("Keyspace %s does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName()));
}
if (!keyspaceMetadata.get().getTable(TABLE_LOCK).isPresent())
{
- LOG.error("Table {}.{} does not exist, it needs to be created", myKeyspaceName, TABLE_LOCK);
+ LOG.error("Table {}.{} does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName(), TABLE_LOCK);
throw new IllegalStateException(
- String.format("Table %s.%s does not exist, it needs to be created", myKeyspaceName, TABLE_LOCK));
+ String.format("Table %s.%s does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName(), TABLE_LOCK));
}
if (!keyspaceMetadata.get().getTable(TABLE_LOCK_PRIORITY).isPresent())
{
- LOG.error("Table {}.{} does not exist, it needs to be created", myKeyspaceName, TABLE_LOCK_PRIORITY);
+ LOG.error("Table {}.{} does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY);
throw new IllegalStateException(
- String.format("Table %s.%s does not exist, it needs to be created", myKeyspaceName,
- TABLE_LOCK_PRIORITY));
+ String.format("Table %s.%s does not exist, it needs to be created",
+ myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY));
}
}
- class CASLock implements DistributedLock, Runnable
- {
- private final String myDataCenter;
- private final String myResource;
- private final int myPriority;
- private final Map myMetadata;
-
- private final AtomicReference> myUpdateFuture = new AtomicReference<>();
-
- private final AtomicInteger myFailedUpdateAttempts = new AtomicInteger();
-
- private final int myLocallyHighestPriority;
- private final int globalHighPriority;
-
- CASLock(final String dataCenter, final String resource, final int priority, final Map metadata)
- {
- myDataCenter = dataCenter;
- myResource = resource;
- myPriority = priority;
- myMetadata = metadata;
-
- List nodePriorities = computePriorities();
-
- myLocallyHighestPriority = nodePriorities.stream()
- .filter(n -> n.getUuid().equals(myUuid))
- .map(NodePriority::getPriority)
- .findFirst()
- .orElse(myPriority);
- globalHighPriority = nodePriorities.stream()
- .filter(n -> !n.getUuid().equals(myUuid))
- .map(NodePriority::getPriority)
- .max(Integer::compare)
- .orElse(myPriority);
- }
-
- public boolean lock()
- {
- if (compete())
- {
- LOG.trace("Trying to acquire lock for resource {}", myResource);
- if (tryLock())
- {
- LOG.trace("Lock for resource {} acquired", myResource);
- ScheduledFuture> future = myExecutor.scheduleAtFixedRate(this,
- myCasLockFactoryCacheContext.getLockUpdateTimeInSeconds(),
- myCasLockFactoryCacheContext.getLockUpdateTimeInSeconds(), TimeUnit.SECONDS);
- myUpdateFuture.set(future);
-
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public void run()
- {
- try
- {
- updateLock();
- myFailedUpdateAttempts.set(0);
- }
- catch (Exception e)
- {
- int failedAttempts = myFailedUpdateAttempts.incrementAndGet();
-
- if (failedAttempts >= myCasLockFactoryCacheContext.getFailedLockRetryAttempts())
- {
- LOG.error("Unable to re-lock resource '{}' after {} failed attempts", myResource, failedAttempts);
- }
- else
- {
- LOG.warn("Unable to re-lock resource '{}', {} failed attempts", myResource, failedAttempts, e);
- }
- }
- }
-
- @Override
- public void close()
- {
- ScheduledFuture> future = myUpdateFuture.get();
- if (future != null)
- {
- future.cancel(true);
- execute(myDataCenter, myRemoveLockStatement.bind(myResource, myUuid));
-
- if (myLocallyHighestPriority <= myPriority)
- {
- execute(myDataCenter, myRemoveLockPriorityStatement.bind(myResource, myUuid));
- }
- else
- {
- LOG.debug("Locally highest priority ({}) is higher than current ({}), will not remove",
- myLocallyHighestPriority, myPriority);
- }
- }
- }
-
- private void updateLock() throws LockException
- {
- ResultSet resultSet = execute(myDataCenter,
- myUpdateLockStatement.bind(myUuid, myMetadata, myResource, myUuid));
-
- if (!resultSet.wasApplied())
- {
- throw new LockException("CAS query failed");
- }
- }
-
- private boolean compete()
- {
- if (myLocallyHighestPriority <= myPriority)
- {
- insertPriority();
- }
-
- LOG.trace("Highest priority for resource {}: {}", myResource, globalHighPriority);
- return myPriority >= globalHighPriority;
- }
-
- private void insertPriority()
- {
- execute(myDataCenter, myCompeteStatement.bind(myResource, myUuid, myPriority));
- }
-
- private boolean tryLock()
- {
- return execute(myDataCenter, myLockStatement.bind(myResource, myUuid, myMetadata)).wasApplied();
- }
-
- private List computePriorities()
- {
- List nodePriorities = new ArrayList<>();
-
- ResultSet resultSet = execute(myDataCenter, myGetPriorityStatement.bind(myResource));
-
- for (Row row : resultSet)
- {
- int priority = row.getInt(COLUMN_PRIORITY);
- UUID hostId = row.getUuid(COLUMN_NODE);
-
- nodePriorities.add(new NodePriority(hostId, priority));
- }
-
- return nodePriorities;
- }
-
- int getFailedAttempts()
- {
- return myFailedUpdateAttempts.get();
- }
- }
-
- public static final class NodePriority
- {
- private final UUID myNode;
- private final int myPriority;
-
- public NodePriority(final UUID node, final int priority)
- {
- myNode = node;
- myPriority = priority;
- }
-
- public UUID getUuid()
- {
- return myNode;
- }
-
- public int getPriority()
- {
- return myPriority;
- }
- }
}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactoryBuilder.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactoryBuilder.java
new file mode 100644
index 000000000..5223c42ed
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactoryBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core;
+
+import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
+import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
+
+/**
+ * Represents a container for builder configurations and state for the CASLockFactory.
+ * This class is used to decouple builder fields from CASLockFactory to avoid excessive field count.
+ */
+public class CASLockFactoryBuilder
+{
+ private static final String DEFAULT_KEYSPACE_NAME = "ecchronos";
+ private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L;
+ private static final ConsistencyType DEFAULT_CONSISTENCY_SERIAL = ConsistencyType.DEFAULT;
+
+ private NativeConnectionProvider myNativeConnectionProvider;
+ private HostStates myHostStates;
+ private StatementDecorator myStatementDecorator;
+ private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
+ private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
+ private ConsistencyType myConsistencyType = DEFAULT_CONSISTENCY_SERIAL;
+
+ public final CASLockFactoryBuilder withNativeConnectionProvider(
+ final NativeConnectionProvider nativeConnectionProvider)
+ {
+ myNativeConnectionProvider = nativeConnectionProvider;
+ return this;
+ }
+
+ public final CASLockFactoryBuilder withHostStates(final HostStates hostStates)
+ {
+ myHostStates = hostStates;
+ return this;
+ }
+
+ public final CASLockFactoryBuilder withStatementDecorator(final StatementDecorator statementDecorator)
+ {
+ myStatementDecorator = statementDecorator;
+ return this;
+ }
+
+ public final CASLockFactoryBuilder withKeyspaceName(final String keyspaceName)
+ {
+ myKeyspaceName = keyspaceName;
+ return this;
+ }
+
+ public final CASLockFactoryBuilder withCacheExpiryInSeconds(final long cacheExpiryInSeconds)
+ {
+ myCacheExpiryTimeInSeconds = cacheExpiryInSeconds;
+ return this;
+ }
+
+ public final CASLockFactoryBuilder withConsistencySerial(final ConsistencyType consistencyType)
+ {
+ myConsistencyType = consistencyType;
+ return this;
+ }
+
+ public final CASLockFactory build()
+ {
+ if (myNativeConnectionProvider == null)
+ {
+ throw new IllegalArgumentException("Native connection provider cannot be null");
+ }
+
+ if (myHostStates == null)
+ {
+ throw new IllegalArgumentException("Host states cannot be null");
+ }
+
+ if (myStatementDecorator == null)
+ {
+ throw new IllegalArgumentException("Statement decorator cannot be null");
+ }
+
+ return new CASLockFactory(this);
+ }
+
+ public final NativeConnectionProvider getNativeConnectionProvider()
+ {
+ return myNativeConnectionProvider;
+ }
+
+ public final HostStates getHostStates()
+ {
+ return myHostStates;
+ }
+
+ public final StatementDecorator getStatementDecorator()
+ {
+ return myStatementDecorator;
+ }
+
+ public final String getKeyspaceName()
+ {
+ return myKeyspaceName;
+ }
+
+ public final long getCacheExpiryTimeInSecond()
+ {
+ return myCacheExpiryTimeInSeconds;
+ }
+
+ public final ConsistencyType getConsistencyType()
+ {
+ return myConsistencyType;
+ }
+
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockProperties.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockProperties.java
new file mode 100644
index 000000000..a071a96d5
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockProperties.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
+
+/**
+ * Represents a container for builder configurations and state for the CASLockStatement.
+ * This class is used to decouple builder fields from CASLock to avoid excessive field count.
+ */
+public class CASLockProperties
+{
+ private final boolean myRemoteRouting;
+ private final String myKeyspaceName;
+ private final ScheduledExecutorService myExecutor;
+ private final ConsistencyLevel mySerialConsistencyLevel;
+ private final CqlSession mySession;
+ private final StatementDecorator myStatementDecorator;
+
+ CASLockProperties(
+ final Boolean remoteRouting,
+ final String keyspaceName,
+ final ScheduledExecutorService executor,
+ final ConsistencyType consistencyType,
+ final CqlSession session,
+ final StatementDecorator statementDecorator)
+ {
+ myRemoteRouting = remoteRouting;
+ myKeyspaceName = keyspaceName;
+ myExecutor = executor;
+ mySerialConsistencyLevel = defineSerialConsistencyLevel(consistencyType);
+ mySession = session;
+ myStatementDecorator = statementDecorator;
+ }
+
+ public final ConsistencyLevel defineSerialConsistencyLevel(final ConsistencyType consistencyType)
+ {
+ ConsistencyLevel serialConsistencyLevel;
+
+ if (ConsistencyType.DEFAULT.equals(consistencyType))
+ {
+ serialConsistencyLevel = myRemoteRouting
+ ? ConsistencyLevel.LOCAL_SERIAL
+ : ConsistencyLevel.SERIAL;
+ }
+ else
+ {
+ serialConsistencyLevel = ConsistencyType.LOCAL.equals(consistencyType)
+ ? ConsistencyLevel.LOCAL_SERIAL
+ : ConsistencyLevel.SERIAL;
+ }
+ return serialConsistencyLevel;
+ }
+
+ public final boolean isRemoteRouting()
+ {
+ return myRemoteRouting;
+ }
+
+ public final String getKeyspaceName()
+ {
+ return myKeyspaceName;
+ }
+
+ public final ScheduledExecutorService getExecutor()
+ {
+ return myExecutor;
+ }
+
+ public final ConsistencyLevel getSerialConsistencyLevel()
+ {
+ return mySerialConsistencyLevel;
+ }
+
+ public final CqlSession getSession()
+ {
+ return mySession;
+ }
+
+ public final StatementDecorator getStatementDecorator()
+ {
+ return myStatementDecorator;
+ }
+
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockStatement.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockStatement.java
new file mode 100644
index 000000000..0de65da1e
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockStatement.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.ericsson.bss.cassandra.ecchronos.connection.DataCenterAwareStatement;
+
+/**
+ * Represents a container for builder configurations and state for the CASLockStatement.
+ * This class is used to decouple builder fields from CASLock to avoid excessive field count.
+ */
+public class CASLockStatement
+{
+ static final String COLUMN_RESOURCE = "resource";
+ static final String COLUMN_NODE = "node";
+ static final String COLUMN_METADATA = "metadata";
+ static final String COLUMN_PRIORITY = "priority";
+
+ private static final String TABLE_LOCK = "lock";
+ private static final String TABLE_LOCK_PRIORITY = "lock_priority";
+
+ private final PreparedStatement myCompeteStatement;
+ private final PreparedStatement myLockStatement;
+ private final PreparedStatement myRemoveLockStatement;
+ private final PreparedStatement myUpdateLockStatement;
+ private final PreparedStatement myRemoveLockPriorityStatement;
+ private final PreparedStatement myGetPriorityStatement;
+ private final PreparedStatement myGetLockMetadataStatement;
+
+ private final CASLockProperties myCasLockProperties;
+ private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;
+
+ public CASLockStatement(
+ final CASLockProperties casLockProperties,
+ final CASLockFactoryCacheContext casLockFactoryCacheContext)
+ {
+ myCasLockProperties = casLockProperties;
+ myCasLockFactoryCacheContext = casLockFactoryCacheContext;
+ myCompeteStatement = myCasLockProperties.getSession().prepare(competeStatement());
+ myLockStatement = myCasLockProperties.getSession().prepare((insertLockStatement()));
+ myRemoveLockStatement = myCasLockProperties.getSession().prepare(removeLockStatement());
+ myUpdateLockStatement = myCasLockProperties.getSession().prepare((updateLockStatement()));
+ myRemoveLockPriorityStatement = myCasLockProperties.getSession().prepare(removeLockPriorityStatement());
+ myGetPriorityStatement = myCasLockProperties.getSession().prepare(getPriorityStatement());
+ myGetLockMetadataStatement = myCasLockProperties.getSession().prepare(lockMetadataStatement());
+ }
+
+ public final ResultSet execute(final String dataCenter, final BoundStatement statement)
+ {
+ Statement executeStatement;
+
+ if (dataCenter != null && myCasLockProperties.isRemoteRouting())
+ {
+ executeStatement = new DataCenterAwareStatement(statement, dataCenter);
+ }
+ else
+ {
+ executeStatement = statement;
+ }
+
+ return myCasLockProperties.getSession()
+ .execute(myCasLockProperties
+ .getStatementDecorator().apply(executeStatement));
+ }
+
+ private SimpleStatement insertLockStatement()
+ {
+ SimpleStatement insertLockStatement = QueryBuilder
+ .insertInto(myCasLockProperties.getKeyspaceName(), TABLE_LOCK)
+ .value(COLUMN_RESOURCE, bindMarker())
+ .value(COLUMN_NODE, bindMarker())
+ .value(COLUMN_METADATA, bindMarker())
+ .ifNotExists()
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
+ .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel());
+ return insertLockStatement;
+ }
+
+ private SimpleStatement removeLockStatement()
+ {
+ SimpleStatement removeLockStatement = QueryBuilder
+ .deleteFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK)
+ .whereColumn(COLUMN_RESOURCE)
+ .isEqualTo(bindMarker())
+ .ifColumn(COLUMN_NODE)
+ .isEqualTo(bindMarker())
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
+ .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel());
+ return removeLockStatement;
+ }
+
+ private SimpleStatement updateLockStatement()
+ {
+ SimpleStatement updateLockStatement = QueryBuilder
+ .update(myCasLockProperties.getKeyspaceName(), TABLE_LOCK)
+ .setColumn(COLUMN_NODE, bindMarker())
+ .setColumn(COLUMN_METADATA, bindMarker())
+ .whereColumn(COLUMN_RESOURCE)
+ .isEqualTo(bindMarker())
+ .ifColumn(COLUMN_NODE)
+ .isEqualTo(bindMarker())
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
+ .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel());
+ return updateLockStatement;
+ }
+
+ private SimpleStatement competeStatement()
+ {
+ SimpleStatement competeStatement = QueryBuilder
+ .insertInto(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY)
+ .value(COLUMN_RESOURCE, bindMarker())
+ .value(COLUMN_NODE, bindMarker())
+ .value(COLUMN_PRIORITY, bindMarker())
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ return competeStatement;
+ }
+
+ private SimpleStatement getPriorityStatement()
+ {
+ SimpleStatement priorityStatement = QueryBuilder
+ .selectFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY)
+ .columns(COLUMN_PRIORITY, COLUMN_NODE)
+ .whereColumn(COLUMN_RESOURCE)
+ .isEqualTo(bindMarker())
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ return priorityStatement;
+ }
+
+ private SimpleStatement removeLockPriorityStatement()
+ {
+ SimpleStatement removeLockPriorityStatement = QueryBuilder
+ .deleteFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK_PRIORITY)
+ .whereColumn(COLUMN_RESOURCE)
+ .isEqualTo(bindMarker())
+ .whereColumn(COLUMN_NODE)
+ .isEqualTo(bindMarker())
+ .build()
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ return removeLockPriorityStatement;
+ }
+
+ private SimpleStatement lockMetadataStatement()
+ {
+ SimpleStatement lockMetadataStatement = QueryBuilder
+ .selectFrom(myCasLockProperties.getKeyspaceName(), TABLE_LOCK)
+ .column(COLUMN_METADATA)
+ .whereColumn(COLUMN_RESOURCE)
+ .isEqualTo(bindMarker())
+ .build()
+ .setSerialConsistencyLevel(myCasLockProperties.getSerialConsistencyLevel());
+ return lockMetadataStatement;
+ }
+
+ public final PreparedStatement getCompeteStatement()
+ {
+ return myCompeteStatement;
+ }
+
+ public final PreparedStatement getLockStatement()
+ {
+ return myLockStatement;
+ }
+
+ public final PreparedStatement getRemoveLockStatement()
+ {
+ return myRemoveLockStatement;
+ }
+
+ public final PreparedStatement getUpdateLockStatement()
+ {
+ return myUpdateLockStatement;
+ }
+
+ public final PreparedStatement getRemoveLockPriorityStatement()
+ {
+ return myRemoveLockPriorityStatement;
+ }
+
+ public final PreparedStatement getGetPriorityStatement()
+ {
+ return myGetPriorityStatement;
+ }
+
+ public final PreparedStatement getLockMetadataStatement()
+ {
+ return myGetLockMetadataStatement;
+ }
+
+ public final CASLockFactoryCacheContext getCasLockFactoryCacheContext()
+ {
+ return myCasLockFactoryCacheContext;
+ }
+
+ public final CASLockProperties getCasLockProperties()
+ {
+ return myCasLockProperties;
+ }
+
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/JmxProxyFactoryImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/JmxProxyFactoryImpl.java
index 3e457eca2..d8a137bfc 100644
--- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/JmxProxyFactoryImpl.java
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/JmxProxyFactoryImpl.java
@@ -316,7 +316,11 @@ public String getNodeStatus()
{
return (String) myMbeanServerConnection.getAttribute(myStorageServiceObject, "OperationMode");
}
- catch (Exception e)
+ catch (InstanceNotFoundException
+ | AttributeNotFoundException
+ | MBeanException
+ | ReflectionException
+ | IOException e)
{
LOG.error("Unable to retrieve node status {}", e.getMessage());
return "Unknown";
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/NodePriority.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/NodePriority.java
new file mode 100644
index 000000000..8fc123399
--- /dev/null
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/NodePriority.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2024 Telefonaktiebolaget LM Ericsson
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ericsson.bss.cassandra.ecchronos.core;
+
+import java.util.UUID;
+
+/**
+ * Represents a container for node priority configurations and state for the CASLockFactory.
+ * This class is used to decouple node priority fields from CASLockFactory to avoid excessive field count.
+ */
+public final class NodePriority
+{
+ private final UUID myNode;
+ private final int myPriority;
+
+ public NodePriority(final UUID node, final int priority)
+ {
+ myNode = node;
+ myPriority = priority;
+ }
+
+ public UUID getUuid()
+ {
+ return myNode;
+ }
+
+ public int getPriority()
+ {
+ return myPriority;
+ }
+}
diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/LockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/LockFactory.java
index 89ee24c0c..03791ed8d 100644
--- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/LockFactory.java
+++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/LockFactory.java
@@ -38,8 +38,6 @@ public interface LockFactory
* @param metadata
* The metadata of the lock.
* @return The lock if able to lock the resource.
- * @throws LockException
- * Thrown when unable to lock a resource
*/
DistributedLock tryLock(String dataCenter, String resource, int priority, Map metadata)
throws LockException;
@@ -54,8 +52,9 @@ DistributedLock tryLock(String dataCenter, String resource, int priority, Map getLockMetadata(String dataCenter, String resource);
+ Map getLockMetadata(String dataCenter, String resource) throws LockException;
/**
* Checks if local_quorum is met.
diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java
index 1bf6eacee..417c249bd 100644
--- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java
+++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java
@@ -19,12 +19,8 @@
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
@@ -104,7 +100,7 @@ public void startup()
hostStates = mock(HostStates.class);
when(hostStates.isUp(any(Node.class))).thenReturn(true);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -138,7 +134,7 @@ public void testGetDefaultTimeToLiveFromLockTable() throws LockException
{
String alterLockTable = String.format("ALTER TABLE %s.%s WITH default_time_to_live = 1200;", myKeyspaceName, TABLE_LOCK);
mySession.execute(alterLockTable);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -297,7 +293,16 @@ public void testInterruptCasLockUpdate() throws InterruptedException
try
{
- Future> future = executorService.submit(myLockFactory.new CASLock(DATA_CENTER, "lock", 1, metadata));
+ Future> future = executorService.submit(
+ new CASLock(
+ DATA_CENTER,
+ "lock",
+ 1,
+ metadata,
+ myLockFactory.getHostId(),
+ myLockFactory.getCasLockStatement()
+ )
+ );
Thread.sleep(100);
@@ -319,7 +324,14 @@ public void testInterruptCasLockUpdate() throws InterruptedException
public void testFailedLockRetryAttempts()
{
Map metadata = new HashMap<>();
- try (CASLockFactory.CASLock lockUpdateTask = myLockFactory.new CASLock(DATA_CENTER, "lock", 1, metadata))
+ try (CASLock lockUpdateTask = new CASLock(
+ DATA_CENTER,
+ "lock",
+ 1,
+ metadata,
+ myLockFactory.getHostId(),
+ myLockFactory.getCasLockStatement()
+ ))
{
for (int i = 0; i < 10; i++)
{
@@ -336,12 +348,29 @@ public void testFailedLockRetryAttempts()
}
@Test
- public void testActivateWithoutAllTablesCausesIllegalStateException()
+ public void testActivateWithoutKeyspaceCausesIllegalStateException()
+ {
+ mySession.execute(String.format("DROP KEYSPACE %s", myKeyspaceName));
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> new CASLockFactoryBuilder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(hostStates)
+ .withStatementDecorator(s -> s)
+ .withKeyspaceName(myKeyspaceName)
+ .build());
+
+ mySession.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", myKeyspaceName));
+ mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK));
+ mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK_PRIORITY)); }
+
+ @Test
+ public void testActivateWithoutLockTableCausesIllegalStateException()
{
mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK));
assertThatExceptionOfType(IllegalStateException.class)
- .isThrownBy(() -> new CASLockFactory.Builder()
+ .isThrownBy(() -> new CASLockFactoryBuilder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -351,6 +380,22 @@ public void testActivateWithoutAllTablesCausesIllegalStateException()
mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, metadata map, PRIMARY KEY(resource)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK));
}
+ @Test
+ public void testActivateWithoutLockPriorityTableCausesIllegalStateException()
+ {
+ mySession.execute(String.format("DROP TABLE %s.%s", myKeyspaceName, TABLE_LOCK_PRIORITY));
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> new CASLockFactoryBuilder()
+ .withNativeConnectionProvider(getNativeConnectionProvider())
+ .withHostStates(hostStates)
+ .withStatementDecorator(s -> s)
+ .withKeyspaceName(myKeyspaceName)
+ .build());
+
+ mySession.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (resource text, node uuid, priority int, PRIMARY KEY(resource, node)) WITH default_time_to_live = 600 AND gc_grace_seconds = 0", myKeyspaceName, TABLE_LOCK_PRIORITY));
+ }
+
@Test
public void testActivateWithoutCassandraCausesIllegalStateException()
{
@@ -361,7 +406,7 @@ public void testActivateWithoutCassandraCausesIllegalStateException()
// test
assertThatExceptionOfType(AllNodesFailedException.class)
- .isThrownBy(() -> new CASLockFactory.Builder()
+ .isThrownBy(() -> new CASLockFactoryBuilder()
.withNativeConnectionProvider(new NativeConnectionProvider()
{
@Override
@@ -398,7 +443,7 @@ public void testRemoteRoutingTrueWithDefaultSerialConsistency()
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(true);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -419,7 +464,7 @@ public void testRemoteRoutingFalseWithDefaultSerialConsistency()
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(false);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -439,7 +484,7 @@ public void testLocalSerialConsistency()
when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
@@ -459,7 +504,7 @@ public void testSerialConsistency()
when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
- myLockFactory = new CASLockFactory.Builder()
+ myLockFactory = new CASLockFactoryBuilder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
diff --git a/docs/autogenerated/EccYamlFile.md b/docs/autogenerated/EccYamlFile.md
index 8b69c7d31..bbd0b3be4 100644
--- a/docs/autogenerated/EccYamlFile.md
+++ b/docs/autogenerated/EccYamlFile.md
@@ -261,9 +261,9 @@
# Allow to override consistency level for LWT (lightweight transactions). Possible values are:
# "DEFAULT" - Use consistency level based on remoteRouting.
# "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
-# "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
+# "LOCAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
#
-# if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
+# if you use remoteRouting: false and LOCAL then all locks will be taken locally
# in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
# same nodes causing multiple repairs on the same range/node at the same time.
#
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
index 9775a4045..db9ed714e 100644
--- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalOnDemandRepairJob.java
@@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolverImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ReplicatedTableProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
@@ -119,7 +120,7 @@ public static void init(Boolean remoteRoutingOption) throws IOException
.build();
myLockFactory = CASLockFactory.builder().withNativeConnectionProvider(getNativeConnectionProvider())
- .withHostStates(myHostStates).withStatementDecorator(s -> s).build();
+ .withHostStates(myHostStates).withStatementDecorator(s -> s).withConsistencySerial(ConsistencyType.DEFAULT).build();
myScheduleManagerImpl = ScheduleManagerImpl.builder().withLockFactory(myLockFactory)
.withRunInterval(100, TimeUnit.MILLISECONDS).build();
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
index 81dea9d79..184144259 100644
--- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITIncrementalSchedules.java
@@ -31,6 +31,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.ScheduledRepairJobView;
import com.ericsson.bss.cassandra.ecchronos.core.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.ScheduleManagerImpl;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolverImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ReplicatedTableProviderImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
@@ -127,6 +128,7 @@ public static void init(Boolean remoteRoutingOption) throws IOException
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(myHostStates)
.withStatementDecorator(s -> s)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
.build();
myScheduleManagerImpl = ScheduleManagerImpl.builder()
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
index b102710cb..e79f3e5b0 100644
--- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITOnDemandRepairJob.java
@@ -67,6 +67,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.ScheduleManagerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.DriverNode;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolverImpl;
@@ -149,6 +150,7 @@ public static void init(Boolean remoteRoutingOption) throws IOException
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(myHostStates)
.withStatementDecorator(s -> s)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
.build();
myScheduleManagerImpl = ScheduleManagerImpl.builder()
diff --git a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java
index a64cdf88c..56a362fc5 100644
--- a/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java
+++ b/standalone-integration/src/test/java/com/ericsson/bss/cassandra/ecchronos/standalone/ITSchedules.java
@@ -42,6 +42,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.state.ReplicationStateImpl;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.ScheduleManagerImpl;
import com.ericsson.bss.cassandra.ecchronos.core.utils.LongTokenRange;
+import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.DriverNode;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolver;
import com.ericsson.bss.cassandra.ecchronos.core.utils.NodeResolverImpl;
@@ -208,6 +209,7 @@ else if (repairHistoryType == RepairHistoryType.CASSANDRA)
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(myHostStates)
.withStatementDecorator(s -> s)
+ .withConsistencySerial(ConsistencyType.DEFAULT)
.build();
myScheduleManagerImpl = ScheduleManagerImpl.builder()