Skip to content

Commit

Permalink
[trace] Move trace feature out of internal flag & improve trace exper…
Browse files Browse the repository at this point in the history
…ience (#2767)

# Description

This pull request targets to improve trace related experience:

- Move `start_trace` in test/run submitter out of internal flag
- Support `trace.provider=none` to disable trace feature, as we make it
public
- Fix bug that we will still print trace url even if we fail to invoke
prompt flow service
- Fix wrong portal urls for cloud trace
- Fix wrong flow name retrieval

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.
  • Loading branch information
zhengfeiwang authored Apr 12, 2024
1 parent fab2cc0 commit a122d20
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 60 deletions.
4 changes: 4 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
SERVICE_CONFIG_FILE,
)
from promptflow._sdk._errors import MissingAzurePackage
from promptflow._sdk._tracing import PF_CONFIG_TRACE_FEATURE_DISABLE
from promptflow._sdk._utils import call_from_extension, gen_uuid_by_compute_info, read_write_by_user
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow._utils.yaml_utils import dump_yaml, load_yaml
Expand Down Expand Up @@ -219,6 +220,9 @@ def _validate(key: str, value: str) -> None:
"please use its child folder, e.g. '${flow_directory}/.runs'."
)
elif key == Configuration.TRACE_PROVIDER:
# disable trace feature, no need to validate
if value.lower() == PF_CONFIG_TRACE_FEATURE_DISABLE:
return
try:
from promptflow.azure._utils._tracing import validate_trace_provider

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from promptflow.contracts.run_mode import RunMode
from promptflow.exceptions import UserErrorException, ValidationException
from promptflow.tracing._operation_context import OperationContext
from promptflow.tracing._start_trace import is_collection_writeable, start_trace

from .._configuration import Configuration
from .._load_functions import load_flow
Expand Down Expand Up @@ -83,20 +84,17 @@ def _run_bulk(self, run: Run, stream=False, **kwargs):
if run._resume_from is not None:
logger.debug(f"Resume from run {run._resume_from!r}...")
run._resume_from = self._ensure_run_completed(run._resume_from)
# Start trace
if self._config.is_internal_features_enabled():
from promptflow.tracing._start_trace import is_collection_writeable, start_trace

logger.debug("start trace for flow run...")
if is_collection_writeable():
logger.debug("trace collection is writeable, will use flow name as collection...")
collection_for_run = run._flow_name
logger.debug("collection for run: %s", collection_for_run)
# pass with internal parameter `_collection`
start_trace(attributes=attributes, run=run.name, _collection=collection_for_run)
else:
logger.debug("trace collection is protected, will honor existing collection.")
start_trace(attributes=attributes, run=run.name)
# start trace
logger.debug("start trace for flow run...")
if is_collection_writeable():
logger.debug("trace collection is writeable, will use flow name as collection...")
collection_for_run = run._flow_name
logger.debug("collection for run: %s", collection_for_run)
# pass with internal parameter `_collection`
start_trace(attributes=attributes, run=run.name, _collection=collection_for_run)
else:
logger.debug("trace collection is protected, will honor existing collection.")
start_trace(attributes=attributes, run=run.name)

self._validate_inputs(run=run)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from promptflow.storage._run_storage import DefaultRunStorage
from promptflow.tracing._start_trace import is_collection_writeable, start_trace

