Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serial Consistency Separation for ecChronos 4.0 - Issue #633 #635

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Version 4.0.6 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633

### Merged from 1.2

* Fix calculation of tokens per repair - Issue #570
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public ECChronosInternals(final Config configuration,
.withHostStates(myHostStatesImpl)
.withStatementDecorator(statementDecorator)
.withKeyspaceName(configuration.getLockFactory().getCas().getKeyspace())
.withConsistencySerial(configuration.getLockFactory().getCas().getConsistencySerial())
.build();

Node node = nativeConnectionProvider.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.DefaultRepairConfigurationProvider;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.fm.impl.LoggingFaultReporter;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.ApplicationContext;

import com.ericsson.bss.cassandra.ecchronos.connection.JmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

@SuppressWarnings({"checkstyle:methodname", "checkstyle:membername"})
public class Config
Expand Down Expand Up @@ -735,6 +737,7 @@ public final void setCas(final CasLockFactoryConfig aCas)
public static class CasLockFactoryConfig
{
private String keyspace = "ecchronos";
private ConsistencyType consistencySerial = ConsistencyType.DEFAULT;

public final String getKeyspace()
{
Expand All @@ -745,7 +748,17 @@ public final void setKeyspace(final String aKeyspace)
{
this.keyspace = aKeyspace;
}
}

public final ConsistencyType getConsistencySerial()
{
return consistencySerial;
}

public final void setConsistencySerial(final String aConsistencySerial)
{
consistencySerial = ConsistencyType.valueOf(aConsistencySerial.toUpperCase(Locale.US));
}
}

public static class RunPolicyConfig
{
Expand Down
11 changes: 11 additions & 0 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ lock_factory:
## The keyspace used for the CAS lock factory tables.
##
keyspace: ecchronos
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
Expand Down Expand Up @@ -135,6 +136,7 @@ public void testAllValues() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecc");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecc");
Expand Down Expand Up @@ -218,6 +220,7 @@ public void testWithDefaultFile() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -300,6 +303,7 @@ public void testDefault() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ statistics:
lock_factory:
cas:
keyspace: ecc
consistencySerial: "LOCAL"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,6 +81,7 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
* </pre>
*/
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
Expand All @@ -104,7 +106,6 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;

private final CqlSession mySession;
private final String myKeyspaceName;
private final PreparedStatement myCompeteStatement;
Expand All @@ -116,6 +117,8 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myRemoveLockPriorityStatement;
private final LockCache myLockCache;

private final ConsistencyLevel mySerialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
myStatementDecorator = builder.myStatementDecorator;
Expand All @@ -130,37 +133,47 @@ private CASLockFactory(final Builder builder)

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if (ConsistencyType.DEFAULT.equals(builder.myConsistencyType))
{
mySerialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
mySerialConsistencyLevel = ConsistencyType.LOCAL.equals(builder.myConsistencyType)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
.value(COLUMN_METADATA, bindMarker())
.ifNotExists()
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement getLockMetadataStatement = QueryBuilder.selectFrom(myKeyspaceName, TABLE_LOCK)
.column(COLUMN_METADATA)
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.build()
.setSerialConsistencyLevel(serialConsistencyLevel);
.setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement removeLockStatement = QueryBuilder.deleteFrom(myKeyspaceName, TABLE_LOCK)
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.ifColumn(COLUMN_NODE).isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel);
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement updateLockStatement = QueryBuilder.update(myKeyspaceName, TABLE_LOCK)
.setColumn(COLUMN_NODE, bindMarker())
.setColumn(COLUMN_METADATA, bindMarker())
.whereColumn(COLUMN_RESOURCE).isEqualTo(bindMarker())
.ifColumn(COLUMN_NODE).isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(serialConsistencyLevel);
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setSerialConsistencyLevel(mySerialConsistencyLevel);

SimpleStatement competeStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK_PRIORITY)
.value(COLUMN_RESOURCE, bindMarker())
Expand Down Expand Up @@ -285,6 +298,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return mySerialConsistencyLevel;
}

public static Builder builder()
{
return new Builder();
Expand All @@ -298,6 +317,7 @@ public static class Builder
private HostStates myHostStates;
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private ConsistencyType myConsistencyType;

public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
{
Expand All @@ -323,6 +343,12 @@ public final Builder withKeyspaceName(final String keyspaceName)
return this;
}

public final Builder withConsistencySerial(final ConsistencyType consistencyType)
{
myConsistencyType = consistencyType;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.utils;

public enum ConsistencyType
{
DEFAULT,
LOCAL,
SERIAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -60,6 +62,7 @@

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory.DistributedLock;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

import net.jcip.annotations.NotThreadSafe;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -367,6 +370,88 @@ public boolean getRemoteRouting()
.build());
}

@Test
public void testRemoteRoutingTrueWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(true);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(getNativeConnectionProvider())
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testRemoteRoutingFalseWithDefaultSerialConsistency()
{
Node nodeMock = mock(Node.class);
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);
when(connectionProviderMock.getRemoteRouting()).thenReturn(false);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.DEFAULT)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testLocalSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.LOCAL)
.build();

assertEquals(ConsistencyLevel.LOCAL_SERIAL, myLockFactory.getSerialConsistencyLevel());
}

@Test
public void testSerialConsistency()
{
NativeConnectionProvider connectionProviderMock = mock(NativeConnectionProvider.class);
Node nodeMock = mock(Node.class);

when(connectionProviderMock.getSession()).thenReturn(mySession);
when(connectionProviderMock.getLocalNode()).thenReturn(nodeMock);

myLockFactory = new CASLockFactory.Builder()
.withNativeConnectionProvider(connectionProviderMock)
.withHostStates(hostStates)
.withStatementDecorator(s -> s)
.withKeyspaceName(myKeyspaceName)
.withConsistencySerial(ConsistencyType.SERIAL)
.build();

assertEquals(ConsistencyLevel.SERIAL, myLockFactory.getSerialConsistencyLevel());
}

private void assertPriorityListEmpty(String resource)
{
assertThat(getPriorities(resource)).isEmpty();
Expand Down
Loading