Skip to content

Commit

Permalink
chore(llmobs): automatically set span links with decorators (#12255)
Browse files Browse the repository at this point in the history
Re-opening #12043 due to
conflicts with main after 3.x-staging was merged in

Decorators set span links by tracking common objects passed as inputs &
outputs of functions.

This functionality will be gated behind the environment variable
`_DD_LLMOBS_AUTO_SPAN_LINKING_ENABLED`

We maintain a dictionary in the LLMObs service that remembers which
objects are used as the input/output for a spans generated by LLM Obs
decorators. This is how we record the **from** direction of span links.

When objects are encountered again as the input/output for another span,
we now know to set the **to** direction for a span link.

In my opinion, this does not need to be gated behind a feature flag
since it's a read-only on app data. A follow up PR will mutate data
actually used in the user app for enhanced span link inferencing, and
the features introduced then should be gated by a flag.

Implementation notes:
- Objects are remembered by generating a string object id through the
type + memory location of the object.
- We ignore "input" -> "output" edges. This is not a valid edge.
- Spans can only be linked to other spans belonging to the same trace

Limitations:
- it is very easy for link information to be lost if an object is
mutated or used to create another object. A follow up PR will implement
a best-effort attempt for objects to inherit link info from other
objects
- doesn't work for distributed tracing scenarios

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: lievan <[email protected]>
  • Loading branch information
lievan and lievan authored Feb 13, 2025
1 parent 09c9958 commit a0fa9d0
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 4 deletions.
53 changes: 52 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ddtrace.llmobs._context import LLMObsContextProvider
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import _get_ml_app
from ddtrace.llmobs._utils import _get_session_id
from ddtrace.llmobs._utils import _get_span_name
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(self, tracer=None):

forksafe.register(self._child_after_fork)

self._link_tracker = LinkTracker()
self._annotations = []
self._annotation_context_lock = forksafe.RLock()

Expand Down Expand Up @@ -208,7 +210,7 @@ def _llmobs_span_event(cls, span: Span) -> Dict[str, Any]:
llmobs_span_event["tags"] = cls._llmobs_tags(span, ml_app, session_id)

span_links = span._get_ctx_item(SPAN_LINKS)
if isinstance(span_links, list):
if isinstance(span_links, list) and span_links:
llmobs_span_event["span_links"] = span_links

return llmobs_span_event
Expand Down Expand Up @@ -397,6 +399,55 @@ def disable(cls) -> None:

log.debug("%s disabled", cls.__name__)

def _record_object(self, span, obj, input_or_output):
if obj is None:
return
span_links = []
for span_link in self._link_tracker.get_span_links_from_object(obj):
try:
if span_link["attributes"]["from"] == "input" and input_or_output == "output":
continue
except KeyError:
log.debug("failed to read span link: ", span_link)
continue
span_links.append(
{
"trace_id": span_link["trace_id"],
"span_id": span_link["span_id"],
"attributes": {
"from": span_link["attributes"]["from"],
"to": input_or_output,
},
}
)
self._tag_span_links(span, span_links)
self._link_tracker.add_span_links_to_object(
obj,
[
{
"trace_id": self.export_span(span)["trace_id"],
"span_id": self.export_span(span)["span_id"],
"attributes": {
"from": input_or_output,
},
}
],
)

def _tag_span_links(self, span, span_links):
if not span_links:
return
span_links = [
span_link
for span_link in span_links
if span_link["span_id"] != LLMObs.export_span(span)["span_id"]
and span_link["trace_id"] == LLMObs.export_span(span)["trace_id"]
]
current_span_links = span._get_ctx_item(SPAN_LINKS)
if current_span_links:
span_links = current_span_links + span_links
span._set_ctx_item(SPAN_LINKS, span_links)

@classmethod
def annotation_context(
cls, tags: Optional[Dict[str, Any]] = None, prompt: Optional[dict] = None, name: Optional[str] = None
Expand Down
17 changes: 17 additions & 0 deletions ddtrace/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ def validate_prompt(prompt: dict) -> Dict[str, Union[str, dict, List[str]]]:
return validated_prompt


class LinkTracker:
def __init__(self, object_span_links=None):
self._object_span_links = object_span_links or {}

def get_object_id(self, obj):
return f"{type(obj).__name__}_{id(obj)}"

def add_span_links_to_object(self, obj, span_links):
obj_id = self.get_object_id(obj)
if obj_id not in self._object_span_links:
self._object_span_links[obj_id] = []
self._object_span_links[obj_id] += span_links

def get_span_links_from_object(self, obj):
return self._object_span_links.get(self.get_object_id(obj), [])


class AnnotationContext:
def __init__(self, _register_annotator, _deregister_annotator):
self._register_annotator = _register_annotator
Expand Down
20 changes: 18 additions & 2 deletions ddtrace/llmobs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Callable
from typing import Optional

from ddtrace import config
from ddtrace.internal.compat import iscoroutinefunction
from ddtrace.internal.compat import isgeneratorfunction
from ddtrace.internal.logger import get_logger
Expand Down Expand Up @@ -138,8 +139,16 @@ def wrapper(*args, **kwargs):
name=span_name,
session_id=session_id,
ml_app=ml_app,
):
return func(*args, **kwargs)
) as span:
if config._llmobs_auto_span_linking_enabled:
for arg in args:
LLMObs._instance._record_object(span, arg, "input")
for arg in kwargs.values():
LLMObs._instance._record_object(span, arg, "input")
ret = func(*args, **kwargs)
if config._llmobs_auto_span_linking_enabled:
LLMObs._instance._record_object(span, ret, "output")
return ret

