Skip to content

Commit

Permalink
fix(llmobs): encode llm objects in utf-8 before sending (#11961)
Browse files Browse the repository at this point in the history
This PR resolves an issue in the Python SDK where non-ascii/utf8
characters being annotated on spans resulted in span payloads being
dropped due to encoding errors.

In #11330 we previously added the `ensure_ascii=False` option to our
`safe_json()` helper's use of `json.dumps(...)` in order to keep
non-ascii characters from being encoded multiple times into nonsense (as
we were calling `safe_json()` multiple nested times while building the
span event from the span tags. However this resulted in issues where
non-latin1 characters (which is a subset of utf-8 and apparently the
encoding scheme HTTP library relies on, which we in turn rely on to
submit payloads) broke the encoding at payload submission time.

To fix this, we remove the `ensure_ascii=False` option at the final
write time.

Also note that after #11543 we mostly centralized all of the times a
span event is encoded, which is at write time and when encoding the
span's input/output value fields (which can be a json dictionary
format). Since we need to provide valid json formatting for the IO
fields (which leads to a prettier UI display), we still need to call
`json.dumps(ensure_ascii=False)` to avoid the same problem as fixed by
#11330, i.e. keep the non-ascii characters unencoded until at the very
end (i.e. write time)

This PR also adds minor test fixtures mocking out the LLMObs back end
intake to make assertions on the payloads we should be submitting to
LLMObs, since previous tests were all relying on the span events prior
to encoding/submission and weren't able to cover this scenario.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [ ] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Kyle Verhoog <[email protected]>
Co-authored-by: Yun Kim <[email protected]>
Co-authored-by: Yun Kim <[email protected]>
(cherry picked from commit e11a0a3)
  • Loading branch information
jjxct authored and github-actions[bot] committed Jan 22, 2025
1 parent 90bdc87 commit 3c1d798
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 23 deletions.
6 changes: 4 additions & 2 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.llmobs import _constants as constants
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(self, tracer=None):
self.tracer = tracer or ddtrace.tracer
self._llmobs_span_writer = LLMObsSpanWriter(
is_agentless=config._llmobs_agentless_enabled,
agentless_url="%s.%s" % (AGENTLESS_BASE_URL, config._dd_site),
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
)
Expand Down Expand Up @@ -152,13 +154,13 @@ def _llmobs_span_event(cls, span: Span) -> Tuple[Dict[str, Any], bool]:
if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None:
meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES)
if span._get_ctx_item(INPUT_VALUE) is not None:
meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE))
meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False)
if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None:
meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES)
if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None:
meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS)
if span._get_ctx_item(OUTPUT_VALUE) is not None:
meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE))
meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False)
if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None:
meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS)
if span._get_ctx_item(INPUT_PROMPT) is not None:
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ def _unserializable_default_repr(obj):
return default_repr


def safe_json(obj):
def safe_json(obj, ensure_ascii=True):
if isinstance(obj, str):
return obj
try:
return json.dumps(obj, ensure_ascii=False, skipkeys=True, default=_unserializable_default_repr)
return json.dumps(obj, ensure_ascii=ensure_ascii, skipkeys=True, default=_unserializable_default_repr)
except Exception:
log.error("Failed to serialize object to JSON.", exc_info=True)
6 changes: 4 additions & 2 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from ddtrace.internal.periodic import PeriodicService
from ddtrace.internal.writer import HTTPWriter
from ddtrace.internal.writer import WriterClientBase
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_ENDPOINT
from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR
from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT
Expand Down Expand Up @@ -243,15 +242,18 @@ def __init__(
interval: float,
timeout: float,
is_agentless: bool = True,
agentless_url: str = "",
dogstatsd=None,
sync_mode=False,
reuse_connections=None,
):
headers = {}
clients = [] # type: List[WriterClientBase]
if is_agentless:
if not agentless_url:
raise ValueError("agentless_url is required for agentless mode")
clients.append(LLMObsAgentlessEventClient())
intake_url = "%s.%s" % (AGENTLESS_BASE_URL, config._dd_site)
intake_url = agentless_url
headers["DD-API-KEY"] = config._dd_api_key
else:
clients.append(LLMObsProxiedEventClient())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors.
7 changes: 6 additions & 1 deletion tests/contrib/langgraph/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
from ddtrace.contrib.internal.langgraph.patch import patch
from ddtrace.contrib.internal.langgraph.patch import unpatch
from ddtrace.llmobs import LLMObs as llmobs_service
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._writer import LLMObsSpanWriter
from tests.utils import DummyTracer
from tests.utils import override_global_config


DATADOG_SITE = "datad0g.com"


@pytest.fixture
def mock_tracer():
yield DummyTracer()
Expand Down Expand Up @@ -48,7 +52,8 @@ def enqueue(self, event):

@pytest.fixture
def llmobs_span_writer():
yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0)
agentless_url = "{}.{}".format(AGENTLESS_BASE_URL, DATADOG_SITE)
yield TestLLMObsSpanWriter(is_agentless=True, agentless_url=agentless_url, interval=1.0, timeout=1.0)


@pytest.fixture
Expand Down
79 changes: 74 additions & 5 deletions tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
import json
import os
import threading
import time

