Skip to content

Commit

Permalink
Python: Update gen_ai traces and logs (#10173)
Browse files Browse the repository at this point in the history
### Motivation and Context

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->
The gen_ai semantic convention has gone through some updates since we
started generating telemetry for gen_ai operations. We would like to
align with the latest conventions, and most importantly allow our users
to visualize the gen_ai traces on the Azure AI Foundry Tracing UI, which
relies on the gen_ai conventions.

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

1. Updates the gen_ai telemetry module to align with the latest gen_ai
convention so that all AI connectors generate telemetry data that can be
visualized on Azure AI Foundry.
2. Unit tests for the update.

> Note that this is a breaking change as this feature is still
experimental. Anyone who is relying on the previous gen_ai conventions
should also update.

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [ ] I didn't break anyone 😄
  • Loading branch information
TaoChenOSU authored Jan 14, 2025
1 parent bc3e294 commit bf45719
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 91 deletions.
160 changes: 110 additions & 50 deletions python/semantic_kernel/utils/telemetry/model_diagnostics/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import functools
import json
import logging
from collections.abc import AsyncGenerator, Callable
from functools import reduce
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

from opentelemetry.trace import Span, StatusCode, get_tracer, use_span

Expand Down Expand Up @@ -37,9 +38,36 @@
TEXT_COMPLETION_OPERATION = "text.completions"
TEXT_STREAMING_COMPLETION_OPERATION = "text.streaming_completions"


# We're recording multiple events for the chat history, some of them are emitted within (hundreds of)
# nanoseconds of each other. The default timestamp resolution is not high enough to guarantee unique
# timestamps for each message. Also Azure Monitor truncates resolution to microseconds and some other
# backends truncate to milliseconds.
#
# But we need to give users a way to restore chat message order, so we're incrementing the timestamp
# by 1 microsecond for each message.
#
# This is a workaround, we'll find a generic and better solution - see
# https://github.com/open-telemetry/semantic-conventions/issues/1701
class ChatHistoryMessageTimestampFilter(logging.Filter):
"""A filter to increment the timestamp of INFO logs by 1 microsecond."""

INDEX_KEY: ClassVar[str] = "CHAT_MESSAGE_INDEX"

def filter(self, record: logging.LogRecord) -> bool:
"""Increment the timestamp of INFO logs by 1 microsecond."""
if hasattr(record, self.INDEX_KEY):
idx = getattr(record, self.INDEX_KEY)
record.created += idx * 1e-6
return True


# Creates a tracer from the global tracer provider
tracer = get_tracer(__name__)

logger = logging.getLogger(__name__)
logger.addFilter(ChatHistoryMessageTimestampFilter())


@experimental_function
def are_model_diagnostics_enabled() -> bool:
Expand Down Expand Up @@ -87,19 +115,19 @@ async def wrapper_decorator(*args: Any, **kwargs: Any) -> list[ChatMessageConten
settings: "PromptExecutionSettings" = kwargs.get("settings") or args[2] # type: ignore

with use_span(
_start_completion_activity(
_get_completion_span(
CHAT_COMPLETION_OPERATION,
completion_service.ai_model_id,
model_provider,
completion_service.service_url(),
chat_history,
settings,
),
end_on_exit=True,
) as current_span:
_set_completion_input(model_provider, chat_history)
try:
completions: list[ChatMessageContent] = await completion_func(*args, **kwargs)
_set_completion_response(current_span, completions)
_set_completion_response(current_span, completions, model_provider)
return completions
except Exception as exception:
_set_completion_error(current_span, exception)
Expand Down Expand Up @@ -144,16 +172,16 @@ async def wrapper_decorator(
all_messages: dict[int, list[StreamingChatMessageContent]] = {}

with use_span(
_start_completion_activity(
_get_completion_span(
CHAT_STREAMING_COMPLETION_OPERATION,
completion_service.ai_model_id,
model_provider,
completion_service.service_url(),
chat_history,
settings,
),
end_on_exit=True,
) as current_span:
_set_completion_input(model_provider, chat_history)
try:
async for streaming_chat_message_contents in completion_func(*args, **kwargs):
for streaming_chat_message_content in streaming_chat_message_contents:
Expand All @@ -166,7 +194,7 @@ async def wrapper_decorator(
all_messages_flattened = [
reduce(lambda x, y: x + y, messages) for messages in all_messages.values()
]
_set_completion_response(current_span, all_messages_flattened)
_set_completion_response(current_span, all_messages_flattened, model_provider)
except Exception as exception:
_set_completion_error(current_span, exception)
raise
Expand Down Expand Up @@ -203,19 +231,19 @@ async def wrapper_decorator(*args: Any, **kwargs: Any) -> list[TextContent]:
settings: "PromptExecutionSettings" = kwargs["settings"] if kwargs.get("settings") is not None else args[2]

with use_span(
_start_completion_activity(
_get_completion_span(
TEXT_COMPLETION_OPERATION,
completion_service.ai_model_id,
model_provider,
completion_service.service_url(),
prompt,
settings,
),
end_on_exit=True,
) as current_span:
_set_completion_input(model_provider, prompt)
try:
completions: list[TextContent] = await completion_func(*args, **kwargs)
_set_completion_response(current_span, completions)
_set_completion_response(current_span, completions, model_provider)
return completions
except Exception as exception:
_set_completion_error(current_span, exception)
Expand Down Expand Up @@ -258,16 +286,16 @@ async def wrapper_decorator(*args: Any, **kwargs: Any) -> AsyncGenerator[list["S
all_text_contents: dict[int, list["StreamingTextContent"]] = {}

with use_span(
_start_completion_activity(
_get_completion_span(
TEXT_STREAMING_COMPLETION_OPERATION,
completion_service.ai_model_id,
model_provider,
completion_service.service_url(),
prompt,
settings,
),
end_on_exit=True,
) as current_span:
_set_completion_input(model_provider, prompt)
try:
async for streaming_text_contents in completion_func(*args, **kwargs):
for streaming_text_content in streaming_text_contents:
Expand All @@ -280,7 +308,7 @@ async def wrapper_decorator(*args: Any, **kwargs: Any) -> AsyncGenerator[list["S
all_text_contents_flattened = [
reduce(lambda x, y: x + y, messages) for messages in all_text_contents.values()
]
_set_completion_response(current_span, all_text_contents_flattened)
_set_completion_response(current_span, all_text_contents_flattened, model_provider)
except Exception as exception:
_set_completion_error(current_span, exception)
raise
Expand All @@ -292,15 +320,18 @@ async def wrapper_decorator(*args: Any, **kwargs: Any) -> AsyncGenerator[list["S
return inner_trace_streaming_text_completion


def _start_completion_activity(
def _get_completion_span(
operation_name: str,
model_name: str,
model_provider: str,
service_url: str | None,
prompt: str | ChatHistory,
execution_settings: "PromptExecutionSettings | None",
) -> Span:
"""Start a text or chat completion activity for a given model."""
"""Start a text or chat completion span for a given model.
Note that `start_span` doesn't make the span the current span.
Use `use_span` to make it the current span as a context manager.
"""
span = tracer.start_span(f"{operation_name} {model_name}")

# Set attributes on the span
Expand All @@ -316,24 +347,53 @@ def _start_completion_activity(
# TODO(@glahaye): we'll need to have a way to get these attributes from model
# providers other than OpenAI (for example if the attributes are named differently)
if execution_settings:
attribute = execution_settings.extension_data.get("max_tokens")
if attribute:
span.set_attribute(gen_ai_attributes.MAX_TOKENS, attribute)
attribute_name_map = {
"seed": gen_ai_attributes.SEED,
"encoding_formats": gen_ai_attributes.ENCODING_FORMATS,
"frequency_penalty": gen_ai_attributes.FREQUENCY_PENALTY,
"max_tokens": gen_ai_attributes.MAX_TOKENS,
"stop_sequences": gen_ai_attributes.STOP_SEQUENCES,
"temperature": gen_ai_attributes.TEMPERATURE,
"top_k": gen_ai_attributes.TOP_K,
"top_p": gen_ai_attributes.TOP_P,
}
for attribute_name, attribute_key in attribute_name_map.items():
attribute = execution_settings.extension_data.get(attribute_name)
if attribute:
span.set_attribute(attribute_key, attribute)

return span

attribute = execution_settings.extension_data.get("temperature")
if attribute:
span.set_attribute(gen_ai_attributes.TEMPERATURE, attribute)

attribute = execution_settings.extension_data.get("top_p")
if attribute:
span.set_attribute(gen_ai_attributes.TOP_P, attribute)
def _set_completion_input(
model_provider: str,
prompt: str | ChatHistory,
) -> None:
"""Set the input for a text or chat completion.
The logs will be associated to the current span.
"""
if are_sensitive_events_enabled():
if isinstance(prompt, ChatHistory):
prompt = _messages_to_openai_format(prompt.messages)
span.add_event(gen_ai_attributes.PROMPT_EVENT, {gen_ai_attributes.PROMPT_EVENT_PROMPT: prompt})

return span
for idx, message in enumerate(prompt.messages):
event_name = gen_ai_attributes.ROLE_EVENT_MAP.get(message.role)
if event_name:
logger.info(
json.dumps(message.to_dict()),
extra={
gen_ai_attributes.EVENT_NAME: event_name,
gen_ai_attributes.SYSTEM: model_provider,
ChatHistoryMessageTimestampFilter.INDEX_KEY: idx,
},
)
else:
logger.info(
prompt,
extra={
gen_ai_attributes.EVENT_NAME: gen_ai_attributes.PROMPT,
gen_ai_attributes.SYSTEM: model_provider,
},
)


def _set_completion_response(
Expand All @@ -342,8 +402,9 @@ def _set_completion_response(
| list[TextContent]
| list[StreamingChatMessageContent]
| list[StreamingTextContent],
model_provider: str,
) -> None:
"""Set the a text or chat completion response for a given activity."""
"""Set the a text or chat completion response for a given span."""
first_completion = completions[0]

# Set the response ID
Expand All @@ -362,33 +423,32 @@ def _set_completion_response(
usage = first_completion.metadata.get("usage", None)
if isinstance(usage, CompletionUsage):
if usage.prompt_tokens:
current_span.set_attribute(gen_ai_attributes.PROMPT_TOKENS, usage.prompt_tokens)
current_span.set_attribute(gen_ai_attributes.INPUT_TOKENS, usage.prompt_tokens)
if usage.completion_tokens:
current_span.set_attribute(gen_ai_attributes.COMPLETION_TOKENS, usage.completion_tokens)
current_span.set_attribute(gen_ai_attributes.OUTPUT_TOKENS, usage.completion_tokens)

# Set the completion event
if are_sensitive_events_enabled():
completion_text: str = _messages_to_openai_format(completions)
current_span.add_event(
gen_ai_attributes.COMPLETION_EVENT, {gen_ai_attributes.COMPLETION_EVENT_COMPLETION: completion_text}
)
for completion in completions:
full_response: dict[str, Any] = {
"message": completion.to_dict(),
}

if hasattr(completion, "finish_reason"):
full_response["finish_reason"] = completion.finish_reason
if hasattr(completion, "choice_index"):
full_response["index"] = completion.choice_index

logger.info(
json.dumps(full_response),
extra={
gen_ai_attributes.EVENT_NAME: gen_ai_attributes.CHOICE,
gen_ai_attributes.SYSTEM: model_provider,
},
)


def _set_completion_error(span: Span, error: Exception) -> None:
"""Set an error for a text or chat completion ."""
span.set_attribute(gen_ai_attributes.ERROR_TYPE, str(type(error)))
span.set_status(StatusCode.ERROR, repr(error))


def _messages_to_openai_format(
messages: list[ChatMessageContent]
| list[StreamingChatMessageContent]
| list[TextContent]
| list[StreamingTextContent],
) -> str:
"""Convert a list of ChatMessageContent to a string in the OpenAI format.
OpenTelemetry recommends formatting the messages in the OpenAI format
regardless of the actual model being used.
"""
return json.dumps([message.to_dict() for message in messages])
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
# Copyright (c) Microsoft. All rights reserved.

from semantic_kernel.contents.utils.author_role import AuthorRole

# Constants for tracing activities with semantic conventions.
# Ideally, we should use the attributes from the semcov package.
# However, many of the attributes are not yet available in the package,
# so we define them here for now.

# Activity tags
SYSTEM = "gen_ai.system"
OPERATION = "gen_ai.operation.name"
SYSTEM = "gen_ai.system"
ERROR_TYPE = "error.type"
MODEL = "gen_ai.request.model"
MAX_TOKENS = "gen_ai.request.max_tokens" # nosec
SEED = "gen_ai.request.seed"
PORT = "server.port"
ENCODING_FORMATS = "gen_ai.request.encoding_formats"
FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty"
MAX_TOKENS = "gen_ai.request.max_tokens"
STOP_SEQUENCES = "gen_ai.request.stop_sequences"
TEMPERATURE = "gen_ai.request.temperature"
TOP_K = "gen_ai.request.top_k"
TOP_P = "gen_ai.request.top_p"
RESPONSE_ID = "gen_ai.response.id"
FINISH_REASON = "gen_ai.response.finish_reason"
PROMPT_TOKENS = "gen_ai.response.prompt_tokens" # nosec
COMPLETION_TOKENS = "gen_ai.response.completion_tokens" # nosec
RESPONSE_ID = "gen_ai.response.id"
INPUT_TOKENS = "gen_ai.usage.input_tokens"
OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
ADDRESS = "server.address"
PORT = "server.port"
ERROR_TYPE = "error.type"

# Activity events
PROMPT_EVENT = "gen_ai.content.prompt"
COMPLETION_EVENT = "gen_ai.content.completion"

# Activity event attributes
PROMPT_EVENT_PROMPT = "gen_ai.prompt"
COMPLETION_EVENT_COMPLETION = "gen_ai.completion"
EVENT_NAME = "event.name"
SYSTEM_MESSAGE = "gen_ai.system.message"
USER_MESSAGE = "gen_ai.user.message"
ASSISTANT_MESSAGE = "gen_ai.assistant.message"
TOOL_MESSAGE = "gen_ai.tool.message"
CHOICE = "gen_ai.choice"
PROMPT = "gen_ai.prompt"

# Kernel specific attributes
AVAILABLE_FUNCTIONS = "sk.available_functions"


ROLE_EVENT_MAP = {
AuthorRole.SYSTEM: SYSTEM_MESSAGE,
AuthorRole.USER: USER_MESSAGE,
AuthorRole.ASSISTANT: ASSISTANT_MESSAGE,
AuthorRole.TOOL: TOOL_MESSAGE,
}
Loading

0 comments on commit bf45719

Please sign in to comment.