Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ilia1243 committed Jan 24, 2024
1 parent a990934 commit d0a8a55
Show file tree
Hide file tree
Showing 14 changed files with 1,763 additions and 379 deletions.
9 changes: 6 additions & 3 deletions kubemarine/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ def create_connection_pool(self, hosts: List[str]) -> ConnectionPool:
def _create_connection_pool(self, nodes: Dict[str, dict], gateway_nodes: Dict[str, dict], hosts: List[str]) -> ConnectionPool:
return ConnectionPool(nodes, gateway_nodes, hosts)

def _create_cluster_storage(self) -> utils.ClusterStorage:
return utils.ClusterStorage(self)

@property
def log(self) -> log.EnhancedLogger:
return self._logger
Expand Down Expand Up @@ -783,10 +786,10 @@ def make_finalized_inventory(self, finalization_functions: List[Callable[['Kuber

def preserve_inventory(self) -> None:
self.log.debug("Start preserving of the information about the procedure.")
cluster_storage = utils.ClusterStorage(self)
cluster_storage = self._create_cluster_storage()
cluster_storage.make_dir()
if self.context.get('initial_procedure') == 'add_node':
cluster_storage.upload_info_new_control_planes()
cluster_storage.collect_procedure_info()
cluster_storage.compress_and_upload_archive()
cluster_storage.rotation_file()
cluster_storage.compress_archive()
cluster_storage.upload_and_rotate()
5 changes: 1 addition & 4 deletions kubemarine/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,9 @@ def run_flow(self, context: Union[dict, res.DynamicResources], print_summary: bo
resources = res.RESOURCES_FACTORY(context)

context = resources.context
args: dict = context['execution_arguments']

try:
if not args['disable_dump']:
utils.prepare_dump_directory(args['dump_location'],
reset_directory=not args['disable_dump_cleanup'])
utils.prepare_dump_directory(context)
resources.logger()
self._run(resources)
except Exception as exc:
Expand Down
45 changes: 28 additions & 17 deletions kubemarine/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ def get_elapsed_string(start: float, end: float) -> str:
return '{:02}h {:02}m {:02}s'.format(int(hours), int(minutes), int(seconds))


def prepare_dump_directory(location: str, reset_directory: bool = True) -> None:
def prepare_dump_directory(context: dict) -> None:
args: dict = context['execution_arguments']
if args['disable_dump']:
return

location = args['dump_location']
reset_directory = not args['disable_dump_cleanup']
dumpdir = os.path.join(location, 'dump')
if reset_directory and os.path.exists(dumpdir) and os.path.isdir(dumpdir):
shutil.rmtree(dumpdir)
Expand Down Expand Up @@ -595,30 +601,40 @@ def __init__(self, cluster: object):
from kubemarine.core.cluster import KubernetesCluster
self.cluster = cast(KubernetesCluster, cluster)
self.dir_path = "/etc/kubemarine/procedures/"
self.dir_name = ''
self.dir_location = ''
self.local_archive_path = ''

def make_dir(self) -> None:
"""
This method creates a directory in which logs about operations on the cluster will be stored.
"""
readable_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
initial_procedure = self.cluster.context["initial_procedure"]
self.dir_name = readable_timestamp + "_" + initial_procedure + "/"
self.dir_location = self.dir_path + self.dir_name
dir_name = readable_timestamp + "_" + initial_procedure + "/"
self.dir_location = self.dir_path + dir_name
self.cluster.nodes['control-plane'].sudo(f"mkdir -p {self.dir_location} ; sudo rm {self.dir_path + 'latest_dump'} ;"
f" sudo ln -s {self.dir_location} {self.dir_path + 'latest_dump'}")

def rotation_file(self) -> None:
def upload_and_rotate(self) -> None:
"""
This method packs files with logs and maintains a structured storage of logs on the cluster.
This method uploads and unpacks the archive,
then packs files with logs and maintains a structured storage of logs on the cluster.
"""
control_planes = self.cluster.nodes["control-plane"]

self.cluster.log.debug('Uploading archive with preserved information about the procedure.')
remote_archive = self.dir_location + "local.tar.gz"
control_planes.put(self.local_archive_path, remote_archive, sudo=True)
control_planes.sudo(
f'tar -C {self.dir_location} -xzv --no-same-owner -f {remote_archive} && '
f'sudo rm -f {remote_archive} ')

not_pack_file = self.cluster.inventory['procedure_history']['archive_threshold']
delete_old = self.cluster.inventory['procedure_history']['delete_threshold']

command = f'ls {self.dir_path} | grep -v latest_dump'
node_group_results = self.cluster.nodes["control-plane"].sudo(command)
with node_group_results.get_group().new_executor() as exe:
node_group_results = control_planes.sudo(command)
with control_planes.new_executor() as exe:
for control_plane in exe.group.get_ordered_members_list():
result = node_group_results[control_plane.get_host()]
files = result.stdout.split()
Expand All @@ -632,25 +648,20 @@ def rotation_file(self) -> None:
elif i >= delete_old:
control_plane.sudo(f'rm -rf {self.dir_path + file}')

def compress_and_upload_archive(self) -> None:
def compress_archive(self) -> None:
"""
This method compose dump files and sends the collected files to the nodes.
This method compose dump files in the local archive.
"""
context = self.cluster.context
archive = get_dump_filepath(context, "local.tar.gz")
with tarfile.open(archive, "w:gz") as tar:
self.local_archive_path = get_dump_filepath(context, "local.tar.gz")
with tarfile.open(self.local_archive_path, "w:gz") as tar:
for name in ClusterStorage.PRESERVED_DUMP_FILES:
source = get_dump_filepath(context, name)
if os.path.exists(source):
tar.add(source, 'dump/' + name)
tar.add(context['execution_arguments']['config'], 'cluster.yaml')
tar.add(get_version_filepath(), 'version')

self.cluster.log.debug('Uploading archive with preserved information about the procedure.')
self.cluster.nodes['control-plane'].put(archive, self.dir_location + 'local.tar.gz', sudo=True)
self.cluster.nodes['control-plane'].sudo(f'tar -C {self.dir_location} -xzv --no-same-owner -f {self.dir_location + "local.tar.gz"} && '
f'sudo rm -f {self.dir_location + "local.tar.gz"} ')

def collect_procedure_info(self) -> None:
"""
This method collects information about the type of procedure and the version of the tool we are working with.
Expand Down
48 changes: 42 additions & 6 deletions kubemarine/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import time
from abc import ABC
from copy import deepcopy
from typing import List, Dict, Union, Any, Optional, Mapping, Iterable, IO, Tuple, cast
from typing import List, Dict, Union, Any, Optional, Mapping, Iterable, IO, Tuple, cast, Callable

import fabric # type: ignore[import-untyped]
import invoke
import yaml

from kubemarine import system, procedures
from kubemarine.core.cluster import KubernetesCluster, _AnyConnectionTypes, EnrichmentStage
from kubemarine.core import connections, static, errors
from kubemarine.core.cluster import (
KubernetesCluster, _AnyConnectionTypes, EnrichmentStage, EnrichmentFunction, enrichment
)
from kubemarine.core import connections, static, errors, utils
from kubemarine.core.connections import ConnectionPool
from kubemarine.core.executor import RunnersResult, GenericResult, Token, CommandTimedOut
from kubemarine.core.group import (
Expand Down Expand Up @@ -170,18 +172,37 @@ def read_all(self, hosts: List[str], filename: str) -> Dict[str, Optional[str]]:
return result


class FakeClusterStorage(utils.ClusterStorage):
def __init__(self, cluster: object):
super().__init__(cluster)
self.uploaded_archives: List[str] = []

def make_dir(self) -> None:
pass

def upload_and_rotate(self) -> None:
self.uploaded_archives.append(self.local_archive_path)

def upload_info_new_control_planes(self) -> None:
pass


class FakeKubernetesCluster(KubernetesCluster):

def __init__(self, *args: Any, **kwargs: Any):
self.resources: FakeResources = kwargs.pop("resources")
self.fake_shell = self.resources.fake_shell
self.fake_fs = self.resources.fake_fs
self.fake_nodes_context = self.resources.fake_nodes_context
self.cluster_storage = FakeClusterStorage(self)
super().__init__(*args, **kwargs)

def _create_connection_pool(self, nodes: Dict[str, dict], gateway_nodes: Dict[str, dict], hosts: List[str]) -> ConnectionPool:
return FakeConnectionPool(nodes, gateway_nodes, hosts, self.fake_shell, self.fake_fs)

def _create_cluster_storage(self) -> utils.ClusterStorage:
return self.cluster_storage

def make_group(self, ips: Iterable[_AnyConnectionTypes]) -> FakeNodeGroup:
return FakeNodeGroup(ips, self)

Expand All @@ -191,9 +212,6 @@ def _detect_nodes_context(self) -> None:
else:
super()._detect_nodes_context()

def preserve_inventory(self) -> None:
return


class FakeResources(DynamicResources):
def __init__(self, context: dict, inventory: dict, procedure_inventory: dict = None,
Expand All @@ -214,6 +232,8 @@ def __init__(self, context: dict, inventory: dict, procedure_inventory: dict = N
self.finalized_inventory: dict = {}
self.make_finalized_inventory = make_finalized_inventory

self._enrichment_functions = super().enrichment_functions()

def _store_inventory(self, inventory: dict) -> None:
pass

Expand Down Expand Up @@ -246,6 +266,22 @@ def _new_cluster_instance(self, context: dict) -> FakeKubernetesCluster:
resources=self,
)

def insert_enrichment_function(self, near: EnrichmentFunction, stages: EnrichmentStage,
fn: Callable[[KubernetesCluster], Optional[dict]],
*,
procedure: str = None,
after: bool = False) -> None:

enrichment_fn = enrichment(stages, procedures=None if procedure is None else [procedure])(fn)

idx = self._enrichment_functions.index(near)
if after:
idx += 1
self._enrichment_functions.insert(idx, enrichment_fn)

def enrichment_functions(self) -> List[EnrichmentFunction]:
return self._enrichment_functions


class FakeConnection(fabric.connection.Connection): # type: ignore[misc]

Expand Down
43 changes: 38 additions & 5 deletions test/unit/core/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest

from kubemarine import demo
Expand All @@ -26,8 +24,6 @@ def get_os_family(cluster: FakeKubernetesCluster):

class KubernetesClusterTest(unittest.TestCase):

# TODO: add more tests

def setUp(self):
self.cluster = demo.new_cluster(demo.generate_inventory(**demo.FULLHA))

Expand Down Expand Up @@ -90,7 +86,44 @@ def test_remove_node_different_os_get_os_family_single(self):
cluster = demo.new_cluster(inventory, procedure_inventory=remove_node, context=context,
nodes_context=nodes_context)
self.assertEqual('debian', get_os_family(cluster),
msg="One node has different OS family and thus global OS family should be 'multiple'")
msg="The only node with different OS family is removed, "
"and global OS family should the specific remained")

def test_remove_node_different_os_get_package_associations(self):
inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED)
context = demo.create_silent_context(['fake.yaml'], procedure='remove_node')
host_different_os = inventory['nodes'][0]['address']
remained_host = inventory['nodes'][1]['address']
nodes_context = self._nodes_context_one_different_os(inventory, host_different_os)
remove_node = demo.generate_procedure_inventory('remove_node')
remove_node['nodes'] = [{"name": inventory["nodes"][0]["name"]}]
cluster = demo.new_cluster(inventory, procedure_inventory=remove_node, context=context,
nodes_context=nodes_context)
self.assertEqual('conntrack-tools',
cluster.get_package_association_for_node(host_different_os, 'conntrack', 'package_name'),
msg="Unexpected package associations of node to remove")
self.assertEqual('conntrack',
cluster.get_package_association_for_node(remained_host, 'conntrack', 'package_name'),
msg="Unexpected package associations of remained node")

def test_upgrade_get_redefined_package_associations(self):
context = demo.create_silent_context(['fake.yaml'], procedure='upgrade')
context['upgrade_step'] = 0

inventory = demo.generate_inventory(**demo.MINIHA_KEEPALIVED)
inventory['services']['kubeadm'] = {
'kubernetesVersion': 'v1.27.8'
}
upgrade = demo.generate_procedure_inventory('upgrade')
upgrade['upgrade_plan'] = ['v1.28.4']
upgrade.setdefault('v1.28.4', {})['packages'] = {
'associations': {'containerd': {'package_name': 'containerd_new'}}
}
cluster = demo.new_cluster(inventory, procedure_inventory=upgrade, context=context)
self.assertEqual('containerd_new',
cluster.get_package_association_for_node(cluster.nodes['all'].get_any_member().get_host(),
'containerd', 'package_name'),
msg="Package associations are not redefined")

def _nodes_context_one_different_os(self, inventory, host_different_os):
nodes_context = demo.generate_nodes_context(inventory, os_name='ubuntu', os_version='20.04')
Expand Down
Loading

0 comments on commit d0a8a55

Please sign in to comment.