Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate serial consistency configuration from remoteRouting functionality - Issue #633 #634

Merged
merged 8 commits into from
Feb 21, 2024
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## Version 5.0.1 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
* Improve hang preventing task - Issue #544
* Improve Description of unwind_ratio - Issue #628

## Version 5.0.0 (Not yet released)
## Version 5.0.0

* Build Ecchronos with Java 11 - Issue 616
* Bump logback from 1.2.10 to 1.2.13 (CVE-2023-6378) - Issue #622
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public ECChronosInternals(final Config configuration,
.withStatementDecorator(statementDecorator)
.withKeyspaceName(casLockFactoryConfig.getKeyspaceName())
.withCacheExpiryInSeconds(casLockFactoryConfig.getFailureCacheExpiryTimeInSeconds())
.withConsistencySerial(casLockFactoryConfig.getConsistencySerial())
.build();

Node node = nativeConnectionProvider.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.lockfactory;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import java.util.Locale;

public class CasLockFactoryConfig
{
private static final long DEFAULT_EXPIRY_TIME_IN_SECONDS = 30L;
private static final String DEFAULT_KEYSPACE_NAME = "ecchronos";
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencySerial = ConsistencyType.DEFAULT;

public final long getFailureCacheExpiryTimeInSeconds()
{
Expand All @@ -44,4 +47,16 @@ public final void setKeyspaceName(final String keyspaceName)
{
myKeyspaceName = keyspaceName;
}

@JsonProperty("consistencySerial")
public final ConsistencyType getConsistencySerial()
{
return myConsistencySerial;
}

@JsonProperty("consistencySerial")
public final void setConsistencySerial(final String consistencySerial)
{
myConsistencySerial = ConsistencyType.valueOf(consistencySerial.toUpperCase(Locale.US));
}
}
14 changes: 12 additions & 2 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ connection:
## Allow routing requests directly to a remote datacenter.
## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter.
## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible.
## If remote routing is disabled its not possible to use LOCAL_SERIAL consistency for the locking,
## instead SERIAL consistency will be used for those request.
## If remote routing is disabled, instead SERIAL consistency will be used for those request.
##
remoteRouting: true
jmx:
Expand Down Expand Up @@ -272,6 +271,17 @@ lock_factory:
## the cache expiration time is reached.
##
cache_expiry_time_in_seconds: 30
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.ericsson.bss.cassandra.ecchronos.application.config.metrics.StatisticsConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.GlobalRepairConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairHistory;
import com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairSchedule;
import com.ericsson.bss.cassandra.ecchronos.application.config.rest.RestServerConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.runpolicy.RunPolicyConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.scheduler.SchedulerConfig;
Expand All @@ -47,6 +46,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
Expand Down Expand Up @@ -157,6 +157,7 @@ public void testAllValues() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecc");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecc");
Expand Down Expand Up @@ -243,6 +244,7 @@ public void testWithDefaultFile() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -327,6 +329,7 @@ public void testDefault() throws Exception

LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getKeyspaceName()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCasLockFactoryConfig().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBasedConfig().getKeyspaceName()).isEqualTo("ecchronos");
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ lock_factory:
cas:
keyspace: ecc
cache_expiry_time_in_seconds: 100
consistencySerial: "LOCAL"


run_policy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,6 +83,7 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
* </pre>
*/
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
Expand All @@ -102,7 +104,6 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;

private final CqlSession mySession;
private final String myKeyspaceName;
private final PreparedStatement myCompeteStatement;
Expand All @@ -113,6 +114,7 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myUpdateLockStatement;
private final PreparedStatement myRemoveLockPriorityStatement;
private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;
private final ConsistencyLevel mySerialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
Expand All @@ -127,24 +129,34 @@ private CASLockFactory(final Builder builder)

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType))
{
mySerialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
.value(COLUMN_METADATA, bindMarker())
.ifNotExists()
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK)
.column(COLUMN_METADATA)
.whereColumn(COLUMN_RESOURCE)
.isEqualTo(bindMarker())
.build()
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK)
.whereColumn(COLUMN_RESOURCE)
Expand All @@ -153,7 +165,7 @@ private CASLockFactory(final Builder builder)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK)
.setColumn(COLUMN_NODE, bindMarker())
Expand All @@ -164,7 +176,7 @@ private CASLockFactory(final Builder builder)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY)
.value(COLUMN_RESOURCE, bindMarker())
Expand Down Expand Up @@ -320,6 +332,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return mySerialConsistencyLevel;
}

@VisibleForTesting
CASLockFactoryCacheContext getCasLockFactoryCacheContext()
{
Expand All @@ -341,6 +359,7 @@ public static class Builder
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private long myCacheExpiryTimeInSeconds = DEFAULT_EXPIRY_TIME_IN_SECONDS;
private ConsistencyType myConsistencyType;

public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
{
Expand Down Expand Up @@ -372,6 +391,12 @@ public final Builder withCacheExpiryInSeconds(final long cacheExpiryInSeconds)
return this;
}

public final Builder withConsistencySerial(final ConsistencyType consistencyType)
{
myConsistencyType = consistencyType;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.utils;

public enum ConsistencyType
{
DEFAULT,
LOCAL,
SERIAL
}
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
Expand Down Expand Up @@ -60,6 +65,7 @@

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import net.jcip.annotations.NotThreadSafe;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -382,6 +388,88 @@ public boolean getRemoteRouting()
.build());
}

@Test
public void testRemoteRoutingTrueWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(true);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testRemoteRoutingFalseWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(false);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testLocalSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.LOCAL)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.SERIAL)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

private void assertPriorityListEmpty(String resource)
{
assertThat(getPriorities(resource)).isEmpty();
Expand Down
Loading
Loading