Skip to content

Commit

Permalink
Revert "resolve conflicts with main"
Browse files Browse the repository at this point in the history
This reverts commit 6ac0e53.
  • Loading branch information
0mza987 committed Apr 18, 2024
1 parent 6ac0e53 commit c030d91
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/promptflow-release-testing-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ jobs:
uses: "./.github/actions/step_generate_configs"
with:
targetFolder: ${{ env.PROMPTFLOW_DIRECTORY }}
- name: install promptflow-azure from wheel
- name: install promptflow-devkit from wheel
# wildcard expansion (*) does not work in Windows, so leverage python to find and install
run: |
poetry run pip install $(python -c "import glob; print(glob.glob('**/promptflow_tracing-*.whl', recursive=True)[0])")
Expand Down
80 changes: 44 additions & 36 deletions src/promptflow-devkit/promptflow/_proxy/_csharp_executor_proxy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import platform
import signal
import socket
Expand All @@ -10,10 +9,9 @@
from pathlib import Path
from typing import NoReturn, Optional

from promptflow._sdk._constants import FLOW_META_JSON, PROMPT_FLOW_DIR_NAME, OSType
from promptflow._utils.flow_utils import is_flex_flow as is_flex_flow_func
from promptflow._utils.flow_utils import read_json_content
from promptflow.exceptions import UserErrorException
from promptflow._core._errors import UnexpectedError
from promptflow._sdk._constants import OSType
from promptflow._utils.flow_utils import is_flex_flow
from promptflow.storage._run_storage import AbstractRunStorage

