From 83f2c995198921b9d9699386cc5b8c5cac662c2c Mon Sep 17 00:00:00 2001 From: Zhengfei Wang <38847871+zhengfeiwang@users.noreply.github.com> Date: Mon, 6 May 2024 17:36:18 +0800 Subject: [PATCH 1/3] [promptflow] Bump pf version in main branch (#3115) # Description Otherwise, when install from local, the version will become 1.10.0. # All Promptflow Contribution checklist: - [x] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [x] Title of the pull request is clear and informative. - [x] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --- src/promptflow/promptflow/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/promptflow/promptflow/_version.py b/src/promptflow/promptflow/_version.py index 344d35d0a6e..c21091e23dc 100644 --- a/src/promptflow/promptflow/_version.py +++ b/src/promptflow/promptflow/_version.py @@ -2,4 +2,4 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # --------------------------------------------------------- -VERSION = "1.10.0" +VERSION = "1.11.0.dev0" From 57c632a395a4ff58a85e343e5e52fdab09c33748 Mon Sep 17 00:00:00 2001 From: Xiaopeng Wang Date: Mon, 6 May 2024 18:07:03 +0800 Subject: [PATCH 2/3] [pfserving] Improve log and add OTLP exporter aad auth support (#3112) # Description Please add an informative description that covers that changes made by the pull request and link all relevant issues. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: xiaopwan --- .../_serving/extension/azureml_extension.py | 2 +- .../otel_exporter_provider_factory.py | 71 ++++++++++++++++++- .../promptflow/core/_serving/flow_invoker.py | 15 ++-- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/src/promptflow-core/promptflow/core/_serving/extension/azureml_extension.py b/src/promptflow-core/promptflow/core/_serving/extension/azureml_extension.py index 3cfb2be4989..ade6ec5222e 100644 --- a/src/promptflow-core/promptflow/core/_serving/extension/azureml_extension.py +++ b/src/promptflow-core/promptflow/core/_serving/extension/azureml_extension.py @@ -97,7 +97,7 @@ def get_override_connections(self, flow: Flow) -> Tuple[dict, dict]: conn = WorkspaceConnectionProvider._convert_to_connection_dict(connection_name, conn_data) connections[connection_name] = conn except Exception as e: - self.logger.warn(f"Failed to convert connection data to connection: {e}") + self.logger.warning(f"Failed to convert connection data to connection: {e}") raise InvalidConnectionData(connection_name) if len(connections_name_overrides) > 0: self.logger.info(f"Connection name overrides: {connections_name_overrides}") diff --git a/src/promptflow-core/promptflow/core/_serving/extension/otel_exporter_provider_factory.py b/src/promptflow-core/promptflow/core/_serving/extension/otel_exporter_provider_factory.py index f463642cbfc..3e6ade8f04a 100644 --- a/src/promptflow-core/promptflow/core/_serving/extension/otel_exporter_provider_factory.py +++ b/src/promptflow-core/promptflow/core/_serving/extension/otel_exporter_provider_factory.py @@ -58,6 +58,10 @@ def get_exporter(self, **kwargs): return AzureMonitorTraceExporter.from_connection_string(self.app_insight_connection_string) except ImportError: + self.logger.warning( + "azure-monitor-opentelemetry-exporter is not installed, \ + AzureMonitorTraceExporter will not be enabled!" + ) return None @@ -82,9 +86,17 @@ def get_exporter(self, **kwargs): return AzureMonitorMetricExporter.from_connection_string(self.app_insight_connection_string) except ImportError: + self.logger.warning( + "azure-monitor-opentelemetry-exporter is not installed, \ + AzureMonitorMetricExporter will not be enabled!" + ) return None +OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE = "OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE" +OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE = "OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE" + + class OTLPExporterProvider(OTelExporterProvider): def __init__(self, logger, exporter_type: ExporterType) -> None: super().__init__(logger, exporter_type) @@ -103,11 +115,30 @@ def __init__(self, logger) -> None: super().__init__(logger, ExporterType.TRACE) def get_exporter(self, **kwargs): + logger = self.logger try: from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - return OTLPSpanExporter(endpoint=self.otel_exporter_endpoint) + class AADAuthOTLPSpanExporter(OTLPSpanExporter): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.aad_auth, self.aad_auth_scope, self.credential = try_parse_otlp_aad_auth_info( + logger, "OTLPSpanExporter" + ) + + def _export(self, serialized_data: str): + if self.aad_auth and self.credential: + token = self.credential.get_token(self.aad_auth_scope).token + auth_header = {"Authorization": f"Bearer {token}"} + self._session.headers.update(auth_header) + return super()._export(serialized_data) + + return AADAuthOTLPSpanExporter(endpoint=self.otel_exporter_endpoint) except ImportError: + self.logger.warning( + "opentelemetry-exporter-otlp-proto-http is not installed, \ + OTLPSpanExporter will not be enabled!" + ) return None @@ -116,11 +147,30 @@ def __init__(self, logger) -> None: super().__init__(logger, ExporterType.METRIC) def get_exporter(self, **kwargs): + logger = self.logger try: from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter - return OTLPMetricExporter(endpoint=self.otel_exporter_endpoint) + class AADAuthOTLPMetricExporter(OTLPMetricExporter): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.aad_auth, self.aad_auth_scope, self.credential = try_parse_otlp_aad_auth_info( + logger, "OTLPMetricExporter" + ) + + def _export(self, serialized_data: str): + if self.aad_auth and self.credential: + token = self.credential.get_token(self.aad_auth_scope).token + auth_header = {"Authorization": f"Bearer {token}"} + self._session.headers.update(auth_header) + return super()._export(serialized_data) + + return AADAuthOTLPMetricExporter(endpoint=self.otel_exporter_endpoint) except ImportError: + self.logger.warning( + "opentelemetry-exporter-otlp-proto-http is not installed, \ + OTLPMetricExporter will not be enabled!" + ) return None @@ -166,3 +216,20 @@ def try_get_app_insight_connection_string(): return f"InstrumentationKey={instrumentation_key}" connection_str = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING") return connection_str + + +def try_parse_otlp_aad_auth_info(logger, exporter_name): + aad_auth = os.environ.get(OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE, "false").lower() == "true" + aad_auth_scope = os.environ.get(OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE, "https://management.azure.com/.default") + credential = None + if aad_auth: + try: + from azure.identity import DefaultAzureCredential + + credential = DefaultAzureCredential() + except ImportError: + logger.warning( + f"azure-identity is not installed, \ + AAD auth for {exporter_name} will not be enabled!" + ) + return aad_auth, aad_auth_scope, credential diff --git a/src/promptflow-core/promptflow/core/_serving/flow_invoker.py b/src/promptflow-core/promptflow/core/_serving/flow_invoker.py index ce4b7dd6d0b..727c3ef4cd0 100644 --- a/src/promptflow-core/promptflow/core/_serving/flow_invoker.py +++ b/src/promptflow-core/promptflow/core/_serving/flow_invoker.py @@ -10,6 +10,7 @@ from promptflow._utils.flow_utils import dump_flow_result, is_executable_chat_flow from promptflow._utils.logger_utils import LoggerFactory from promptflow._utils.multimedia_utils import MultimediaProcessor +from promptflow.contracts.run_info import Status from promptflow.core._connection import _Connection from promptflow.core._connection_provider._connection_provider import ConnectionProvider from promptflow.core._flow import AbstractFlowBase @@ -222,8 +223,11 @@ def invoke(self, data: dict, run_id=None, disable_input_output_logging=False): returned_non_dict_output = False resolved_outputs = self._convert_multimedia_data_to_base64(output_dict) self._dump_invoke_result(result) - log_outputs = "" if disable_input_output_logging else result.output - self.logger.info(f"Flow run result: {log_outputs}") + if result.run_info.status != Status.Completed: + self.logger.error(f"Flow run failed with error: {result.run_info.error}") + else: + log_outputs = "" if disable_input_output_logging else result.output + self.logger.info(f"Flow run result: {log_outputs}") if not self.raise_ex: # If raise_ex is False, we will return the trace flow & node run info. return FlowResult( @@ -266,8 +270,11 @@ async def invoke_async(self, data: dict, run_id=None, disable_input_output_loggi returned_non_dict_output = False resolved_outputs = self._convert_multimedia_data_to_base64(output_dict) self._dump_invoke_result(result) - log_outputs = "" if disable_input_output_logging else result.output - self.logger.info(f"Flow run result: {log_outputs}") + if result.run_info.status != Status.Completed: + self.logger.error(f"Flow run failed with error: {result.run_info.error}") + else: + log_outputs = "" if disable_input_output_logging else result.output + self.logger.info(f"Flow run result: {log_outputs}") if not self.raise_ex: # If raise_ex is False, we will return the trace flow & node run info. return FlowResult( From e53bf7f998e78c155bc95bc3579f94f602e623d2 Mon Sep 17 00:00:00 2001 From: Robben Wang <350053002@qq.com> Date: Mon, 6 May 2024 18:45:11 +0800 Subject: [PATCH 3/3] Add util method to summarize trace telemetry. (#3074) # Description Add util method to summarize trace count telemetry. For long term telemetry, we will need span count. The simplest solution is create custom event for each trace id and include span count in custom dimension. But that may generate too many customer events to affect all telemetry. So, we only record trace count first, and decide how to add span count later according to trace count's telemetry. Maybe just decide a reasonable sampling rate. # All Promptflow Contribution checklist: - [ ] **The pull request does not introduce [breaking changes].** - [ ] **CHANGELOG is updated for new features, bug fixes or other significant changes.** - [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).** - [ ] **Create an issue and link to the pull request to get dedicated review from promptflow team. Learn more: [suggested workflow](../CONTRIBUTING.md#suggested-workflow).** ## General Guidelines and Best Practices - [ ] Title of the pull request is clear and informative. - [ ] There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, [see this page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md). ### Testing Guidelines - [ ] Pull request includes test coverage for the included changes. --------- Co-authored-by: robbenwang --- src/promptflow-core/promptflow/_constants.py | 3 + .../promptflow/_internal/__init__.py | 1 + .../promptflow/_sdk/_tracing.py | 2 +- .../_sdk/_utilities/tracing_utils.py | 43 ++++++++ .../_sdk/_utilities/test_tracing_utils.py | 101 ++++++++++++++++++ 5 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 src/promptflow-devkit/tests/unittests/_sdk/_utilities/test_tracing_utils.py diff --git a/src/promptflow-core/promptflow/_constants.py b/src/promptflow-core/promptflow/_constants.py index f15d77271e1..32565352926 100644 --- a/src/promptflow-core/promptflow/_constants.py +++ b/src/promptflow-core/promptflow/_constants.py @@ -171,6 +171,9 @@ class SpanAttributeFieldName: COMPLETION_TOKEN_COUNT = "__computed__.cumulative_token_count.completion" PROMPT_TOKEN_COUNT = "__computed__.cumulative_token_count.prompt" TOTAL_TOKEN_COUNT = "__computed__.cumulative_token_count.total" + # Execution target, e.g. prompty, flex, dag, code. + # We may need another field to indicate the language, e.g. python, csharp. + EXECUTION_TARGET = "execution_target" SESSION_ID = "session_id" diff --git a/src/promptflow-devkit/promptflow/_internal/__init__.py b/src/promptflow-devkit/promptflow/_internal/__init__.py index 4f1dc5edceb..2e606c018e7 100644 --- a/src/promptflow-devkit/promptflow/_internal/__init__.py +++ b/src/promptflow-devkit/promptflow/_internal/__init__.py @@ -52,6 +52,7 @@ from promptflow._sdk._service.apis.collector import trace_collector from promptflow._sdk._tracing import process_otlp_trace_request from promptflow._sdk._utilities.general_utils import resolve_flow_language +from promptflow._sdk._utilities.tracing_utils import aggregate_trace_count from promptflow._sdk._version import VERSION from promptflow._utils.context_utils import _change_working_dir, inject_sys_path from promptflow._utils.credential_scrubber import CredentialScrubber diff --git a/src/promptflow-devkit/promptflow/_sdk/_tracing.py b/src/promptflow-devkit/promptflow/_sdk/_tracing.py index 2a1a0b69f37..deddb6acb8f 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_tracing.py +++ b/src/promptflow-devkit/promptflow/_sdk/_tracing.py @@ -615,7 +615,7 @@ def process_otlp_trace_request( args=(all_spans, get_created_by_info_with_cache, logger, get_credential, cloud_trace_only), ).start() - return + return all_spans def _try_write_trace_to_cosmosdb( diff --git a/src/promptflow-devkit/promptflow/_sdk/_utilities/tracing_utils.py b/src/promptflow-devkit/promptflow/_sdk/_utilities/tracing_utils.py index 9e96da8ec4e..8102f57eaf9 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_utilities/tracing_utils.py +++ b/src/promptflow-devkit/promptflow/_sdk/_utilities/tracing_utils.py @@ -6,6 +6,7 @@ import json import logging import typing +from collections import namedtuple from dataclasses import dataclass from pathlib import Path @@ -15,10 +16,13 @@ from opentelemetry.trace.span import format_trace_id as otel_format_trace_id from promptflow._constants import ( + SpanAttributeFieldName, SpanContextFieldName, SpanEventFieldName, SpanFieldName, SpanLinkFieldName, + SpanResourceAttributesFieldName, + SpanResourceFieldName, SpanStatusFieldName, ) from promptflow._sdk._constants import HOME_PROMPT_FLOW_DIR, AzureMLWorkspaceTriad @@ -284,3 +288,42 @@ def append_conditions( expression += f" and session_id == '{session_id}'" logger.debug("final search expression: %s", expression) return expression + + +# SCENARIO: trace count telemetry +TraceCountKey = namedtuple( + "TraceKey", ["subscription_id", "resource_group", "workspace_name", "scenario", "execution_target"] +) + + +def aggregate_trace_count(all_spans: typing.List[Span]) -> typing.Dict[TraceCountKey, int]: + """ + Aggregate the trace count based on workspace info, scenario, and execution target. + """ + trace_count_summary = {} + + if not all_spans: + return trace_count_summary + + # Iterate over all spans + for span in all_spans: + # Only count for root span, ignore span count telemetry for now. + if span.parent_id is None: + resource_attributes = span.resource.get(SpanResourceFieldName.ATTRIBUTES, {}) + subscription_id = resource_attributes.get(SpanResourceAttributesFieldName.SUBSCRIPTION_ID, None) + resource_group = resource_attributes.get(SpanResourceAttributesFieldName.RESOURCE_GROUP_NAME, None) + workspace_name = resource_attributes.get(SpanResourceAttributesFieldName.WORKSPACE_NAME, None) + # We may need another field to indicate the language in the future, e.g. python, csharp. + execution_target = span.attributes.get(SpanAttributeFieldName.EXECUTION_TARGET, "code") + + scenario = "script" + if SpanAttributeFieldName.BATCH_RUN_ID in span.attributes: + scenario = "batch" + elif SpanAttributeFieldName.LINE_RUN_ID in span.attributes: + scenario = "test" + + key = TraceCountKey(subscription_id, resource_group, workspace_name, scenario, execution_target) + + trace_count_summary[key] = trace_count_summary.get(key, 0) + 1 + + return trace_count_summary diff --git a/src/promptflow-devkit/tests/unittests/_sdk/_utilities/test_tracing_utils.py b/src/promptflow-devkit/tests/unittests/_sdk/_utilities/test_tracing_utils.py new file mode 100644 index 00000000000..d3f5ec507da --- /dev/null +++ b/src/promptflow-devkit/tests/unittests/_sdk/_utilities/test_tracing_utils.py @@ -0,0 +1,101 @@ +import pytest +from pydash import partial + +from promptflow._constants import SpanAttributeFieldName, SpanResourceAttributesFieldName, SpanResourceFieldName +from promptflow._sdk._utilities.tracing_utils import aggregate_trace_count +from promptflow._sdk.entities._trace import Span + +# Mock definitions for Span, SpanResourceFieldName, SpanResourceAttributesFieldName, and SpanAttributeFieldName +# These should match the actual implementations you're using in your application. + + +@pytest.mark.unittest +class TestTraceTelemetry: + def test_empty_span_list(self): + """Test with an empty list of spans.""" + result = aggregate_trace_count([]) + assert result == {} + + def test_single_root_span(self): + + resource = { + SpanResourceFieldName.ATTRIBUTES: { + SpanResourceAttributesFieldName.SUBSCRIPTION_ID: "sub", + SpanResourceAttributesFieldName.RESOURCE_GROUP_NAME: "rg", + SpanResourceAttributesFieldName.WORKSPACE_NAME: "ws", + } + } + create_span = partial( + Span, + trace_id=None, + span_id=None, + name=None, + context=None, + kind=None, + start_time=None, + end_time=None, + status=None, + parent_id=None, + resource=resource, + ) + + batch_root_span = create_span( + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "code", + SpanAttributeFieldName.BATCH_RUN_ID: "batch_run_id", + }, + ) + line_root_span = create_span( + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "code", + SpanAttributeFieldName.LINE_RUN_ID: "line_run_id", + }, + ) + + flex_root_span = create_span( + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "flex", + }, + ) + prompty_root_span = create_span( + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "prompty", + }, + ) + script_root_span = create_span( + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "code", + }, + ) + none_ws_root_span = create_span( + resource={}, + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "prompty", + }, + ) + non_root_span = create_span( + parent_id=1, + attributes={ + SpanAttributeFieldName.EXECUTION_TARGET: "code", + }, + ) + result = aggregate_trace_count( + [ + batch_root_span, + line_root_span, + script_root_span, + flex_root_span, + prompty_root_span, + non_root_span, + none_ws_root_span, + ] + ) + expected_result = { + ("sub", "rg", "ws", "batch", "code"): 1, + ("sub", "rg", "ws", "script", "code"): 1, + ("sub", "rg", "ws", "script", "flex"): 1, + ("sub", "rg", "ws", "script", "prompty"): 1, + ("sub", "rg", "ws", "test", "code"): 1, + (None, None, None, "script", "prompty"): 1, + } + assert result == expected_result