Skip to content

Commit

Permalink
feat(utils): implement symlink sandbox for cachi2
Browse files Browse the repository at this point in the history
What/why: implement detection/removal of unsafe symlinks in repos, specifically
covering cachi2 use case: Cachito already does this

How:

- copypasta `_enforce_sandbox()` and related unit tests from Cachito
  ("cachito/cachito/workers/tasks/general.py" and
   "cachito/tests/test_workers/test_tasks/test_general.py", respectively)
- add call to `_enforce_sandbox()`
- add CLI boolean arg `remove-unsafe-symlinks`, which toggles removing all
  symlinks which point to location(s) outside of any cloned repository

Signed-off-by: Ben Alkov <[email protected]>

rh-pre-commit.version: 2.3.2
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
ben-alkov committed Dec 18, 2024
1 parent 9f2ab9e commit f7064c7
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 1 deletion.
3 changes: 2 additions & 1 deletion atomic_reactor/plugins/cachi2_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from atomic_reactor.util import map_to_user_params
from atomic_reactor.utils.cachi2 import (
remote_source_to_cachi2, clone_only, validate_paths,
normalize_gomod_pkg_manager
normalize_gomod_pkg_manager, enforce_sandbox,
)


Expand Down Expand Up @@ -135,6 +135,7 @@ def process_remote_sources(self) -> List[Dict[str, Any]]:
remote_source_data["ref"]
)

enforce_sandbox(source_path_app, remove_unsafe_symlinks=False)
validate_paths(source_path_app, remote_source_data.get("packages", {}))

if clone_only(remote_source_data):
Expand Down
52 changes: 52 additions & 0 deletions atomic_reactor/utils/cachi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,64 @@
Utils to help to integrate with cachi2 CLI tool
"""

import logging

from typing import Any, Callable, Dict, Optional, Tuple, List
from pathlib import Path
import os.path

from packageurl import PackageURL

logger = logging.getLogger(__name__)


class SymlinkSandboxError(Exception):
"""Found symlink(s) pointing outside the sandbox."""


def enforce_sandbox(repo_root: Path, remove_unsafe_symlinks: bool) -> None:
"""
Check that there are no symlinks that try to leave the cloned repository.
:param (str | Path) repo_root: absolute path to root of cloned repository
:raises OsbsValidationException: if any symlink points outside of cloned repository
"""
for path_to_dir, subdirs, files in os.walk(repo_root):
dirpath = Path(path_to_dir)

for entry in subdirs + files:
# the logic in here actually *requires* f-strings with `!r`. using
# `%r` DOES NOT WORK (tested)
# pylint: disable=logging-fstring-interpolation

# apparently pylint doesn't understand Path
full_path = dirpath / entry # pylint: disable=old-division

try:
real_path = full_path.resolve()
except RuntimeError as e:
if "Symlink loop from " in str(e):
logger.info(f"Symlink loop from {full_path!r}")
continue
logger.exception("RuntimeError encountered")
raise

try:
real_path.relative_to(repo_root)
except ValueError as exc:
# Unlike the real path, the full path is always relative to the root
relative_path = str(full_path.relative_to(repo_root))
if remove_unsafe_symlinks:
full_path.unlink()
logger.warning(
f"The destination of {relative_path!r} is outside of cloned repository. "
"Removing...",
)
else:
raise SymlinkSandboxError(
f"The destination of {relative_path!r} is outside of cloned repository",
) from exc


def validate_paths(repo_path: Path, remote_sources_packages: dict) -> None:
"""Paths must be relative and within cloned repo"""
Expand Down
101 changes: 101 additions & 0 deletions tests/utils/test_cachi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
of the BSD license. See the LICENSE file for details.
"""

import os
from pathlib import Path
from typing import Union

from atomic_reactor.utils.cachi2 import (
SymlinkSandboxError,
convert_SBOM_to_ICM,
enforce_sandbox,
remote_source_to_cachi2,
gen_dependency_from_sbom_component,
generate_request_json,
Expand All @@ -19,6 +23,8 @@

import pytest

from unittest import mock


@pytest.mark.parametrize(('input_remote_source', 'expected_cachi2'), [
pytest.param(
Expand Down Expand Up @@ -580,3 +586,98 @@ def test_generate_request_json():
def test_clone_only(remote_source, expected):
"""Test if clone_only is evaluate correctly only from empty list of pkg_managers"""
assert clone_only(remote_source) == expected


class Symlink(str):
"""
Use this to create symlinks via write_file_tree().
The value of a Symlink instance is the target path (path to make a symlink to).
"""


def write_file_tree(tree_def: dict, rooted_at: Union[str, Path], *, exist_dirs_ok: bool = False):
"""
Write a file tree to disk.
:param tree_def: Definition of file tree, see usage for intuitive examples
:param rooted_at: Root of file tree, must be an existing directory
:param exist_dirs_ok: If True, existing directories will not cause this function to fail
"""
root = Path(rooted_at)
for entry, value in tree_def.items():
entry_path = root / entry
if isinstance(value, Symlink):
os.symlink(value, entry_path)
elif isinstance(value, str):
entry_path.write_text(value)
else:
entry_path.mkdir(exist_ok=exist_dirs_ok)
write_file_tree(value, entry_path)


@pytest.mark.parametrize(
"file_tree,bad_symlink",
[
# good
pytest.param({}, None, id="empty-no-symlink"),
pytest.param({"symlink_to_self": Symlink(".")}, None, id="self-symlink-ok"),
pytest.param(
{"subdir": {"symlink_to_parent": Symlink("..")}}, None, id="parent-symlink-ok"
),
pytest.param(
{"symlink_to_subdir": Symlink("subdir/some_file"), "subdir": {"some_file": "foo"}},
None,
id="subdir-symlink-ok",
),
# bad
pytest.param(
{"symlink_to_parent": Symlink("..")}, "symlink_to_parent", id="parent-symlink-bad"
),
pytest.param({"symlink_to_root": Symlink("/")}, "symlink_to_root", id="root-symlink-bad"),
pytest.param(
{"subdir": {"symlink_to_parent_parent": Symlink("../..")}},
"subdir/symlink_to_parent_parent",
id="parent-parent-symlink-bad",
),
pytest.param(
{"subdir": {"symlink_to_root": Symlink("/")}},
"subdir/symlink_to_root",
id="subdir-root-symlink-bad",
),
],
)
def test_enforce_sandbox(file_tree, bad_symlink, tmp_path):
write_file_tree(file_tree, tmp_path)
if bad_symlink:
error = f"The destination of {bad_symlink!r} is outside of cloned repository"
with pytest.raises(SymlinkSandboxError, match=error):
enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
assert Path(tmp_path / bad_symlink).exists()
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert not Path(tmp_path / bad_symlink).exists()
else:
enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)


def test_enforce_sandbox_symlink_loop(tmp_path, caplog):
file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert "Symlink loop from " in caplog.text


@mock.patch("pathlib.Path.resolve")
def test_enforce_sandbox_runtime_error(mock_resolve, tmp_path):
error = "RuntimeError is triggered"

def side_effect():
raise RuntimeError(error)

mock_resolve.side_effect = side_effect

file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
with pytest.raises(RuntimeError, match=error):
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)

0 comments on commit f7064c7

Please sign in to comment.