Skip to content

Commit

Permalink
[PFS] Start pfs in foreground and log in terminal and file in debug m…
Browse files Browse the repository at this point in the history
…ode (#2756)

# Description
- pf service start --debug will start pfs in forground and print log in
both terminal and pfs.log

![image](https://github.com/microsoft/promptflow/assets/26239730/584c7cd1-f70f-44f1-87fe-3c7369313357)

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Ying Chen <[email protected]>
  • Loading branch information
YingChen1996 and Ying Chen authored Apr 12, 2024
1 parent 5873b30 commit ac82e2e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 64 deletions.
109 changes: 62 additions & 47 deletions src/promptflow-devkit/promptflow/_cli/_pf/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import waitress

from promptflow._cli._params import base_params
from promptflow._cli._params import add_param_ua, add_param_verbose, base_params
from promptflow._cli._utils import activate_action
from promptflow._constants import PF_NO_INTERACTIVE_LOGIN
from promptflow._sdk._constants import (
Expand Down Expand Up @@ -74,7 +74,7 @@ def dispatch_service_commands(args: argparse.Namespace):
start_service(args)
elif args.sub_action == "stop":
stop_service()
elif args.sub_action == "show-status":
elif args.sub_action == "status":
show_service()


Expand All @@ -85,28 +85,35 @@ def add_parser_start_service(subparsers):
# Start prompt flow service:
pf service start
# Force restart promptflow service:
# Force restart prompt flow service:
pf service start --force
# Start promptflow service with specific port:
# Start prompt flow service with specific port:
pf service start --port 65553
""" # noqa: E501
add_param_port = lambda parser: parser.add_argument( # noqa: E731
"-p", "--port", type=int, help="port of the promptflow service"
"-p", "--port", type=int, help="port of the prompt flow service"
)
add_param_force = lambda parser: parser.add_argument( # noqa: E731
"--force",
action="store_true",
help="If the port is used, the existing service will be terminated and restart a new service.",
)
add_param_debug = lambda parser: parser.add_argument( # noqa: E731
"-d",
"--debug",
action="store_true",
help="Start the prompt flow service in foreground, displaying debug level logs directly in the terminal.",
)
activate_action(
name="start",
description="Start prompt flow service.",
epilog=epilog,
add_params=[
add_param_port,
add_param_force,
add_param_debug,
]
+ base_params,
+ [add_param_ua, add_param_verbose],
subparsers=subparsers,
help_message="Start prompt flow service.",
action_param_name="sub_action",
Expand Down Expand Up @@ -138,10 +145,10 @@ def add_parser_show_service(subparsers):
Examples:
# Display the started prompt flow service info.:
pf service show-status
pf service status
""" # noqa: E501
activate_action(
name="show-status",
name="status",
description="Show the started prompt flow service status.",
epilog=epilog,
add_params=base_params,
Expand All @@ -157,42 +164,35 @@ def start_service(args):
port = args.port
if args.debug:
os.environ[PF_SERVICE_DEBUG] = "true"

if is_run_from_built_binary():
# For msi installer/executable, use sdk api to start pfs since it's not supported to invoke waitress by cli
# directly after packaged by Pyinstaller.
parent_dir = os.path.dirname(sys.executable)
output_path = os.path.join(parent_dir, "output.txt")
with redirect_stdout_to_file(output_path):
port = validate_port(port, args.force)
global app
if app is None:
app, _ = create_app()
if os.environ.get(PF_SERVICE_DEBUG) == "true":
app.logger.setLevel(logging.DEBUG)
else:
app.logger.setLevel(logging.INFO)
message = f"Starting Prompt Flow Service on {port}, version: {get_pfs_version()}."
app.logger.info(message)
print(message)
sys.stdout.flush()
if not is_run_from_built_binary():
add_executable_script_to_env_path()
port = _prepare_app_for_foreground_service(port, args.force)
waitress.serve(app, host="127.0.0.1", port=port, threads=PF_SERVICE_WORKER_NUM)
else:
port = validate_port(port, args.force)
add_executable_script_to_env_path()
# Start a pfs process using detach mode. It will start a new process and create a new app. So we use environment
# variable to pass the debug mode, since it will inherit parent process environment variable.
if platform.system() == "Windows":
_start_background_service_on_windows(port)
if is_run_from_built_binary():
# For msi installer/executable, use sdk api to start pfs since it's not supported to invoke waitress by cli
# directly after packaged by Pyinstaller.
parent_dir = os.path.dirname(sys.executable)
output_path = os.path.join(parent_dir, "output.txt")
with redirect_stdout_to_file(output_path):
port = _prepare_app_for_foreground_service(port, args.force)
waitress.serve(app, host="127.0.0.1", port=port, threads=PF_SERVICE_WORKER_NUM)
else:
_start_background_service_on_unix(port)
is_healthy = check_pfs_service_status(port)
if is_healthy:
message = f"Start Promptflow Service on port {port}, version: {get_pfs_version()}."
print(message)
logger.info(message)
else:
logger.warning(f"Promptflow service start failed in {port}. {hint_stop_before_upgrade}")
port = validate_port(port, args.force)
add_executable_script_to_env_path()
# Start a pfs process using detach mode. It will start a new process and create a new app. So we use
# environment variable to pass the debug mode, since it will inherit parent process environment variable.
if platform.system() == "Windows":
_start_background_service_on_windows(port)
else:
_start_background_service_on_unix(port)
is_healthy = check_pfs_service_status(port)
if is_healthy:
message = f"Start prompt flow service on port {port}, version: {get_pfs_version()}."
print(message)
logger.info(message)
else:
logger.warning(f"Prompt flow service start failed in {port}. {hint_stop_before_upgrade}")


def validate_port(port, force_start):
Expand Down Expand Up @@ -238,6 +238,21 @@ def redirect_stdout_to_file(path):
sys.stderr = old_stderr


def _prepare_app_for_foreground_service(port, force_start):
port = validate_port(port, force_start)
global app
if app is None:
app, _ = create_app()
if os.environ.get(PF_SERVICE_DEBUG) == "true":
app.logger.setLevel(logging.DEBUG)
else:
app.logger.setLevel(logging.INFO)
message = f"Starting prompt flow Service on {port}, version: {get_pfs_version()}."
app.logger.info(message)
print(message)
return port


def _start_background_service_on_windows(port):
try:
import win32api
Expand All @@ -252,7 +267,7 @@ def _start_background_service_on_windows(port):
f"waitress-serve --listen=127.0.0.1:{port} --threads={PF_SERVICE_WORKER_NUM} "
"promptflow._cli._pf._service:get_app"
)
logger.debug(f"Start Promptflow Service in Windows: {command}")
logger.debug(f"Start prompt flow service in Windows: {command}")
startupinfo = win32process.STARTUPINFO()
startupinfo.dwFlags |= win32process.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = win32con.SW_HIDE
Expand Down Expand Up @@ -286,17 +301,17 @@ def _start_background_service_on_unix(port):
f"--threads={PF_SERVICE_WORKER_NUM}",
"promptflow._cli._pf._service:get_app",
]
logger.debug(f"Start Promptflow Service in Unix: {cmd}")
logger.debug(f"Start prompt flow service in Unix: {cmd}")
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, start_new_session=True)


def stop_service():
port = get_port_from_config()
if port is not None and is_port_in_use(port):
kill_exist_service(port)
message = f"Promptflow service stop in {port}."
message = f"Prompt flow service stop in {port}."
else:
message = "Promptflow service is not started."
message = "Prompt flow service is not started."
logger.debug(message)
print(message)

Expand All @@ -315,7 +330,7 @@ def show_service():
return
else:
logger.warning(
f"Promptflow service is not started. The promptflow service log is located at {log_file.as_posix()} "
f"and promptflow version is {get_pfs_version()}."
f"Prompt flow service is not started. The prompt flow service log is located at {log_file.as_posix()} "
f"and prompt flow version is {get_pfs_version()}."
)
sys.exit(1)
13 changes: 13 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
import os
import threading
import time
from datetime import datetime, timedelta
Expand All @@ -14,6 +15,7 @@

from promptflow._sdk._constants import (
HOME_PROMPT_FLOW_DIR,
PF_SERVICE_DEBUG,
PF_SERVICE_HOUR_TIMEOUT,
PF_SERVICE_LOG_FILE,
PF_SERVICE_MONITOR_SECOND,
Expand Down Expand Up @@ -101,8 +103,19 @@ def create_app():
handler = RotatingFileHandler(filename=log_file, maxBytes=1_000_000, backupCount=1)
formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s")
handler.setFormatter(formatter)

# Create a stream handler to output logs to the terminal
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)

# Set app logger to the only one RotatingFileHandler to avoid duplicate logs
app.logger.handlers = [handler]
if os.environ.get(PF_SERVICE_DEBUG) == "true":
# Set app logger to use both the rotating file handler and the stream handler in debug mode
app.logger.handlers.append(stream_handler)

# Prevent logs from being handled by the root logger
app.logger.propagate = False

# Basic error handler
@api.errorhandler(Exception)
Expand Down
24 changes: 12 additions & 12 deletions src/promptflow-devkit/promptflow/_sdk/_service/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@
logger = get_cli_sdk_logger()

hint_stop_message = (
f"You can stop the prompt flow tracing server with the following command:'\033[1mpf service stop\033[0m'.\n"
f"You can stop the prompt flow service with the following command:'\033[1mpf service stop\033[0m'.\n"
f"Alternatively, if no requests are made within {PF_SERVICE_HOUR_TIMEOUT} "
f"hours, it will automatically stop."
)
hint_stop_before_upgrade = (
"Kindly reminder: If you have previously upgraded the promptflow package , please "
"double-confirm that you have run '\033[1mpf service stop\033[0m' to stop the promptflow"
"Kindly reminder: If you have previously upgraded the prompt flow package , please "
"double-confirm that you have run '\033[1mpf service stop\033[0m' to stop the prompt flow"
"service before proceeding with the upgrade. Otherwise, you may encounter unexpected "
"environmental issues or inconsistencies between the version of running promptflow service "
"and the local promptflow version. Alternatively, you can use the "
"'\033[1mpf upgrade\033[0m' command to proceed with the upgrade process for the promptflow "
"environmental issues or inconsistencies between the version of running prompt flow service "
"and the local prompt flow version. Alternatively, you can use the "
"'\033[1mpf upgrade\033[0m' command to proceed with the upgrade process for the prompt flow "
"package."
)

Expand Down Expand Up @@ -176,7 +176,7 @@ def make_response_no_content():


def get_pfs_version():
"""Promptflow service show promptflow version if installed from root, else devkit version"""
"""Prompt flow service show promptflow version if installed from root, else devkit version"""
version_promptflow = get_promptflow_sdk_version()
if version_promptflow:
return version_promptflow
Expand All @@ -190,20 +190,20 @@ def is_pfs_service_healthy(pfs_port) -> bool:
try:
response = requests.get("http://localhost:{}/heartbeat".format(pfs_port))
if response.status_code == 200:
logger.debug(f"Promptflow service is already running on port {pfs_port}, {response.text}")
logger.debug(f"Prompt flow service is already running on port {pfs_port}, {response.text}")
match = re.search(r'"promptflow":"(.*?)"', response.text)
if match:
version = match.group(1)
local_version = get_pfs_version()
is_healthy = version == local_version
if not is_healthy:
logger.warning(
f"Promptflow service is running on port {pfs_port}, but the version is not the same as "
f"Prompt flow service is running on port {pfs_port}, but the version is not the same as "
f"local sdk version {local_version}. The service version is {version}."
)
else:
is_healthy = False
logger.warning("/heartbeat response doesn't contain current promptflow service version.")
logger.warning("/heartbeat response doesn't contain current prompt flow service version.")
return is_healthy
except Exception: # pylint: disable=broad-except
pass
Expand All @@ -217,7 +217,7 @@ def check_pfs_service_status(pfs_port, time_delay=1, count_threshold=10) -> bool
is_healthy = is_pfs_service_healthy(pfs_port)
while is_healthy is False and count_threshold > cnt:
message = (
f"Waiting for the Promptflow service status to become healthy... It has been tried for {cnt} times, will "
f"Waiting for the prompt flow service status to become healthy... It has been tried for {cnt} times, will "
f"try at most {count_threshold} times."
)
if cnt >= 3:
Expand Down Expand Up @@ -308,7 +308,7 @@ def get_client_from_request(*, connection_provider=None) -> "PFClient":

def is_run_from_built_binary():
"""
Use this function to trigger behavior difference between calling from promptflow sdk/cli and built binary.
Use this function to trigger behavior difference between calling from prompt flow sdk/cli and built binary.
Allow customer to use environment variable to control the triggering.
"""
Expand Down
5 changes: 2 additions & 3 deletions src/promptflow-devkit/promptflow/_sdk/_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ def _invoke_pf_svc() -> str:
cmd_args.append("--force")
logger.debug("Prompt flow service is not healthy, force to start...")
else:
print("Prompt flow Tracing Server has started...")
print(hint_stop_message)
print("Prompt flow service has started...")
return port

add_executable_script_to_env_path()
print("Starting prompt flow Tracing Server...")
print("Starting prompt flow service...")
start_pfs = None
try:
start_pfs = subprocess.Popen(cmd_args, shell=platform.system() == "Windows", stderr=subprocess.PIPE)
Expand Down
4 changes: 2 additions & 2 deletions src/promptflow-devkit/tests/sdk_pfs_test/e2etests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ def test_start_service(self):

def test_show_service_status(self, capsys):
with pytest.raises(SystemExit):
self._run_pfs_command("show-status")
self._run_pfs_command("status")
start_pfs = subprocess.Popen("pf service start", shell=True)
# Wait for service to be started
start_pfs.wait()
assert self._is_service_healthy()
self._run_pfs_command("show-status")
self._run_pfs_command("status")
output, _ = capsys.readouterr()
assert str(get_port_from_config()) in output
self._run_pfs_command("stop")
Expand Down

0 comments on commit ac82e2e

Please sign in to comment.