Skip to content

Commit

Permalink
Prepare ClusterStorage for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ilia1243 committed Jan 17, 2024
1 parent c636753 commit 7d35c86
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 28 deletions.
9 changes: 6 additions & 3 deletions kubemarine/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ def create_connection_pool(self, hosts: List[str]) -> ConnectionPool:
def _create_connection_pool(self, nodes: Dict[str, dict], gateway_nodes: Dict[str, dict], hosts: List[str]) -> ConnectionPool:
return ConnectionPool(nodes, gateway_nodes, hosts)

def _create_cluster_storage(self) -> utils.ClusterStorage:
return utils.ClusterStorage(self)

@property
def log(self) -> log.EnhancedLogger:
return self._logger
Expand Down Expand Up @@ -769,10 +772,10 @@ def make_finalized_inventory(self, finalization_functions: List[Callable[['Kuber

def preserve_inventory(self) -> None:
self.log.debug("Start preserving of the information about the procedure.")
cluster_storage = utils.ClusterStorage(self)
cluster_storage = self._create_cluster_storage()
cluster_storage.make_dir()
if self.context.get('initial_procedure') == 'add_node':
cluster_storage.upload_info_new_control_planes()
cluster_storage.collect_procedure_info()
cluster_storage.compress_and_upload_archive()
cluster_storage.rotation_file()
cluster_storage.compress_archive()
cluster_storage.upload_and_rotate()
9 changes: 5 additions & 4 deletions kubemarine/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,16 @@ def _post_process_actions_group(resources: res.DynamicResources, successfully_pe
if recreate_inventory:
resources.recreate_inventory()

if (resources.is_cluster_initialized()
and resources.context['preserve_inventory']
and not resources.context['execution_arguments'].get('without_act', False)):
if resources.is_cluster_initialized():
# If the cluster is initialized, it is initialized to at least LIGHT state.
# Below code only switches it to that state without enrichment.
cluster = resources.cluster(c.EnrichmentStage.LIGHT)
cluster.context['successfully_performed'] = successfully_performed
cluster.context['status'] = 'failed' if failed else 'successful'
cluster.preserve_inventory()

if (resources.context['preserve_inventory']
and not resources.context['execution_arguments'].get('without_act', False)):
cluster.preserve_inventory()


def run_tasks(resources: res.DynamicResources, tasks: dict, cumulative_points: dict = None,
Expand Down
37 changes: 21 additions & 16 deletions kubemarine/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,30 +591,40 @@ def __init__(self, cluster: object):
from kubemarine.core.cluster import KubernetesCluster
self.cluster = cast(KubernetesCluster, cluster)
self.dir_path = "/etc/kubemarine/procedures/"
self.dir_name = ''
self.dir_location = ''
self.local_archive_path = ''

def make_dir(self) -> None:
"""
This method creates a directory in which logs about operations on the cluster will be stored.
"""
readable_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
initial_procedure = self.cluster.context["initial_procedure"]
self.dir_name = readable_timestamp + "_" + initial_procedure + "/"
self.dir_location = self.dir_path + self.dir_name
dir_name = readable_timestamp + "_" + initial_procedure + "/"
self.dir_location = self.dir_path + dir_name
self.cluster.nodes['control-plane'].sudo(f"mkdir -p {self.dir_location} ; sudo rm {self.dir_path + 'latest_dump'} ;"
f" sudo ln -s {self.dir_location} {self.dir_path + 'latest_dump'}")

def rotation_file(self) -> None:
def upload_and_rotate(self) -> None:
"""
This method packs files with logs and maintains a structured storage of logs on the cluster.
This method uploads and unpacks the archive,
then packs files with logs and maintains a structured storage of logs on the cluster.
"""
control_planes = self.cluster.nodes["control-plane"]

self.cluster.log.debug('Uploading archive with preserved information about the procedure.')
remote_archive = self.dir_location + "local.tar.gz"
control_planes.put(self.local_archive_path, remote_archive, sudo=True)
control_planes.sudo(
f'tar -C {self.dir_location} -xzv --no-same-owner -f {remote_archive} && '
f'sudo rm -f {remote_archive} ')

not_pack_file = self.cluster.inventory['procedure_history']['archive_threshold']
delete_old = self.cluster.inventory['procedure_history']['delete_threshold']

command = f'ls {self.dir_path} | grep -v latest_dump'
node_group_results = self.cluster.nodes["control-plane"].sudo(command)
with node_group_results.get_group().new_executor() as exe:
node_group_results = control_planes.sudo(command)
with control_planes.new_executor() as exe:
for control_plane in exe.group.get_ordered_members_list():
result = node_group_results[control_plane.get_host()]
files = result.stdout.split()
Expand All @@ -628,25 +638,20 @@ def rotation_file(self) -> None:
elif i >= delete_old:
control_plane.sudo(f'rm -rf {self.dir_path + file}')

def compress_and_upload_archive(self) -> None:
def compress_archive(self) -> None:
"""
This method compose dump files and sends the collected files to the nodes.
This method compose dump files in the local archive.
"""
context = self.cluster.context
archive = get_dump_filepath(context, "local.tar.gz")
with tarfile.open(archive, "w:gz") as tar:
self.local_archive_path = get_dump_filepath(context, "local.tar.gz")
with tarfile.open(self.local_archive_path, "w:gz") as tar:
for name in ClusterStorage.PRESERVED_DUMP_FILES:
source = get_dump_filepath(context, name)
if os.path.exists(source):
tar.add(source, 'dump/' + name)
tar.add(context['execution_arguments']['config'], 'cluster.yaml')
tar.add(get_version_filepath(), 'version')

self.cluster.log.debug('Uploading archive with preserved information about the procedure.')
self.cluster.nodes['control-plane'].put(archive, self.dir_location + 'local.tar.gz', sudo=True)
self.cluster.nodes['control-plane'].sudo(f'tar -C {self.dir_location} -xzv --no-same-owner -f {self.dir_location + "local.tar.gz"} && '
f'sudo rm -f {self.dir_location + "local.tar.gz"} ')

def collect_procedure_info(self) -> None:
"""
This method collects information about the type of procedure and the version of the tool we are working with.
Expand Down
24 changes: 20 additions & 4 deletions kubemarine/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from kubemarine import system, procedures
from kubemarine.core.cluster import KubernetesCluster, _AnyConnectionTypes, EnrichmentStage
from kubemarine.core import connections, static, errors
from kubemarine.core import connections, static, errors, utils
from kubemarine.core.connections import ConnectionPool
from kubemarine.core.executor import RunnersResult, GenericResult, Token, CommandTimedOut
from kubemarine.core.group import (
Expand Down Expand Up @@ -169,18 +169,37 @@ def read_all(self, hosts: List[str], filename: str) -> Dict[str, Optional[str]]:
return result


class FakeClusterStorage(utils.ClusterStorage):
def __init__(self, cluster: object):
super().__init__(cluster)
self.uploaded_archives: List[str] = []

def make_dir(self) -> None:
pass

def upload_and_rotate(self) -> None:
self.uploaded_archives.append(self.local_archive_path)

def upload_info_new_control_planes(self) -> None:
pass


class FakeKubernetesCluster(KubernetesCluster):

def __init__(self, *args: Any, **kwargs: Any):
self.resources: FakeResources = kwargs.pop("resources")
self.fake_shell = self.resources.fake_shell
self.fake_fs = self.resources.fake_fs
self.fake_nodes_context = self.resources.fake_nodes_context
self.cluster_storage = FakeClusterStorage(self)
super().__init__(*args, **kwargs)

def _create_connection_pool(self, nodes: Dict[str, dict], gateway_nodes: Dict[str, dict], hosts: List[str]) -> ConnectionPool:
return FakeConnectionPool(nodes, gateway_nodes, hosts, self.fake_shell, self.fake_fs)

def _create_cluster_storage(self) -> utils.ClusterStorage:
return self.cluster_storage

def make_group(self, ips: Iterable[_AnyConnectionTypes]) -> FakeNodeGroup:
return FakeNodeGroup(ips, self)

Expand All @@ -190,9 +209,6 @@ def _detect_nodes_context(self) -> None:
else:
super()._detect_nodes_context()

def preserve_inventory(self) -> None:
return


class FakeResources(DynamicResources):
def __init__(self, context: dict, inventory: dict, procedure_inventory: dict = None,
Expand Down
1 change: 0 additions & 1 deletion test/unit/core/test_run_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
class RunActionsTest(unittest.TestCase):
def setUp(self) -> None:
self.context = demo.create_silent_context()
self.context['preserve_inventory'] = True
self.inventory = demo.generate_inventory(**demo.FULLHA)

def test_patch_inventory(self):
Expand Down

0 comments on commit 7d35c86

Please sign in to comment.