Skip to content

Commit

Permalink
Added result commands
Browse files Browse the repository at this point in the history
  • Loading branch information
AncientPatata committed Jan 16, 2025
1 parent 34012e6 commit 0311513
Show file tree
Hide file tree
Showing 8 changed files with 671 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/armonik_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ def cli() -> None:
cli.add_command(commands.sessions)
cli.add_command(commands.tasks)
cli.add_command(commands.partitions)
cli.add_command(commands.results)
3 changes: 2 additions & 1 deletion src/armonik_cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .sessions import sessions
from .tasks import tasks
from .partitions import partitions
from .results import results


__all__ = ["sessions", "tasks", "partitions"]
__all__ = ["sessions", "tasks", "partitions", "results"]
219 changes: 219 additions & 0 deletions src/armonik_cli/commands/results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from collections import defaultdict
import grpc
import rich_click as click

from typing import List, Union

from armonik.client.results import ArmoniKResults
from armonik.common import Result, ResultStatus, Direction
from armonik.common.filter import PartitionFilter, Filter

from armonik_cli.core import console, base_command
from armonik_cli.core.options import MutuallyExclusiveOption
from armonik_cli.core.params import FieldParam, FilterParam, ResultParam


RESULT_TABLE_COLS = [
("ID", "ResultId"),
("Status", "Status"),
("CreatedAt", "CreatedAt"),
] # These should be configurable (through Config)


@click.group(name="result")
def results() -> None:
"""Manage results."""
pass


@results.command(name="list")
@click.argument("session-id", required=True, type=str)
@click.option(
"-f",
"--filter",
"filter_with",
type=FilterParam("Partition"),
required=False,
help="An expression to filter the listed results with.",
metavar="FILTER EXPR",
)
@click.option(
"--sort-by",
type=FieldParam("Partition"),
required=False,
help="Attribute of result to sort with.",
)
@click.option(
"--sort-direction",
type=click.Choice(["asc", "desc"], case_sensitive=False),
default="asc",
required=False,
help="Whether to sort by ascending or by descending order.",
)
@click.option(
"--page", default=-1, help="Get a specific page, it defaults to -1 which gets all pages."
)
@click.option("--page-size", default=100, help="Number of elements in each page")
@base_command
def result_list(
endpoint: str,
output: str,
session_id: str,
filter_with: Union[PartitionFilter, None],
sort_by: Filter,
sort_direction: str,
page: int,
page_size: int,
debug: bool,
) -> None:
"""List the results of an ArmoniK cluster given <SESSION-ID>."""
with grpc.insecure_channel(endpoint) as channel:
results_client = ArmoniKResults(channel)
curr_page = page if page > 0 else 0
results_list = []
while True:
total, results = results_client.list_results(
result_filter=(Result.session_id == session_id) & filter_with
if filter_with is not None
else (Result.session_id == session_id),
sort_field=Result.name if sort_by is None else sort_by,
sort_direction=Direction.ASC
if sort_direction.capitalize() == "ASC"
else Direction.DESC,
page=curr_page,
page_size=page_size,
)

results_list += results
if page > 0 or len(results_list) >= total:
break
curr_page += 1

if total > 0:
results = [_clean_up_status(r) for r in results]
console.formatted_print(results, format=output, table_cols=RESULT_TABLE_COLS)


@results.command(name="get")
@click.argument("result-ids", type=str, nargs=-1, required=True)
@base_command
def result_get(endpoint: str, output: str, result_ids: List[str], debug: bool) -> None:
"""Get details about multiple results given their RESULT_IDs."""
with grpc.insecure_channel(endpoint) as channel:
results_client = ArmoniKResults(channel)
results = []
for result_id in result_ids:
result = results_client.get_result(result_id)
result = _clean_up_status(result)
results.append(result)
console.formatted_print(results, format=output, table_cols=RESULT_TABLE_COLS)


@results.command(name="create")
@click.argument("session-id", type=str, required=True)
@click.option(
"-r",
"--result",
"results",
type=ResultParam(),
required=True,
multiple=True,
help=(
"Results to create. You can pass:\n"
"1. --result <result_name> (only metadata is created).\n"
"2. --result '<result_name> bytes <bytes>' (data is provided in bytes).\n"
"3. --result '<result_name> file <filepath>' (data is provided from a file)."
),
)
@base_command
def result_create(
endpoint: str, output: str, results: List[ResultParam.ParamType], session_id: str, debug: bool
) -> None:
"""Create result objects in a session with id SESSION_ID."""
results_with_data = {res[0]: res[2] for res in results if res[1] == "bytes"} # type: ignore
metadata_only = [res[0] for res in results if res[1] == "nodata"]
# Read in remaining data from files
for res in results:
if res[1] == "file":
result_data_filepath = res[2] # type: ignore
try:
with open(result_data_filepath, "rb") as file:
results_with_data[res[0]] = file.read()
except FileNotFoundError:
raise click.FileError(
str(result_data_filepath),
f"File {str(result_data_filepath)} not found for result {res[0]}",
)
channel = grpc.insecure_channel(endpoint)
results_client = ArmoniKResults(channel)
# Create metadata-only results
if len(metadata_only) > 0:
_ = results_client.create_results_metadata(
result_names=metadata_only, session_id=session_id
)
console.print(f"Created metadata for results: {metadata_only}")
# TODO: Should probably print a table mapping result_name to result_id
# Create results with data
if len(results_with_data.keys()) > 0:
_ = results_client.create_results(results_data=results_with_data, session_id=session_id)
result_names = ", ".join(list(results_with_data.keys()))
console.print(f"Created the following results along with their data: {result_names}")


