Skip to content

Commit

Permalink
Introduce ConsistencyType and change consistencySerial to lock config
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorCavichioli committed Feb 20, 2024
1 parent 5271060 commit 6ddfa2e
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public DefaultNativeConnectionProvider(final Config config,
String host = nativeConfig.getHost();
int port = nativeConfig.getPort();
boolean remoteRouting = nativeConfig.getRemoteRouting();
String consistencySerial = nativeConfig.getConsistencySerial();
Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get();
boolean authEnabled = cqlSecurity.getCredentials().isEnabled();
boolean tlsEnabled = cqlSecurity.getTls().isEnabled();
Expand All @@ -73,7 +72,6 @@ public DefaultNativeConnectionProvider(final Config config,
.withLocalhost(host)
.withPort(port)
.withRemoteRouting(remoteRouting)
.withConsistencySerial(consistencySerial)
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory)
.withMetricsEnabled(config.getStatistics().isEnabled())
Expand Down Expand Up @@ -146,12 +144,6 @@ public final boolean getRemoteRouting()
return myLocalNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myLocalNativeConnectionProvider.getSerialConsistency();
}

@Override
public final void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public ECChronosInternals(final Config configuration,
.withHostStates(myHostStatesImpl)
.withStatementDecorator(statementDecorator)
.withKeyspaceName(configuration.getLockFactory().getCas().getKeyspace())
.withConsistencySerial(configuration.getLockFactory().getCas().getConsistencySerial())
.build();

Node node = nativeConnectionProvider.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.NativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;

@SuppressWarnings({"checkstyle:methodname", "checkstyle:membername"})
public class Config
Expand Down Expand Up @@ -325,7 +326,6 @@ public static class NativeConnection extends Connection<NativeConnectionProvider

private Class<? extends StatementDecorator> decoratorClass = NoopStatementDecorator.class;
private boolean remoteRouting = true;
private String consistencySerial = "DEFAULT";

public NativeConnection()
{
Expand Down Expand Up @@ -364,16 +364,6 @@ public final void setRemoteRouting(final boolean aRemoteRouting)
this.remoteRouting = aRemoteRouting;
}

public final String getConsistencySerial()
{
return consistencySerial;
}

public final void setConsistencySerial(final String aConsistencySerial)
{
this.consistencySerial = aConsistencySerial;
}

@Override
protected final Class<?>[] expectedConstructor()
{
Expand Down Expand Up @@ -747,6 +737,7 @@ public final void setCas(final CasLockFactoryConfig aCas)
public static class CasLockFactoryConfig
{
private String keyspace = "ecchronos";
private ConsistencyType consistencySerial = ConsistencyType.DEFAULT;

public final String getKeyspace()
{
Expand All @@ -757,7 +748,17 @@ public final void setKeyspace(final String aKeyspace)
{
this.keyspace = aKeyspace;
}
}

public final ConsistencyType getConsistencySerial()
{
return consistencySerial;
}

public final void setConsistencySerial(final String aConsistencySerial)
{
consistencySerial = ConsistencyType.valueOf(aConsistencySerial.toUpperCase(Locale.US));
}
}

public static class RunPolicyConfig
{
Expand Down
21 changes: 11 additions & 10 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@ connection:
## instead SERIAL consistency will be used for those request.
##
remoteRouting: true
# # Allow to override consistency level for LWT (lightweight transactions). Possible values are:
# # "DEFAULT" - Use consistency level based on remoteRouting.
# # "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
# # "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
# # if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
# # in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
# # same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"
jmx:
##
## Host and port properties for JMX.
Expand Down Expand Up @@ -231,6 +221,17 @@ lock_factory:
## The keyspace used for the CAS lock factory tables.
##
keyspace: ecchronos
##
## Allow to override consistency level for LWT (lightweight transactions). Possible values are:
## "DEFAULT" - Use consistency level based on remoteRouting.
## "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
## "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
## if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
## in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
## same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairOptions;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.ericsson.bss.cassandra.ecchronos.core.utils.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.utils.UnitConverter;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
Expand Down Expand Up @@ -71,7 +72,6 @@ public void testAllValues() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("127.0.0.2");
assertThat(nativeConnection.getPort()).isEqualTo(9100);
assertThat(nativeConnection.getRemoteRouting()).isFalse();
assertThat(nativeConnection.getConsistencySerial().equals("LOCAL")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.SECONDS)).isEqualTo(5);
assertThat(nativeConnection.getProviderClass()).isEqualTo(TestNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(TestCertificateHandler.class);
Expand Down Expand Up @@ -136,6 +136,7 @@ public void testAllValues() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecc");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.LOCAL)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecc");
Expand Down Expand Up @@ -164,7 +165,6 @@ public void testWithDefaultFile() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();
assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -220,6 +220,7 @@ public void testWithDefaultFile() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -248,7 +249,6 @@ public void testDefault() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();
assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -303,6 +303,7 @@ public void testDefault() throws Exception

Config.LockFactoryConfig lockFactoryConfig = config.getLockFactory();
assertThat(lockFactoryConfig.getCas().getKeyspace()).isEqualTo("ecchronos");
assertThat(lockFactoryConfig.getCas().getConsistencySerial().equals(ConsistencyType.DEFAULT)).isTrue();

Config.RunPolicyConfig runPolicyConfig = config.getRunPolicy();
assertThat(runPolicyConfig.getTimeBased().getKeyspace()).isEqualTo("ecchronos");
Expand Down Expand Up @@ -357,12 +358,6 @@ public boolean getRemoteRouting()
{
throw new UnsupportedOperationException();
}

@Override
public String getSerialConsistency()
{
throw new UnsupportedOperationException();
}
}

public static class TestCertificateHandler implements CertificateHandler
Expand Down
2 changes: 1 addition & 1 deletion application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ connection:
certificateHandler: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestCertificateHandler
decoratorClass: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestStatementDecorator
remoteRouting: false
consistencySerial: "LOCAL"
jmx:
host: 127.0.0.3
port: 7100
Expand Down Expand Up @@ -77,6 +76,7 @@ statistics:
lock_factory:
cas:
keyspace: ecc
consistencySerial: "LOCAL"

run_policy:
time_based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,12 @@ public final class LocalNativeConnectionProvider implements NativeConnectionProv
private final CqlSession mySession;
private final Node myLocalNode;
private final boolean myRemoteRouting;
private final String mySerialConsistencyLevel;

private LocalNativeConnectionProvider(
final CqlSession session,
final Node node,
final boolean remoteRouting,
final String serialConsistencyLevel)
private LocalNativeConnectionProvider(final CqlSession session, final Node node, final boolean remoteRouting)
{
mySession = session;
myLocalNode = node;
myRemoteRouting = remoteRouting;
mySerialConsistencyLevel = serialConsistencyLevel;
}

@Override
Expand All @@ -103,12 +97,6 @@ public boolean getRemoteRouting()
return myRemoteRouting;
}