from .._configuration import Configuration
from ..entities._flows import FlexFlow
from .utils import (
SubmitterHelper,
Expand Down Expand Up @@ -244,7 +243,7 @@ def init(
)

# do not enable trace when test single node, as we have not determined this behavior
if target_node is None and Configuration(overrides=self._client._config).is_internal_features_enabled():
if target_node is None:
logger.debug("start trace for flow test...")
if collection is not None:
logger.debug("collection is user specified: %s, will use it...", collection)
Expand Down
42 changes: 35 additions & 7 deletions src/promptflow-devkit/promptflow/_sdk/_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from promptflow._cli._utils import get_credentials_for_cli
from promptflow._constants import (
OTEL_RESOURCE_SERVICE_NAME,
AzureWorkspaceKind,
SpanAttributeFieldName,
SpanResourceAttributesFieldName,
TraceEnvironmentVariableName,
)
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import (
PF_TRACE_CONTEXT,
PF_TRACE_CONTEXT_ATTR,
Expand All @@ -48,9 +46,20 @@

_logger = get_cli_sdk_logger()

PF_CONFIG_TRACE_FEATURE_DISABLE = "none"
TRACER_PROVIDER_PFS_EXPORTER_SET_ATTR = "_pfs_exporter_set"


def is_trace_feature_disabled() -> bool:
from promptflow._sdk._configuration import Configuration

trace_provider = Configuration.get_instance().get_trace_provider()
if isinstance(trace_provider, str):
return Configuration.get_instance().get_trace_provider().lower() == PF_CONFIG_TRACE_FEATURE_DISABLE
else:
return False


def _is_azure_ext_installed() -> bool:
try:
importlib.metadata.version("promptflow-azure")
Expand All @@ -63,6 +72,7 @@ def _get_collection_id_for_azure(collection: str) -> str:
"""{collection}_{object_id}"""
import jwt

from promptflow._cli._utils import get_credentials_for_cli
from promptflow.azure._utils.general import get_arm_token

token = get_arm_token(credential=get_credentials_for_cli())
Expand Down Expand Up @@ -93,7 +103,7 @@ def _invoke_pf_svc() -> str:
if is_port_in_use(int(port)):
if not is_pfs_service_healthy(port):
cmd_args.append("--force")
logger.debug("Prompt flow service is not healthy, force to start...")
_logger.debug("Prompt flow service is not healthy, force to start...")
else:
print("Prompt flow service has started...")
return port
Expand Down Expand Up @@ -127,6 +137,8 @@ def _invoke_pf_svc() -> str:


def _get_ws_triad_from_pf_config() -> typing.Optional[AzureMLWorkspaceTriad]:
from promptflow._sdk._configuration import Configuration

ws_arm_id = Configuration.get_instance().get_trace_provider()
return extract_workspace_triad_from_trace_provider(ws_arm_id) if ws_arm_id is not None else None

Expand Down Expand Up @@ -183,15 +195,15 @@ def _print_tracing_url_from_azure_portal(
if AzureWorkspaceKind.is_workspace(workspace):
_logger.debug(f"{ws_triad.workspace_name!r} is an Azure ML workspace.")
if run is None:
query = f"trace/collection/{collection_id}"
query = f"trace/collection/{collection_id}/list"
else:
query = f"prompts/trace/run/{run}"
query = f"prompts/trace/run/{run}/details"
elif AzureWorkspaceKind.is_project(workspace):
_logger.debug(f"{ws_triad.workspace_name!r} is an Azure AI project.")
if run is None:
query = f"projecttrace/collection/{collection_id}"
query = f"projecttrace/collection/{collection_id}/list"
else:
query = f"projectflows/trace/run/{run}"
query = f"projectflows/trace/run/{run}/details"
else:
_logger.error(f"the workspace type of {ws_triad.workspace_name!r} is not supported.")
return
Expand Down Expand Up @@ -250,6 +262,10 @@ def _create_res(


def start_trace_with_devkit(collection: str, **kwargs: typing.Any) -> None:
if is_trace_feature_disabled():
_logger.info("trace feature is disabled in config, skip setup exporter to PFS.")
return

_logger.debug("collection: %s", collection)
_logger.debug("kwargs: %s", kwargs)
attrs = kwargs.get("attributes", None)
Expand Down Expand Up @@ -291,6 +307,14 @@ def start_trace_with_devkit(collection: str, **kwargs: typing.Any) -> None:

# invoke prompt flow service
pfs_port = _invoke_pf_svc()
is_pfs_healthy = is_pfs_service_healthy(pfs_port)
if not is_pfs_healthy:
warning_msg = (
"Prompt flow service is not healthy, please check the logs for more details; "
"traces might not be exported correctly."
)
_logger.warning(warning_msg)
return

_inject_res_attrs_to_environ(pfs_port=pfs_port, collection=collection, exp=exp, ws_triad=ws_triad)
# instrument openai and setup exporter to pfs here for flex mode
Expand All @@ -303,6 +327,10 @@ def start_trace_with_devkit(collection: str, **kwargs: typing.Any) -> None:


def setup_exporter_to_pfs() -> None:
if is_trace_feature_disabled():
_logger.info("trace feature is disabled in config, skip setup exporter to PFS.")
return

_logger.debug("start setup exporter to prompt flow service...")
# get resource attributes from environment
# For local trace, collection is the only identifier for name and id
Expand Down
4 changes: 3 additions & 1 deletion src/promptflow-devkit/promptflow/_sdk/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,9 @@ def is_flex_run(run: "Run") -> bool:
update_dict_value_with_connections = update_dict_value_with_connections


def get_flow_name(flow: FlowBase) -> str:
def get_flow_name(flow: Union[FlowBase, Path]) -> str:
if isinstance(flow, Path):
return flow.resolve().name
if isinstance(flow, DAGFlow):
return flow.name
# others: flex flow, prompty, etc.
Expand Down
32 changes: 17 additions & 15 deletions src/promptflow-devkit/tests/sdk_cli_test/e2etests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,21 +359,23 @@ def test_flow_with_aad_connection(self):
assert output["result"] == "meid_token"

def test_pf_flow_test_with_non_english_input_output(self, capsys):
question = "什么是 chat gpt"
run_pf_command("flow", "test", "--flow", f"{FLOWS_DIR}/chat_flow", "--inputs", f'question="{question}"')
stdout, _ = capsys.readouterr()
output_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.output.json"
assert output_path.exists()
with open(output_path, "r", encoding="utf-8") as f:
outputs = json.load(f)
assert outputs["answer"] in json.loads(stdout)["answer"]

detail_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.detail.json"
assert detail_path.exists()
with open(detail_path, "r", encoding="utf-8") as f:
detail = json.load(f)
assert detail["flow_runs"][0]["inputs"]["question"] == question
assert detail["flow_runs"][0]["output"]["answer"] == outputs["answer"]
# disable trace to not invoke prompt flow service, which will print unexpected content to stdout
with mock.patch("promptflow._sdk._tracing.is_trace_feature_disabled", return_value=True):
question = "什么是 chat gpt"
run_pf_command("flow", "test", "--flow", f"{FLOWS_DIR}/chat_flow", "--inputs", f'question="{question}"')
stdout, _ = capsys.readouterr()
output_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.output.json"
assert output_path.exists()
with open(output_path, "r", encoding="utf-8") as f:
outputs = json.load(f)
assert outputs["answer"] in json.loads(stdout)["answer"]

detail_path = Path(FLOWS_DIR) / "chat_flow" / ".promptflow" / "flow.detail.json"
assert detail_path.exists()
with open(detail_path, "r", encoding="utf-8") as f:
detail = json.load(f)
assert detail["flow_runs"][0]["inputs"]["question"] == question
assert detail["flow_runs"][0]["output"]["answer"] == outputs["answer"]

def test_pf_flow_with_variant(self, capsys):
with tempfile.TemporaryDirectory() as temp_dir:
Expand Down
33 changes: 12 additions & 21 deletions src/promptflow-devkit/tests/sdk_cli_test/unittests/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from promptflow.client import PFClient
from promptflow.exceptions import UserErrorException
from promptflow.tracing._operation_context import OperationContext
from promptflow.tracing._start_trace import setup_exporter_from_environ, start_trace
from promptflow.tracing._start_trace import setup_exporter_from_environ

MOCK_PROMPTFLOW_SERVICE_PORT = "23333"

Expand Down Expand Up @@ -64,8 +64,7 @@ def mock_resource() -> Dict:
@pytest.fixture
def mock_promptflow_service_invocation():
"""Mock `_invoke_pf_svc` as we don't expect to invoke PFS during unit test."""
with mock.patch("promptflow._sdk._tracing._invoke_pf_svc") as mock_func:
mock_func.return_value = MOCK_PROMPTFLOW_SERVICE_PORT
with mock.patch("promptflow._sdk._tracing._invoke_pf_svc", return_value=MOCK_PROMPTFLOW_SERVICE_PORT):
yield


Expand Down Expand Up @@ -148,7 +147,8 @@ def test_trace_without_attributes_collection(self, mock_resource: Dict) -> None:
assert isinstance(span.attributes, dict)
assert len(span.attributes) == 0

def test_experiment_test_lineage(self, monkeypatch: pytest.MonkeyPatch, mock_promptflow_service_invocation) -> None:
@pytest.mark.usefixtures("mock_promptflow_service_invocation")
def test_experiment_test_lineage(self, monkeypatch: pytest.MonkeyPatch) -> None:
# experiment orchestrator will help set this context in environment
referenced_line_run_id = str(uuid.uuid4())
ctx = {PF_TRACE_CONTEXT_ATTR: {ContextAttributeKey.REFERENCED_LINE_RUN_ID: referenced_line_run_id}}
Expand All @@ -160,9 +160,8 @@ def test_experiment_test_lineage(self, monkeypatch: pytest.MonkeyPatch, mock_pro
otel_attrs = op_ctx._get_otel_attributes()
assert otel_attrs[SpanAttributeFieldName.REFERENCED_LINE_RUN_ID] == referenced_line_run_id

def test_experiment_test_lineage_cleanup(
self, monkeypatch: pytest.MonkeyPatch, mock_promptflow_service_invocation
) -> None:
@pytest.mark.usefixtures("mock_promptflow_service_invocation")
def test_experiment_test_lineage_cleanup(self, monkeypatch: pytest.MonkeyPatch) -> None:
# in previous code, context may be set with lineage
op_ctx = OperationContext.get_instance()
op_ctx._add_otel_attributes(SpanAttributeFieldName.REFERENCED_LINE_RUN_ID, str(uuid.uuid4()))
Expand All @@ -182,20 +181,12 @@ def test_setup_exporter_in_executor(self, monkeypatch: pytest.MonkeyPatch):
# Assert the provider without exporter is not the one with exporter
assert original_proivder == new_provider

def test_setup_exporter_in_executor_with_preview_flag(
self, reset_tracer_provider, mock_promptflow_service_invocation
):
with mock.patch("promptflow._sdk._configuration.Configuration.is_internal_features_enabled") as mock_func:
mock_func.return_value = True

start_trace()
setup_exporter_from_environ()
tracer_provider: TracerProvider = trace.get_tracer_provider()
assert len(tracer_provider._active_span_processor._span_processors) == 1
assert (
tracer_provider._active_span_processor._span_processors[0].span_exporter._endpoint
== f"http://localhost:{MOCK_PROMPTFLOW_SERVICE_PORT}/v1/traces"
)
def test_pfs_invocation_failed_in_start_trace(self):
with mock.patch("promptflow._sdk._tracing._invoke_pf_svc"), mock.patch(
"promptflow._sdk._tracing.is_pfs_service_healthy", return_value=False
), mock.patch("promptflow._sdk._tracing._inject_res_attrs_to_environ") as monitor_func:
start_trace_with_devkit(collection=str(uuid.uuid4()))
assert monitor_func.call_count == 0


@pytest.mark.unittest
Expand Down

0 comments on commit a122d20

Please sign in to comment.