Skip to content

Commit

Permalink
Ericsson#699 Reload nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Chandler committed Oct 21, 2024
1 parent e32e352 commit 41b3823
Show file tree
Hide file tree
Showing 16 changed files with 292 additions and 844 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import com.ericsson.bss.cassandra.ecchronos.application.config.Config;
import com.ericsson.bss.cassandra.ecchronos.application.config.connection.AgentConnectionConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.ReloadingAuthProvider;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.Security;
import com.ericsson.bss.cassandra.ecchronos.application.spring.EccNodeStateListener;
import com.ericsson.bss.cassandra.ecchronos.connection.CertificateHandler;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.impl.builders.DistributedNativeBuilder;
Expand All @@ -49,6 +51,8 @@ public class AgentNativeConnectionProvider implements DistributedNativeConnectio

private final DistributedNativeConnectionProviderImpl myDistributedNativeConnectionProviderImpl;

private final EccNodeStateListener myNodeStateListener;

/**
* Constructs an {@code AgentNativeConnectionProvider} with the specified configuration, security supplier, and
* certificate handler.
Expand All @@ -64,6 +68,7 @@ public AgentNativeConnectionProvider(
final Config config,
final Supplier<Security.CqlSecurity> cqlSecuritySupplier,
final CertificateHandler certificateHandler

)
{
AgentConnectionConfig agentConnectionConfig = config.getConnectionConfig().getCqlConnection()
Expand All @@ -83,12 +88,14 @@ public AgentNativeConnectionProvider(
sslEngineFactory = certificateHandler;
}

myNodeStateListener = new EccNodeStateListener();
DistributedNativeBuilder nativeConnectionBuilder =
DistributedNativeConnectionProviderImpl.builder()
.withInitialContactPoints(resolveInitialContactPoints(agentConnectionConfig.getContactPoints()))
.withAgentType(agentConnectionConfig.getType().toString())
.withLocalDatacenter(agentConnectionConfig.getLocalDatacenter())
.withAuthProvider(authProvider)
.withNodeStateListener(myNodeStateListener)
.withSslEngineFactory(sslEngineFactory);
LOG.info("Preparing Agent Connection Config");
nativeConnectionBuilder = resolveAgentProviderBuilder(nativeConnectionBuilder, agentConnectionConfig);
Expand Down Expand Up @@ -300,4 +307,19 @@ public void close() throws IOException
{
myDistributedNativeConnectionProviderImpl.close();
}

@Override
public void addNode(Node myNode) {
myDistributedNativeConnectionProviderImpl.addNode(myNode);
}

@Override
public void removeNode(Node myNode) {
myDistributedNativeConnectionProviderImpl.removeNode(myNode);
}

public EccNodeStateListener getNodeStateListener()
{
return myNodeStateListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.ericsson.bss.cassandra.ecchronos.application.config.Interval;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.CqlTLSConfig;
import com.ericsson.bss.cassandra.ecchronos.application.config.security.ReloadingCertificateHandler;
Expand Down Expand Up @@ -237,7 +238,13 @@ private DistributedNativeConnectionProvider getDistributedNativeConnection(
{
Supplier<CqlTLSConfig> tlsSupplier = () -> securitySupplier.get().getCqlTlsConfig();
CertificateHandler certificateHandler = createCertificateHandler(tlsSupplier);
return new AgentNativeConnectionProvider(config, securitySupplier, certificateHandler);

AgentNativeConnectionProvider distributedNativeConnectionProvider = new AgentNativeConnectionProvider(config, securitySupplier, certificateHandler);


distributedNativeConnectionProvider.getNodeStateListener().setAgentNativeConnectionProvider(distributedNativeConnectionProvider);

return distributedNativeConnectionProvider;
}

private DistributedJmxConnectionProvider getDistributedJmxConnection(
Expand All @@ -246,8 +253,14 @@ private DistributedJmxConnectionProvider getDistributedJmxConnection(
final EccNodesSync eccNodesSync
) throws IOException
{
return new AgentJmxConnectionProvider(
AgentJmxConnectionProvider agentJmxConnectionProvider = new AgentJmxConnectionProvider(
securitySupplier, distributedNativeConnectionProvider, eccNodesSync);
EccNodeStateListener nodeStateListener = ((AgentNativeConnectionProvider)distributedNativeConnectionProvider).getNodeStateListener();

nodeStateListener.setJmxConnectionProvider(agentJmxConnectionProvider);
nodeStateListener.setEccNodesSync(eccNodesSync);

return agentJmxConnectionProvider;
}

private void refreshSecurityConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
import com.ericsson.bss.cassandra.ecchronos.application.providers.AgentNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class EccNodeStateListener implements NodeStateListener
{
private EccNodesSync myEccNodesSync;
private DistributedJmxConnectionProvider myJmxConnectionProvider;
ExecutorService service ;



AgentNativeConnectionProvider myAgentNativeConnectionProvider = null;
public EccNodeStateListener ()
{
service = Executors.newFixedThreadPool(4);
}
private static final Logger LOG = LoggerFactory.getLogger(EccNodeStateListener.class);

@Override
public void onAdd(Node node)
{

LOG.info("Node added {}", node.getHostId());
NodeAddedAction callable = new NodeAddedAction(myEccNodesSync, myJmxConnectionProvider, myAgentNativeConnectionProvider, node );
service.submit(callable);

}

@Override
public void onUp(Node node)
{
LOG.info("Node up {}", node.getHostId());
}

@Override
public void onDown(Node node)
{
LOG.info("Node Down {}", node.getHostId());
}

@Override
public void onRemove(Node node)
{
LOG.info("Node removed {}", node.getHostId());
NodeRemovedAction callable = new NodeRemovedAction(myEccNodesSync, myJmxConnectionProvider, myAgentNativeConnectionProvider, node );
service.submit(callable);
}

@Override
public void close() throws Exception
{
}

public void setJmxConnectionProvider(DistributedJmxConnectionProvider myJmxConnectionProvider)
{
this.myJmxConnectionProvider = myJmxConnectionProvider;
}

public void setAgentNativeConnectionProvider(AgentNativeConnectionProvider myAgentNativeConnectionProvider)
{
this.myAgentNativeConnectionProvider = myAgentNativeConnectionProvider;
}

public void setEccNodesSync(EccNodesSync eccNodesSync) {
this.myEccNodesSync = eccNodesSync;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.application.spring;

import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.data.sync.EccNodesSync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import com.datastax.oss.driver.api.core.metadata.Node;


public class NodeAddedAction implements Callable<Boolean> {


private static final Logger LOG = LoggerFactory.getLogger(com.ericsson.bss.cassandra.ecchronos.application.spring.NodeAddedAction.class);

private final EccNodesSync myEccNodesSync;
private final DistributedJmxConnectionProvider myJmxConnectionProvider;

private final DistributedNativeConnectionProvider myDistributedNativeConnectionProvider;

private final Node myNode;


public NodeAddedAction(EccNodesSync eccNodesSync, DistributedJmxConnectionProvider jmxConnectionProvider, DistributedNativeConnectionProvider distributedNativeConnectionProvider, Node node)
{
myEccNodesSync = eccNodesSync;
myJmxConnectionProvider = jmxConnectionProvider;
myDistributedNativeConnectionProvider = distributedNativeConnectionProvider;
myNode = node;
}


@Override
public Boolean call()
{
Boolean result = true;

LOG.info("Node Up {}", myNode.getHostId());
myEccNodesSync.verifyAcquireNode(myNode);
try
{
myJmxConnectionProvider.add(myNode);
}
catch (IOException e)
{
LOG.warn("Node {} JMX connection failed", myNode.getHostId());
result = false;
}
myDistributedNativeConnectionProvider.addNode(myNode);

return result;
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 41b3823

Please sign in to comment.