diff --git a/.github/workflows/tox.yml b/.github/workflows/tox.yml index 71a0791b37..81f4c3d50a 100644 --- a/.github/workflows/tox.yml +++ b/.github/workflows/tox.yml @@ -71,7 +71,7 @@ jobs: env: # Number of expected test passes, safety measure for accidental skip of # tests. Update value if you add/remove tests. - PYTEST_REQPASS: 852 + PYTEST_REQPASS: 853 steps: - uses: actions/checkout@v4 with: diff --git a/examples/playbooks/incorrect_module_args.yml b/examples/playbooks/incorrect_module_args.yml new file mode 100644 index 0000000000..9e4dde61ac --- /dev/null +++ b/examples/playbooks/incorrect_module_args.yml @@ -0,0 +1,7 @@ +--- +- name: Demonstrate linting issue. + hosts: all + tasks: + - name: Include a role with the wrong syntax + ansible.builtin.include_role: + role: foo diff --git a/src/ansiblelint/runner.py b/src/ansiblelint/runner.py index 562416e57b..07dedb486b 100644 --- a/src/ansiblelint/runner.py +++ b/src/ansiblelint/runner.py @@ -27,10 +27,6 @@ import ansiblelint.skip_utils import ansiblelint.utils -from ansiblelint._internal.rules import ( - BaseRule, - LoadingFailureRule, -) from ansiblelint.app import App, get_app from ansiblelint.constants import States from ansiblelint.errors import LintWarning, MatchError, WarnSource @@ -40,9 +36,7 @@ from ansiblelint.text import strip_ansi_escape from ansiblelint.utils import ( PLAYBOOK_DIR, - _include_children, - _roles_children, - _taskshandlers_children, + HandleChildren, parse_examples_from_plugin, template, ) @@ -50,6 +44,7 @@ if TYPE_CHECKING: from collections.abc import Callable, Generator + from ansiblelint._internal.rules import BaseRule from ansiblelint.config import Options from ansiblelint.constants import FileType from ansiblelint.rules import RulesCollection @@ -458,7 +453,7 @@ def _emit_matches(self, files: list[Lintable]) -> Generator[MatchError, None, No except MatchError as exc: if not exc.filename: # pragma: no branch exc.filename = str(lintable.path) - exc.rule = LoadingFailureRule() + exc.rule = self.rules["load-failure"] yield exc except AttributeError: yield MatchError(lintable=lintable, rule=self.rules["load-failure"]) @@ -523,22 +518,28 @@ def play_children( ) -> list[Lintable]: """Flatten the traversed play tasks.""" # pylint: disable=unused-argument - delegate_map: dict[str, Callable[[str, Any, Any, FileType], list[Lintable]]] = { - "tasks": _taskshandlers_children, - "pre_tasks": _taskshandlers_children, - "post_tasks": _taskshandlers_children, - "block": _taskshandlers_children, - "include": _include_children, - "ansible.builtin.include": _include_children, - "import_playbook": _include_children, - "ansible.builtin.import_playbook": _include_children, - "roles": _roles_children, - "dependencies": _roles_children, - "handlers": _taskshandlers_children, - "include_tasks": _include_children, - "ansible.builtin.include_tasks": _include_children, - "import_tasks": _include_children, - "ansible.builtin.import_tasks": _include_children, + + handlers = HandleChildren(self.rules) + + delegate_map: dict[ + str, + Callable[[str, Any, Any, FileType], list[Lintable]], + ] = { + "tasks": handlers.taskshandlers_children, + "pre_tasks": handlers.taskshandlers_children, + "post_tasks": handlers.taskshandlers_children, + "block": handlers.taskshandlers_children, + "include": handlers.include_children, + "ansible.builtin.include": handlers.include_children, + "import_playbook": handlers.include_children, + "ansible.builtin.import_playbook": handlers.include_children, + "roles": handlers.roles_children, + "dependencies": handlers.roles_children, + "handlers": handlers.taskshandlers_children, + "include_tasks": handlers.include_children, + "ansible.builtin.include_tasks": handlers.include_children, + "import_tasks": handlers.include_children, + "ansible.builtin.import_tasks": handlers.include_children, } (k, v) = item add_all_plugin_dirs(str(basedir.resolve())) diff --git a/src/ansiblelint/utils.py b/src/ansiblelint/utils.py index 77d2517839..be1fbaec03 100644 --- a/src/ansiblelint/utils.py +++ b/src/ansiblelint/utils.py @@ -32,7 +32,7 @@ from dataclasses import _MISSING_TYPE, dataclass, field from functools import cache, lru_cache from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import ruamel.yaml.parser import yaml @@ -50,7 +50,10 @@ from yaml.composer import Composer from yaml.representer import RepresenterError -from ansiblelint._internal.rules import AnsibleParserErrorRule, RuntimeErrorRule +from ansiblelint._internal.rules import ( + AnsibleParserErrorRule, + RuntimeErrorRule, +) from ansiblelint.app import get_app from ansiblelint.config import Options, options from ansiblelint.constants import ( @@ -69,6 +72,8 @@ from ansiblelint.skip_utils import is_nested_task from ansiblelint.text import has_jinja, removeprefix +if TYPE_CHECKING: + from ansiblelint.rules import RulesCollection # ansible-lint doesn't need/want to know about encrypted secrets, so we pass a # string as the password to enable such yaml files to be opened and parsed # successfully. @@ -275,110 +280,169 @@ def template( return value -def _include_children( - basedir: str, - k: str, - v: Any, - parent_type: FileType, -) -> list[Lintable]: - # handle special case include_tasks: name=filename.yml - if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v: - v = v["file"] - - # we cannot really parse any jinja2 in includes, so we ignore them - if not v or "{{" in v: - return [] - - if "import_playbook" in k and COLLECTION_PLAY_RE.match(v): - # Any import_playbooks from collections should be ignored as ansible - # own syntax check will handle them. - return [] - - # handle include: filename.yml tags=blah - # pylint: disable=unused-variable - (command, args, kwargs) = tokenize(f"{k}: {v}") +@dataclass +class HandleChildren: + """Parse task, roles and children.""" + + rules: RulesCollection = field(init=True, repr=False) + + def include_children( + self, + basedir: str, + k: str, + v: Any, + parent_type: FileType, + ) -> list[Lintable]: + """Include children.""" + # handle special case include_tasks: name=filename.yml + if k in INCLUSION_ACTION_NAMES and isinstance(v, dict) and "file" in v: + v = v["file"] + + # we cannot really parse any jinja2 in includes, so we ignore them + if not v or "{{" in v: + return [] + + if "import_playbook" in k and COLLECTION_PLAY_RE.match(v): + # Any import_playbooks from collections should be ignored as ansible + # own syntax check will handle them. + return [] + + # handle include: filename.yml tags=blah + # pylint: disable=unused-variable + (command, args, kwargs) = tokenize(f"{k}: {v}") - result = path_dwim(basedir, args[0]) - while basedir not in ["", "/"]: - if os.path.exists(result): - break - basedir = os.path.dirname(basedir) result = path_dwim(basedir, args[0]) - - return [Lintable(result, kind=parent_type)] - - -def _taskshandlers_children( - basedir: str, - k: str, - v: None | Any, - parent_type: FileType, -) -> list[Lintable]: - results: list[Lintable] = [] - if v is None: - raise MatchError( - message="A malformed block was encountered while loading a block.", - rule=RuntimeErrorRule(), - ) - for task_handler in v: - # ignore empty tasks, `-` - if not task_handler: - continue - - with contextlib.suppress(LookupError): - children = _get_task_handler_children_for_tasks_or_playbooks( - task_handler, - basedir, - k, - parent_type, + while basedir not in ["", "/"]: + if os.path.exists(result): + break + basedir = os.path.dirname(basedir) + result = path_dwim(basedir, args[0]) + + return [Lintable(result, kind=parent_type)] + + def taskshandlers_children( + self, + basedir: str, + k: str, + v: None | Any, + parent_type: FileType, + ) -> list[Lintable]: + """TasksHandlers Children.""" + results: list[Lintable] = [] + if v is None: + raise MatchError( + message="A malformed block was encountered while loading a block.", + rule=RuntimeErrorRule(), ) - results.append(children) - continue - - if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES): - task_handler = normalize_task_v2(task_handler) - _validate_task_handler_action_for_role(task_handler["action"]) - name = task_handler["action"].get("name") - if has_jinja(name): - # we cannot deal with dynamic imports + for task_handler in v: + # ignore empty tasks, `-` + if not task_handler: continue - results.extend( - _roles_children( + + with contextlib.suppress(LookupError): + children = _get_task_handler_children_for_tasks_or_playbooks( + task_handler, basedir, k, - [name], parent_type, - main=task_handler["action"].get("tasks_from", "main"), - ), - ) - continue + ) + results.append(children) + continue - if "block" not in task_handler: - continue + if any(x in task_handler for x in ROLE_IMPORT_ACTION_NAMES): + task_handler = normalize_task_v2(task_handler) + self._validate_task_handler_action_for_role(task_handler["action"]) + name = task_handler["action"].get("name") + if has_jinja(name): + # we cannot deal with dynamic imports + continue + results.extend( + self.roles_children(basedir, k, [name], parent_type), + ) + continue + + if "block" not in task_handler: + continue - results.extend( - _taskshandlers_children(basedir, k, task_handler["block"], parent_type), - ) - if "rescue" in task_handler: results.extend( - _taskshandlers_children( + self.taskshandlers_children( basedir, k, - task_handler["rescue"], + task_handler["block"], parent_type, ), ) - if "always" in task_handler: - results.extend( - _taskshandlers_children( - basedir, - k, - task_handler["always"], - parent_type, + if "rescue" in task_handler: + results.extend( + self.taskshandlers_children( + basedir, + k, + task_handler["rescue"], + parent_type, + ), + ) + if "always" in task_handler: + results.extend( + self.taskshandlers_children( + basedir, + k, + task_handler["always"], + parent_type, + ), + ) + + return results + + def _validate_task_handler_action_for_role(self, th_action: dict[str, Any]) -> None: + """Verify that the task handler action is valid for role include.""" + module = th_action["__ansible_module__"] + + if "name" not in th_action: + raise MatchError( + message=f"Failed to find required 'name' key in {module!s}", + rule=self.rules.rules[0], + filename=( + self.rules.options.lintables[0] + if self.rules.options.lintables + else "." ), ) - return results + if not isinstance(th_action["name"], str): + raise MatchError( + message=f"Value assigned to 'name' key on '{module!s}' is not a string.", + rule=self.rules.rules[1], + ) + + def roles_children( + self, + basedir: str, + k: str, + v: Sequence[Any], + parent_type: FileType, + ) -> list[Lintable]: + """Roles children.""" + # pylint: disable=unused-argument # parent_type) + results: list[Lintable] = [] + if not v: + # typing does not prevent junk from being passed in + return results + for role in v: + if isinstance(role, dict): + if "role" in role or "name" in role: + if "tags" not in role or "skip_ansible_lint" not in role["tags"]: + results.extend( + _look_for_role_files( + basedir, + role.get("role", role.get("name")), + ), + ) + elif k != "dependencies": + msg = f'role dict {role} does not contain a "role" or "name" key' + raise SystemExit(msg) + else: + results.extend(_look_for_role_files(basedir, role)) + return results def _get_task_handler_children_for_tasks_or_playbooks( @@ -412,50 +476,6 @@ def _get_task_handler_children_for_tasks_or_playbooks( raise LookupError(msg) -def _validate_task_handler_action_for_role(th_action: dict[str, Any]) -> None: - """Verify that the task handler action is valid for role include.""" - module = th_action["__ansible_module__"] - - if "name" not in th_action: - raise MatchError(message=f"Failed to find required 'name' key in {module!s}") - - if not isinstance(th_action["name"], str): - raise MatchError( - message=f"Value assigned to 'name' key on '{module!s}' is not a string.", - ) - - -def _roles_children( - basedir: str, - k: str, - v: Sequence[Any], - parent_type: FileType, # noqa: ARG001 - main: str = "main", -) -> list[Lintable]: - # pylint: disable=unused-argument # parent_type) - results: list[Lintable] = [] - if not v: - # typing does not prevent junk from being passed in - return results - for role in v: - if isinstance(role, dict): - if "role" in role or "name" in role: - if "tags" not in role or "skip_ansible_lint" not in role["tags"]: - results.extend( - _look_for_role_files( - basedir, - role.get("role", role.get("name")), - main=main, - ), - ) - elif k != "dependencies": - msg = f'role dict {role} does not contain a "role" or "name" key' - raise SystemExit(msg) - else: - results.extend(_look_for_role_files(basedir, role, main=main)) - return results - - def _rolepath(basedir: str, role: str) -> str | None: role_path = None @@ -487,12 +507,7 @@ def _rolepath(basedir: str, role: str) -> str | None: return role_path -def _look_for_role_files( - basedir: str, - role: str, - main: str | None = "main", # noqa: ARG001 -) -> list[Lintable]: - # pylint: disable=unused-argument # main +def _look_for_role_files(basedir: str, role: str) -> list[Lintable]: role_path = _rolepath(basedir, role) if not role_path: # pragma: no branch return [] diff --git a/test/test_internal_rules.py b/test/test_internal_rules.py index 19700156b4..e1cc69e923 100644 --- a/test/test_internal_rules.py +++ b/test/test_internal_rules.py @@ -1,9 +1,35 @@ """Tests for internal rules.""" +import pytest + from ansiblelint._internal.rules import BaseRule +from ansiblelint.rules import RulesCollection +from ansiblelint.runner import Runner def test_base_rule_url() -> None: """Test that rule URL is set to expected value.""" rule = BaseRule() assert rule.url == "https://ansible.readthedocs.io/projects/lint/rules/" + + +@pytest.mark.parametrize( + ("path"), + ( + pytest.param( + "examples/playbooks/incorrect_module_args.yml", + id="playbook", + ), + ), +) +def test_incorrect_module_args( + path: str, + default_rules_collection: RulesCollection, +) -> None: + """Check that we fail when file encoding is wrong.""" + runner = Runner(path, rules=default_rules_collection) + matches = runner.run() + assert len(matches) == 1, matches + assert matches[0].rule.id == "load-failure" + assert "Failed to find required 'name' key in include_role" in matches[0].message + assert matches[0].tag == "internal-error"