Skip to content

Commit

Permalink
Separate serial consistency configuration from remoteRouting function…
Browse files Browse the repository at this point in the history
…ality - Issue #633 (#634)

* Separe serial consistency from remoteRouting - Issue #633

To decouple the consistency definition in lightweight transactions,
introduce a new property configurable by ecc.yml that allows
consistency to be overridden by the values DEFAULT/SERIAL/LOCAL.

Closes #633

* Fix PMD Violations

* Use SuppressWarning Annotation to Fix PMD Violations

* Introduce ConsistencyType enum

* Add header in ConsistencyType

* Use consistencySerial as a lock configuration propertie

* Reset unnecessary changes

* Remove unnecessary variables and lines
  • Loading branch information
VictorCavichioli authored Feb 21, 2024
1 parent 8257966 commit 2972fbd
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## Version 5.0.1 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633
* Improve hang preventing task - Issue #544
* Improve Description of unwind_ratio - Issue #628

## Version 5.0.0 (Not yet released)
## Version 5.0.0

* Build Ecchronos with Java 11 - Issue 616
* Bump logback from 1.2.10 to 1.2.13 (CVE-2023-6378) - Issue #622
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public ECChronosInternals(final Config configuration,
.withStatementDecorator(statementDecorator)
.withKeyspaceName(casLockFactoryConfig.getKeyspaceName())
.withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds())
.withConsistencySerial(casLockFactoryConfig.getConsistencySerial())
.build();

Node node = nativeConnectionProvider.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import java.util.Locale;

public class CasLockFactoryConfig
{
private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L;
private static final String DEFAULT_KEYSPACE_NAME = "ecchronos";
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencySerial = ConsistencyType.DEFAULT;

public final long getFailureCacheExpiryTimeInSeconds()
{
Expand All @@ -44,4 +47,16 @@ public final void setKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
}

@JsonProperty("consistencySerial")
public final ConsistencyType getConsistencySerial()
{
return myConsistencySerial;
}

@JsonProperty("consistencySerial")
public final void setConsistencySerial(final String consistencySerial)
{
myConsistencySerial = ConsistencyType.valueOf(consistencySerial.toUpperCase(Locale.US));
}
}
14 changes: 12 additions & 2 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ connection:
## Allow routing requests directly to a remote datacenter.
## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter.
## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible.
## If remote routing is disabled its not possible to use LOCAL_SERIAL consistency for the locking,
## instead SERIAL consistency will be used for those request.
## If remote routing is disabled, instead SERIAL consistency will be used for those request.
##
remoteRouting: true
jmx:
Expand Down Expand Up @@ -272,6 +271,17 @@ lock_factory:
## the cache expiration time is reached.
##
cache_expiry_time_in_seconds: 30
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.ericsson.bss.cassandra.ecchronos.application.config.metrics.StatisticsConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairHistory;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairSchedule;
import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.scheduler.SchedulerConfig;
Expand All @@ -47,6 +46,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
Expand Down Expand Up @@ -157,6 +157,7 @@ public void testAllValues() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecc");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecc");
Expand Down Expand Up @@ -243,6 +244,7 @@ public void testWithDefaultFile() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -327,6 +329,7 @@ public void testDefault() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos");
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ lock_factory:
cas:
keyspace: ecc
cache_expiry_time_in_seconds: 100
consistencySerial: "LOCAL"


run_policy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,6 +83,7 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
* </pre>
*/
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
Expand All @@ -102,7 +104,6 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;

private final CqlSession mySession;
private final String myKeyspaceName;
private final PreparedStatement myCompeteStatement;
Expand All @@ -113,6 +114,7 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myUpdateLockStatement;
private final PreparedStatement myRemoveLockPriorityStatement;
private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;
private final ConsistencyLevel mySerialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
Expand All @@ -127,24 +129,34 @@ private CASLockFactory(final Builder builder)

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType))
{
mySerialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
.value(COLUMN_METADATA, bindMarker())
.ifNotExists()
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK)
.column(COLUMN_METADATA)
.whereColumn(COLUMN_RESOURCE)
.isEqualTo(bindMarker())
.build()
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK)
.whereColumn(COLUMN_RESOURCE)
Expand All @@ -153,7 +165,7 @@ private CASLockFactory(final Builder builder)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK)
.setColumn(COLUMN_NODE, bindMarker())
Expand All @@ -164,7 +176,7 @@ private CASLockFactory(final Builder builder)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY)
.value(COLUMN_RESOURCE, bindMarker())
Expand Down Expand Up @@ -320,6 +332,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return mySerialConsistencyLevel;
}

@VisibleForTesting
CASLockFactoryCacheContext getCasLockFactoryCacheContext()
{
Expand All @@ -341,6 +359,7 @@ public static class Builder
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencyType;

public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
{
Expand Down Expand Up @@ -372,6 +391,12 @@ public final Builder withCacheExpiryInSeconds(final long cacheExpiryInSeconds)
return this;
}

public final Builder withConsistencySerial(final ConsistencyType consistencyType)
{
myConsistencyType = consistencyType;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.utils;

public enum ConsistencyType
{
DEFAULT,
LOCAL,
SERIAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
Expand Down Expand Up @@ -60,6 +65,7 @@

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import net.jcip.annotations.NotThreadSafe;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -382,6 +388,88 @@ public boolean getRemoteRouting()
.build());
}

@Test
public void testRemoteRoutingTrueWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(true);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testRemoteRoutingFalseWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(false);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testLocalSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.LOCAL)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.SERIAL)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

private void assertPriorityListEmpty(String resource)
{
assertThat(getPriorities(resource)).isEmpty();
Expand Down
Loading

0 comments on commit 2972fbd

Please sign in to comment.