return generator_wrapper if (isgeneratorfunction(func) or isasyncgenfunction(func)) else wrapper

Expand Down Expand Up @@ -231,6 +240,11 @@ def wrapper(*args, **kwargs):
_, span_name = _get_llmobs_span_options(name, None, func)
traced_operation = getattr(LLMObs, operation_kind, LLMObs.workflow)
with traced_operation(name=span_name, session_id=session_id, ml_app=ml_app) as span:
if config._llmobs_auto_span_linking_enabled:
for arg in args:
LLMObs._instance._record_object(span, arg, "input")
for arg in kwargs.values():
LLMObs._instance._record_object(span, arg, "input")
func_signature = signature(func)
bound_args = func_signature.bind_partial(*args, **kwargs)
if _automatic_io_annotation and bound_args.arguments:
Expand All @@ -243,6 +257,8 @@ def wrapper(*args, **kwargs):
and span._get_ctx_item(OUTPUT_VALUE) is None
):
LLMObs.annotate(span=span, output_data=resp)
if config._llmobs_auto_span_linking_enabled:
LLMObs._instance._record_object(span, resp, "output")
return resp

return generator_wrapper if (isgeneratorfunction(func) or isasyncgenfunction(func)) else wrapper
Expand Down
1 change: 1 addition & 0 deletions ddtrace/settings/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def __init__(self):
self._llmobs_sample_rate = _get_config("DD_LLMOBS_SAMPLE_RATE", 1.0, float)
self._llmobs_ml_app = _get_config("DD_LLMOBS_ML_APP")
self._llmobs_agentless_enabled = _get_config("DD_LLMOBS_AGENTLESS_ENABLED", False, asbool)
self._llmobs_auto_span_linking_enabled = _get_config("_DD_LLMOBS_AUTO_SPAN_LINKING_ENABLED", False, asbool)

self._inject_force = _get_config("DD_INJECT_FORCE", False, asbool)
self._lib_was_injected = False
Expand Down
16 changes: 15 additions & 1 deletion tests/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,13 @@ def _expected_llmobs_non_llm_span_event(


def _llmobs_base_span_event(
span, span_kind, tags=None, session_id=None, error=None, error_message=None, error_stack=None
span,
span_kind,
tags=None,
session_id=None,
error=None,
error_message=None,
error_stack=None,
):
span_event = {
"trace_id": "{:x}".format(span.trace_id),
Expand Down Expand Up @@ -776,3 +782,11 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None):
"_dd": {"span_id": mock.ANY, "trace_id": mock.ANY},
},
]


