Skip to content

Commit

Permalink
fix(llmobs): encode llm objects in utf-8 before sending [backport #11961
Browse files Browse the repository at this point in the history
 to 2.19] (#12033)

Backports #11961 to 2.19.
**Note that due to non-existent test files/conftest utilities in the
2.19 branch, this backport avoids backporting over the entire diff of
#11961 and instead just backports over the fix implementation.**

This PR resolves an issue in the Python SDK where non-ascii/utf8
characters being annotated on spans resulted in span payloads being
dropped due to encoding errors.

In #11330 we previously added the `ensure_ascii=False` option to our
`safe_json()` helper's use of `json.dumps(...)` in order to keep
non-ascii characters from being encoded multiple times into nonsense (as
we were calling `safe_json()` multiple nested times while building the
span event from the span tags. However this resulted in issues where
non-latin1 characters (which is a subset of utf-8 and apparently the
encoding scheme HTTP library relies on, which we in turn rely on to
submit payloads) broke the encoding at payload submission time.

To fix this, we remove the `ensure_ascii=False` option at the final
write time.

Also note that after #11543 we mostly centralized all of the times a
span event is encoded, which is at write time and when encoding the
span's input/output value fields (which can be a json dictionary
format). Since we need to provide valid json formatting for the IO
fields (which leads to a prettier UI display), we still need to call
`json.dumps(ensure_ascii=False)` to avoid the same problem as fixed by
end (i.e. write time)

This PR also adds minor test fixtures mocking out the LLMObs back end
intake to make assertions on the payloads we should be submitting to
LLMObs, since previous tests were all relying on the span events prior
to encoding/submission and weren't able to cover this scenario.

---------

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

Co-authored-by: Jonathan Chavez <[email protected]>
Co-authored-by: Kyle Verhoog <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2025
1 parent 7a1d6b9 commit 88bc85b
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 16 deletions.
4 changes: 3 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(self, tracer=None):

self._llmobs_span_writer = LLMObsSpanWriter(
is_agentless=config._llmobs_agentless_enabled,
agentless_url="%s.%s" % (AGENTLESS_BASE_URL, config._dd_site),
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
)
Expand All @@ -108,7 +110,7 @@ def __init__(self, tracer=None):
self._annotation_context_lock = forksafe.RLock()
self.tracer.on_start_span(self._do_annotations)

def _do_annotations(self, span):
def _do_annotations(self, span: Span) -> None:
# get the current span context
# only do the annotations if it matches the context
if span.span_type != SpanTypes.LLM: # do this check to avoid the warning log in `annotate`
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ def _unserializable_default_repr(obj):
return default_repr


def safe_json(obj):
def safe_json(obj, ensure_ascii=True):
if isinstance(obj, str):
return obj
try:
return json.dumps(obj, ensure_ascii=False, skipkeys=True, default=_unserializable_default_repr)
return json.dumps(obj, ensure_ascii=ensure_ascii, skipkeys=True, default=_unserializable_default_repr)
except Exception:
log.error("Failed to serialize object to JSON.", exc_info=True)
6 changes: 4 additions & 2 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from ddtrace.internal.periodic import PeriodicService
from ddtrace.internal.writer import HTTPWriter
from ddtrace.internal.writer import WriterClientBase
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_ENDPOINT
from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR
from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT
Expand Down Expand Up @@ -237,15 +236,18 @@ def __init__(
interval: float,
timeout: float,
is_agentless: bool = True,
agentless_url: str = "",
dogstatsd=None,
sync_mode=False,
reuse_connections=None,
):
headers = {}
clients = [] # type: List[WriterClientBase]
if is_agentless:
if not agentless_url:
raise ValueError("agentless_url is required for agentless mode")
clients.append(LLMObsAgentlessEventClient())
intake_url = "%s.%s" % (AGENTLESS_BASE_URL, config._dd_site)
intake_url = agentless_url
headers["DD-API-KEY"] = config._dd_api_key
else:
clients.append(LLMObsProxiedEventClient())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors.
2 changes: 1 addition & 1 deletion tests/llmobs/test_llmobs_span_agent_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(


def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=False, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
Expand Down
21 changes: 11 additions & 10 deletions tests/llmobs/test_llmobs_span_agentless_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

def test_writer_start(mock_writer_logs):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", INTAKE_URL)])


def test_buffer_limit(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
for _ in range(1001):
llmobs_span_writer.enqueue({})
mock_writer_logs.warning.assert_called_with(
Expand All @@ -39,7 +39,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(
mock_writer_logs, mock_http_writer_send_payload_response
):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
Expand All @@ -56,7 +56,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(

def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
Expand All @@ -77,7 +77,7 @@ def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_pay

def test_send_completion_event(mock_writer_logs, mock_http_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -87,7 +87,7 @@ def test_send_completion_event(mock_writer_logs, mock_http_writer_logs, mock_htt

def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_chat_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -97,7 +97,7 @@ def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_logs, moc

def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put_response_forbidden):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="<bad-api-key>")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -111,7 +111,7 @@ def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put

def test_send_timed_events(mock_writer_logs, mock_http_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.reset_mock()

Expand All @@ -127,7 +127,7 @@ def test_send_timed_events(mock_writer_logs, mock_http_writer_logs, mock_http_wr

def test_send_multiple_events(mock_writer_logs, mock_http_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.reset_mock()

Expand Down Expand Up @@ -159,6 +159,7 @@ def test_send_on_exit(mock_writer_logs, run_python_code_in_subprocess):
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs._writer import LLMObsSpanWriter
from tests.llmobs.test_llmobs_span_agentless_writer import INTAKE_URL
from tests.llmobs.test_llmobs_span_agentless_writer import _completion_event
with mock.patch(
Expand All @@ -168,7 +169,7 @@ def test_send_on_exit(mock_writer_logs, run_python_code_in_subprocess):
body="{}",
),
):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
""",
Expand Down

0 comments on commit 88bc85b

Please sign in to comment.