from ._csharp_base_executor_proxy import CSharpBaseExecutorProxy
Expand All @@ -29,12 +27,12 @@ def __init__(
process,
port: str,
working_dir: Optional[Path] = None,
chat_output_name: Optional[str] = None,
enable_stream_output: bool = False,
is_flex_flow: bool = False,
):
self._process = process
self._port = port
self._is_flex_flow = is_flex_flow
self._chat_output_name = chat_output_name
super().__init__(
working_dir=working_dir,
enable_stream_output=enable_stream_output,
Expand All @@ -48,6 +46,10 @@ def api_endpoint(self) -> str:
def port(self) -> str:
return self._port

@property
def chat_output_name(self) -> Optional[str]:
return self._chat_output_name

@classmethod
def dump_metadata(cls, flow_file: Path, working_dir: Path) -> NoReturn:
"""In csharp, we need to generate metadata based on a dotnet command for now and the metadata will
Expand All @@ -68,7 +70,7 @@ def dump_metadata(cls, flow_file: Path, working_dir: Path) -> NoReturn:
cwd=working_dir,
)
except subprocess.CalledProcessError as e:
raise UserErrorException(
raise UnexpectedError(
message_format="Failed to generate flow meta for csharp flow.\n"
"Command: {command}\n"
"Working directory: {working_directory}\n"
Expand All @@ -80,14 +82,18 @@ def dump_metadata(cls, flow_file: Path, working_dir: Path) -> NoReturn:
output=e.output,
)

def _get_interface_definition(self):
if not self._is_flex_flow:
return super()._get_interface_definition()
flow_json_path = self.working_dir / PROMPT_FLOW_DIR_NAME / FLOW_META_JSON
signatures = read_json_content(flow_json_path, "meta of tools")
for key in set(signatures.keys()) - {"inputs", "outputs", "init"}:
signatures.pop(key)
return signatures
@classmethod
def get_outputs_definition(cls, flow_file: Path, working_dir: Path) -> dict:
# TODO: no outputs definition for eager flow for now
if is_flex_flow(flow_path=flow_file, working_dir=working_dir):
return {}

# TODO: get this from self._get_flow_meta for both eager flow and non-eager flow then remove
# dependency on flow_file and working_dir
from promptflow.contracts.flow import Flow as DataplaneFlow

dataplane_flow = DataplaneFlow.from_yaml(flow_file, working_dir=working_dir)
return dataplane_flow.outputs

@classmethod
async def create(
Expand All @@ -101,17 +107,11 @@ async def create(
**kwargs,
) -> "CSharpExecutorProxy":
"""Create a new executor"""
# TODO: support init_kwargs in csharp executor
port = kwargs.get("port", None)
log_path = kwargs.get("log_path", "")
target_uuid = str(uuid.uuid4())
init_error_file = Path(working_dir) / f"init_error_{target_uuid}.json"
init_error_file = Path(working_dir) / f"init_error_{str(uuid.uuid4())}.json"
init_error_file.touch()
if init_kwargs:
init_kwargs_path = Path(working_dir) / f"init_kwargs_{target_uuid}.json"
# TODO: complicated init_kwargs handling
init_kwargs_path.write_text(json.dumps(init_kwargs))
else:
init_kwargs_path = None

if port is None:
# if port is not provided, find an available port and start a new execution service
Expand All @@ -123,27 +123,33 @@ async def create(
log_path=log_path,
error_file_path=init_error_file,
yaml_path=flow_file.as_posix(),
init_kwargs_path=init_kwargs_path.absolute().as_posix() if init_kwargs_path else None,
),
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == OSType.WINDOWS else 0,
)
else:
# if port is provided, assume the execution service is already started
process = None

outputs_definition = cls.get_outputs_definition(flow_file, working_dir=working_dir)
chat_output_name = next(
filter(
lambda key: outputs_definition[key].is_chat_output,
outputs_definition.keys(),
),
None,
)
executor_proxy = cls(
process=process,
port=port,
working_dir=working_dir,
is_flex_flow=is_flex_flow_func(flow_path=flow_file, working_dir=working_dir),
# TODO: remove this from the constructor after can always be inferred from flow meta?
chat_output_name=chat_output_name,
enable_stream_output=kwargs.get("enable_stream_output", False),
)
try:
await executor_proxy.ensure_executor_startup(init_error_file)
finally:
Path(init_error_file).unlink()
if init_kwargs_path:
init_kwargs_path.unlink()
return executor_proxy

async def destroy(self):
Expand All @@ -159,6 +165,14 @@ async def destroy(self):
On the other hand, the subprocess for execution service is not started in detach mode;
it wll exit when parent process exit. So we simply skip the destruction here.
"""

# TODO 3033484: update this after executor service support graceful shutdown
if not await self._all_generators_exhausted():
raise UnexpectedError(
message_format="The executor service is still handling a stream request "
"whose response is not fully consumed yet."
)

# process is not None, it means the executor service is started by the current executor proxy
# and should be terminated when the executor proxy is destroyed if the service is still active
if self._process and self._is_executor_active():
Expand All @@ -169,18 +183,12 @@ async def destroy(self):
# for Linux and MacOS, Popen.terminate() will send SIGTERM to the process
self._process.terminate()

# TODO: there is a potential issue that, graceful shutdown won't work for streaming chat flow for now
# because response will not be fully consumed before we destroy the executor proxy
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
# TODO: pf.test won't work for streaming. Response will be fully consumed outside TestSubmitter context.
# We will still kill this process in case this is a true timeout but raise an error to indicate that
# we may meet runtime error when trying to consume the result.
if not await self._all_generators_exhausted():
raise UserErrorException(
message_format="The executor service is still handling a stream request "
"whose response is not fully consumed yet."
)

def _is_executor_active(self):
"""Check if the process is still running and return False if it has exited"""
Expand Down
105 changes: 39 additions & 66 deletions src/promptflow-devkit/promptflow/_proxy/_csharp_inspector_proxy.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import json
import re
import subprocess
import uuid
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Dict, List

import pydash

from promptflow._constants import FlowEntryRegex
from promptflow._sdk._constants import ALL_CONNECTION_TYPES, FLOW_META_JSON, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME
from promptflow._core._errors import UnexpectedError
from promptflow._sdk._constants import ALL_CONNECTION_TYPES, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME
from promptflow._utils.flow_utils import is_flex_flow, read_json_content
from promptflow._utils.yaml_utils import load_yaml
from promptflow.exceptions import UserErrorException

from ._base_inspector_proxy import AbstractInspectorProxy

Expand Down Expand Up @@ -58,71 +55,47 @@ def get_used_connection_names(

def is_flex_flow_entry(self, entry: str) -> bool:
"""Check if the flow is a flex flow entry."""
return isinstance(entry, str) and re.match(FlowEntryRegex.CSharp, entry) is not None
return isinstance(entry, str) and re.match(FlowEntryRegex.CSharp, entry)

def get_entry_meta(
self,
entry: str,
working_dir: Path,
**kwargs,
) -> Dict[str, str]:
"""In csharp, the metadata will always be dumped at the beginning of each local run."""
target_path = working_dir / PROMPT_FLOW_DIR_NAME / FLOW_META_JSON
"""In csharp, we need to generate metadata based on a dotnet command for now and the metadata will
always be dumped.
"""
# TODO: add tests for this
with tempfile.TemporaryDirectory() as temp_dir:
flow_file = Path(temp_dir) / "flow.dag.yaml"
flow_file.write_text(json.dumps({"entry": entry}))

if target_path.is_file():
entry_meta = read_json_content(target_path, "flow metadata")
for key in ["inputs", "outputs", "init"]:
if key not in entry_meta:
continue
for port_name, port in entry_meta[key].items():
if "type" in port and isinstance(port["type"], list) and len(port["type"]) == 1:
port["type"] = port["type"][0]
entry_meta.pop("framework", None)
return entry_meta
raise UserErrorException("Flow metadata not found.")

def prepare_metadata(
self,
flow_file: Path,
working_dir: Path,
**kwargs,
) -> None:
init_kwargs = kwargs.get("init_kwargs", {})
command = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--flow_meta",
"--yaml_path",
flow_file.absolute().as_posix(),
"--assembly_folder",
".",
]
# csharp depends on init_kwargs to identify the target constructor
if init_kwargs:
temp_init_kwargs_file = working_dir / PROMPT_FLOW_DIR_NAME / f"init-{uuid.uuid4()}.json"
temp_init = {k: None for k in init_kwargs}
temp_init_kwargs_file.write_text(json.dumps(temp_init))
command.extend(["--init", temp_init_kwargs_file.as_posix()])
else:
temp_init_kwargs_file = None

try:
subprocess.check_output(
command,
cwd=working_dir,
)
except subprocess.CalledProcessError as e:
raise UserErrorException(
message_format="Failed to generate flow meta for csharp flow.\n"
"Command: {command}\n"
"Working directory: {working_directory}\n"
"Return code: {return_code}\n"
"Output: {output}",
command=" ".join(command),
working_directory=working_dir.as_posix(),
return_code=e.returncode,
output=e.output,
)
finally:
if temp_init_kwargs_file:
temp_init_kwargs_file.unlink()
# TODO: enable cache?
command = [
"dotnet",
EXECUTOR_SERVICE_DLL,
"--flow_meta",
"--yaml_path",
flow_file.absolute().as_posix(),
"--assembly_folder",
".",
]
try:
subprocess.check_output(
command,
cwd=working_dir,
)
except subprocess.CalledProcessError as e:
raise UnexpectedError(
message_format="Failed to generate flow meta for csharp flow.\n"
"Command: {command}\n"
"Working directory: {working_directory}\n"
"Return code: {return_code}\n"
"Output: {output}",
command=" ".join(command),
working_directory=working_dir.as_posix(),
return_code=e.returncode,
output=e.output,
)
return json.loads((working_dir / PROMPT_FLOW_DIR_NAME / "flow.json").read_text())

0 comments on commit c030d91

Please sign in to comment.