Skip to content

Commit

Permalink
[Internal][Executor] Add line_timeout_sec to the initialization param…
Browse files Browse the repository at this point in the history
…eters of BatchEngine (#2054)

# Description

Add `line_timeout_sec` to the initialization parameters of BatchEngine.

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
PeiwenGaoMS authored Feb 20, 2024
1 parent e398924 commit 1ff5ac6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(
connections: Optional[dict] = None,
storage: Optional[AbstractRunStorage] = None,
batch_timeout_sec: Optional[int] = None,
line_timeout_sec: Optional[int] = None,
worker_count: Optional[int] = None,
**kwargs,
):
Expand All @@ -94,6 +95,8 @@ def __init__(
:type storage: Optional[~promptflow.storage._run_storage.AbstractRunStorage]
:param batch_timeout: The timeout of batch run in seconds
:type batch_timeout: Optional[int]
:param line_timeout: The timeout of each line in seconds
:type line_timeout: Optional[int]
:param worker_count: The concurrency limit of batch run
:type worker_count: Optional[int]
:param kwargs: The keyword arguments related to creating the executor proxy class
Expand All @@ -114,7 +117,7 @@ def __init__(
self._kwargs = kwargs

self._batch_timeout_sec = batch_timeout_sec or get_int_env_var("PF_BATCH_TIMEOUT_SEC")
self._line_timeout_sec = get_int_env_var("PF_LINE_TIMEOUT_SEC", LINE_TIMEOUT_SEC)
self._line_timeout_sec = line_timeout_sec or get_int_env_var("PF_LINE_TIMEOUT_SEC", LINE_TIMEOUT_SEC)
self._worker_count = worker_count or get_int_env_var("PF_WORKER_COUNT")

# set it to True when the batch run is canceled
Expand Down
12 changes: 6 additions & 6 deletions src/promptflow/tests/executor/e2etests/test_batch_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class TestBatchTimeout:
)
def test_batch_with_line_timeout(self, flow_folder, dev_connections):
mem_run_storage = MemoryRunStorage()
# set line timeout to 5 seconds for testing
batch_engine = BatchEngine(
get_yaml_file(flow_folder),
get_flow_folder(flow_folder),
connections=dev_connections,
storage=mem_run_storage,
line_timeout_sec=5,
)
# set line timeout to 5 seconds for testing
batch_engine._line_timeout_sec = 5
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples_all_timeout.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -79,14 +79,14 @@ def test_batch_with_line_timeout(self, flow_folder, dev_connections):
)
def test_batch_with_one_line_timeout(self, flow_folder, dev_connections):
mem_run_storage = MemoryRunStorage()
# set line timeout to 5 seconds for testing
batch_engine = BatchEngine(
get_yaml_file(flow_folder),
get_flow_folder(flow_folder),
connections=dev_connections,
storage=mem_run_storage,
line_timeout_sec=5,
)
batch_engine._line_timeout_sec = 5
# set line timeout to 5 seconds for testing
# prepare input file and output dir
input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down Expand Up @@ -137,9 +137,9 @@ def test_batch_timeout(self, flow_folder, line_timeout_sec, batch_timeout_sec, e
get_flow_folder(flow_folder),
connections={},
storage=mem_run_storage,
line_timeout_sec=line_timeout_sec,
batch_timeout_sec=batch_timeout_sec,
)
batch_engine._line_timeout_sec = line_timeout_sec
batch_engine._batch_timeout_sec = batch_timeout_sec

input_dirs = {"data": get_flow_inputs_file(flow_folder, file_name="samples.json")}
output_dir = Path(mkdtemp())
Expand Down

0 comments on commit 1ff5ac6

Please sign in to comment.