Skip to content

Commit

Permalink
Separe serial consistency from remoteRouting - Issue #633
Browse files Browse the repository at this point in the history
To decouple the consistency definition in lightweight transactions,
introduce a new property configurable by ecc.yml that allows
consistency to be overridden by the values DEFAULT/SERIAL/LOCAL.

Closes #633
  • Loading branch information
VictorCavichioli committed Feb 18, 2024
1 parent 8257966 commit 29702cd
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 40 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## Version 5.0.1 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633
* Improve hang preventing task - Issue #544
* Improve Description of unwind_ratio - Issue #628

## Version 5.0.0 (Not yet released)
## Version 5.0.0

* Build Ecchronos with Java 11 - Issue 616
* Bump logback from 1.2.10 to 1.2.13 (CVE-2023-6378) - Issue #622
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public DefaultNativeConnectionProvider(final Config config,
String host = nativeConfig.getHost();
int port = nativeConfig.getPort();
boolean remoteRouting = nativeConfig.getRemoteRouting();
String consistencySerial = nativeConfig.getConsistencySerial();
Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get();
boolean authEnabled = cqlSecurity.getCqlCredentials().isEnabled();
boolean tlsEnabled = cqlSecurity.getCqlTlsConfig().isEnabled();
Expand All @@ -73,6 +74,7 @@ public DefaultNativeConnectionProvider(final Config config,
.withLocalhost(host)
.withPort(port)
.withRemoteRouting(remoteRouting)
.withConsistencySerial(consistencySerial)
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory)
.withMetricsEnabled(config.getStatisticsConfig().isEnabled())
Expand Down Expand Up @@ -147,6 +149,12 @@ public final boolean getRemoteRouting()
return myLocalNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myLocalNativeConnectionProvider.getSerialConsistency();
}

@Override
public final void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class NativeConnection extends Connection<NativeConnectionProvider>

private Class<? extends StatementDecorator> myDecoratorClass = NoopStatementDecorator.class;
private boolean myRemoteRouting = true;
private String myConsistencySerial = "DEFAULT";

public NativeConnection()
{
Expand Down Expand Up @@ -74,6 +75,18 @@ public final void setRemoteRouting(final boolean remoteRouting)
myRemoteRouting = remoteRouting;
}

@JsonProperty("consistencySerial")
public final String getConsistencySerial()
{
return myConsistencySerial;
}

@JsonProperty("consistencySerial")
public final void setConsistencySerial(final String consistencySerial)
{
myConsistencySerial = consistencySerial;
}

@Override
protected final Class<?>[] expectedConstructor()
{
Expand Down
9 changes: 8 additions & 1 deletion application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,17 @@ connection:
## Allow routing requests directly to a remote datacenter.
## This allows locks for other datacenters to be taken in that datacenter instead of via the local datacenter.
## If clients are prevented from connecting directly to Cassandra nodes in other sites this is not possible.
## If remote routing is disabled its not possible to use LOCAL_SERIAL consistency for the locking,
## If remote routing is disabled and consistency serial uses the DEFAULT mode its not possible to use LOCAL_SERIAL consistency for the locking,
## instead SERIAL consistency will be used for those request.
##
remoteRouting: true
##
# consistencySerial value can be DEFAULT, which means that the serial consistency for lightweight transactions is based
# on the remoteRouting configurations. It is possible to define as LOCAL or SERIAL as well, the first one defines serial consistency
# as LOCAL_SERIAL which requires a quorum of replicas in the local DC, also SERIAL defines a serial consistency of SERIAL
# which requires a quorum of replicas in all DCs.
##
consistencySerial: "DEFAULT"
jmx:
##
## Host and port properties for JMX.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testAllValues() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("127.0.0.2");
assertThat(nativeConnection.getPort()).isEqualTo(9100);
assertThat(nativeConnection.getRemoteRouting()).isFalse();
assertThat(nativeConnection.getConsistencySerial().equals("LOCAL")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.SECONDS)).isEqualTo(5);
assertThat(nativeConnection.getProviderClass()).isEqualTo(TestNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(TestCertificateHandler.class);
Expand Down Expand Up @@ -185,6 +186,7 @@ public void testWithDefaultFile() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();
assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -271,6 +273,8 @@ public void testDefault() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();

assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -403,6 +407,12 @@ public boolean getRemoteRouting()
{
throw new UnsupportedOperationException();
}

@Override
public String getSerialConsistency()
{
throw new UnsupportedOperationException();
}
}