import mock
import pytest
Expand Down Expand Up @@ -195,15 +200,79 @@ def llmobs_env():
class TestLLMObsSpanWriter(LLMObsSpanWriter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.events = []
self._events = []

def enqueue(self, event):
self.events.append(event)
self._events.append(event)
super().enqueue(event)

def events(self):
return self._events


@pytest.fixture
def llmobs_span_writer():
yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0)
def llmobs_span_writer(_llmobs_backend):
url, _ = _llmobs_backend
sw = TestLLMObsSpanWriter(interval=1.0, timeout=1.0, agentless_url=url)
sw._headers["DD-API-KEY"] = "<test-key>"
yield sw


class LLMObsServer(BaseHTTPRequestHandler):
"""A mock server for the LLMObs backend used to capture the requests made by the client.
Python's HTTPRequestHandler is a bit weird and uses a class rather than an instance
for running an HTTP server so the requests are stored in a class variable and reset in the pytest fixture.
"""

requests = []

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

def do_POST(self) -> None:
content_length = int(self.headers["Content-Length"])
body = self.rfile.read(content_length).decode("utf-8")
self.requests.append({"path": self.path, "headers": dict(self.headers), "body": body})
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")


@pytest.fixture
def _llmobs_backend():
LLMObsServer.requests = []
# Create and start the HTTP server
server = HTTPServer(("localhost", 0), LLMObsServer)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()

# Provide the server details to the test
server_address = f"http://{server.server_address[0]}:{server.server_address[1]}"

yield server_address, LLMObsServer.requests

# Stop the server after the test
server.shutdown()
server.server_close()


@pytest.fixture
def llmobs_backend(_llmobs_backend):
_, reqs = _llmobs_backend

class _LLMObsBackend:
def wait_for_num_events(self, num, attempts=1000):
for _ in range(attempts):
if len(reqs) == num:
return [json.loads(r["body"]) for r in reqs]
# time.sleep will yield the GIL so the server can process the request
time.sleep(0.001)
else:
raise TimeoutError(f"Expected {num} events, got {len(reqs)}")

return _LLMObsBackend()


@pytest.fixture
Expand Down Expand Up @@ -231,4 +300,4 @@ def llmobs(

@pytest.fixture
def llmobs_events(llmobs, llmobs_span_writer):
return llmobs_span_writer.events
return llmobs_span_writer.events()
27 changes: 27 additions & 0 deletions tests/llmobs/test_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,30 @@ def test_only_generate_span_events_from_llmobs_spans(tracer, llmobs_events):
assert len(llmobs_events) == 2
assert llmobs_events[1] == _expected_llmobs_llm_span_event(root_span, "llm")
assert llmobs_events[0] == expected_grandchild_llmobs_span


def test_utf_non_ascii_io(llmobs, llmobs_backend):
with llmobs.workflow() as workflow_span:
with llmobs.llm(model_name="gpt-3.5-turbo-0125") as llm_span:
llmobs.annotate(llm_span, input_data="안녕, 지금 몇 시야?")
llmobs.annotate(workflow_span, input_data="안녕, 지금 몇 시야?")
events = llmobs_backend.wait_for_num_events(num=1)
assert len(events) == 1
assert events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"] == "안녕, 지금 몇 시야?"
assert events[0]["spans"][1]["meta"]["input"]["value"] == "안녕, 지금 몇 시야?"


def test_non_utf8_inputs_outputs(llmobs, llmobs_backend):
"""Test that latin1 encoded inputs and outputs are correctly decoded."""
with llmobs.llm(model_name="gpt-3.5-turbo-0125") as span:
llmobs.annotate(
span,
input_data="The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967.",
)

events = llmobs_backend.wait_for_num_events(num=1)
assert len(events) == 1
assert (
events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"]
== "The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967."
)
2 changes: 1 addition & 1 deletion tests/llmobs/test_llmobs_span_agent_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(


def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=False, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
Expand Down
21 changes: 11 additions & 10 deletions tests/llmobs/test_llmobs_span_agentless_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

def test_writer_start(mock_writer_logs):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", INTAKE_URL)])


def test_buffer_limit(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
for _ in range(1001):
llmobs_span_writer.enqueue({})
mock_writer_logs.warning.assert_called_with(
Expand All @@ -39,7 +39,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(
mock_writer_logs, mock_http_writer_send_payload_response
):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
Expand All @@ -56,7 +56,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(

def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
Expand All @@ -77,7 +77,7 @@ def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_pay

def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -86,7 +86,7 @@ def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_r

def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_chat_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -96,7 +96,7 @@ def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payl
@mock.patch("ddtrace.internal.writer.writer.log")
def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put_response_forbidden):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="<bad-api-key>")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
llmobs_span_writer.periodic()
Expand All @@ -110,7 +110,7 @@ def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put

def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.reset_mock()

Expand All @@ -125,7 +125,7 @@ def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_respo

def test_send_multiple_events(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
mock_writer_logs.reset_mock()

Expand Down Expand Up @@ -156,6 +156,7 @@ def test_send_on_exit(run_python_code_in_subprocess):
from ddtrace.internal.utils.http import Response
from ddtrace.llmobs._writer import LLMObsSpanWriter
from tests.llmobs.test_llmobs_span_agentless_writer import INTAKE_URL
from tests.llmobs.test_llmobs_span_agentless_writer import _completion_event
with mock.patch(
Expand All @@ -165,7 +166,7 @@ def test_send_on_exit(run_python_code_in_subprocess):
body="{}",
),
):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1)
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1)
llmobs_span_writer.start()
llmobs_span_writer.enqueue(_completion_event())
""",
Expand Down

0 comments on commit 3c1d798

Please sign in to comment.