diff --git a/CHANGES.md b/CHANGES.md index 2c513d71c..ac817be85 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Version 4.0.6 (Not yet released) +* Separate serial consistency configuration from remoteRouting functionality - Issue #633 + ### Merged from 1.2 * Fix calculation of tokens per repair - Issue #570 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java index 18c88cf93..0c49cb246 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/ECChronosInternals.java @@ -82,6 +82,7 @@ public ECChronosInternals(final Config configuration, .withHostStates(myHostStatesImpl) .withStatementDecorator(statementDecorator) .withKeyspaceName(configuration.getLockFactory().getCas().getKeyspace()) + .withConsistencySerial(configuration.getLockFactory().getCas().getConsistencySerial()) .build(); Node node = nativeConnectionProvider.getLocalNode(); diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java index 39fffd870..59c057c9d 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/Config.java @@ -34,6 +34,7 @@ import com.ericsson.bss.cassandra.ecchronos.core.repair.DefaultRepairConfigurationProvider; import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter; import com.ericsson.bss.cassandra.ecchronos.fm.impl.LoggingFaultReporter; + import io.micrometer.core.instrument.MeterRegistry; import org.springframework.context.ApplicationContext; @@ -41,6 +42,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; @SuppressWarnings({"checkstyle:methodname", "checkstyle:membername"}) public class Config @@ -735,6 +737,7 @@ public final void setCas(final CasLockFactoryConfig aCas) public static class CasLockFactoryConfig { private String keyspace = "ecchronos"; + private ConsistencyType consistencySerial = ConsistencyType.DEFAULT; public final String getKeyspace() { @@ -745,7 +748,17 @@ public final void setKeyspace(final String aKeyspace) { this.keyspace = aKeyspace; } - } + + public final ConsistencyType getConsistencySerial() + { + return consistencySerial; + } + + public final void setConsistencySerial(final String aConsistencySerial) + { + consistencySerial = ConsistencyType.valueOf(aConsistencySerial.toUpperCase(Locale.US)); + } + } public static class RunPolicyConfig { diff --git a/application/src/main/resources/ecc.yml b/application/src/main/resources/ecc.yml index 1c9454e8c..0e9a9b02b 100644 --- a/application/src/main/resources/ecc.yml +++ b/application/src/main/resources/ecc.yml @@ -221,6 +221,17 @@ lock_factory: ## The keyspace used for the CAS lock factory tables. ## keyspace: ecchronos + ## + ## Allow to override consistency level for LWT (lightweight transactions). Possible values are: + ## "DEFAULT" - Use consistency level based on remoteRouting. + ## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting. + ## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting. + ## + ## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally + ## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the + ## same nodes causing multiple repairs on the same range/node at the same time. + ## + consistencySerial: "DEFAULT" run_policy: time_based: diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java index 5fdf1d903..933ac5f99 100644 --- a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/TestConfig.java @@ -32,6 +32,7 @@ import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration; import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType; import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference; import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter; import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter; @@ -135,6 +136,7 @@ public void testAllValues() throws Exception Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecc"); + assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue(); Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecc"); @@ -218,6 +220,7 @@ public void testWithDefaultFile() throws Exception Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos"); + assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue(); Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos"); @@ -300,6 +303,7 @@ public void testDefault() throws Exception Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory(); assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos"); + assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue(); Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy(); assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos"); diff --git a/application/src/test/resources/all_set.yml b/application/src/test/resources/all_set.yml index 5cc2bc0cf..3eaf551bb 100644 --- a/application/src/test/resources/all_set.yml +++ b/application/src/test/resources/all_set.yml @@ -76,6 +76,7 @@ statistics: lock_factory: cas: keyspace: ecc + consistencySerial: "LOCAL" run_policy: time_based: diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java index 4de046053..4babbd23e 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/CASLockFactory.java @@ -32,6 +32,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator; import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; @@ -80,6 +81,7 @@ * WITH default_time_to_live = 600 AND gc_grace_seconds = 0; * */ +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"}) public final class CASLockFactory implements LockFactory, Closeable { private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class); @@ -104,7 +106,6 @@ public final class CASLockFactory implements LockFactory, Closeable private final StatementDecorator myStatementDecorator; private final HostStates myHostStates; private final boolean myRemoteRouting; - private final CqlSession mySession; private final String myKeyspaceName; private final PreparedStatement myCompeteStatement; @@ -116,6 +117,8 @@ public final class CASLockFactory implements LockFactory, Closeable private final PreparedStatement myRemoveLockPriorityStatement; private final LockCache myLockCache; + private final ConsistencyLevel mySerialConsistencyLevel; + private CASLockFactory(final Builder builder) { myStatementDecorator = builder.myStatementDecorator; @@ -130,9 +133,19 @@ private CASLockFactory(final Builder builder) verifySchemasExists(); - ConsistencyLevel serialConsistencyLevel = myRemoteRouting + if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType)) + { + mySerialConsistencyLevel = myRemoteRouting ? ConsistencyLevel.LOCAL_SERIAL : ConsistencyLevel.SERIAL; + } + else + { + mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType) + ? ConsistencyLevel.LOCAL_SERIAL + : ConsistencyLevel.SERIAL; + } + SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK) .value(COLUMN_RESOURCE, bindMarker()) .value(COLUMN_NODE, bindMarker()) @@ -140,19 +153,19 @@ private CASLockFactory(final Builder builder) .ifNotExists() .build() .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM) - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK) .column(COLUMN_METADATA) .whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker()) .build() - .setSerialConsistencyLevel(serialConsistencyLevel); + .setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK) .whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker()) .ifColumn(COLUMN_NODE).isEqualTo(bindMarker()) .build() - .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel); + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK) .setColumn(COLUMN_NODE, bindMarker()) @@ -160,7 +173,7 @@ private CASLockFactory(final Builder builder) .whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker()) .ifColumn(COLUMN_NODE).isEqualTo(bindMarker()) .build() - .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel); + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel); SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY) .value(COLUMN_RESOURCE, bindMarker()) @@ -285,6 +298,12 @@ UUID getHostId() return myUuid; } + @VisibleForTesting + ConsistencyLevel getSerialConsistencyLevel() + { + return mySerialConsistencyLevel; + } + public static Builder builder() { return new Builder(); @@ -298,6 +317,7 @@ public static class Builder private HostStates myHostStates; private StatementDecorator myStatementDecorator; private String myKeyspaceName = DEFAULT_KEYSPACE_NAME; + private ConsistencyType myConsistencyType; public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider) { @@ -323,6 +343,12 @@ public final Builder withKeyspaceName(final String keyspaceName) return this; } + public final Builder withConsistencySerial(final ConsistencyType consistencyType) + { + myConsistencyType = consistencyType; + return this; + } + public final CASLockFactory build() { if (myNativeConnectionProvider == null) diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java new file mode 100644 index 000000000..341f484e6 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/utils/ConsistencyType.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.utils; + +public enum ConsistencyType +{ + DEFAULT, + LOCAL, + SERIAL +} diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java index 60307fff9..af481f8e0 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/TestCASLockFactory.java @@ -17,6 +17,8 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -60,6 +62,7 @@ import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock; +import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType; import net.jcip.annotations.NotThreadSafe; import org.junit.runner.RunWith; @@ -367,6 +370,88 @@ public boolean getRemoteRouting() .build()); } + @Test + public void testRemoteRoutingTrueWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + when(connectionProviderMock.getRemoteRouting()).thenReturn(true); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(getNativeConnectionProvider()) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testRemoteRoutingFalseWithDefaultSerialConsistency() + { + Node nodeMock = mock(Node.class); + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + when(connectionProviderMock.getRemoteRouting()).thenReturn(false); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.DEFAULT) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testLocalSerialConsistency() + { + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.LOCAL) + .build(); + + assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + + @Test + public void testSerialConsistency() + { + NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class); + Node nodeMock = mock(Node.class); + + when(connectionProviderMock.getSession()).thenReturn(mySession); + when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock); + + myLockFactory = new CASLockFactory.Builder() + .withNativeConnectionProvider(connectionProviderMock) + .withHostStates(hostStates) + .withStatementDecorator(s -> s) + .withKeyspaceName(myKeyspaceName) + .withConsistencySerial(ConsistencyType.SERIAL) + .build(); + + assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel()); + } + private void assertPriorityListEmpty(String resource) { assertThat(getPriorities(resource)).isEmpty();