Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serial Consistency Separation for ecChronos 4.0 - Issue #633 #635

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Version 4.0.6 (Not yet released)

* Separate serial consistency configuration from remoteRouting functionality - Issue #633

### Merged from 1.2

* Fix calculation of tokens per repair - Issue #570
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public DefaultNativeConnectionProvider(final Config config,
String host = nativeConfig.getHost();
int port = nativeConfig.getPort();
boolean remoteRouting = nativeConfig.getRemoteRouting();
String consistencySerial = nativeConfig.getConsistencySerial();
Security.CqlSecurity cqlSecurity = cqlSecuritySupplier.get();
boolean authEnabled = cqlSecurity.getCredentials().isEnabled();
boolean tlsEnabled = cqlSecurity.getTls().isEnabled();
Expand All @@ -72,6 +73,7 @@ public DefaultNativeConnectionProvider(final Config config,
.withLocalhost(host)
.withPort(port)
.withRemoteRouting(remoteRouting)
.withConsistencySerial(consistencySerial)
.withAuthProvider(authProvider)
.withSslEngineFactory(sslEngineFactory)
.withMetricsEnabled(config.getStatistics().isEnabled())
Expand Down Expand Up @@ -144,6 +146,12 @@ public final boolean getRemoteRouting()
return myLocalNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myLocalNativeConnectionProvider.getSerialConsistency();
}

@Override
public final void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.DefaultRepairConfigurationProvider;
import com.ericsson.bss.cassandra.ecchronos.fm.RepairFaultReporter;
import com.ericsson.bss.cassandra.ecchronos.fm.impl.LoggingFaultReporter;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.ApplicationContext;

Expand Down Expand Up @@ -324,6 +325,7 @@ public static class NativeConnection extends Connection<NativeConnectionProvider

private Class<? extends StatementDecorator> decoratorClass = NoopStatementDecorator.class;
private boolean remoteRouting = true;
private String consistencySerial = "DEFAULT";

public NativeConnection()
{
Expand Down Expand Up @@ -362,6 +364,16 @@ public final void setRemoteRouting(final boolean aRemoteRouting)
this.remoteRouting = aRemoteRouting;
}

public final String getConsistencySerial()
{
return consistencySerial;
}

public final void setConsistencySerial(final String aConsistencySerial)
{
this.consistencySerial = aConsistencySerial;
}

@Override
protected final Class<?>[] expectedConstructor()
{
Expand Down
10 changes: 10 additions & 0 deletions application/src/main/resources/ecc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ connection:
## instead SERIAL consistency will be used for those request.
##
remoteRouting: true
# # Allow to override consistency level for LWT (lightweight transactions). Possible values are:
# # "DEFAULT" - Use consistency level based on remoteRouting.
# # "SERIAL" - Use SERIAL consistency for LWT regardless of remoteRouting.
# # "LOCAL_SERIAL" - Use LOCAL_SERIAL consistency for LWT regardless of remoteRouting.
##
# # if you use remoteRouting: false and LOCAL_SERIAL then all locks will be taken locally
# # in DC. I.e There's a risk that multiple nodes in different datacenters will be able to lock the
# # same nodes causing multiple repairs on the same range/node at the same time.
##
consistencySerial: "DEFAULT"
jmx:
##
## Host and port properties for JMX.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void testAllValues() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("127.0.0.2");
assertThat(nativeConnection.getPort()).isEqualTo(9100);
assertThat(nativeConnection.getRemoteRouting()).isFalse();
assertThat(nativeConnection.getConsistencySerial().equals("LOCAL")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.SECONDS)).isEqualTo(5);
assertThat(nativeConnection.getProviderClass()).isEqualTo(TestNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(TestCertificateHandler.class);
Expand Down Expand Up @@ -163,6 +164,7 @@ public void testWithDefaultFile() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();
assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -246,6 +248,7 @@ public void testDefault() throws Exception
assertThat(nativeConnection.getHost()).isEqualTo("localhost");
assertThat(nativeConnection.getPort()).isEqualTo(9042);
assertThat(nativeConnection.getRemoteRouting()).isTrue();
assertThat(nativeConnection.getConsistencySerial().equals("DEFAULT")).isTrue();
assertThat(nativeConnection.getTimeout().getConnectionTimeout(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(nativeConnection.getProviderClass()).isEqualTo(DefaultNativeConnectionProvider.class);
assertThat(nativeConnection.getCertificateHandlerClass()).isEqualTo(ReloadingCertificateHandler.class);
Expand Down Expand Up @@ -354,6 +357,12 @@ public boolean getRemoteRouting()
{
throw new UnsupportedOperationException();
}

@Override
public String getSerialConsistency()
{
throw new UnsupportedOperationException();
}
}

public static class TestCertificateHandler implements CertificateHandler
Expand Down
1 change: 1 addition & 0 deletions application/src/test/resources/all_set.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ connection:
certificateHandler: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestCertificateHandler
decoratorClass: com.ericsson.bss.cassandra.ecchronos.application.config.TestConfig$TestStatementDecorator
remoteRouting: false
consistencySerial: "LOCAL"
jmx:
host: 127.0.0.3
port: 7100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,18 @@ public final class LocalNativeConnectionProvider implements NativeConnectionProv
private final CqlSession mySession;
private final Node myLocalNode;
private final boolean myRemoteRouting;
private final String mySerialConsistencyLevel;

private LocalNativeConnectionProvider(final CqlSession session, final Node node, final boolean remoteRouting)
private LocalNativeConnectionProvider(
final CqlSession session,
final Node node,
final boolean remoteRouting,
final String serialConsistencyLevel)
{
mySession = session;
myLocalNode = node;
myRemoteRouting = remoteRouting;
mySerialConsistencyLevel = serialConsistencyLevel;
}

@Override
Expand All @@ -97,6 +103,12 @@ public boolean getRemoteRouting()
return myRemoteRouting;
}

@Override
public String getSerialConsistency()
{
return mySerialConsistencyLevel;
}

@Override
public void close()
{
Expand All @@ -115,6 +127,7 @@ public static class Builder
private String myLocalhost = DEFAULT_LOCAL_HOST;
private int myPort = DEFAULT_NATIVE_PORT;
private boolean myRemoteRouting = true;
private String mySerialConsistency = "DEFAULT";
private boolean myIsMetricsEnabled = true;
private AuthProvider myAuthProvider = null;
private SslEngineFactory mySslEngineFactory = null;
Expand All @@ -139,6 +152,12 @@ public final Builder withRemoteRouting(final boolean remoteRouting)
return this;
}

public final Builder withConsistencySerial(final String serialConsistency)
{
mySerialConsistency = serialConsistency;
return this;
}

public final Builder withAuthProvider(final AuthProvider authProvider)
{
this.myAuthProvider = authProvider;
Expand Down Expand Up @@ -173,7 +192,7 @@ public final LocalNativeConnectionProvider build()
{
CqlSession session = createSession(this);
Node node = resolveLocalhost(session, localEndPoint());
return new LocalNativeConnectionProvider(session, node, myRemoteRouting);
return new LocalNativeConnectionProvider(session, node, myRemoteRouting, mySerialConsistency);
}

private EndPoint localEndPoint()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public final synchronized void activate(final Configuration configuration)
String localhost = configuration.localHost();
int port = configuration.nativePort();
boolean remoteRouting = configuration.remoteRouting();
String serialConsistency = configuration.serialConsistency();

LocalNativeConnectionProvider.Builder builder = LocalNativeConnectionProvider.builder()
.withLocalhost(localhost)
.withPort(port)
.withRemoteRouting(remoteRouting);
.withRemoteRouting(remoteRouting)
.withConsistencySerial(serialConsistency);

if (!configuration.credentialsFile().isEmpty())
{
Expand Down Expand Up @@ -100,6 +102,12 @@ public final boolean getRemoteRouting()
return myDelegateNativeConnectionProvider.getRemoteRouting();
}

@Override
public final String getSerialConsistency()
{
return myDelegateNativeConnectionProvider.getSerialConsistency();
}

@ObjectClassDefinition
public @interface Configuration
{
Expand All @@ -117,5 +125,8 @@ public final boolean getRemoteRouting()

@AttributeDefinition(name = "Remote routing", description = "Enables remote routing between datacenters")
boolean remoteRouting() default true;

@AttributeDefinition(name = "Serial consistency", description = "Define serial consistency level used")
String serialConsistency() default "DEFAULT";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface NativeConnectionProvider extends Closeable

boolean getRemoteRouting();

String getSerialConsistency();

@Override
default void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public final class CASLockFactory implements LockFactory, Closeable
private final StatementDecorator myStatementDecorator;
private final HostStates myHostStates;
private final boolean myRemoteRouting;
private final String mySerialConsistency;

private final CqlSession mySession;
private final String myKeyspaceName;
Expand All @@ -116,6 +117,8 @@ public final class CASLockFactory implements LockFactory, Closeable
private final PreparedStatement myRemoveLockPriorityStatement;
private final LockCache myLockCache;

private final ConsistencyLevel serialConsistencyLevel;

private CASLockFactory(final Builder builder)
{
myStatementDecorator = builder.myStatementDecorator;
Expand All @@ -127,12 +130,22 @@ private CASLockFactory(final Builder builder)

mySession = builder.myNativeConnectionProvider.getSession();
myRemoteRouting = builder.myNativeConnectionProvider.getRemoteRouting();
mySerialConsistency = builder.myNativeConnectionProvider.getSerialConsistency();

verifySchemasExists();

ConsistencyLevel serialConsistencyLevel = myRemoteRouting
if ("DEFAULT".equals(mySerialConsistency))
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
{
serialConsistencyLevel = myRemoteRouting
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
else
{
serialConsistencyLevel = "LOCAL".equals(mySerialConsistency)
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
? ConsistencyLevel.LOCAL_SERIAL
: ConsistencyLevel.SERIAL;
}
SimpleStatement insertLockStatement = QueryBuilder.insertInto(myKeyspaceName, TABLE_LOCK)
.value(COLUMN_RESOURCE, bindMarker())
.value(COLUMN_NODE, bindMarker())
Expand Down Expand Up @@ -285,6 +298,12 @@ UUID getHostId()
return myUuid;
}

@VisibleForTesting
ConsistencyLevel getSerialConsistencyLevel()
{
return serialConsistencyLevel;
}

public static Builder builder()
{
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public boolean getRemoteRouting()
{
return true;
}

@Override
public String getSerialConsistency(){
return "DEFAULT";
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
}
};
}

Expand Down
Loading
Loading