@Override
public String getSerialConsistency()
{
return mySerialConsistencyLevel;
}

@Override
public void close()
{
Expand All @@ -127,7 +115,6 @@ public static class Builder
private String myLocalhost = DEFAULT_LOCAL_HOST;
private int myPort = DEFAULT_NATIVE_PORT;
private boolean myRemoteRouting = true;
private String mySerialConsistency = "DEFAULT";
private boolean myIsMetricsEnabled = true;
private AuthProvider myAuthProvider = null;
private SslEngineFactory mySslEngineFactory = null;
Expand All @@ -152,12 +139,6 @@ public final Builder withRemoteRouting(final boolean remoteRouting)
return this;
}

public final Builder withConsistencySerial(final String serialConsistency)
{
mySerialConsistency = serialConsistency;
return this;
}

public final Builder withAuthProvider(final AuthProvider authProvider)
{
this.myAuthProvider = authProvider;
Expand Down Expand Up @@ -192,7 +173,7 @@ public final LocalNativeConnectionProvider build()
{
CqlSession session = createSession(this);
Node node = resolveLocalhost(session, localEndPoint());
return new LocalNativeConnectionProvider(session, node, myRemoteRouting, mySerialConsistency);
return new LocalNativeConnectionProvider(session, node, myRemoteRouting);
}

private EndPoint localEndPoint()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ public final synchronized void activate(final Configuration configuration)
String localhost = configuration.localHost();
int port = configuration.nativePort();
boolean remoteRouting = configuration.remoteRouting();
String serialConsistency = configuration.serialConsistency();

LocalNativeConnectionProvider.Builder builder = LocalNativeConnectionProvider.builder()
.withLocalhost(localhost)
.withPort(port)
.withRemoteRouting(remoteRouting)
.withConsistencySerial(serialConsistency);
.withRemoteRouting(remoteRouting);

if (!configuration.credentialsFile().isEmpty())
{
Expand Down Expand Up @@ -102,12 +100,6 @@ public final boolean getRemoteRouting()
return myDelegateNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myDelegateNativeConnectionProvider.getSerialConsistency();
}

@ObjectClassDefinition
public @interface Configuration
{
Expand All @@ -125,8 +117,5 @@ public final String getSerialConsistency()

@AttributeDefinition(name = "Remote routing", description = "Enables remote routing between datacenters")
boolean remoteRouting() default true;

@AttributeDefinition(name = "Serial consistency", description = "Define serial consistency level used")
String serialConsistency() default "DEFAULT";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public interface NativeConnectionProvider extends Closeable

boolean getRemoteRouting();

String getSerialConsistency();

@Override
default void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.StatementDecorator;
import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.core.scheduling.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.utils.ConsistencyType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,6 +81,7 @@
* WITH default_time_to_live = 600 AND gc_grace_seconds = 0;
* </pre>
*/
@SuppressWarnings({"PMD.GodClass", "PMD.TooManyFields", "PMD.SingularField", "PMD.ExcessiveMethodLength"})
public final class CASLockFactory implements LockFactory, Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(CASLockFactory.class);
Expand All @@ -104,7 +106,7 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;
private final String mySerialConsistency;
private final ConsistencyType mySerialConsistency;

private final CqlSession mySession;
private final String myKeyspaceName;
Expand All @@ -130,22 +132,23 @@ private CASLockFactory(final Builder builder)

mySession = builder.myNativeConnectionProvider.getSession();
myRemoteRouting = builder.myNativeConnectionProvider.getRemoteRouting();
mySerialConsistency = builder.myNativeConnectionProvider.getSerialConsistency();
mySerialConsistency = builder.myConsistencyType;

verifySchemasExists();

if ("DEFAULT".equals(mySerialConsistency))
if (ConsistencyType.DEFAULT.equals(mySerialConsistency))
{
serialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
serialConsistencyLevel = "LOCAL".equals(mySerialConsistency)
serialConsistencyLevel = ConsistencyType.LOCAL.equals(mySerialConsistency)
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
Expand Down Expand Up @@ -317,6 +320,7 @@ public static class Builder
private HostStates myHostStates;
private StatementDecorator myStatementDecorator;
private String myKeyspaceName = DEFAULT_KEYSPACE_NAME;
private ConsistencyType myConsistencyType;

public final Builder withNativeConnectionProvider(final NativeConnectionProvider nativeConnectionProvider)
{
Expand All @@ -342,6 +346,12 @@ public final Builder withKeyspaceName(final String keyspaceName)
return this;
}

public final Builder withConsistencySerial(final ConsistencyType consistencyType)
{
myConsistencyType = consistencyType;
return this;
}

public final CASLockFactory build()
{
if (myNativeConnectionProvider == null)
Expand Down
Loading

0 comments on commit 6ddfa2e

Please sign in to comment.