From 44e0bb7f19d03a54a2fea44f72c4dab5d8d12663 Mon Sep 17 00:00:00 2001 From: Paolo Sacconier Date: Thu, 16 Mar 2023 19:33:57 +0100 Subject: [PATCH] fetch_and_run in python supporting multiple custom actions (#1895) * fetch_and_run in python supporting multiple custom action * python fetch_and_run integration in cookbook processing * fix system test local execution adding a mock cluster config Signed-off-by: Paolo Sacconier --- .dockerignore | 1 + .../custom_action_executor.py | 518 ++++++++++++++++++ .../recipes/custom_actions_setup.rb | 31 ++ .../recipes/init.rb | 9 +- .../templates/default/init/cfnconfig.erb | 12 +- .../templates/default/init/fetch_and_run.erb | 139 ++--- system_tests/Dockerfile.centos7 | 3 + system_tests/Dockerfile.ubuntu | 3 + system_tests/cluster-config.yaml | 21 + .../test_custom_action_executor.py | 414 ++++++++++++++ tox.ini | 3 + 11 files changed, 1034 insertions(+), 120 deletions(-) create mode 100644 cookbooks/aws-parallelcluster-config/files/default/custom_action_executor/custom_action_executor.py create mode 100644 cookbooks/aws-parallelcluster-config/recipes/custom_actions_setup.rb create mode 100644 system_tests/cluster-config.yaml create mode 100644 test/unit/custom_action_executor/test_custom_action_executor.py diff --git a/.dockerignore b/.dockerignore index 164b7b0ee..d5b5f69cf 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1,2 @@ ./.github +./.tox diff --git a/cookbooks/aws-parallelcluster-config/files/default/custom_action_executor/custom_action_executor.py b/cookbooks/aws-parallelcluster-config/files/default/custom_action_executor/custom_action_executor.py new file mode 100644 index 000000000..9f99172fe --- /dev/null +++ b/cookbooks/aws-parallelcluster-config/files/default/custom_action_executor/custom_action_executor.py @@ -0,0 +1,518 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import logging +import os +import subprocess # nosec B404 +import tempfile +from builtins import RuntimeError +from dataclasses import dataclass +from enum import Enum +from urllib.parse import urlparse + +import boto3 +import botocore +import requests +import yaml +from botocore.exceptions import ClientError, NoCredentialsError + +BOOSTRAP_ERROR_FILE = "/var/log/parallelcluster/bootstrap_error_msg" + +ERROR_MSG_SUFFIX = ( + "Please check /var/log/cfn-init.log in the head node, or check the cfn-init.log in CloudWatch logs. " + "Please refer to https://docs.aws.amazon.com/parallelcluster/latest/ug/troubleshooting-v3.html" + "#troubleshooting-v3-get-logs for more details on ParallelCluster logs." +) + +SCRIPT_LOG_NAME_FETCH_AND_RUN = "fetch_and_run" + +DOWNLOAD_SCRIPT_HTTP_TIMEOUT_SECONDS = 60 + + +@dataclass +class ScriptDefinition: + """Script definition for custom actions.""" + + url: str + args: list # type list[str] + + +@dataclass +class ExecutableScript(ScriptDefinition): + """Executable script for custom actions.""" + + step_num: int + path: str + + +class ScriptRunner: + """Performs download and execution of scripts.""" + + def __init__(self, event_name): + self.event_name = event_name + + async def download_and_execute_scripts(self, scripts): + """ + Download and execute scripts. + + :param scripts: + :return: + """ + downloaded_scripts = await asyncio.gather( + *[self._download_script(script, idx) for idx, script in enumerate(scripts, 1)] + ) + for script in downloaded_scripts: + await self._execute_script(script) + os.unlink(script.path) + + async def _download_script(self, script: ScriptDefinition, step_num=0) -> ExecutableScript: + exe_script = self._build_exe_script(script, step_num, None) + if self._is_s3_url(script.url): + return await self._download_s3_script(exe_script) + if self._is_https_url(script.url): + return await self._download_http_script(exe_script) + + raise DownloadRunError( + f"Failed to download {self.event_name} script {step_num} {script.url}, URL must be an s3 or HTTPs.", + f"Failed to download {self.event_name} script {step_num}, URL must be an s3 or HTTPs.", + ) + + @staticmethod + def _build_exe_script(script, step_num, path): + return ExecutableScript(script.url, script.args, step_num, path) + + async def _download_s3_script(self, exe_script: ExecutableScript): + s3_client = boto3.resource("s3") + bucket_name, key = self._parse_s3_url(exe_script.url) + with tempfile.NamedTemporaryFile(delete=False) as file: + try: + s3_client.Bucket(bucket_name).download_file(key, file.name) + except (NoCredentialsError, botocore.exceptions.ClientError) as err: + os.unlink(file.name) + raise DownloadRunError( + f"Failed to download {self.event_name} script {exe_script.step_num} {exe_script.url}" + f" using aws s3, cause: {err}.", + f"Failed to download {self.event_name} script {exe_script.step_num} using aws s3.", + ) from err + exe_script.path = file.name + return exe_script + + async def _download_http_script(self, exe_script: ExecutableScript): + url = exe_script.url + response = requests.get(url, timeout=DOWNLOAD_SCRIPT_HTTP_TIMEOUT_SECONDS) + if response.status_code != 200: + raise DownloadRunError( + f"Failed to download {self.event_name} script {exe_script.step_num} {url}, " + f"HTTP status code {response.status_code}.", + f"Failed to download {self.event_name} script {exe_script.step_num} via HTTP.", + ) + with tempfile.NamedTemporaryFile(delete=False) as file: + file.write(response.content) + exe_script.path = file.name + return exe_script + + async def _execute_script(self, exe_script: ExecutableScript): + # preserving error case for making the script executable + try: + subprocess.run(["chmod", "+x", exe_script.path], check=True) # nosec - trusted input + except subprocess.CalledProcessError as err: + raise DownloadRunError( + f"Failed to run {self.event_name} script {exe_script.step_num} {exe_script.url} " + f"due to a failure in making the file executable, return code: {err.returncode}.", + f"Failed to run {self.event_name} script {exe_script.step_num} " + f"due to a failure in making the file executable, return code: {err.returncode}.", + ) from err + + # execute script with it's args + try: + # arguments are provided by the user who has the privilege to create/update the cluster + subprocess.run([exe_script.path] + (exe_script.args or []), check=True) # nosec - trusted input + except subprocess.CalledProcessError as err: + raise DownloadRunError( + f"Failed to execute {self.event_name} script {exe_script.step_num} {exe_script.url}," + f" return code: {err.returncode}.", + f"Failed to execute {self.event_name} script {exe_script.step_num}, return code: {err.returncode}.", + ) from err + + @staticmethod + def _is_https_url(url): + return urlparse(url).scheme == "https" + + @staticmethod + def _is_s3_url(url): + return urlparse(url).scheme == "s3" + + @staticmethod + def _parse_s3_url(url): + parsed_url = urlparse(url) + return parsed_url.netloc, parsed_url.path.lstrip("/") + + +class LegacyEventName(Enum): + """Maps legacy events names to avoid changing script contract.""" + + ON_NODE_START = "preinstall" + ON_NODE_CONFIGURED = "postinstall" + ON_NODE_UPDATED = "postupdate" + + def map_to_current_name(self): + """Return the current event name value as it's configured in the cluster config.""" + if self == LegacyEventName.ON_NODE_START: + result = "OnNodeStart" + elif self == LegacyEventName.ON_NODE_CONFIGURED: + result = "OnNodeConfigured" + elif self == LegacyEventName.ON_NODE_UPDATED: + result = "OnNodeUpdated" + else: + raise ValueError(f"Unknown legacy event name: {self.value}") + + return result + + def __str__(self): + """Return the legacy event name value.""" + return self.value + + +@dataclass +class CustomActionsConfig: + """ + Encapsulates custom actions configuration. + + Contains all the configuration relevant to custom actions execution. + """ + + stack_name: str + region_name: str + node_type: str + queue_name: str + event_name: str + legacy_event: LegacyEventName + can_execute: bool + dry_run: bool + script_sequence: list # type list[ScriptDefinition] + + +class CustomLogger: + """ + Logs using the same logic as the legacy bash script. + + Could be changed to a standard logger when error signaling is more testable. + """ + + def __init__(self, conf: CustomActionsConfig): + self.conf = conf + + def error_exit_with_bootstrap_error(self, msg: str, msg_without_url: str = None): + """ + Log error message and exit with a bootstrap error. + + :param msg: error message + :param msg_without_url: alternate error message with the URL masked + """ + self._log_message(msg) + self._write_bootstrap_error(msg_without_url if msg_without_url else msg) + raise SystemExit(1) + + def _write_bootstrap_error(self, message): + if self.conf.dry_run: + print(f"Would write to {BOOSTRAP_ERROR_FILE}, message: {message}") + return + + os.makedirs(os.path.dirname(BOOSTRAP_ERROR_FILE), exist_ok=True) + with open(BOOSTRAP_ERROR_FILE, "w", encoding="utf-8") as f: + f.write(message) + + def error_exit(self, msg: str): + """ + Log error message and exit. + + :param msg: error message + """ + self._log_message(msg) + raise SystemExit(1) + + def _log_message(self, msg: str): + complete_message = f"{SCRIPT_LOG_NAME_FETCH_AND_RUN} - {msg} {ERROR_MSG_SUFFIX}" + print(complete_message) + if not self.conf.dry_run: + subprocess.run(["logger", "-t", "parallelcluster", complete_message], check=True) # nosec - trusted input + + +class ConfigLoader: + """ + Encapsulates configuration handling logic. + + Loads custom actions relevant configuration from the provided cluster configuration file according to the node type, + event and queue. + """ + + @staticmethod + def _load_cluster_config(input_file_path): + """Load cluster config file.""" + with open(input_file_path, encoding="utf-8") as input_file: + return yaml.load(input_file, Loader=yaml.SafeLoader) + + def load_configuration(self, args) -> CustomActionsConfig: + """ + Load configuration. + + :param args: command line arguments + :return: configuration object + """ + legacy_event = None + for event in LegacyEventName: + if getattr(args, event.value): + legacy_event = event + break + event_name = legacy_event.map_to_current_name() + cluster_config = self._load_cluster_config(args.cluster_configuration) + + logging.debug(cluster_config) + + try: + if args.node_type == "HeadNode": + data = cluster_config["HeadNode"]["CustomActions"][event_name] + else: + data = next( + ( + q + for q in next(v for v in cluster_config["Scheduling"].values() if isinstance(v, list)) + if q["Name"] == args.queue_name and q["CustomActions"] + ), + None, + )["CustomActions"][event_name] + + sequence = self._extract_script_sequence(data) + except (KeyError, TypeError) as err: + logging.debug("Ignoring missing %s in configuration, cause: %s", event_name, err) + sequence = [] + + conf = CustomActionsConfig( + legacy_event=legacy_event, + node_type=args.node_type, + queue_name=args.queue_name, + event_name=event_name, + region_name=args.region, + stack_name=args.stack_name, + script_sequence=sequence, + dry_run=args.dry_run, + can_execute=len(sequence) > 0, + ) + + logging.debug(conf) + + return conf + + @staticmethod + def _extract_script_sequence(data): + sequence = [] + if not data: + pass + elif "Script" in data: + sequence = [data] + elif "Sequence" in data and isinstance(data["Sequence"], list): + sequence = data["Sequence"] + return [ScriptDefinition(url=s["Script"], args=s["Args"]) for s in sequence] + + +class DownloadRunError(Exception): + """Error in script execution supporting masking of script urls in the logs.""" + + def __init__(self, msg, msg_with_url): + self.msg = msg + self.msg_with_url = msg_with_url + + +class ActionRunner: + """Encapsulates the logic to map configured supported events to executable scripts.""" + + def __init__(self, conf: CustomActionsConfig, custom_logger: CustomLogger): + self.conf = conf + self.custom_logger = custom_logger + + def run(self): + """Execute the custom action scripts configured for the event.""" + actions = { + LegacyEventName.ON_NODE_START: self._on_node_start, + LegacyEventName.ON_NODE_CONFIGURED: self._on_node_configured, + LegacyEventName.ON_NODE_UPDATED: self._on_node_updated, + } + actions.get(self.conf.legacy_event, self._unknown_action)() + + def _on_node_bootstrap_event(self): + if self.conf.can_execute: + try: + self._download_run() + except DownloadRunError as e: + self.custom_logger.error_exit_with_bootstrap_error(msg=e.msg, msg_without_url=e.msg_with_url) + except RuntimeError as e: + logging.debug(e) + self.custom_logger.error_exit_with_bootstrap_error(f"Failed to run {self.conf.event_name} script.") + + def _on_node_start(self): + self._on_node_bootstrap_event() + + def _on_node_configured(self): + self._on_node_bootstrap_event() + + def _on_node_updated(self): + if self.conf.can_execute and self._is_stack_update_in_progress(): + try: + self._download_run() + except DownloadRunError as e: + self.custom_logger.error_exit(msg=e.msg) + except RuntimeError as e: + logging.debug(e) + self.custom_logger.error_exit("Failed to run post update hook") + + def _download_run(self): + if self.conf.dry_run: + print("Dry run, would download and execute scripts:") + for script in self.conf.script_sequence: + print(f" {script.url} with args {script.args}") + else: + asyncio.run(ScriptRunner(self.conf.event_name).download_and_execute_scripts(self.conf.script_sequence)) + + def _get_stack_status(self) -> str: + stack_status = "UNKNOWN" + try: + cloudformation = boto3.client("cloudformation", region_name=self.conf.region_name) + response = cloudformation.describe_stacks(StackName=self.conf.stack_name) + stack_status = response["Stacks"][0]["StackStatus"] + except (KeyError, ClientError, botocore.exceptions.ParamValidationError) as e: + logging.debug(e) + self.custom_logger.error_exit( + "Failed to get the stack status, check the HeadNode instance profile's IAM policies" + ) + return stack_status + + def _is_stack_update_in_progress(self): + stack_status = self._get_stack_status() + if stack_status != "UPDATE_IN_PROGRESS": + print(f"Post update hook called with CFN stack in state {stack_status}, doing nothing") + result = False + else: + result = True + return result + + @staticmethod + def _unknown_action(): + print("Unknown action. Exit gracefully") + raise SystemExit(1) + + +def _parse_cli_args(): + parser = argparse.ArgumentParser( + description="Execute action scripts attached to a node lifecycle event", exit_on_error=False + ) + + event_group = parser.add_mutually_exclusive_group(required=True) + + for event in LegacyEventName: + event_group.add_argument( + f"-{event.value}", + action="store_true", + help=f"selects the {event.value} event in the node lifecycle to execute", + ) + + parser.add_argument( + "-r", + "--region", + type=str, + default=os.getenv("AWS_REGION", None), + required=False, + help="the cluster AWS region, defaults to AWS_REGION env variable", + ) + parser.add_argument( + "-s", + "--stack-name", + type=str, + default=os.getenv("PCLUSTER_STACK_NAME", None), + required=False, + help="the parallelcluster cloudformation stack name," " defaults to PCLUSTER_STACK_NAME env variable", + ) + parser.add_argument( + "-n", + "--node-type", + type=str, + default=os.getenv("PCLUSTER_NODE_TYPE", None), + required=False, + help="the node type, defaults to PCLUSTER_NODE_TYPE env variable", + ) + parser.add_argument( + "-q", + "--queue-name", + type=str, + default=os.getenv("PCLUSTER_SCHEDULER_QUEUE_NAME", None), + required=False, + help="the scheduler queue name, defaults to PCLUSTER_SCHEDULER_QUEUE_NAME env variable", + ) + parser.add_argument( + "-c", + "--cluster-configuration", + type=str, + default="/opt/parallelcluster/shared/cluster-config.yaml", + required=False, + help="the cluster config file, defaults to " "/opt/parallelcluster/shared/cluster-config.yaml", + ) + parser.add_argument("--verbose", "-v", action="store_true", help="enable verbose logging") + parser.add_argument("--dry-run", "-d", action="store_true", help="enable dry run") + parser.add_argument("--execute-via-cfnconfig", "-e", action="store_true", help="execute via cfnconfig") + + try: + args = parser.parse_args() + except SystemExit as e: + e.code = 1 + raise e + + return args + + +def main(): + try: + args = _parse_cli_args() + + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + + if args.execute_via_cfnconfig: + logging.warning( + "Execution via cfnconfig env variables is discouraged! Current related env values:\n" + "cfn_preinstall=%s\ncfn_preinstall_args=%s\ncfn_postinstall=%s\ncfn_postinstall_args=%s\n" + "cfn_postupdate=%s\ncfn_postupdate_args=%s\n" + "Please mind that the cfn_* env variables will be ignored and the action will be executed " + "using the cluster configuration available in %s.\n", + os.getenv("cfn_preinstall", ""), + os.getenv("cfn_preinstall_args", ""), + os.getenv("cfn_postinstall", ""), + os.getenv("cfn_postinstall_args", ""), + os.getenv("cfn_postupdate", ""), + os.getenv("cfn_postupdate_args", ""), + args.cluster_configuration, + ) + + conf = ConfigLoader().load_configuration(args) + ActionRunner(conf, CustomLogger(conf)).run() + + except Exception as err: + logging.exception(err) + print(f"ERROR: Unexpected exception: {err}") + raise SystemExit(1) from err + + logging.debug("Completed with success.") + raise SystemExit(0) + + +if __name__ == "__main__": + main() diff --git a/cookbooks/aws-parallelcluster-config/recipes/custom_actions_setup.rb b/cookbooks/aws-parallelcluster-config/recipes/custom_actions_setup.rb new file mode 100644 index 000000000..95be3deae --- /dev/null +++ b/cookbooks/aws-parallelcluster-config/recipes/custom_actions_setup.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +# +# Cookbook:: aws-parallelcluster-config +# Recipe:: custom_actions_setup +# +# Copyright:: 2013-2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. + +template "#{node['cluster']['scripts_dir']}/fetch_and_run" do + source 'init/fetch_and_run.erb' + owner "root" + group "root" + mode "0755" +end + +cookbook_file "#{node['cluster']['scripts_dir']}/custom_action_executor.py" do + source 'custom_action_executor/custom_action_executor.py' + owner 'root' + group 'root' + mode '0755' + not_if { ::File.exist?("#{node['cluster']['scripts_dir']}/custom_action_executor.py") } +end diff --git a/cookbooks/aws-parallelcluster-config/recipes/init.rb b/cookbooks/aws-parallelcluster-config/recipes/init.rb index 8379d6cfd..76156721e 100644 --- a/cookbooks/aws-parallelcluster-config/recipes/init.rb +++ b/cookbooks/aws-parallelcluster-config/recipes/init.rb @@ -23,13 +23,6 @@ include_recipe "aws-parallelcluster-config::cfnconfig_mixed" -template "/opt/parallelcluster/scripts/fetch_and_run" do - source 'init/fetch_and_run.erb' - owner "root" - group "root" - mode "0755" -end - include_recipe "aws-parallelcluster-config::mount_shared" if node['cluster']['node_type'] == "ComputeFleet" fetch_config 'Fetch and load cluster configs' @@ -40,6 +33,8 @@ # ParallelCluster log rotation configuration include_recipe "aws-parallelcluster-config::log_rotation" +include_recipe "aws-parallelcluster-config::custom_actions_setup" + # Configure additional Networking Interfaces (if present) include_recipe "aws-parallelcluster-config::network_interfaces" unless virtualized? diff --git a/cookbooks/aws-parallelcluster-config/templates/default/init/cfnconfig.erb b/cookbooks/aws-parallelcluster-config/templates/default/init/cfnconfig.erb index 5fcab2809..a861db994 100644 --- a/cookbooks/aws-parallelcluster-config/templates/default/init/cfnconfig.erb +++ b/cookbooks/aws-parallelcluster-config/templates/default/init/cfnconfig.erb @@ -1,10 +1,10 @@ stack_name=<%= node['cluster']['stack_name'] %> -cfn_preinstall=<%= node['cluster']['preinstall'] %> -cfn_preinstall_args=(<%= node['cluster']['preinstall_args'] %>) -cfn_postinstall=<%= node['cluster']['postinstall'] %> -cfn_postinstall_args=(<%= node['cluster']['postinstall_args'] %>) -cfn_postupdate=<%= node['cluster']['postupdate'] %> -cfn_postupdate_args=(<%= node['cluster']['postupdate_args'] %>) +cfn_preinstall="<%= node['cluster']['scripts_dir'] %>/fetch_and_run --execute-via-cfnconfig -preinstall" +cfn_preinstall_args=() +cfn_postinstall="<%= node['cluster']['scripts_dir'] %>/fetch_and_run --execute-via-cfnconfig -postinstall" +cfn_postinstall_args=() +cfn_postupdate="<%= node['cluster']['scripts_dir'] %>/fetch_and_run --execute-via-cfnconfig -postupdate" +cfn_postupdate_args=() cfn_region=<%= node['cluster']['region'] %> cfn_scheduler=<%= node['cluster']['scheduler'] %> cfn_scheduler_slots=<%= node['cluster']['scheduler_slots'] %> diff --git a/cookbooks/aws-parallelcluster-config/templates/default/init/fetch_and_run.erb b/cookbooks/aws-parallelcluster-config/templates/default/init/fetch_and_run.erb index 151b083dc..4b3209694 100644 --- a/cookbooks/aws-parallelcluster-config/templates/default/init/fetch_and_run.erb +++ b/cookbooks/aws-parallelcluster-config/templates/default/init/fetch_and_run.erb @@ -1,111 +1,36 @@ #!/bin/bash +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. set -euo pipefail -cfnconfig_file="/etc/parallelcluster/cfnconfig" -. ${cfnconfig_file} - -# Check expected variables from cfnconfig file -function check_params () { - if [ -z "${cfn_region}" ] || \ - [ -z "${cfn_preinstall}" ] || [ -z "${cfn_preinstall_args}" ] || \ - [ -z "${cfn_postinstall}" ] || [ -z "${cfn_postinstall_args}" ] || \ - [ -z "${cfn_postupdate}" ] || [ -z "${cfn_postupdate_args}" ] - then - error_exit "One or more required variables from ${cfnconfig_file} file are undefined" - fi -} - -# Error exit function -function error_exit () { - script=`basename $0` - echo "parallelcluster: ${script} - $1" - logger -t parallelcluster "${script} - $1" - case $custom_script in - OnNodeStart|OnNodeConfigured) - if [ -z "$2" ]; then - echo "$1 Please check /var/log/cfn-init.log in the head node, or check the cfn-init.log in CloudWatch logs. Please refer to https://docs.aws.amazon.com/parallelcluster/latest/ug/troubleshooting-v3.html#troubleshooting-v3-get-logs for more details on ParallelCluster logs." > /var/log/parallelcluster/bootstrap_error_msg - else - echo "$2 Please check /var/log/cfn-init.log in the head node, or check the cfn-init.log in CloudWatch logs. Please refer to https://docs.aws.amazon.com/parallelcluster/latest/ug/troubleshooting-v3.html#troubleshooting-v3-get-logs for more details on ParallelCluster logs." > /var/log/parallelcluster/bootstrap_error_msg - fi - ;; - esac - exit 1 -} - -function download_run (){ - url=$1 - shift - scheme=$(echo "${url}"| cut -d: -f1) - tmpfile=$(mktemp) - trap "/bin/rm -f $tmpfile" RETURN - if [ "${scheme}" == "s3" ]; then - <%= node['cluster']['cookbook_virtualenv_path'] %>/bin/aws --region ${cfn_region} s3 cp ${url} - > $tmpfile || error_exit "Failed to download ${custom_script} script ${url} using aws s3, return code: $?." "Failed to download ${custom_script} script using aws s3, return code: $?." - else - wget -qO- ${url} > $tmpfile || error_exit "Failed to download ${custom_script} script ${url} using wget, return code: $?." "Failed to download ${custom_script} script using wget, return code: $?." - fi - chmod +x $tmpfile || error_exit "Failed to run ${custom_script} script due to a failure in making the file executable, return code: $?." - $tmpfile "$@" || error_exit "Failed to execute ${custom_script} script ${url}, return code: $?." "Failed to execute ${custom_script} script, return code: $?." -} - -function get_stack_status () { - region=$(sed -n 's/cfn_region=//p' /etc/parallelcluster/cfnconfig | xargs) || return 1 - stack_name=$(sed -n 's/stack_name=//p' /etc/parallelcluster/cfnconfig | xargs) || return 1 - stack_status=$(aws cloudformation describe-stacks --region "${region}" --stack-name "${stack_name}" --query Stacks[0].StackStatus | xargs) || return 1 - echo ${stack_status} -} - -function run_preinstall () { - custom_script="OnNodeStart" - if [ "${cfn_preinstall}" != "NONE" ]; then - if [ "${cfn_preinstall_args}" != "NONE" ]; then - download_run "${cfn_preinstall}" "${cfn_preinstall_args[@]}" - else - download_run "${cfn_preinstall}" - fi - fi || error_exit "Failed to run ${custom_script} script." -} - -function run_postinstall () { - custom_script="OnNodeConfigured" - if [ "${cfn_postinstall}" != "NONE" ]; then - if [ "${cfn_postinstall_args}" != "NONE" ]; then - download_run "${cfn_postinstall}" "${cfn_postinstall_args[@]}" - else - download_run "${cfn_postinstall}" - fi - fi || error_exit "Failed to run ${custom_script} script." -} - -function run_postupdate () { - custom_script="OnNodeUpdated" - if [ "${cfn_postupdate}" != "NONE" ]; then - stack_status=$(get_stack_status) || error_exit "Failed to get the stack status, check the HeadNode instance profile's IAM policies" - - if [ "${stack_status}" != "UPDATE_IN_PROGRESS" ]; then - echo "Post update hook called with CFN stack in state ${stack_status}, doing nothing" - elif [ "${cfn_postupdate_args}" != "NONE" ]; then - download_run "${cfn_postupdate}" "${cfn_postupdate_args[@]}" - else - download_run "${cfn_postupdate}" - fi - fi || error_exit "Failed to run post update hook" -} - -custom_script="" -check_params -ACTION=${1#?} -case ${ACTION} in - preinstall) - run_preinstall - ;; - postinstall) - run_postinstall - ;; - postupdate) - run_postupdate - ;; - *) - echo "Unknown action. Exit gracefully" - exit 0 -esac +# Disclaimer: this env setup is for *internal use only*, custom actions scripts SHOULD NOT rely on it + +# please note that this env setup has potential independent but relevant duplication with: +# build_env in cookbooks/aws-parallelcluster-scheduler-plugin/resources/execute_event_handler.rb +PCLUSTER_COOKBOOK_VIRTUALENV_PATH="<%= node['cluster']['cookbook_virtualenv_path'] %>" +PCLUSTER_SCRIPTS_DIR="<%= node['cluster']['scripts_dir'] %>" +PCLUSTER_CONFIG_PATH="<%= node['cluster']['cluster_config_path'] %>" +PCLUSTER_NODE_TYPE="<%= node['cluster']['node_type'] %>" +PCLUSTER_SCHEDULER_QUEUE_NAME="<%= node['cluster']['scheduler_queue_name'] %>" +PCLUSTER_STACK_NAME="<%= node['cluster']['stack_name'] %>" +PCLUSTER_REGION="<%= node['cluster']['region'] %>" + + +"${PCLUSTER_COOKBOOK_VIRTUALENV_PATH}"/bin/python \ + "${PCLUSTER_SCRIPTS_DIR}/custom_action_executor.py" \ + --region "${PCLUSTER_REGION}" \ + --stack-name "${PCLUSTER_STACK_NAME}" \ + --node-type "${PCLUSTER_NODE_TYPE}" \ + --queue-name "${PCLUSTER_SCHEDULER_QUEUE_NAME}" \ + --cluster-configuration "${PCLUSTER_CONFIG_PATH}" \ + "$@" diff --git a/system_tests/Dockerfile.centos7 b/system_tests/Dockerfile.centos7 index ed856372a..d9c167fa9 100644 --- a/system_tests/Dockerfile.centos7 +++ b/system_tests/Dockerfile.centos7 @@ -38,6 +38,9 @@ RUN yum install -y vim ksh tcsh zsh libssl-dev net-tools \ COPY system_tests/install_cinc.sh /build/install_cinc.sh RUN /build/install_cinc.sh +# mock fetch_config that is virtualized +COPY system_tests/cluster-config.yaml /opt/parallelcluster/shared/cluster-config.yaml + # customization for build for docker environment COPY . /tmp/cookbooks diff --git a/system_tests/Dockerfile.ubuntu b/system_tests/Dockerfile.ubuntu index 1bc60a9d0..6aedff7f8 100644 --- a/system_tests/Dockerfile.ubuntu +++ b/system_tests/Dockerfile.ubuntu @@ -42,6 +42,9 @@ WORKDIR build COPY system_tests/install_cinc.sh install_cinc.sh RUN ./install_cinc.sh +# mock fetch_config that is virtualized +COPY system_tests/cluster-config.yaml /opt/parallelcluster/shared/cluster-config.yaml + # customization for build for docker environment COPY . /tmp/cookbooks diff --git a/system_tests/cluster-config.yaml b/system_tests/cluster-config.yaml new file mode 100644 index 000000000..97a583188 --- /dev/null +++ b/system_tests/cluster-config.yaml @@ -0,0 +1,21 @@ +# this configuration can't currently contain any custom action in a system test env +HeadNode: + Iam: + InstanceRole: arn:aws:iam::1234567:role/role_iam + S3Access: + - BucketName: amibucket11223 + - BucketName: test + EnableWriteAccess: true + KeyName: hello/* + InstanceType: c5.xlarge +Image: + CustomAmi: ami-12345 + Os: alinux2 +Region: us-west-1 +Scheduling: + Scheduler: slurm +Tags: + - Key: key + Value: value + - Key: key2 + Value: value2 diff --git a/test/unit/custom_action_executor/test_custom_action_executor.py b/test/unit/custom_action_executor/test_custom_action_executor.py new file mode 100644 index 000000000..db46f7993 --- /dev/null +++ b/test/unit/custom_action_executor/test_custom_action_executor.py @@ -0,0 +1,414 @@ +# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the +# License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=redefined-outer-name +# pylint: disable=protected-access + +import os +import subprocess # nosec B404 +import tempfile +from unittest.mock import MagicMock, call + +import botocore +import pytest +from assertpy import assert_that +from custom_action_executor import ( + SCRIPT_LOG_NAME_FETCH_AND_RUN, + ActionRunner, + ConfigLoader, + CustomLogger, + DownloadRunError, + ExecutableScript, + LegacyEventName, + ScriptDefinition, + ScriptRunner, + main, +) +from mock.mock import AsyncMock + + +@pytest.fixture +def script_runner(): + return ScriptRunner("OnMockTestEvent") + + +@pytest.fixture +def s3_script(): + return ScriptDefinition(url="s3://bucket/script.sh", args=["arg1", "arg2"]) + + +@pytest.fixture +def http_script(): + return ScriptDefinition(url="http://example.com/script.sh", args=["arg1", "arg2"]) + + +@pytest.fixture +def https_script(): + return ScriptDefinition(url="https://example.com/script.sh", args=["arg1", "arg2"]) + + +def test_is_s3_url(script_runner): + assert_that(script_runner._is_s3_url("s3://bucket/script.sh")).is_true() + assert_that(script_runner._is_s3_url("http://example.com/script.sh")).is_false() + + +def test_parse_s3_url(script_runner): + assert_that(script_runner._parse_s3_url("s3://bucket/script.sh")).is_equal_to(("bucket", "script.sh")) + assert_that(script_runner._parse_s3_url("s3://bucket/dir/script.sh")).is_equal_to(("bucket", "dir/script.sh")) + assert_that(script_runner._parse_s3_url("s3://bucket/path/to/script.sh")).is_equal_to( + ("bucket", "path/to/script.sh") + ) + + +def write_to_file(filename, file_contents: bytes): + with open(filename, "w", encoding="utf-8") as file: + file.write(file_contents.decode()) + + +@pytest.mark.asyncio +async def test_download_s3_script(script_runner, s3_script, mocker): + # Mock the s3 download of file_contents via s3 resource mocking + file_contents: bytes = b"#!/bin/bash\n" + mock_resource = MagicMock() + mock_resource.download_file = MagicMock(side_effect=(lambda key, filename: write_to_file(filename, file_contents))) + + mocker.patch("boto3.resource").return_value.Bucket.return_value = mock_resource + + # Act + exe_script = await script_runner._download_script(s3_script) + downloaded_file_path = exe_script.path + + # Assert + assert_that(downloaded_file_path).is_not_equal_to(s3_script.url) + with open(downloaded_file_path, encoding="utf-8") as downloaded_file: + assert_that(downloaded_file.read()).is_equal_to("#!/bin/bash\n") + + +@pytest.mark.asyncio +async def test_download_https_script(script_runner, https_script, mocker): + response_mock = MagicMock() + response_mock.status_code = 200 + content = b"#!/bin/bash\n" + response_mock.content = content + mocker.patch("requests.get", return_value=response_mock) + + result: ExecutableScript = await script_runner._download_script(https_script) + + assert_that(result.path).is_instance_of(str) + with open(result.path, encoding="utf-8") as f: + assert_that(f.read()).is_equal_to("#!/bin/bash\n") + + +@pytest.mark.asyncio +async def test_download_http_script_not_allowed(script_runner, http_script): + with pytest.raises( + DownloadRunError, + match="Failed to download OnMockTestEvent script 0 http://example.com/script.sh, " + "URL must be an s3 or HTTPs.", + ): + await script_runner._download_script(http_script) + + +@pytest.mark.asyncio +async def test_download_s3_script_error(script_runner, mocker, s3_script): + mock_resource = MagicMock() + mock_resource.download_file = MagicMock( + side_effect=botocore.exceptions.ClientError({"Error": {"Code": 403}}, "test error") + ) + mocker.patch("boto3.resource").return_value.Bucket.return_value = mock_resource + + with pytest.raises( + DownloadRunError, + match="Failed to download OnMockTestEvent script 0 s3://bucket/script.sh " + r"using aws s3, cause: An error occurred \(403\).*", + ): + await script_runner._download_script(s3_script) + + +@pytest.mark.asyncio +async def test_download_https_script_error(script_runner, mocker, https_script): + response_mock = MagicMock() + response_mock.status_code = 403 + response_mock.content = b"test error" + mocker.patch("requests.get", return_value=response_mock) + + with pytest.raises( + DownloadRunError, + match="Failed to download OnMockTestEvent script 0 " "https://example.com/script.sh, HTTP status code 403", + ): + await script_runner._download_script(https_script) + + +def build_exe_script(args=None): + return ExecutableScript(url="s3://bucket/script.sh", step_num=0, path="/this/is/a/path/to/script.sh", args=args) + + +@pytest.mark.parametrize("args", [None, [], ["arg1", "arg2"], ["arg1", "arg2", "arg3"]]) +@pytest.mark.asyncio +async def test_execute_script(script_runner, mocker, args): + # mock process execution + process_mock = MagicMock() + process_mock.returncode = 0 + subprocess_mock = mocker.patch("subprocess.run", return_value=process_mock) + + exe_script = build_exe_script(args) + await script_runner._execute_script(exe_script) + + # assert that subprocess_mock is called twice + subprocess_mock.assert_has_calls( + [ + call(["chmod", "+x", exe_script.path], check=True), + call([exe_script.path] + (exe_script.args or []), check=True), + ] + ) + + +@pytest.mark.asyncio +async def test_execute_script_error_not_executable(script_runner): + with pytest.raises( + DownloadRunError, + match="Failed to run OnMockTestEvent script 0 s3://bucket/script.sh due " + "to a failure in making the file executable, return code: 1.", + ): + await script_runner._execute_script(build_exe_script()) + + +@pytest.mark.asyncio +async def test_execute_script_error_in_execution(script_runner, mocker): + # mock process execution + process_mock = MagicMock() + process_mock.returncode = 0 + # patch subprocess.run: first call succeeds, second call fails with non-zero return code + mocker.patch("subprocess.run", side_effect=[process_mock, subprocess.CalledProcessError(1, "test error")]) + + with pytest.raises( + DownloadRunError, + match="Failed to execute OnMockTestEvent script 0 s3://bucket/script.sh, " "return code: 1.", + ): + await script_runner._execute_script(build_exe_script()) + + +def create_persistent_tmp_file(additional_content: str = "") -> str: + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(b"#!/bin/bash\n") + f.write(additional_content.encode()) + return f.name + + +@pytest.mark.asyncio +async def test_download_and_execute_scripts(script_runner, mocker, s3_script, https_script): + tmp_file1 = create_persistent_tmp_file() + tmp_file2 = create_persistent_tmp_file() + + exe_script1 = script_runner._build_exe_script(s3_script, 1, tmp_file1) + exe_script2 = script_runner._build_exe_script(https_script, 2, tmp_file2) + + download_script_mock = AsyncMock(side_effect=[exe_script1, exe_script2]) + mocker.patch.object(script_runner, "_download_script", download_script_mock) + execute_script_mock = AsyncMock() + mocker.patch.object(script_runner, "_execute_script", execute_script_mock) + unlink_mock = MagicMock() + mocker.patch.object(os, "unlink", unlink_mock) + + await script_runner.download_and_execute_scripts([s3_script, https_script]) + + assert_that(unlink_mock.call_count).is_equal_to(2) + + assert_that(execute_script_mock.await_count).is_equal_to(2) + execute_script_mock.assert_has_awaits([call(exe_script1), call(exe_script2)]) + + mocker.stopall() + os.unlink(tmp_file1) + os.unlink(tmp_file2) + + +@pytest.mark.parametrize( + "legacy_event_name", + [LegacyEventName.ON_NODE_START, LegacyEventName.ON_NODE_CONFIGURED, LegacyEventName.ON_NODE_UPDATED], +) +@pytest.mark.asyncio +async def test_action_runner_run_event(mocker, legacy_event_name, https_script): + conf_mock = MagicMock() + conf_mock.legacy_event = legacy_event_name + conf_mock.can_execute = True + conf_mock.dry_run = False + script_sequence = [https_script, https_script] + conf_mock.script_sequence = script_sequence + + download_and_execute_scripts_mock = AsyncMock() + mocker.patch( + "custom_action_executor.ScriptRunner.download_and_execute_scripts", + side_effect=download_and_execute_scripts_mock, + ) + mocker.patch("custom_action_executor.ActionRunner._get_stack_status", return_value="UPDATE_IN_PROGRESS") + asyncio_run_mock = mocker.patch("asyncio.run") + + ActionRunner(conf_mock, MagicMock()).run() + + await asyncio_run_mock.call_args[0][0] + + download_and_execute_scripts_mock.assert_awaited_once_with(script_sequence) + + +def test_action_runner_run_on_node_updated_stack_not_in_progress(mocker, https_script): + conf_mock = MagicMock() + conf_mock.legacy_event = LegacyEventName.ON_NODE_UPDATED + conf_mock.can_execute = True + script_sequence = [https_script, https_script] + conf_mock.script_sequence = script_sequence + + script_runner_mock = mocker.patch("custom_action_executor.ScriptRunner.download_and_execute_scripts") + mocker.patch("custom_action_executor.ActionRunner._get_stack_status", return_value="UPDATE_COMPLETE") + + mock_print = mocker.patch("builtins.print") + + ActionRunner(conf_mock, MagicMock()).run() + + mock_print.assert_called_once_with("Post update hook called with CFN stack in state UPDATE_COMPLETE, doing nothing") + script_runner_mock.assert_not_called() + + +def test_log_without_url(mocker): + mock_conf = MagicMock() + mock_conf.dry_run = True + + process_mock = MagicMock() + process_mock.returncode = 0 + + mock_print = mocker.patch("builtins.print") + + with pytest.raises(SystemExit) as err: + CustomLogger(mock_conf).error_exit_with_bootstrap_error("test message", "test_url") + + assert_that(err.value.code).is_equal_to(1) + assert_that(mock_print.call_count).is_equal_to(2) + assert_that(mock_print.call_args_list[0][0][0]).matches(r".*test message.*") + assert_that(mock_print.call_args_list[1][0][0]).matches(r"Would write to .* test_url.*") + + +@pytest.mark.parametrize( + "args, conf_file_content, scripts_sequence", + [ + ( + { + LegacyEventName.ON_NODE_START.value: True, + "node_type": "HeadNode", + }, + { + "HeadNode": { + "CustomActions": { + "OnNodeStart": {"Script": "https://example.com/script.sh", "Args": ["arg1", "arg2"]} + } + } + }, + [ScriptDefinition(url="https://example.com/script.sh", args=["arg1", "arg2"])], + ), + ( + { + LegacyEventName.ON_NODE_START.value: True, + "node_type": "NotReallyAHeadNode", + "queue_name": "happyqueue1", + }, + { + "Scheduling": { + "SlurmQueues": [ + { + "CustomActions": { + "OnNodeStart": {"Script": "https://example.com/script.sh", "Args": ["arg1", "arg2"]} + }, + "Name": "happyqueue1", + } + ] + } + }, + [ScriptDefinition(url="https://example.com/script.sh", args=["arg1", "arg2"])], + ), + ( + { + LegacyEventName.ON_NODE_CONFIGURED.value: True, + "node_type": "NotReallyAHeadNode", + "queue_name": "happyqueue2", + }, + { + "Scheduling": { + "Wathever": [ + { + "CustomActions": { + "OnNodeConfigure": {"Script": "https://example.com/script.sh", "Args": ["arg1", "arg2"]} + }, + "Name": "happyqueue1", + }, + { + "CustomActions": { + "OnNodeConfigured": { + "Script": "https://example.com/happy2/script.sh", + "Args": ["arg1", "arg2"], + } + }, + "Name": "happyqueue2", + }, + ] + } + }, + [ScriptDefinition(url="https://example.com/happy2/script.sh", args=["arg1", "arg2"])], + ), + ( + { + LegacyEventName.ON_NODE_UPDATED.value: True, + "node_type": "HeadNode", + }, + { + "HeadNode": { + "CustomActions": { + "OnNodeUpdated": { + "Sequence": [ + {"Script": "https://example.com/script1.sh", "Args": ["arg1", "arg2"]}, + {"Script": "https://example.com/script2.sh", "Args": ["arg1", "arg2", "args3"]}, + ] + } + } + } + }, + [ + ScriptDefinition(url="https://example.com/script1.sh", args=["arg1", "arg2"]), + ScriptDefinition(url="https://example.com/script2.sh", args=["arg1", "arg2", "args3"]), + ], + ), + ], +) +def test_config_loader(mocker, args, conf_file_content, scripts_sequence): + mocker.patch("custom_action_executor.ConfigLoader._load_cluster_config", return_value=conf_file_content) + + for legacy_name in LegacyEventName: + if legacy_name.value not in args: + args[legacy_name.value] = False + mock_args = MagicMock(**args) + + conf = ConfigLoader().load_configuration(mock_args) + + assert_that(conf.script_sequence).contains_sequence(*scripts_sequence) + + +def test_config_loader_config_file_not_found(): + with pytest.raises(FileNotFoundError) as err: + ConfigLoader().load_configuration(MagicMock(cluster_configuration="not_found.yaml")) + assert_that(err.value.filename).is_equal_to("not_found.yaml") + + +@pytest.mark.parametrize("args", [[], ["error"], ["-h"], ["-v", "-e", "postinstall"]]) +def test_main_execution_with_arguments(mocker, args): + mocker.patch("sys.argv", [SCRIPT_LOG_NAME_FETCH_AND_RUN, *args]) + + with pytest.raises(SystemExit) as err: + main() + + assert_that(err.value.code).is_equal_to(1) diff --git a/tox.ini b/tox.ini index 01b993234..9cc5b8c8b 100644 --- a/tox.ini +++ b/tox.ini @@ -12,11 +12,13 @@ passenv = GITHUB_* deps = -rtest/unit/requirements.txt + py{37,38}: pytest-asyncio cov: codecov setenv = PYTHONPATH = \ {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/cloudwatch_agent:\ {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/compute_fleet_status:\ + {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/custom_action_executor:\ {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/:\ {toxinidir}/cookbooks/aws-parallelcluster-install/files/default/cloudwatch_agent:\ {toxinidir}/cookbooks/aws-parallelcluster-install/files/default/clusterstatusmgtd:\ @@ -32,6 +34,7 @@ commands = src_dirs = {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/cloudwatch_agent \ {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/compute_fleet_status \ + {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/custom_action_executor \ {toxinidir}/cookbooks/aws-parallelcluster-config/files/default/ \ {toxinidir}/cookbooks/aws-parallelcluster-install/files/default/cloudwatch_agent \ {toxinidir}/cookbooks/aws-parallelcluster-install/files/default/clusterstatusmgtd \