From 48bce08a063d40c02dd0a17240a56c333f5afb34 Mon Sep 17 00:00:00 2001 From: sleep Date: Mon, 30 Dec 2024 14:51:50 +0100 Subject: [PATCH] Added result commands --- src/armonik_cli/cli.py | 1 + src/armonik_cli/commands/__init__.py | 3 +- src/armonik_cli/commands/results.py | 226 +++++++++++++++++ src/armonik_cli/core/params.py | 36 +++ src/armonik_cli/core/serialize.py | 4 +- tests/commands/test_results.py | 357 +++++++++++++++++++++++++++ tests/conftest.py | 9 +- 7 files changed, 631 insertions(+), 5 deletions(-) create mode 100644 src/armonik_cli/commands/results.py create mode 100644 tests/commands/test_results.py diff --git a/src/armonik_cli/cli.py b/src/armonik_cli/cli.py index 2816b7c..689162a 100644 --- a/src/armonik_cli/cli.py +++ b/src/armonik_cli/cli.py @@ -15,3 +15,4 @@ def cli() -> None: cli.add_command(commands.sessions) cli.add_command(commands.tasks) cli.add_command(commands.partitions) +cli.add_command(commands.results) diff --git a/src/armonik_cli/commands/__init__.py b/src/armonik_cli/commands/__init__.py index d1c02ab..7a87e19 100644 --- a/src/armonik_cli/commands/__init__.py +++ b/src/armonik_cli/commands/__init__.py @@ -1,6 +1,7 @@ from .sessions import sessions from .tasks import tasks from .partitions import partitions +from .results import results -__all__ = ["sessions", "tasks", "partitions"] +__all__ = ["sessions", "tasks", "partitions", "results"] diff --git a/src/armonik_cli/commands/results.py b/src/armonik_cli/commands/results.py new file mode 100644 index 0000000..d27739b --- /dev/null +++ b/src/armonik_cli/commands/results.py @@ -0,0 +1,226 @@ +from collections import defaultdict +import grpc +import rich_click as click + +from typing import List, Union + +from armonik.client.results import ArmoniKResults +from armonik.common import Result, ResultStatus, Direction +from armonik.common.filter import PartitionFilter, Filter + +from armonik_cli.core import console, base_command +from armonik_cli.core.options import MutuallyExclusiveOption +from armonik_cli.core.params import FieldParam, FilterParam, ResultParam + + +RESULT_TABLE_COLS = [ + ("ID", "ResultId"), + ("Status", "Status"), + ("CreatedAt", "CreatedAt"), +] # These should be configurable (through Config) + + +@click.group(name="result") +def results() -> None: + """Manage results.""" + pass + + +@results.command(name="list") +@click.argument("session-id", required=True, type=str) +@click.option( + "-f", + "--filter", + "filter_with", + type=FilterParam("Partition"), + required=False, + help="An expression to filter the listed results with.", + metavar="FILTER EXPR", +) +@click.option( + "--sort-by", type=FieldParam("Partition"), required=False, help="Attribute of result to sort with." +) +@click.option( + "--sort-direction", + type=click.Choice(["asc", "desc"], case_sensitive=False), + default="asc", + required=False, + help="Whether to sort by ascending or by descending order.", +) +@click.option( + "--page", default=-1, help="Get a specific page, it defaults to -1 which gets all pages." +) +@click.option("--page-size", default=100, help="Number of elements in each page") +@base_command +def result_list( + endpoint: str, + output: str, + session_id: str, + filter_with: Union[PartitionFilter, None], + sort_by: Filter, + sort_direction: str, + page: int, + page_size: int, + debug: bool, +) -> None: + """List the results of an ArmoniK cluster given .""" + with grpc.insecure_channel(endpoint) as channel: + results_client = ArmoniKResults(channel) + curr_page = page if page > 0 else 0 + results_list = [] + while True: + total, results = results_client.list_results( + result_filter=(Result.session_id == session_id) & filter_with + if filter_with is not None + else (Result.session_id == session_id), + sort_field=Result.name if sort_by is None else sort_by, + sort_direction=Direction.ASC + if sort_direction.capitalize() == "ASC" + else Direction.DESC, + page=curr_page, + page_size=page_size, + ) + + results_list += results + if page > 0 or len(results_list) >= total: + break + curr_page += 1 + + if total > 0: + results = [_clean_up_status(r) for r in results] + console.formatted_print(results, format=output, table_cols=RESULT_TABLE_COLS) + + +@results.command(name="get") +@click.argument("result-ids", type=str, nargs=-1, required=True) +@base_command +def result_get(endpoint: str, output: str, result_ids: List[str], debug: bool) -> None: + """Get details about multiple results given their RESULT_IDs.""" + with grpc.insecure_channel(endpoint) as channel: + results_client = ArmoniKResults(channel) + results = [] + for result_id in result_ids: + result = results_client.get_result(result_id) + result = _clean_up_status(result) + results.append(result) + console.formatted_print(results, format=output, table_cols=RESULT_TABLE_COLS) + + +@results.command(name="create") +@click.argument("session-id", type=str, required=True) +@click.option( + "-r", + "--result", + "results", + type=ResultParam(), + required=True, + multiple=True, + help=( + "Results to create. You can pass:\n" + "1. --result (only metadata is created).\n" + "2. --result ' bytes ' (data is provided in bytes).\n" + "3. --result ' file ' (data is provided from a file)." + ), +) +@base_command +def result_create( + endpoint: str, output: str, results: List[ResultParam.ParamType], session_id: str, debug: bool +) -> None: + """Create result objects in a session with id SESSION_ID.""" + results_with_data = {res[0]: res[2] for res in results if res[1] == "bytes"} + metadata_only = [res[0] for res in results if res[1] == "nodata"] + # Read in remaining data from files + for res in results: + if res[1] == "file": + result_data_filepath = res[2] + try: + with open(result_data_filepath, "rb") as file: + results_with_data[res[0]] = file.read() + except FileNotFoundError: + raise click.FileError( + result_data_filepath, + f"File {result_data_filepath} not found for result {res[0]}", + ) + channel = grpc.insecure_channel(endpoint) + results_client = ArmoniKResults(channel) + # Create metadata-only results + if len(metadata_only) > 0: + _ = results_client.create_results_metadata( + result_names=metadata_only, session_id=session_id + ) + console.print(f"Created metadata for results: {metadata_only}") + # TODO: Should probably print a table mapping result_name to result_id + # Create results with data + if len(results_with_data.keys()) > 0: + _ = results_client.create_results(results_data=results_with_data, session_id=session_id) + result_names = ", ".join(list(results_with_data.keys())) + console.print(f"Created the following results along with their data: {result_names}") + + +@results.command(name="upload_data") +@click.argument("session-id", type=str, required=True) +@click.argument("result-id", type=str, required=True) +@click.option( + "--from-bytes", + type=str, + cls=MutuallyExclusiveOption, + mutual=["from_file"] +) +@click.option( + "--from-file", + type=str, + cls=MutuallyExclusiveOption, + mutual=["from_bytes"] +) +@base_command +def result_upload_data( + endpoint: str, + output: str, + session_id: str, + result_id: Union[str, None], + from_bytes: Union[str, None], + from_file: str, + debug: bool, +) -> None: + """Upload data for a result separately""" + with grpc.insecure_channel(endpoint) as channel: + results_client = ArmoniKResults(channel) + if from_bytes is not None: + result_data = bytes(from_bytes, encoding="utf-8") + if from_file is not None: + try: + with open(from_file, "rb") as file: + result_data = file.read() + except FileNotFoundError: + raise click.FileError( + from_file, f"File {from_file} not found for result with id {result_id}" + ) + + results_client.upload_result_data(result_id, session_id, result_data) + + +@results.command(name="delete_data") +@click.argument("result-ids", type=str, nargs=-1, required=True) +@click.option("--yes", is_flag=True) +@base_command +def result_delete_data( + endpoint: str, output: str, result_ids: List[str], yes: bool, debug: bool +) -> None: + """Delete the data of multiple results given their RESULT_IDs.""" + with grpc.insecure_channel(endpoint) as channel: + results_client = ArmoniKResults(channel) + session_result_mapping = defaultdict(list) + for result_id in result_ids: + result = results_client.get_result(result_id) + session_result_mapping[result.session_id].append(result_id) + if yes or click.confirm( + f"Are you sure you want to delete the result data of task [{result.owner_task_id}] in session [{result.session_id}]", + abort=False, + ): + for session_id, result_ids_for_session in session_result_mapping.items(): + results_client.delete_result_data(result_ids_for_session, session_id) + + +def _clean_up_status(result: Result) -> Result: + result.status = ResultStatus.name_from_value(result.status).split("_")[-1].capitalize() + return result diff --git a/src/armonik_cli/core/params.py b/src/armonik_cli/core/params.py index 75f98a0..aa8a7e7 100644 --- a/src/armonik_cli/core/params.py +++ b/src/armonik_cli/core/params.py @@ -4,6 +4,7 @@ from datetime import timedelta from typing import cast, Tuple, Union +from pathlib import Path from armonik import common from armonik.common import Filter @@ -188,3 +189,38 @@ def convert( ) cls = getattr(common, self.base_struct) return getattr(cls, value) + + +class ResultParam(click.ParamType): + name = "result" + ParamType = Union[Tuple[str, str], Tuple[str, str, str], Tuple[str, str, bytes]] + + def convert( + self, value: str, param: Union[click.Parameter, None], ctx: Union[click.Context, None] + ) -> ParamType: + # breakpoint() + if not value: + return None + + parts = value.split(" ") + + # Validate parts + if len(parts) == 1: + return (parts[0], "nodata") + if len(parts) == 3: + if parts[1] == "bytes": + return (parts[0], "bytes", bytes(parts[2], encoding="utf-8")) + elif parts[1] == "file": + # check if file exists + if Path(parts[2]).is_file(): + return (parts[0], "file", parts[2]) + else: + self.fail( + f"Tried to check for [{parts[0]}]'s results data but couldn't find the file [{parts[2]}]." + ) + else: + self.fail( + f"Invalid type '{parts[1]}'. Must be 'bytes' or 'file'", + param, + ctx, + ) diff --git a/src/armonik_cli/core/serialize.py b/src/armonik_cli/core/serialize.py index 3011106..757be4c 100644 --- a/src/armonik_cli/core/serialize.py +++ b/src/armonik_cli/core/serialize.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta from typing import Dict, List, Union, Any -from armonik.common import Session, TaskOptions, Task, Partition +from armonik.common import Session, Task, TaskOptions, Partition, Result from google._upb._message import ScalarMapContainer, RepeatedScalarContainer @@ -16,7 +16,7 @@ class CLIJSONEncoder(json.JSONEncoder): __api_types: The list of ArmoniK API Python objects managed by this encoder. """ - __api_types = [Session, TaskOptions, Task, Partition] + __api_types = [Session, Task, TaskOptions, Partition, Result] def default(self, obj: object) -> Union[str, Dict[str, Any], List[Any]]: """ diff --git a/tests/commands/test_results.py b/tests/commands/test_results.py new file mode 100644 index 0000000..98a0d17 --- /dev/null +++ b/tests/commands/test_results.py @@ -0,0 +1,357 @@ +from copy import deepcopy +from datetime import datetime +from unittest.mock import Mock, mock_open +import pytest + +from armonik.client.results import ArmoniKResults +from armonik.common import Result, ResultStatus + +from conftest import run_cmd_and_assert_exit_code, reformat_cmd_output + +ENDPOINT = "172.17.119.85:5001" + +raw_results = [ + Result( + session_id="5f723ff6-56e7-4329-a5d1-5e1f7ba6c7ef", + name="", + created_by="37b15f70-cacc-4ffd-8cbd-e5986f7edbb4", + owner_task_id="", + status=ResultStatus.DELETED, + created_at=datetime(year=2025, month=1, day=15), + completed_at=None, + result_id="e9cb3a53-e742-4ea5-b66b-8a0f9eb1f49d", + size=20910, + ), + Result( + session_id="5f723ff6-56e7-4329-a5d1-5e1f7ba6c7ef", + name="", + created_by="37b15f70-cacc-4ffd-8cbd-e5986f7edbb4", + owner_task_id="", + status=ResultStatus.DELETED, + created_at=datetime(year=2025, month=1, day=15), + completed_at=None, + result_id="cb0c7a4a-d7e7-4bea-bb0e-66c3628f9ac3", + size=20942, + ), +] + +serialized_results = [ + { + "SessionId": "5f723ff6-56e7-4329-a5d1-5e1f7ba6c7ef", + "Name": "", + "CreatedBy": "37b15f70-cacc-4ffd-8cbd-e5986f7edbb4", + "OwnerTaskId": "", + "Status": "Deleted", + "CreatedAt": "2025-01-15 00:00:00", + "CompletedAt": None, + "ResultId": "e9cb3a53-e742-4ea5-b66b-8a0f9eb1f49d", + "Size": 20910, + }, + { + "SessionId": "5f723ff6-56e7-4329-a5d1-5e1f7ba6c7ef", + "Name": "", + "CreatedBy": "37b15f70-cacc-4ffd-8cbd-e5986f7edbb4", + "OwnerTaskId": "", + "Status": "Deleted", + "CreatedAt": "2025-01-15 00:00:00", + "CompletedAt": None, + "ResultId": "cb0c7a4a-d7e7-4bea-bb0e-66c3628f9ac3", + "Size": 20942, + }, +] + + +@pytest.mark.parametrize("cmd", [f"result list id -e {ENDPOINT}"]) +def test_result_list(mocker, cmd): + mocker.patch.object(ArmoniKResults, "list_results", return_value=(2, deepcopy(raw_results))) + result = run_cmd_and_assert_exit_code(cmd) + assert reformat_cmd_output(result.output, deserialize=True) == serialized_results + + +@pytest.mark.parametrize( + "cmd, expected_output", + [ + ( + f"result get --endpoint {ENDPOINT} {serialized_results[0]['ResultId']}", + [serialized_results[0]], + ), + ( + f"result get --endpoint {ENDPOINT} {serialized_results[0]['ResultId']} {serialized_results[1]['ResultId']}", + [serialized_results[0], serialized_results[1]], + ), + ], +) +def test_result_get(mocker, cmd, expected_output): + def get_result_side_effect(result_id): + if result_id == serialized_results[0]["ResultId"]: + return deepcopy(raw_results[0]) + elif result_id == serialized_results[1]["ResultId"]: + return deepcopy(raw_results[1]) + + mocker.patch.object(ArmoniKResults, "get_result", side_effect=get_result_side_effect) + result = run_cmd_and_assert_exit_code(cmd) + assert reformat_cmd_output(result.output, deserialize=True) == expected_output + + +@pytest.mark.parametrize( + "cmd, expected_output", + [ + ( + f"result delete_data --endpoint {ENDPOINT} {serialized_results[0]['ResultId']} --yes --debug", + {serialized_results[0]["SessionId"]: [serialized_results[0]["ResultId"]]}, + ), + ( + f"result delete_data --endpoint {ENDPOINT} {serialized_results[0]['ResultId']} {serialized_results[1]['ResultId']} --yes --debug", + { + serialized_results[0]["SessionId"]: [ + serialized_results[0]["ResultId"], + serialized_results[1]["ResultId"], + ] + }, + ), + ], +) +def test_result_delete_data(mocker, cmd, expected_output): + def get_result_side_effect(result_id): + if result_id == serialized_results[0]["ResultId"]: + return deepcopy(raw_results[0]) + elif result_id == serialized_results[1]["ResultId"]: + return deepcopy(raw_results[1]) + raise ValueError(f"Unexpected result_id: {result_id}") + + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel first + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class itself + mocker.patch.object(ArmoniKResults, "get_result", side_effect=get_result_side_effect) + mocker.patch.object(ArmoniKResults, "delete_result_data") + + # Run the command + run_cmd_and_assert_exit_code(cmd) + + # Get the instance that was created during the test + ArmonikResults_instance = ArmoniKResults.delete_result_data + ArmonikResults_instance.assert_called_once_with( + expected_output[serialized_results[0]["SessionId"]], serialized_results[0]["SessionId"] + ) + + +def test_result_create_file_not_found(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Mock file operations to raise FileNotFoundError + mocker.patch("builtins.open", side_effect=FileNotFoundError) + + # Run the command and expect it to fail + run_cmd_and_assert_exit_code( + [ + "result", + "create", + "my-session-id", + "-r", + "res file nonexistent", + "--endpoint", + ENDPOINT, + "--debug", + ], + split=False, + exit_code=2, + ) + + +def test_result_create_metadata_only(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class + mocker.patch.object(ArmoniKResults, "create_results_metadata", return_value=[]) + mocker.patch.object(ArmoniKResults, "create_results", return_value=[]) + + cmd = [ + "result", + "create", + "my-session-id", + "--result", + "result1", + "--result", + "result2", + "--endpoint", + ENDPOINT, + ] + run_cmd_and_assert_exit_code(cmd, split=False) + + ArmoniKResults.create_results_metadata.assert_called_once_with( + session_id="my-session-id", result_names=["result1", "result2"] + ) + ArmoniKResults.create_results.assert_not_called() + + +def test_result_create_with_bytes(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class + mocker.patch.object(ArmoniKResults, "create_results_metadata", return_value=[]) + mocker.patch.object(ArmoniKResults, "create_results", return_value=[]) + + cmd = [ + "result", + "create", + "my-session-id", + "--result", + "result1 bytes hello", + "--result", + "result2 bytes world", + "--endpoint", + ENDPOINT, + ] + run_cmd_and_assert_exit_code(cmd, split=False) + + ArmoniKResults.create_results_metadata.assert_not_called() + ArmoniKResults.create_results.assert_called_once_with( + session_id="my-session-id", results_data={"result1": b"hello", "result2": b"world"} + ) + + +def test_result_create_mixed(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class + mocker.patch.object(ArmoniKResults, "create_results_metadata", return_value=[]) + mocker.patch.object(ArmoniKResults, "create_results", return_value=[]) + + cmd = [ + "result", + "create", + "my-session-id", + "--result", + "result1", + "--result", + "result2 bytes hello", + "--endpoint", + ENDPOINT, + ] + run_cmd_and_assert_exit_code(cmd, split=False) + + ArmoniKResults.create_results_metadata.assert_called_once_with( + session_id="my-session-id", result_names=["result1"] + ) + ArmoniKResults.create_results.assert_called_once_with( + session_id="my-session-id", results_data={"result2": b"hello"} + ) + + +def test_result_upload_data_from_bytes(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class + mocker.patch.object(ArmoniKResults, "upload_result_data", return_value=None) + + cmd = [ + "result", + "upload_data", + "my-session-id", + "result-id", + "--from-bytes", + "hello", + "--endpoint", + ENDPOINT, + ] + run_cmd_and_assert_exit_code(cmd, split=False) + + ArmoniKResults.upload_result_data.assert_called_once_with( + "result-id", "my-session-id", b"hello" + ) + + +def test_result_upload_data_from_file(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Patch the methods on ArmoniKResults class + mocker.patch.object(ArmoniKResults, "upload_result_data", return_value=None) + + # Mock file operations + mock_file_content = b"file content" + mocker.patch("builtins.open", mock_open(read_data=mock_file_content)) + + cmd = [ + "result", + "upload_data", + "my-session-id", + "result-id", + "--from-file", + "test.txt", + "--endpoint", + ENDPOINT, + ] + run_cmd_and_assert_exit_code(cmd, split=False) + + ArmoniKResults.upload_result_data.assert_called_once_with( + "result-id", "my-session-id", mock_file_content + ) + + +def test_result_upload_data_file_not_found(mocker): + # Create a mock channel that supports context manager + mock_channel = Mock() + mock_channel.__enter__ = Mock(return_value=mock_channel) + mock_channel.__exit__ = Mock(return_value=None) + + # Patch the channel creation + mocker.patch("grpc.insecure_channel", return_value=mock_channel) + + # Mock file operations to raise FileNotFoundError + mocker.patch("builtins.open", side_effect=FileNotFoundError) + + cmd = [ + "result", + "upload_data", + "my-session-id", + "result-id", + "--from-file", + "nonexistent.txt", + "--endpoint", + ENDPOINT, + ] + result = run_cmd_and_assert_exit_code(cmd, split=False, exit_code=1) + + assert "Could not open file" in str(result.output) diff --git a/tests/conftest.py b/tests/conftest.py index 3fca495..b9c3124 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,9 +8,14 @@ def run_cmd_and_assert_exit_code( - cmd: str, exit_code: int = 0, input: Optional[str] = None, env: Optional[Dict[str, str]] = None + cmd: str, + exit_code: int = 0, + input: Optional[str] = None, + env: Optional[Dict[str, str]] = None, + split: bool = True, ) -> Result: - cmd = cmd.split() + if split: + cmd = cmd.split() runner = CliRunner() with runner.isolated_filesystem(): result = runner.invoke(cli, cmd, input=input, env=env)