@results.command(name="upload_data")
@click.argument("session-id", type=str, required=True)
@click.argument("result-id", type=str, required=True)
@click.option("--from-bytes", type=str, cls=MutuallyExclusiveOption, mutual=["from_file"])
@click.option("--from-file", type=str, cls=MutuallyExclusiveOption, mutual=["from_bytes"])
@base_command
def result_upload_data(
endpoint: str,
output: str,
session_id: str,
result_id: Union[str, None],
from_bytes: Union[str, None],
from_file: str,
debug: bool,
) -> None:
"""Upload data for a result separately"""
with grpc.insecure_channel(endpoint) as channel:
results_client = ArmoniKResults(channel)
if from_bytes is not None:
result_data = bytes(from_bytes, encoding="utf-8")
if from_file is not None:
try:
with open(from_file, "rb") as file:
result_data = file.read()
except FileNotFoundError:
raise click.FileError(
from_file, f"File {from_file} not found for result with id {result_id}"
)

results_client.upload_result_data(result_id, session_id, result_data)


@results.command(name="delete_data")
@click.argument("result-ids", type=str, nargs=-1, required=True)
@click.option("--yes", is_flag=True)
@base_command
def result_delete_data(
endpoint: str, output: str, result_ids: List[str], yes: bool, debug: bool
) -> None:
"""Delete the data of multiple results given their RESULT_IDs."""
with grpc.insecure_channel(endpoint) as channel:
results_client = ArmoniKResults(channel)
session_result_mapping = defaultdict(list)
for result_id in result_ids:
result = results_client.get_result(result_id)
session_result_mapping[result.session_id].append(result_id)
if yes or click.confirm(
f"Are you sure you want to delete the result data of task [{result.owner_task_id}] in session [{result.session_id}]",
abort=False,
):
for session_id, result_ids_for_session in session_result_mapping.items():
results_client.delete_result_data(result_ids_for_session, session_id)


def _clean_up_status(result: Result) -> Result:
result.status = ResultStatus.name_from_value(result.status).split("_")[-1].capitalize()
return result
48 changes: 48 additions & 0 deletions src/armonik_cli/core/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any, List, Tuple, Mapping

import rich_click as click


class MutuallyExclusiveOption(click.Option):
"""
A custom Click option class that enforces mutual exclusivity between specified options.
This class allows defining options that cannot be used together with other specific options.
If any of the mutually exclusive options are used simultaneously, a usage error is raised.
Attributes:
mutual: A list of option names that cannot be used together with this option.
"""

def __init__(self, *args, **kwargs):
self.mutual = set(kwargs.pop("mutual", []))
if self.mutual:
kwargs["help"] = (
f"{kwargs.get('help', '')} This option cannot be used together with {' or '.join(self.mutual)}."
)
super().__init__(*args, **kwargs)

def handle_parse_result(
self, ctx: click.Context, opts: Mapping[str, Any], args: List[str]
) -> Tuple[Any, List[str]]:
"""
Handle the parsing of command-line options, enforcing mutual exclusivity.
Args:
ctx: The Click context, which provides information about the command being executed.
opts: A dictionary of the parsed command-line options.
args: The remaining command-line arguments.
Returns:
The result of the superclass's `handle_parse_result` method, if no mutual exclusivity violation occurs.
Raises:
click.UsageError: If this option and any of the mutually exclusive options are used together.
"""
mutex = self.mutual.intersection(opts)
if mutex and self.name in opts:
raise click.UsageError(
f"Illegal usage: `{self.name}` cannot be used together with '{''.join(mutex)}'."
)

return super().handle_parse_result(ctx, opts, args)
35 changes: 35 additions & 0 deletions src/armonik_cli/core/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from datetime import timedelta
from typing import cast, Tuple, Union
from pathlib import Path

from armonik import common
from armonik.common import Filter
Expand Down Expand Up @@ -188,3 +189,37 @@ def convert(
)
cls = getattr(common, self.base_struct)
return getattr(cls, value)


class ResultParam(click.ParamType):
name = "result"
ParamType = Union[Tuple[str, str], Tuple[str, str, str], Tuple[str, str, bytes]]

def convert(
self, value: str, param: Union[click.Parameter, None], ctx: Union[click.Context, None]
) -> Union[ParamType, None]:
if not value:
return None

parts = value.split(" ")

# Validate parts
if len(parts) == 1:
return (parts[0], "nodata")
if len(parts) == 3:
if parts[1] == "bytes":
return (parts[0], "bytes", bytes(parts[2], encoding="utf-8"))
elif parts[1] == "file":
# check if file exists
if Path(parts[2]).is_file():
return (parts[0], "file", parts[2])
else:
self.fail(
f"Tried to check for [{parts[0]}]'s results data but couldn't find the file [{parts[2]}]."
)
else:
self.fail(
f"Invalid type '{parts[1]}'. Must be 'bytes' or 'file'",
param,
ctx,
)
4 changes: 2 additions & 2 deletions src/armonik_cli/core/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta
from typing import Dict, List, Union, Any

from armonik.common import Session, TaskOptions, Task, Partition
from armonik.common import Session, Task, TaskOptions, Partition, Result
from google._upb._message import ScalarMapContainer, RepeatedScalarContainer


Expand All @@ -16,7 +16,7 @@ class CLIJSONEncoder(json.JSONEncoder):
__api_types: The list of ArmoniK API Python objects managed by this encoder.
"""

__api_types = [Session, TaskOptions, Task, Partition]
__api_types = [Session, Task, TaskOptions, Partition, Result]

def default(self, obj: object) -> Union[str, Dict[str, Any], List[Any]]:
"""
Expand Down
Loading

0 comments on commit 0311513

Please sign in to comment.