Skip to content

Commit

Permalink
Ericsson#699 Reload nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Chandler committed Oct 14, 2024
1 parent de46adc commit 4aaadf0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,60 +54,64 @@ List<NodeChangeRecord> compareNodeLists(final List<Node> oldNodes, final List<No
{
if (newNode == null)
{
LOG.info("Node has been removed, Node id: {} ", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE));
oldNode = getNode(oldIterator);
oldNode = processNodeRemoved(oldNode, changesList, oldIterator);
}
else
{
if (oldNode.getHostId().equals(newNode.getHostId()))
{
// same host id, now check the ipaddress is still the same
if (!oldNode.getListenAddress().equals(newNode.getListenAddress()))
{
LOG.info("Node id {}, has a different ipaddress, it was {}, it is now {} ", oldNode.getHostId(), oldNode.getListenAddress(), newNode.getListenAddress());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.UPDATE));
}
checkIPAddress(oldNode, newNode, changesList);
oldNode = getNode(oldIterator);
newNode = getNode(newIterator);
}
else
{
if (oldNode.getHostId().compareTo(newNode.getHostId()) == 1)
if (oldNode.getHostId().compareTo(newNode.getHostId()) > 0)
{
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT));
newNode = getNode(newIterator);
newNode = processNodeAdded(newNode, changesList, newIterator);
}
else
{
LOG.info("Node has been removed, Node id: {}", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE));
oldNode = getNode(oldIterator);
oldNode = processNodeRemoved(oldNode, changesList, oldIterator);
}
}
}
}
while (newNode != null)
{
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT));
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
newNode = getNode(newIterator);
newNode = processNodeAdded(newNode, changesList, newIterator);
}
return changesList;
}

private Node processNodeRemoved(Node oldNode, List<NodeChangeRecord> changesList, Iterator<Node> oldIterator) {
LOG.info("Node has been removed, Node id: {}", oldNode.getHostId());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.DELETE));
return getNode(oldIterator);
}

private Node processNodeAdded(Node newNode, List<NodeChangeRecord> changesList, Iterator<Node> newIterator) {
LOG.info("Node has been added, Node id: {}", newNode.getHostId());
changesList.add(new NodeChangeRecord(newNode, NodeChangeRecord.NodeChangeType.INSERT));
return getNode(newIterator);
}

private static void checkIPAddress(Node oldNode, Node newNode, List<NodeChangeRecord> changesList) {
// same host id, now check the ipaddress is still the same
if (!oldNode.getListenAddress().equals(newNode.getListenAddress()))
{
LOG.info("Node id {}, has a different ipaddress, it was {}, it is now {} ", oldNode.getHostId(), oldNode.getListenAddress(), newNode.getListenAddress());
changesList.add(new NodeChangeRecord(oldNode, NodeChangeRecord.NodeChangeType.UPDATE));
}
}

private Node getNode(final Iterator<Node> iterator)
{
Node node;
Node node = null;
if (iterator.hasNext())
{
node = iterator.next();
}
else
{
node = null;
}
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ void reloadNodes()
{
List<Node> oldNodes = myDistributedNativeConnectionProvider.getNodes();
List<Node> newNodes = myDistributedNativeConnectionProvider.reloadNodes();
CqlSession cqlSession = myDistributedNativeConnectionProvider.getCqlSession();
List<NodeChangeRecord> nodeChangeList = nodeListComparator.compareNodeLists(oldNodes, newNodes);
if (!nodeChangeList.isEmpty())
{
Expand All @@ -91,32 +90,40 @@ void reloadNodes()
NodeChangeRecord nodeChangeRecord = iterator.next();
if (nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.INSERT)
{
myEccNodesSync.verifyAcquireNode(nodeChangeRecord.getNode());
try
{
myJmxConnectionProvider.add(nodeChangeRecord.getNode());
}
catch (IOException e)
{
LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId());
}
processInsertRecord(nodeChangeRecord);
}
if (nodeChangeRecord.getType() == NodeChangeRecord.NodeChangeType.DELETE)
{
myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(), nodeChangeRecord.getNode().getHostId());
try
{
myJmxConnectionProvider.close(nodeChangeRecord.getNode().getHostId());
}
catch (IOException e)
{
LOG.info("Node {} JMX connection removal failed", nodeChangeRecord.getNode().getHostId());
}
processDeleteRecord(nodeChangeRecord);
}
}
}
}

private void processDeleteRecord(NodeChangeRecord nodeChangeRecord) {
myEccNodesSync.deleteNodeStatus(nodeChangeRecord.getNode().getDatacenter(), nodeChangeRecord.getNode().getHostId());
try
{
myJmxConnectionProvider.close(nodeChangeRecord.getNode().getHostId());
}
catch (IOException e)
{
LOG.info("Node {} JMX connection removal failed", nodeChangeRecord.getNode().getHostId());
}
}

private void processInsertRecord(NodeChangeRecord nodeChangeRecord) {
myEccNodesSync.verifyAcquireNode(nodeChangeRecord.getNode());
try
{
myJmxConnectionProvider.add(nodeChangeRecord.getNode());
}
catch (IOException e)
{
LOG.info("Node {} JMX connection failed", nodeChangeRecord.getNode().getHostId());
}
}

/***
* Shutsdown the scheduler.
*/
Expand Down

0 comments on commit 4aaadf0

Please sign in to comment.