Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llmobs): llmobs-specific context manager #12236

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddtrace/contrib/internal/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# We have parsed a trace id from headers, and we do not already
# have a context with the same trace id active
tracer.context_provider.activate(context)
core.dispatch("http.activate_distributed_headers", (request_headers, context))


def _flatten(
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
DROPPED_IO_COLLECTION_ERROR = "dropped_io"
DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]"

ROOT_PARENT_ID = "undefined"

# Set for traces of evaluator integrations e.g. `runner.integration:ragas`.
# Used to differentiate traces of Datadog-run operations vs user-application operations.
RUNNER_IS_INTEGRATION_SPAN_TAG = "runner.integration"
Expand Down
59 changes: 59 additions & 0 deletions ddtrace/llmobs/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import contextvars
from typing import Optional
from typing import Union

from ddtrace._trace.context import Context
from ddtrace._trace.provider import DefaultContextProvider
from ddtrace._trace.span import Span
from ddtrace.ext import SpanTypes


ContextTypeValue = Optional[Union[Context, Span]]


_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar(
Yun-Kim marked this conversation as resolved.
Show resolved Hide resolved
"datadog_llmobs_contextvar",
default=None,
)


class LLMObsContextProvider(DefaultContextProvider):
"""Context provider that retrieves contexts from a context variable.
It is suitable for synchronous programming and for asynchronous executors
that support contextvars.
"""

def __init__(self) -> None:
super(DefaultContextProvider, self).__init__()
_DD_LLMOBS_CONTEXTVAR.set(None)

def _has_active_context(self) -> bool:
"""Returns whether there is an active context in the current execution."""
ctx = _DD_LLMOBS_CONTEXTVAR.get()
return ctx is not None

def _update_active(self, span: Span) -> Optional[Span]:
"""Updates the active LLMObs span.
The active span is updated to be the span's closest unfinished LLMObs ancestor span.
"""
if not span.finished:
return span
new_active: Optional[Span] = span
while new_active and new_active.finished:
new_active = new_active._parent
if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM:
break
self.activate(new_active)
return new_active

def activate(self, ctx: ContextTypeValue) -> None:
"""Makes the given context active in the current execution."""
_DD_LLMOBS_CONTEXTVAR.set(ctx)
super(DefaultContextProvider, self).activate(ctx)

def active(self) -> ContextTypeValue:
"""Returns the active span or context for the current execution."""
item = _DD_LLMOBS_CONTEXTVAR.get()
if isinstance(item, Span):
return self._update_active(item)
return item
28 changes: 10 additions & 18 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._llmobs import LLMObs
from ddtrace.llmobs._log_writer import V2LogWriter
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.settings import IntegrationConfig
from ddtrace.trace import Pin
from ddtrace.trace import Span
Expand Down Expand Up @@ -132,21 +129,16 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
span.set_tag(_SPAN_MEASURED_KEY)
self._set_base_span_tags(span, **kwargs)
if submit_to_llmobs and self.llmobs_enabled:
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
# For non-distributed traces or spans in the first service of a distributed trace,
# The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now
# in these cases to avoid conflicting with the later propagated tags.
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, str(parent_id))
telemetry_writer.add_count_metric(
namespace=TELEMETRY_NAMESPACE.MLOBS,
name="span.start",
value=1,
tags=(
("integration", self._integration_name),
("autoinstrumented", "true"),
),
)
LLMObs._instance._activate_llmobs_span(span)
telemetry_writer.add_count_metric(
namespace=TELEMETRY_NAMESPACE.MLOBS,
name="span.start",
value=1,
tags=(
("integration", self._integration_name),
("autoinstrumented", "true"),
),
)
return span

@classmethod
Expand Down
8 changes: 2 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.trace import Span


Expand All @@ -34,9 +32,7 @@ def _llmobs_set_tags(
operation: str = "",
) -> None:
"""Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags."""
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, parent_id)
LLMObs._instance._activate_llmobs_span(span)
parameters = {}
if span.get_tag("bedrock.request.temperature"):
parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0)
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/llmobs/_integrations/langgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import NAME
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import ROOT_PARENT_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_LINKS
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import format_langchain_io
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.trace import Span
from ddtrace.trace import tracer
Expand Down Expand Up @@ -154,7 +155,7 @@ def _default_span_link(span: Span):
the span is linked to its parent's input.
"""
return {
"span_id": str(_get_llmobs_parent_id(span)) or "undefined",
"span_id": span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID,
"trace_id": "{:x}".format(span.trace_id),
"attributes": {"from": "input", "to": "input"},
}
Expand Down
Loading
Loading