public static class TestCertificateHandler implements CertificateHandler
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ connection:
certificateHandler: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestCertificateHandler
decoratorClass: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestStatementDecorator
remoteRouting: false
consistencySerial: "LOCAL"
jmx:
host: 127.0.0.3
port: 7100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ public final class LocalNativeConnectionProvider implements NativeConnectionProv
private final CqlSession mySession;
private final Node myLocalNode;
private final boolean myRemoteRouting;

private LocalNativeConnectionProvider(final CqlSession session, final Node node, final boolean remoteRouting)
private final String mySerialConsistencyLevel;

private LocalNativeConnectionProvider(
final CqlSession session,
final Node node,
final boolean remoteRouting,
final String serialConsistencyLevel
)
{
mySession = session;
myLocalNode = node;
myRemoteRouting = remoteRouting;
mySerialConsistencyLevel = serialConsistencyLevel;
}

@Override
Expand All @@ -98,6 +105,12 @@ public boolean getRemoteRouting()
return myRemoteRouting;
}

@Override
public String getSerialConsistency()
{
return mySerialConsistencyLevel;
}

@Override
public void close()
{
Expand All @@ -116,6 +129,7 @@ public static class Builder
private String myLocalhost = DEFAULT_LOCAL_HOST;
private int myPort = DEFAULT_NATIVE_PORT;
private boolean myRemoteRouting = true;
private String mySerialConsistency = "DEFAULT";
private boolean myIsMetricsEnabled = true;
private AuthProvider myAuthProvider = null;
private SslEngineFactory mySslEngineFactory = null;
Expand All @@ -141,6 +155,12 @@ public final Builder withRemoteRouting(final boolean remoteRouting)
return this;
}

public final Builder withConsistencySerial(final String serialConsistency)
{
mySerialConsistency = serialConsistency;
return this;
}

public final Builder withAuthProvider(final AuthProvider authProvider)
{
this.myAuthProvider = authProvider;
Expand Down Expand Up @@ -181,7 +201,7 @@ public final LocalNativeConnectionProvider build()
{
CqlSession session = createSession(this);
Node node = resolveLocalhost(session, localEndPoint());
return new LocalNativeConnectionProvider(session, node, myRemoteRouting);
return new LocalNativeConnectionProvider(session, node, myRemoteRouting, mySerialConsistency);
}

private EndPoint localEndPoint()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public final synchronized void activate(final Configuration configuration)
String localhost = configuration.localHost();
int port = configuration.nativePort();
boolean remoteRouting = configuration.remoteRouting();
String serialConsistency = configuration.serialConsistency();

LocalNativeConnectionProvider.Builder builder = LocalNativeConnectionProvider.builder()
.withLocalhost(localhost)
.withPort(port)
.withRemoteRouting(remoteRouting);
.withRemoteRouting(remoteRouting)
.withConsistencySerial(serialConsistency);

if (!configuration.credentialsFile().isEmpty())
{
Expand Down Expand Up @@ -100,6 +102,12 @@ public final boolean getRemoteRouting()
return myDelegateNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myDelegateNativeConnectionProvider.getSerialConsistency();
}

@ObjectClassDefinition
public @interface Configuration
{
Expand All @@ -117,5 +125,8 @@ public final boolean getRemoteRouting()

@AttributeDefinition(name = "Remote routing", description = "Enables remote routing between datacenters")
boolean remoteRouting() default true;

@AttributeDefinition(name = "Serial consistency", description = "Define serial consistency level used")
String serialConsistency() default "DEFAULT";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface NativeConnectionProvider extends Closeable

boolean getRemoteRouting();

String getSerialConsistency();

@Override
default void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;
private final String mySerialConsistency;

private final CqlSession mySession;
private final String myKeyspaceName;
Expand All @@ -114,6 +115,8 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myRemoveLockPriorityStatement;
private final CASLockFactoryCacheContext myCasLockFactoryCacheContext;

private final ConsistencyLevel serialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
myStatementDecorator = builder.myStatementDecorator;
Expand All @@ -124,12 +127,23 @@ private CASLockFactory(final Builder builder)

mySession = builder.myNativeConnectionProvider.getSession();
myRemoteRouting = builder.myNativeConnectionProvider.getRemoteRouting();
mySerialConsistency = builder.myNativeConnectionProvider.getSerialConsistency();

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if (mySerialConsistency.equals("DEFAULT"))
{
serialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
serialConsistencyLevel = mySerialConsistency.equals("LOCAL")
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}

SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
Expand Down Expand Up @@ -320,6 +334,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return serialConsistencyLevel;
}

@VisibleForTesting
CASLockFactoryCacheContext getCasLockFactoryCacheContext()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public boolean getRemoteRouting()
{
return true;
}

@Override
public String getSerialConsistency(){
return "DEFAULT";
}
};
}

Expand Down
Loading

0 comments on commit 29702cd

Please sign in to comment.