def _expected_span_link(span_event, link_from, link_to):
return {
"trace_id": span_event["trace_id"],
"span_id": span_event["span_id"],
"attributes": {"from": link_from, "to": link_to},
}
83 changes: 83 additions & 0 deletions tests/llmobs/test_llmobs_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
from ddtrace.llmobs.decorators import workflow
from tests.llmobs._utils import _expected_llmobs_llm_span_event
from tests.llmobs._utils import _expected_llmobs_non_llm_span_event
from tests.llmobs._utils import _expected_span_link
from tests.utils import override_global_config


@pytest.fixture
def auto_linking_enabled():
with override_global_config(dict(_llmobs_auto_span_linking_enabled=True)):
yield


@pytest.fixture
Expand Down Expand Up @@ -828,3 +836,78 @@ def get_next_element(alist):
error_message=span.get_tag("error.message"),
error_stack=span.get_tag("error.stack"),
)


def test_decorator_records_span_links(llmobs, llmobs_events, auto_linking_enabled):
@workflow
def one(inp):
return 1

@task
def two(inp):
return inp

with llmobs.agent("dummy_trace"):
two(one("test_input"))

one_span = llmobs_events[0]
two_span = llmobs_events[1]

assert "span_links" not in one_span
assert len(two_span["span_links"]) == 2
assert two_span["span_links"][0] == _expected_span_link(one_span, "output", "input")
assert two_span["span_links"][1] == _expected_span_link(one_span, "output", "output")


def test_decorator_records_span_links_for_multi_input_functions(llmobs, llmobs_events, auto_linking_enabled):
@agent
def some_agent(a, b):
pass

@workflow
def one():
return 1

@task
def two():
return 2

with llmobs.agent("dummy_trace"):
some_agent(one(), two())

one_span = llmobs_events[0]
two_span = llmobs_events[1]
three_span = llmobs_events[2]

assert "span_links" not in one_span
assert "span_links" not in two_span
assert len(three_span["span_links"]) == 2
assert three_span["span_links"][0] == _expected_span_link(one_span, "output", "input")
assert three_span["span_links"][1] == _expected_span_link(two_span, "output", "input")


def test_decorator_records_span_links_via_kwargs(llmobs, llmobs_events, auto_linking_enabled):
@agent
def some_agent(a=None, b=None):
pass

@workflow
def one():
return 1

@task
def two():
return 2

with llmobs.agent("dummy_trace"):
some_agent(one(), two())

one_span = llmobs_events[0]
two_span = llmobs_events[1]
three_span = llmobs_events[2]

assert "span_links" not in one_span
assert "span_links" not in two_span
assert len(three_span["span_links"]) == 2
assert three_span["span_links"][0] == _expected_span_link(one_span, "output", "input")
assert three_span["span_links"][1] == _expected_span_link(two_span, "output", "input")
1 change: 1 addition & 0 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
{"name": "_DD_APPSEC_DEDUPLICATION_ENABLED", "origin": "default", "value": True},
{"name": "_DD_IAST_LAZY_TAINT", "origin": "default", "value": False},
{"name": "_DD_INJECT_WAS_ATTEMPTED", "origin": "default", "value": False},
{"name": "_DD_LLMOBS_AUTO_SPAN_LINKING_ENABLED", "origin": "default", "value": False},
{"name": "_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS", "origin": "default", "value": False},
{"name": "ddtrace_auto_used", "origin": "unknown", "value": True},
{"name": "ddtrace_bootstrapped", "origin": "unknown", "value": True},
Expand Down
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def override_global_config(values):
"_llmobs_sample_rate",
"_llmobs_ml_app",
"_llmobs_agentless_enabled",
"_llmobs_auto_span_linking_enabled",
"_data_streams_enabled",
"_inferred_proxy_services_enabled",
]
Expand Down

0 comments on commit a0fa9d0

Please sign in to comment.