From 2b78c85b342e86cdb4e2550a9da4db8c4e04d85f Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 18:38:47 +0800 Subject: [PATCH 1/6] add async.get() --- .../executor/_line_execution_process_pool.py | 1 + .../test_line_execution_process_pool.py | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index 0669d1e88e7..ec69a40a995 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -379,6 +379,7 @@ def run(self, batch_inputs): while not async_result.ready(): # Check every 1 second async_result.wait(1) + async_result.get() except KeyboardInterrupt: raise except PromptflowException: diff --git a/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py b/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py index efc345959ac..86524e03c21 100644 --- a/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py +++ b/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py @@ -224,3 +224,38 @@ def test_get_multiprocessing_context(self): # Not set start method context = get_multiprocessing_context() assert context.get_start_method() == multiprocessing.get_start_method() + + @pytest.mark.parametrize( + "flow_folder", + [ + SAMPLE_FLOW, + ], + ) + def test_process_pool_run_with_exception(self, flow_folder, dev_connections, mocker: MockFixture): + # mock process pool run execution raise error + test_error_msg = "Test user error" + mocker.patch( + "promptflow.executor._line_execution_process_pool.LineExecutionProcessPool." "_timeout_process_wrapper", + side_effect=UserErrorException(message=test_error_msg, target=ErrorTarget.AZURE_RUN_STORAGE), + ) + executor = FlowExecutor.create( + get_yaml_file(flow_folder), + dev_connections, + ) + run_id = str(uuid.uuid4()) + bulk_inputs = self.get_bulk_inputs() + nlines = len(bulk_inputs) + with LineExecutionProcessPool( + executor, + nlines, + run_id, + "", + False, + None, + ) as pool: + with pytest.raises(UserErrorException) as e: + pool.run(zip(range(nlines), bulk_inputs)) + print(f"eee:{e.value}") + assert e.value.message == test_error_msg + assert e.value.target == ErrorTarget.AZURE_RUN_STORAGE + assert e.value.error_codes[0] == "UserError" From 2dd0228614cc302350ed5205602c2c1059a9a195 Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 18:46:58 +0800 Subject: [PATCH 2/6] Add some comments --- .../promptflow/executor/_line_execution_process_pool.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index ec69a40a995..0a41981095d 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -379,6 +379,7 @@ def run(self, batch_inputs): while not async_result.ready(): # Check every 1 second async_result.wait(1) + # Raise exceptions if they are raised. async_result.get() except KeyboardInterrupt: raise From a57d43246cce62ff70a0a1edcad2ee13841e0d41 Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 19:05:55 +0800 Subject: [PATCH 3/6] Add comments --- .../promptflow/executor/_line_execution_process_pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index 0a41981095d..c445d146091 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -379,7 +379,8 @@ def run(self, batch_inputs): while not async_result.ready(): # Check every 1 second async_result.wait(1) - # Raise exceptions if they are raised. + # If the remote call raised an exception then that exception will be reraised by get(). + # Related link: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult async_result.get() except KeyboardInterrupt: raise From e1f6855591a907c0c2bbc4f4922bbdd9421a359a Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 19:08:13 +0800 Subject: [PATCH 4/6] Add some comments --- .../promptflow/executor/_line_execution_process_pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index c445d146091..8fe45e6a340 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -380,7 +380,8 @@ def run(self, batch_inputs): # Check every 1 second async_result.wait(1) # If the remote call raised an exception then that exception will be reraised by get(). - # Related link: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult + # Related link: + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult async_result.get() except KeyboardInterrupt: raise From d85d47c0e9a8b00e6e539b21defff25ddf8c1c65 Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 19:21:17 +0800 Subject: [PATCH 5/6] Add some comments --- .../promptflow/executor/_line_execution_process_pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/promptflow/promptflow/executor/_line_execution_process_pool.py b/src/promptflow/promptflow/executor/_line_execution_process_pool.py index 8fe45e6a340..24341c54a13 100644 --- a/src/promptflow/promptflow/executor/_line_execution_process_pool.py +++ b/src/promptflow/promptflow/executor/_line_execution_process_pool.py @@ -379,9 +379,10 @@ def run(self, batch_inputs): while not async_result.ready(): # Check every 1 second async_result.wait(1) - # If the remote call raised an exception then that exception will be reraised by get(). - # Related link: - # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult + # To ensure exceptions in thread-pool calls are propagated to the main process for proper handling + # The exceptions raised will be re-raised by the get() method. + # Related link: + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult async_result.get() except KeyboardInterrupt: raise From aef128e9eb161cecc3ef7779f04f09572db1da7f Mon Sep 17 00:00:00 2001 From: LuLu Zuo Date: Wed, 22 Nov 2023 19:27:21 +0800 Subject: [PATCH 6/6] delete useless code --- .../unittests/processpool/test_line_execution_process_pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py b/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py index 86524e03c21..d7a4b6a2c0a 100644 --- a/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py +++ b/src/promptflow/tests/executor/unittests/processpool/test_line_execution_process_pool.py @@ -255,7 +255,6 @@ def test_process_pool_run_with_exception(self, flow_folder, dev_connections, moc ) as pool: with pytest.raises(UserErrorException) as e: pool.run(zip(range(nlines), bulk_inputs)) - print(f"eee:{e.value}") assert e.value.message == test_error_msg assert e.value.target == ErrorTarget.AZURE_RUN_STORAGE assert e.value.error_codes[0] == "UserError"