diff --git a/examples/flows/chat/chat-with-pdf/tests/chat_with_pdf_test.py b/examples/flows/chat/chat-with-pdf/tests/chat_with_pdf_test.py index 17d303321ff..8d9d36d2e78 100644 --- a/examples/flows/chat/chat-with-pdf/tests/chat_with_pdf_test.py +++ b/examples/flows/chat/chat-with-pdf/tests/chat_with_pdf_test.py @@ -2,7 +2,6 @@ import unittest import promptflow from base_test import BaseTest -from promptflow.exceptions import ValidationException class TestChatWithPDF(BaseTest): @@ -72,28 +71,26 @@ def test_bulk_run_valid_mapping(self): self.assertEqual(details.shape[0], 3) def test_bulk_run_mapping_missing_one_column(self): - # in this case, run won't be created because the question column is missed in the data data_path = os.path.join( self.flow_path, "data/invalid-data-missing-column.jsonl" ) - with self.assertRaises(ValidationException): - self.create_chat_run( - column_mapping={ - "question": "${data.question}", - }, - data=data_path - ) + run = self.create_chat_run( + column_mapping={ + "question": "${data.question}", + }, + data=data_path + ) + self.assertEqual(run.status, "Failed") def test_bulk_run_invalid_mapping(self): - # in this case, run won't be created. - with self.assertRaises(ValidationException): - self.create_chat_run( - column_mapping={ - "question": "${data.question_not_exist}", - "pdf_url": "${data.pdf_url}", - "chat_history": "${data.chat_history}", - } - ) + run = self.create_chat_run( + column_mapping={ + "question": "${data.question_not_exist}", + "pdf_url": "${data.pdf_url}", + "chat_history": "${data.chat_history}", + } + ) + self.assertEqual(run.status, "Failed") if __name__ == "__main__": diff --git a/src/promptflow/CHANGELOG.md b/src/promptflow/CHANGELOG.md index f1329137fbb..7649345ce32 100644 --- a/src/promptflow/CHANGELOG.md +++ b/src/promptflow/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 0.1.0b9 (Upcoming) +## 1.0.0 (2023.11.09) ### Features Added @@ -11,6 +11,11 @@ ### Bugs Fixed - [SDK/CLI] Keep original format in run output.jsonl. +- [Executor] Fix the bug that raise an error when an aggregation node references a bypassed node + +### Improvements + +- [Executor] Set the outputs of the bypassed nodes as None ## 0.1.0b8 (2023.10.26) diff --git a/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py b/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py index 4709d626be9..bfeb3bbd0cb 100644 --- a/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py +++ b/src/promptflow/promptflow/_sdk/operations/_local_storage_operations.py @@ -261,14 +261,9 @@ def load_outputs(self) -> RunOutputs: df = pd.read_json(f, orient="records", lines=True) return df.to_dict("list") - # get total number of line runs from inputs - num_line_runs = len(list(self.load_inputs().values())[0]) with open(self._outputs_path, mode="r", encoding=DEFAULT_ENCODING) as f: df = pd.read_json(f, orient="records", lines=True) - # if all line runs are failed, no need to fill if len(df) > 0: - df = self._outputs_padding(df, num_line_runs) - df.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt df = df.set_index(LINE_NUMBER) return df.to_dict("list") @@ -429,14 +424,14 @@ def _prepare_folder(path: Union[str, Path]) -> Path: return path @staticmethod - def _outputs_padding(df: pd.DataFrame, expected_rows: int) -> pd.DataFrame: + def _outputs_padding(df: pd.DataFrame, inputs_line_numbers: List[int]) -> pd.DataFrame: + if len(df) == len(inputs_line_numbers): + return df missing_lines = [] lines_set = set(df[LINE_NUMBER].values) - for i in range(expected_rows): + for i in inputs_line_numbers: if i not in lines_set: missing_lines.append({LINE_NUMBER: i}) - if len(missing_lines) == 0: - return df df_to_append = pd.DataFrame(missing_lines) res = pd.concat([df, df_to_append], ignore_index=True) res = res.sort_values(by=LINE_NUMBER, ascending=True) @@ -452,7 +447,7 @@ def load_inputs_and_outputs(self) -> Tuple[pd.DataFrame, pd.DataFrame]: outputs = pd.read_json(f, orient="records", lines=True) # if all line runs are failed, no need to fill if len(outputs) > 0: - outputs = self._outputs_padding(outputs, len(inputs)) + outputs = self._outputs_padding(outputs, inputs["line_number"].tolist()) outputs.fillna(value="(Failed)", inplace=True) # replace nan with explicit prompt outputs = outputs.set_index(LINE_NUMBER) return inputs, outputs diff --git a/src/promptflow/promptflow/_sdk/operations/_run_operations.py b/src/promptflow/promptflow/_sdk/operations/_run_operations.py index de0c2051f5f..70a91902704 100644 --- a/src/promptflow/promptflow/_sdk/operations/_run_operations.py +++ b/src/promptflow/promptflow/_sdk/operations/_run_operations.py @@ -22,7 +22,7 @@ ) from promptflow._sdk._errors import InvalidRunStatusError, RunExistsError, RunNotFoundError, RunOperationParameterError from promptflow._sdk._orm import RunInfo as ORMRun -from promptflow._sdk._utils import incremental_print, safe_parse_object_list +from promptflow._sdk._utils import incremental_print, print_red_error, safe_parse_object_list from promptflow._sdk._visualize_functions import dump_html, generate_html_string from promptflow._sdk.entities import Run from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations @@ -140,7 +140,10 @@ def stream(self, name: Union[str, Run]) -> Run: available_logs = local_storage.logger.get_logs() incremental_print(available_logs, printed, file_handler) self._print_run_summary(run) - # won't print error here, put it in run dict + # print error message when run is failed + if run.status == RunStatus.FAILED: + error_message = local_storage.load_exception()["message"] + print_red_error(error_message) except KeyboardInterrupt: error_message = "The output streaming for the run was interrupted, but the run is still executing." print(error_message) diff --git a/src/promptflow/tests/executor/unittests/_utils/test_thread_utils.py b/src/promptflow/tests/executor/unittests/_utils/test_thread_utils.py index 45b03092652..4125d1769dd 100644 --- a/src/promptflow/tests/executor/unittests/_utils/test_thread_utils.py +++ b/src/promptflow/tests/executor/unittests/_utils/test_thread_utils.py @@ -1,3 +1,4 @@ +import re import sys import time from io import StringIO @@ -29,12 +30,13 @@ def test_context_manager(self): log_message_function=generate_elapsed_time_messages, args=("Test", start_time, interval_seconds, None), ): - time.sleep(10) + time.sleep(10.5) logs = s.getvalue().split("\n") - for i in range(1, 10): - assert ( - logs[i - 1] - == f"Test has been running for {i} seconds, " - + "thread None cannot be found in sys._current_frames, " - + "maybe it has been terminated due to unexpected errors." - ) + logs = [log for log in logs if log] + log_pattern = re.compile( + r"^Test has been running for [0-9]+ seconds, thread None cannot be found in sys._current_frames, " + r"maybe it has been terminated due to unexpected errors.$" + ) + assert logs, "Logs are empty." + for log in logs: + assert re.match(log_pattern, log), f"The wrong log: {log}" diff --git a/src/promptflow/tests/executor/unittests/executor/test_tool_resolver.py b/src/promptflow/tests/executor/unittests/executor/test_tool_resolver.py index 66c89a95313..f0897d524a4 100644 --- a/src/promptflow/tests/executor/unittests/executor/test_tool_resolver.py +++ b/src/promptflow/tests/executor/unittests/executor/test_tool_resolver.py @@ -153,6 +153,10 @@ def test_resolve_tool_by_node_with_duplicated_inputs(self, resolver, mocker): assert isinstance(exec_info.value.inner_exception, NodeInputValidationError) assert "These inputs are duplicated" in exec_info.value.message + @pytest.mark.skipif( + condition=(sys.version_info.major == 3 and sys.version_info.minor == 11), + reason="BUG 2709800: known issue on enum in Python 3.11", + ) def test_ensure_node_inputs_type(self): # Case 1: conn_name not in connections, should raise conn_name not found error tool = Tool(name="mock", type="python", inputs={"conn": InputDefinition(type=["CustomConnection"])}) diff --git a/src/promptflow/tests/executor/unittests/processpool/test_healthy_ensured_process.py b/src/promptflow/tests/executor/unittests/processpool/test_healthy_ensured_process.py index 498f1bbf7db..42fe462c060 100644 --- a/src/promptflow/tests/executor/unittests/processpool/test_healthy_ensured_process.py +++ b/src/promptflow/tests/executor/unittests/processpool/test_healthy_ensured_process.py @@ -27,7 +27,7 @@ def end_process(healthy_ensured_process): class TestHealthyEnsuredProcess: def test_healthy_ensured_process(self): - context = get_multiprocessing_context("fork") + context = get_multiprocessing_context("spawn") healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context) assert healthy_ensured_process.is_ready is False task_queue = Queue() @@ -38,7 +38,7 @@ def test_healthy_ensured_process(self): assert healthy_ensured_process.process.is_alive() is False def test_unhealthy_process(self): - context = get_multiprocessing_context("fork") + context = get_multiprocessing_context("spawn") healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func_timeout, context) assert healthy_ensured_process.is_ready is False task_queue = Queue() @@ -49,7 +49,7 @@ def test_unhealthy_process(self): assert healthy_ensured_process.process.is_alive() is False def test_format_current_process(self): - context = get_multiprocessing_context("fork") + context = get_multiprocessing_context("spawn") healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context) healthy_ensured_process.process = patch( 'promptflow.executor._line_execution_process_pool.Process', autospec=True) @@ -66,7 +66,7 @@ def test_format_current_process(self): @patch('promptflow.executor._line_execution_process_pool.logger.info', autospec=True) def test_format_completed_process(self, mock_logger_info): - context = get_multiprocessing_context("fork") + context = get_multiprocessing_context("spawn") healthy_ensured_process = HealthyEnsuredProcess(executor_creation_func, context) healthy_ensured_process.process = patch( 'promptflow.executor._line_execution_process_pool.Process', autospec=True) diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py index f7381ab3650..38c9bce75fa 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_cli.py @@ -306,13 +306,13 @@ def test_pf_flow_test_with_non_english_input_output(self, capsys): stdout, _ = capsys.readouterr() output_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.output.json" assert output_path.exists() - with open(output_path, "r") as f: + with open(output_path, "r", encoding="utf-8") as f: outputs = json.load(f) assert outputs["answer"] in json.loads(stdout)["answer"] detail_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.detail.json" assert detail_path.exists() - with open(detail_path, "r") as f: + with open(detail_path, "r", encoding="utf-8") as f: detail = json.load(f) assert detail["flow_runs"][0]["inputs"]["question"] == question assert detail["flow_runs"][0]["output"]["answer"] == outputs["answer"] diff --git a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py index 4b83b2a5055..a82f0614cc0 100644 --- a/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py +++ b/src/promptflow/tests/sdk_cli_test/e2etests/test_flow_run.py @@ -58,6 +58,23 @@ def create_run_against_run(client, run: Run) -> Run: ) +def assert_run_with_invalid_column_mapping(client: PFClient, run: Run, capfd: pytest.CaptureFixture) -> None: + assert run.status == RunStatus.FAILED + + expected_error_message = "The input for batch run is incorrect. Couldn't find these mapping relations" + + client.stream(run.name) + out, _ = capfd.readouterr() + assert expected_error_message in out + + local_storage = LocalStorageOperations(run) + assert os.path.exists(local_storage._exception_path) + + exception = local_storage.load_exception() + assert expected_error_message in exception["message"] + assert exception["code"] == "BulkRunException" + + @pytest.mark.usefixtures("use_secrets_config_file", "setup_local_connection", "install_custom_tool_pkg") @pytest.mark.sdk_test @pytest.mark.e2etest @@ -321,7 +338,7 @@ def test_run_reference_failed_run(self, pf): with pytest.raises(RunNotFoundError): pf.runs.get(name=run_name) - def test_referenced_output_not_exist(self, pf): + def test_referenced_output_not_exist(self, pf: PFClient, capfd: pytest.CaptureFixture) -> None: # failed run won't generate output failed_run = pf.run( flow=f"{FLOWS_DIR}/failed_flow", @@ -336,13 +353,7 @@ def test_referenced_output_not_exist(self, pf): flow=f"{FLOWS_DIR}/failed_flow", column_mapping={"text": "${run.outputs.text}"}, ) - - local_storage = LocalStorageOperations(run) - assert os.path.exists(local_storage._exception_path) - - exception = local_storage.load_exception() - assert "The input for batch run is incorrect. Couldn't find these mapping relations" in exception["message"] - assert exception["code"] == "BulkRunException" + assert_run_with_invalid_column_mapping(pf, run, capfd) def test_connection_overwrite_file(self, local_client, local_aoai_connection): run = create_yaml_run( @@ -650,7 +661,12 @@ def test_flow_bulk_run_with_additional_includes(self, azure_open_ai_connection: additional_includes = _get_additional_includes(snapshot_path / "flow.dag.yaml") assert not additional_includes - def test_input_mapping_source_not_found_error(self, azure_open_ai_connection: AzureOpenAIConnection, pf): + def test_input_mapping_source_not_found_error( + self, + azure_open_ai_connection: AzureOpenAIConnection, + pf: PFClient, + capfd: pytest.CaptureFixture, + ): # input_mapping source not found error won't create run name = str(uuid.uuid4()) data_path = f"{DATAS_DIR}/webClassification3.jsonl" @@ -660,13 +676,7 @@ def test_input_mapping_source_not_found_error(self, azure_open_ai_connection: Az column_mapping={"not_exist": "${data.not_exist_key}"}, name=name, ) - - local_storage = LocalStorageOperations(run) - assert os.path.exists(local_storage._exception_path) - - exception = local_storage.load_exception() - assert "The input for batch run is incorrect. Couldn't find these mapping relations" in exception["message"] - assert exception["code"] == "BulkRunException" + assert_run_with_invalid_column_mapping(pf, run, capfd) def test_input_mapping_with_dict(self, azure_open_ai_connection: AzureOpenAIConnection, pf): data_path = f"{DATAS_DIR}/webClassification3.jsonl" diff --git a/src/promptflow/tests/sdk_cli_test/unittests/test_local_storage_operations.py b/src/promptflow/tests/sdk_cli_test/unittests/test_local_storage_operations.py index fbd30e8ca25..78e07b4516e 100644 --- a/src/promptflow/tests/sdk_cli_test/unittests/test_local_storage_operations.py +++ b/src/promptflow/tests/sdk_cli_test/unittests/test_local_storage_operations.py @@ -17,12 +17,20 @@ def test_outputs_padding(self) -> None: {LINE_NUMBER: 2, "col": "b"}, ] df = pd.DataFrame(data) - expected_rows = 5 - df_with_padding = LocalStorageOperations._outputs_padding(df, expected_rows) + + df_with_padding = LocalStorageOperations._outputs_padding(df, inputs_line_numbers=[0, 1, 2, 3, 4]) df_with_padding.fillna("", inplace=True) - assert len(df_with_padding) == expected_rows + assert len(df_with_padding) == 5 assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 0, "col": ""} assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 1, "col": "a"} assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 2, "col": "b"} assert df_with_padding.iloc[3].to_dict() == {LINE_NUMBER: 3, "col": ""} assert df_with_padding.iloc[4].to_dict() == {LINE_NUMBER: 4, "col": ""} + + # in evaluation run, inputs may not have all line number + df_with_padding = LocalStorageOperations._outputs_padding(df, inputs_line_numbers=[1, 2, 4]) + df_with_padding.fillna("", inplace=True) + assert len(df_with_padding) == 3 + assert df_with_padding.iloc[0].to_dict() == {LINE_NUMBER: 1, "col": "a"} + assert df_with_padding.iloc[1].to_dict() == {LINE_NUMBER: 2, "col": "b"} + assert df_with_padding.iloc[2].to_dict() == {LINE_NUMBER: 4, "col": ""}