diff --git a/CHANGES.md b/CHANGES.md index 221761b39..d49d76deb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,10 +2,11 @@ ## Version 5.0.1 (Not yet released) +* Separate serial consistency configuration from remoteRouting functionality - Issue #633 * Improve hang preventing task - Issue #544 * Improve Description of unwind_ratio - Issue #628 -## Version 5.0.0 (Not yet released) +## Version 5.0.0 * Build Ecchronos with Java 11 - Issue 616 * Bump logback from 1.2.10 to 1.2.13 (CVE-2023-6378) - Issue #622 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java index a48acecb7..44b4267ba 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java @@ -85,6 +85,7 @@ public ECChronosInternals(final Config configuration, .withStatementDecorator(statementDecorator) .withKeyspaceName(casLockFactoryConfig.getKeyspaceName()) .withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds()) + .withConsistencySerial(casLockFactoryConfig.getConsistencySerial()) .build(); Node node = nativeConnectionProvider.getLocalNode(); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java index 7150dea35..611fedf22 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/lockfactory/CasLockFactoryConfig.java @@ -15,6 +15,8 @@ package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory; import com.fasterxml.jackson.annotation.JsonProperty; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; +import java.util.Locale; public class CasLockFactoryConfig { @@ -22,6 +24,7 @@ public class CasLockFactoryConfig private static final String DEFAULT_KEYSPACE_NAME = "ecchronos"; private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; private long myExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS; + private ConsistencyType myConsistencySerial = ConsistencyType.DEFAULT; public final long getFailureCacheExpiryTimeInSeconds() { @@ -44,4 +47,16 @@ public final void setKeyspaceName(final String keyspaceName) { myKeyspaceName = keyspaceName; } + + @JsonProperty("consistencySerial") + public final ConsistencyType getConsistencySerial() + { + return myConsistencySerial; + } + + @JsonProperty("consistencySerial") + public final void setConsistencySerial(final String consistencySerial) + { + myConsistencySerial = ConsistencyType.valueOf(consistencySerial.toUpperCase(Locale.US)); + } } diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 6b8600155..92ba478a3 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -53,8 +53,7 @@ connection: ## Allow routing requests directly to a remote datacenter. ## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter. ## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible. - ## If remote routing is disabled its not possible to use LOCAL_SERIAL consistency for the locking, - ## instead SERIAL consistency will be used for those request. + ## If remote routing is disabled, instead SERIAL consistency will be used for those request. ## remoteRouting: true jmx: @@ -272,6 +271,17 @@ lock_factory: ## the cache expiration time is reached. ## cache_expiry_time_in_seconds: 30 + ## + ## Allow to override consistency level for LWT (lightweight transactions). Possible values are: + ## "DEFAULT" - Use consistency level based on remoteRouting. + ## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting. + ## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting. + ## + ## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally + ## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the + ## same nodes causing multiple repairs on the same range/node at the same time. + ## + consistencySerial: "DEFAULT" run_policy: time_based: diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index 504f75a8e..745005682 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -33,7 +33,6 @@ import com.ericsson.bss.cassandra.ecchronos.application.config.metrics.StatisticsConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairHistory; -import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairSchedule; import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig; import com.ericsson.bss.cassandra.ecchronos.application.config.scheduler.SchedulerConfig; @@ -47,6 +46,7 @@ import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration; import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType; import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference; import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter; import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter; @@ -157,6 +157,7 @@ public void testAllValues() throws Exception LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecc"); + assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue(); RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecc"); @@ -243,6 +244,7 @@ public void testWithDefaultFile() throws Exception LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos"); + assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue(); RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos"); @@ -327,6 +329,7 @@ public void testDefault() throws Exception LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos"); + assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue(); RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos"); diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index f72fb74ff..7d5c4f9c7 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -82,6 +82,7 @@ lock_factory: cas: keyspace: ecc cache_expiry_time_in_seconds: 100 + consistencySerial: "LOCAL" run_policy: diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java index 1009d1077..07ddc431c 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java @@ -34,6 +34,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; @@ -82,6 +83,7 @@ * WITH default_time_to_live = 600 AND gc_grace_seconds = 0; * */ +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"}) public final class CASLockFactory implements LockFactory, Closeable { private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class); @@ -102,7 +104,6 @@ public final class CASLockFactory implements LockFactory, Closeable private final StatementDecorator myStatementDecorator; private final HostStates myHostStates; private final boolean myRemoteRouting; - private final CqlSession mySession; private final String myKeyspaceName; private final PreparedStatement myCompeteStatement; @@ -113,6 +114,7 @@ public final class CASLockFactory implements LockFactory, Closeable private final PreparedStatement myUpdateLockStatement; private final PreparedStatement myRemoveLockPriorityStatement; private final CASLockFactoryCacheContext myCasLockFactoryCacheContext; + private final ConsistencyLevel mySerialConsistencyLevel; private CASLockFactory(final Builder builder) { @@ -127,9 +129,19 @@ private CASLockFactory(final Builder builder) verifySchemasExists(); - ConsistencyLevel serialConsistencyLevel = myRemoteRouting + if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType)) + { + mySerialConsistencyLevel = myRemoteRouting ? ConsistencyLevel.LOCAL_SERIAL : ConsistencyLevel.SERIAL; + } + else + { + mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType) + ? ConsistencyLevel.LOCAL_SERIAL + : ConsistencyLevel.SERIAL; + } + SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK) .value(COLUMN_RESOURCE, bindMarker()) .value(COLUMN_NODE, bindMarker()) @@ -137,14 +149,14 @@ private CASLockFactory(final Builder builder) .ifNotExists() .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK) .column(COLUMN_METADATA) .whereColumn(COLUMN_RESOURCE) .isEqualTo(bindMarker()) .build() - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK) .whereColumn(COLUMN_RESOURCE) @@ -153,7 +165,7 @@ private CASLockFactory(final Builder builder) .isEqualTo(bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK) .setColumn(COLUMN_NODE, bindMarker()) @@ -164,7 +176,7 @@ private CASLockFactory(final Builder builder) .isEqualTo(bindMarker()) .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY) .value(COLUMN_RESOURCE, bindMarker()) @@ -320,6 +332,12 @@ UUID getHostId() return myUuid; } + @VisibleForTesting + ConsistencyLevel getSerialConsistencyLevel() + { + return mySerialConsistencyLevel; + } + @VisibleForTesting CASLockFactoryCacheContext getCasLockFactoryCacheContext() { @@ -341,6 +359,7 @@ public static class Builder private StatementDecorator myStatementDecorator; private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS; + private ConsistencyType myConsistencyType; public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider) { @@ -372,6 +391,12 @@ public final Builder withCacheExpiryInSeconds(final long cacheExpiryInSeconds) return this; } + public final Builder withConsistencySerial(final ConsistencyType consistencyType) + { + myConsistencyType = consistencyType; + return this; + } + public final CASLockFactory build() { if (myNativeConnectionProvider == null) diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java new file mode 100644 index 000000000..341f484e6 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.utils; + +public enum ConsistencyType +{ + DEFAULT, + LOCAL, + SERIAL +} diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java index db42000f0..1bf6eacee 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java @@ -17,9 +17,14 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; @@ -60,6 +65,7 @@ import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import net.jcip.annotations.NotThreadSafe; import org.junit.runner.RunWith; @@ -382,6 +388,88 @@ public boolean getRemoteRouting() .build()); } + @Test + public void testRemoteRoutingTrueWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + when(connectionProviderMock.getRemoteRouting()).thenReturn(true); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testRemoteRoutingFalseWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + when(connectionProviderMock.getRemoteRouting()).thenReturn(false); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testLocalSerialConsistency() + { + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.LOCAL) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testSerialConsistency() + { + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.SERIAL) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + private void assertPriorityListEmpty(String resource) { assertThat(getPriorities(resource)).isEmpty(); diff --git a/docs/autogenerated/EccYamlFile.md b/docs/autogenerated/EccYamlFile.md index e6a0bd28d..8b69c7d31 100644 --- a/docs/autogenerated/EccYamlFile.md +++ b/docs/autogenerated/EccYamlFile.md @@ -39,8 +39,7 @@ # Allow routing requests directly to a remote datacenter. # This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter. # If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible. -# If remote routing is disabled its not possible to use LOCAL_SERIAL consistency for the locking, -# instead SERIAL consistency will be used for those request. +# If remote routing is disabled, instead SERIAL consistency will be used for those request. # * remoteRouting: true **jmx:** @@ -124,6 +123,23 @@ # This value is a ratio between 0 -> 100% of the execution time of a repair session. # # 100% means that the executor will wait to run the next session for as long time as the previous session took. +# The 'unwind_ratio' setting configures the wait time between repair tasks as a proportion of the previous task's execution time. +# +# Examples: +# - unwind_ratio: 0 +# Explanation: No wait time between tasks. The next task starts immediately after the previous one finishes. +# Total Repair Time: T1 (10s) + T2 (20s) = 30 seconds. +# +# - unwind_ratio: 1.0 (100%) +# Explanation: The wait time after each task equals its duration. +# Total Repair Time: T1 (10s + 10s wait) + T2 (20s + 20s wait) = 60 seconds. +# +# - unwind_ratio: 0.5 (50%) +# Explanation: The wait time is half of the task's duration. +# Total Repair Time: T1 (10s + 5s wait) + T2 (20s + 10s wait) = 45 seconds. +# +# A higher 'unwind_ratio' reduces system load by adding longer waits, but increases total repair time. +# A lower 'unwind_ratio' speeds up repairs but may increase system load. # * unwind_ratio: 0.0 # @@ -241,6 +257,17 @@ # the cache expiration time is reached. # * cache_expiry_time_in_seconds: 30 +# +# Allow to override consistency level for LWT (lightweight transactions). Possible values are: +# "DEFAULT" - Use consistency level based on remoteRouting. +# "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting. +# "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting. +# +# if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally +# in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the +# same nodes causing multiple repairs on the same range/node at the same time. +# +* consistencySerial: "DEFAULT" **run_policy:** **time_based:**