Skip to content

Commit

Permalink
Merge branch 'main' into mengla/copilot_sample
Browse files Browse the repository at this point in the history
  • Loading branch information
melionel authored May 6, 2024
2 parents 152ed5b + e53bf7f commit f4a43c4
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 9 deletions.
3 changes: 3 additions & 0 deletions src/promptflow-core/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class SpanAttributeFieldName:
COMPLETION_TOKEN_COUNT = "__computed__.cumulative_token_count.completion"
PROMPT_TOKEN_COUNT = "__computed__.cumulative_token_count.prompt"
TOTAL_TOKEN_COUNT = "__computed__.cumulative_token_count.total"
# Execution target, e.g. prompty, flex, dag, code.
# We may need another field to indicate the language, e.g. python, csharp.
EXECUTION_TARGET = "execution_target"

SESSION_ID = "session_id"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_override_connections(self, flow: Flow) -> Tuple[dict, dict]:
conn = WorkspaceConnectionProvider._convert_to_connection_dict(connection_name, conn_data)
connections[connection_name] = conn
except Exception as e:
self.logger.warn(f"Failed to convert connection data to connection: {e}")
self.logger.warning(f"Failed to convert connection data to connection: {e}")
raise InvalidConnectionData(connection_name)
if len(connections_name_overrides) > 0:
self.logger.info(f"Connection name overrides: {connections_name_overrides}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def get_exporter(self, **kwargs):

return AzureMonitorTraceExporter.from_connection_string(self.app_insight_connection_string)
except ImportError:
self.logger.warning(
"azure-monitor-opentelemetry-exporter is not installed, \
AzureMonitorTraceExporter will not be enabled!"
)
return None


Expand All @@ -82,9 +86,17 @@ def get_exporter(self, **kwargs):

return AzureMonitorMetricExporter.from_connection_string(self.app_insight_connection_string)
except ImportError:
self.logger.warning(
"azure-monitor-opentelemetry-exporter is not installed, \
AzureMonitorMetricExporter will not be enabled!"
)
return None


OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE = "OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE"
OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE = "OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE"


class OTLPExporterProvider(OTelExporterProvider):
def __init__(self, logger, exporter_type: ExporterType) -> None:
super().__init__(logger, exporter_type)
Expand All @@ -103,11 +115,30 @@ def __init__(self, logger) -> None:
super().__init__(logger, ExporterType.TRACE)

def get_exporter(self, **kwargs):
logger = self.logger
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

return OTLPSpanExporter(endpoint=self.otel_exporter_endpoint)
class AADAuthOTLPSpanExporter(OTLPSpanExporter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.aad_auth, self.aad_auth_scope, self.credential = try_parse_otlp_aad_auth_info(
logger, "OTLPSpanExporter"
)

def _export(self, serialized_data: str):
if self.aad_auth and self.credential:
token = self.credential.get_token(self.aad_auth_scope).token
auth_header = {"Authorization": f"Bearer {token}"}
self._session.headers.update(auth_header)
return super()._export(serialized_data)

return AADAuthOTLPSpanExporter(endpoint=self.otel_exporter_endpoint)
except ImportError:
self.logger.warning(
"opentelemetry-exporter-otlp-proto-http is not installed, \
OTLPSpanExporter will not be enabled!"
)
return None


Expand All @@ -116,11 +147,30 @@ def __init__(self, logger) -> None:
super().__init__(logger, ExporterType.METRIC)

def get_exporter(self, **kwargs):
logger = self.logger
try:
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter

return OTLPMetricExporter(endpoint=self.otel_exporter_endpoint)
class AADAuthOTLPMetricExporter(OTLPMetricExporter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.aad_auth, self.aad_auth_scope, self.credential = try_parse_otlp_aad_auth_info(
logger, "OTLPMetricExporter"
)

def _export(self, serialized_data: str):
if self.aad_auth and self.credential:
token = self.credential.get_token(self.aad_auth_scope).token
auth_header = {"Authorization": f"Bearer {token}"}
self._session.headers.update(auth_header)
return super()._export(serialized_data)

return AADAuthOTLPMetricExporter(endpoint=self.otel_exporter_endpoint)
except ImportError:
self.logger.warning(
"opentelemetry-exporter-otlp-proto-http is not installed, \
OTLPMetricExporter will not be enabled!"
)
return None


Expand Down Expand Up @@ -166,3 +216,20 @@ def try_get_app_insight_connection_string():
return f"InstrumentationKey={instrumentation_key}"
connection_str = os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")
return connection_str


def try_parse_otlp_aad_auth_info(logger, exporter_name):
aad_auth = os.environ.get(OTEL_EXPORTER_OTLP_AAD_AUTH_ENABLE, "false").lower() == "true"
aad_auth_scope = os.environ.get(OTEL_EXPORTER_OTLP_AAD_AUTH_SCOPE, "https://management.azure.com/.default")
credential = None
if aad_auth:
try:
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
except ImportError:
logger.warning(
f"azure-identity is not installed, \
AAD auth for {exporter_name} will not be enabled!"
)
return aad_auth, aad_auth_scope, credential
15 changes: 11 additions & 4 deletions src/promptflow-core/promptflow/core/_serving/flow_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from promptflow._utils.flow_utils import dump_flow_result, is_executable_chat_flow
from promptflow._utils.logger_utils import LoggerFactory
from promptflow._utils.multimedia_utils import MultimediaProcessor
from promptflow.contracts.run_info import Status
from promptflow.core._connection import _Connection
from promptflow.core._connection_provider._connection_provider import ConnectionProvider
from promptflow.core._flow import AbstractFlowBase
Expand Down Expand Up @@ -222,8 +223,11 @@ def invoke(self, data: dict, run_id=None, disable_input_output_logging=False):
returned_non_dict_output = False
resolved_outputs = self._convert_multimedia_data_to_base64(output_dict)
self._dump_invoke_result(result)
log_outputs = "<REDACTED>" if disable_input_output_logging else result.output
self.logger.info(f"Flow run result: {log_outputs}")
if result.run_info.status != Status.Completed:
self.logger.error(f"Flow run failed with error: {result.run_info.error}")
else:
log_outputs = "<REDACTED>" if disable_input_output_logging else result.output
self.logger.info(f"Flow run result: {log_outputs}")
if not self.raise_ex:
# If raise_ex is False, we will return the trace flow & node run info.
return FlowResult(
Expand Down Expand Up @@ -266,8 +270,11 @@ async def invoke_async(self, data: dict, run_id=None, disable_input_output_loggi
returned_non_dict_output = False
resolved_outputs = self._convert_multimedia_data_to_base64(output_dict)
self._dump_invoke_result(result)
log_outputs = "<REDACTED>" if disable_input_output_logging else result.output
self.logger.info(f"Flow run result: {log_outputs}")
if result.run_info.status != Status.Completed:
self.logger.error(f"Flow run failed with error: {result.run_info.error}")
else:
log_outputs = "<REDACTED>" if disable_input_output_logging else result.output
self.logger.info(f"Flow run result: {log_outputs}")
if not self.raise_ex:
# If raise_ex is False, we will return the trace flow & node run info.
return FlowResult(
Expand Down
1 change: 1 addition & 0 deletions src/promptflow-devkit/promptflow/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from promptflow._sdk._service.apis.collector import trace_collector
from promptflow._sdk._tracing import process_otlp_trace_request
from promptflow._sdk._utilities.general_utils import resolve_flow_language
from promptflow._sdk._utilities.tracing_utils import aggregate_trace_count
from promptflow._sdk._version import VERSION
from promptflow._utils.context_utils import _change_working_dir, inject_sys_path
from promptflow._utils.credential_scrubber import CredentialScrubber
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow-devkit/promptflow/_sdk/_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def process_otlp_trace_request(
args=(all_spans, get_created_by_info_with_cache, logger, get_credential, cloud_trace_only),
).start()

return
return all_spans


def _try_write_trace_to_cosmosdb(
Expand Down
43 changes: 43 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_utilities/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
import typing
from collections import namedtuple
from dataclasses import dataclass
from pathlib import Path

Expand All @@ -15,10 +16,13 @@
from opentelemetry.trace.span import format_trace_id as otel_format_trace_id

from promptflow._constants import (
SpanAttributeFieldName,
SpanContextFieldName,
SpanEventFieldName,
SpanFieldName,
SpanLinkFieldName,
SpanResourceAttributesFieldName,
SpanResourceFieldName,
SpanStatusFieldName,
)
from promptflow._sdk._constants import HOME_PROMPT_FLOW_DIR, AzureMLWorkspaceTriad
Expand Down Expand Up @@ -284,3 +288,42 @@ def append_conditions(
expression += f" and session_id == '{session_id}'"
logger.debug("final search expression: %s", expression)
return expression


# SCENARIO: trace count telemetry
TraceCountKey = namedtuple(
"TraceKey", ["subscription_id", "resource_group", "workspace_name", "scenario", "execution_target"]
)


def aggregate_trace_count(all_spans: typing.List[Span]) -> typing.Dict[TraceCountKey, int]:
"""
Aggregate the trace count based on workspace info, scenario, and execution target.
"""
trace_count_summary = {}

if not all_spans:
return trace_count_summary

# Iterate over all spans
for span in all_spans:
# Only count for root span, ignore span count telemetry for now.
if span.parent_id is None:
resource_attributes = span.resource.get(SpanResourceFieldName.ATTRIBUTES, {})
subscription_id = resource_attributes.get(SpanResourceAttributesFieldName.SUBSCRIPTION_ID, None)
resource_group = resource_attributes.get(SpanResourceAttributesFieldName.RESOURCE_GROUP_NAME, None)
workspace_name = resource_attributes.get(SpanResourceAttributesFieldName.WORKSPACE_NAME, None)
# We may need another field to indicate the language in the future, e.g. python, csharp.
execution_target = span.attributes.get(SpanAttributeFieldName.EXECUTION_TARGET, "code")

scenario = "script"
if SpanAttributeFieldName.BATCH_RUN_ID in span.attributes:
scenario = "batch"
elif SpanAttributeFieldName.LINE_RUN_ID in span.attributes:
scenario = "test"

key = TraceCountKey(subscription_id, resource_group, workspace_name, scenario, execution_target)

trace_count_summary[key] = trace_count_summary.get(key, 0) + 1

return trace_count_summary
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pytest
from pydash import partial

from promptflow._constants import SpanAttributeFieldName, SpanResourceAttributesFieldName, SpanResourceFieldName
from promptflow._sdk._utilities.tracing_utils import aggregate_trace_count
from promptflow._sdk.entities._trace import Span

# Mock definitions for Span, SpanResourceFieldName, SpanResourceAttributesFieldName, and SpanAttributeFieldName
# These should match the actual implementations you're using in your application.


@pytest.mark.unittest
class TestTraceTelemetry:
def test_empty_span_list(self):
"""Test with an empty list of spans."""
result = aggregate_trace_count([])
assert result == {}

def test_single_root_span(self):

resource = {
SpanResourceFieldName.ATTRIBUTES: {
SpanResourceAttributesFieldName.SUBSCRIPTION_ID: "sub",
SpanResourceAttributesFieldName.RESOURCE_GROUP_NAME: "rg",
SpanResourceAttributesFieldName.WORKSPACE_NAME: "ws",
}
}
create_span = partial(
Span,
trace_id=None,
span_id=None,
name=None,
context=None,
kind=None,
start_time=None,
end_time=None,
status=None,
parent_id=None,
resource=resource,
)

batch_root_span = create_span(
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "code",
SpanAttributeFieldName.BATCH_RUN_ID: "batch_run_id",
},
)
line_root_span = create_span(
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "code",
SpanAttributeFieldName.LINE_RUN_ID: "line_run_id",
},
)

flex_root_span = create_span(
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "flex",
},
)
prompty_root_span = create_span(
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "prompty",
},
)
script_root_span = create_span(
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "code",
},
)
none_ws_root_span = create_span(
resource={},
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "prompty",
},
)
non_root_span = create_span(
parent_id=1,
attributes={
SpanAttributeFieldName.EXECUTION_TARGET: "code",
},
)
result = aggregate_trace_count(
[
batch_root_span,
line_root_span,
script_root_span,
flex_root_span,
prompty_root_span,
non_root_span,
none_ws_root_span,
]
)
expected_result = {
("sub", "rg", "ws", "batch", "code"): 1,
("sub", "rg", "ws", "script", "code"): 1,
("sub", "rg", "ws", "script", "flex"): 1,
("sub", "rg", "ws", "script", "prompty"): 1,
("sub", "rg", "ws", "test", "code"): 1,
(None, None, None, "script", "prompty"): 1,
}
assert result == expected_result
2 changes: 1 addition & 1 deletion src/promptflow/promptflow/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

VERSION = "1.10.0"
VERSION = "1.11.0.dev0"

0 comments on commit f4a43c4

Please sign in to comment.