diff --git a/src/promptflow-devkit/promptflow/_sdk/operations/_run_operations.py b/src/promptflow-devkit/promptflow/_sdk/operations/_run_operations.py index ea12d660c35..83c60650189 100644 --- a/src/promptflow-devkit/promptflow/_sdk/operations/_run_operations.py +++ b/src/promptflow-devkit/promptflow/_sdk/operations/_run_operations.py @@ -15,6 +15,7 @@ MAX_SHOW_DETAILS_RESULTS, FlowRunProperties, ListViewType, + LocalStorageFilenames, RunInfoSources, RunMode, RunStatus, @@ -35,7 +36,7 @@ from promptflow._sdk.entities import Run from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations from promptflow._utils.logger_utils import get_cli_sdk_logger -from promptflow._utils.yaml_utils import load_yaml_string +from promptflow._utils.yaml_utils import load_yaml, load_yaml_string from promptflow.contracts._run_management import RunDetail, RunMetadata, RunVisualization, VisualizationConfig from promptflow.exceptions import UserErrorException @@ -424,6 +425,25 @@ def _visualize_with_trace_ui(self, runs: List[Run], html_path: Optional[str] = N html_string = generate_trace_ui_html_string(trace_ui_url) dump_html(html_string, html_path=html_path, open_html=html_path is None) + def _get_run_flow_type(self, run: Run) -> str: + # BUG 3195705: observed `Run._flow_type` returns wrong flow type + # this function is a workaround to get the correct flow type for visualize run scenario + # so please use this function carefully + from promptflow._utils.flow_utils import is_prompty_flow, resolve_flow_path + + # prompty: according to the file extension + if is_prompty_flow(run.flow): + return FlowType.PROMPTY + # DAG vs. flex: "entry" in flow.yaml + # resolve run snapshot, where must exist flow.dag.yaml or flow.flex.yaml + snapshot_path = run._output_path / LocalStorageFilenames.SNAPSHOT_FOLDER + flow_directory, yaml_file = resolve_flow_path(snapshot_path) + yaml_dict = load_yaml(flow_directory / yaml_file) + if isinstance(yaml_dict, dict) and "entry" in yaml_dict: + return FlowType.FLEX_FLOW + else: + return FlowType.DAG_FLOW + @monitor_operation(activity_name="pf.runs.visualize", activity_type=ActivityType.PUBLICAPI) def visualize(self, runs: Union[str, Run, List[str], List[Run]], **kwargs) -> None: """Visualize run(s). @@ -448,7 +468,8 @@ def visualize(self, runs: Union[str, Run, List[str], List[Run]], **kwargs) -> No # for existing run source run, will raise type error when call `_flow_type`, so skip it if run._run_source == RunInfoSources.EXISTING_RUN: continue - if run._flow_type == FlowType.FLEX_FLOW or run._flow_type == FlowType.PROMPTY: + flow_type = self._get_run_flow_type(run) + if flow_type == FlowType.FLEX_FLOW or flow_type == FlowType.PROMPTY: has_flex_or_prompty = True break if has_flex_or_prompty is True: