Skip to content

Commit

Permalink
Serial Consistency Separation for ecChronos 4.0 - Issue #633 (#635)
Browse files Browse the repository at this point in the history
* Serial Consistency Separation for ecChronos 4.0 - Issue #633

To decouple the consistency definition in lightweight transactions, introduce a new property configurable by ecc.yml that allows consistency to be overridden by the values DEFAULT/SERIAL/LOCAL.

Closes #633

* Include Exclusions for PMD Rules

* Introduce ConsistencyType and change consistencySerial to lock config

* Remove unnecessary variables and lines
  • Loading branch information
VictorCavichioli authored Feb 21, 2024
1 parent 2b69721 commit f07beda
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Version 4.0.6 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633

### Merged from 1.2

* Fix calculation of tokens per repair - Issue #570
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public ECChronosInternals(final Config configuration,
.withHostStates(myHostStatesImpl)
.withStatementDecorator(statementDecorator)
.withKeyspaceName(configuration.getLockFactory().getCas().getKeyspace())
.withConsistencySerial(configuration.getLockFactory().getCas().getConsistencySerial())
.build();

Node node = nativeConnectionProvider.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.DefaultRepairConfigurationProvider;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.fm.impl.LoggingFaultReporter;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.ApplicationContext;

import com.ericsson.bss.cassandra.ecchronos.connection.JmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

@SuppressWarnings({"checkstyle:methodname", "checkstyle:membername"})
public class Config
Expand Down Expand Up @@ -735,6 +737,7 @@ public final void setCas(final CasLockFactoryConfig aCas)
public static class CasLockFactoryConfig
{
private String keyspace = "ecchronos";
private ConsistencyType consistencySerial = ConsistencyType.DEFAULT;

public final String getKeyspace()
{
Expand All @@ -745,7 +748,17 @@ public final void setKeyspace(final String aKeyspace)
{
this.keyspace = aKeyspace;
}
}

public final ConsistencyType getConsistencySerial()
{
return consistencySerial;
}

public final void setConsistencySerial(final String aConsistencySerial)
{
consistencySerial = ConsistencyType.valueOf(aConsistencySerial.toUpperCase(Locale.US));
}
}

public static class RunPolicyConfig
{
Expand Down
11 changes: 11 additions & 0 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ lock_factory:
## The keyspace used for the CAS lock factory tables.
##
keyspace: ecchronos
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
Expand Down Expand Up @@ -135,6 +136,7 @@ public void testAllValues() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecc");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecc");
Expand Down Expand Up @@ -218,6 +220,7 @@ public void testWithDefaultFile() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -300,6 +303,7 @@ public void testDefault() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ statistics:
lock_factory:
cas:
keyspace: ecc
consistencySerial: "LOCAL"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,6 +81,7 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
* </pre>
*/
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
Expand All @@ -104,7 +106,6 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;

private final CqlSession mySession;
private final String myKeyspaceName;
private final PreparedStatement myCompeteStatement;
Expand All @@ -116,6 +117,8 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myRemoveLockPriorityStatement;
private final LockCache myLockCache;

private final ConsistencyLevel mySerialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
myStatementDecorator = builder.myStatementDecorator;
Expand All @@ -130,37 +133,47 @@ private CASLockFactory(final Builder builder)

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType))
{
mySerialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
.value(COLUMN_METADATA, bindMarker())
.ifNotExists()
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK)
.column(COLUMN_METADATA)
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.build()
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK)
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.ifColumn(COLUMN_NODE).isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel);
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK)
.setColumn(COLUMN_NODE, bindMarker())
.setColumn(COLUMN_METADATA, bindMarker())
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.ifColumn(COLUMN_NODE).isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel);
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY)
.value(COLUMN_RESOURCE, bindMarker())
Expand Down Expand Up @@ -285,6 +298,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return mySerialConsistencyLevel;
}

public static Builder builder()
{
return new Builder();
Expand All @@ -298,6 +317,7 @@ public static class Builder
private HostStates myHostStates;
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private ConsistencyType myConsistencyType;

public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
{
Expand All @@ -323,6 +343,12 @@ public final Builder withKeyspaceName(final String keyspaceName)
return this;
}

public final Builder withConsistencySerial(final ConsistencyType consistencyType)
{
myConsistencyType = consistencyType;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.utils;

public enum ConsistencyType
{
DEFAULT,
LOCAL,
SERIAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -60,6 +62,7 @@

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import net.jcip.annotations.NotThreadSafe;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -367,6 +370,88 @@ public boolean getRemoteRouting()
.build());
}

@Test
public void testRemoteRoutingTrueWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(true);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testRemoteRoutingFalseWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(false);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testLocalSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.LOCAL)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.SERIAL)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

private void assertPriorityListEmpty(String resource)
{
assertThat(getPriorities(resource)).isEmpty();
Expand Down

0 comments on commit f07beda

Please sign in to comment.