diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 66339cbb751..50a8e9d4255 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -31,6 +31,7 @@ from ddtrace.internal.utils.formats import asbool from ddtrace.internal.utils.formats import parse_tags_str from ddtrace.llmobs import _constants as constants +from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID from ddtrace.llmobs._constants import INPUT_DOCUMENTS from ddtrace.llmobs._constants import INPUT_MESSAGES @@ -91,6 +92,7 @@ def __init__(self, tracer=None): self.tracer = tracer or ddtrace.tracer self._llmobs_span_writer = LLMObsSpanWriter( is_agentless=config._llmobs_agentless_enabled, + agentless_url="%s.%s" % (AGENTLESS_BASE_URL, config._dd_site), interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) @@ -152,13 +154,13 @@ def _llmobs_span_event(cls, span: Span) -> Tuple[Dict[str, Any], bool]: if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None: meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES) if span._get_ctx_item(INPUT_VALUE) is not None: - meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE)) + meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE), ensure_ascii=False) if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None: meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES) if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None: meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) if span._get_ctx_item(OUTPUT_VALUE) is not None: - meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE)) + meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE), ensure_ascii=False) if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None: meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) if span._get_ctx_item(INPUT_PROMPT) is not None: diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index 1799eb5548d..b30ef4c969e 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -185,10 +185,10 @@ def _unserializable_default_repr(obj): return default_repr -def safe_json(obj): +def safe_json(obj, ensure_ascii=True): if isinstance(obj, str): return obj try: - return json.dumps(obj, ensure_ascii=False, skipkeys=True, default=_unserializable_default_repr) + return json.dumps(obj, ensure_ascii=ensure_ascii, skipkeys=True, default=_unserializable_default_repr) except Exception: log.error("Failed to serialize object to JSON.", exc_info=True) diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index 5880019d67f..c172c9adba9 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -22,7 +22,6 @@ from ddtrace.internal.periodic import PeriodicService from ddtrace.internal.writer import HTTPWriter from ddtrace.internal.writer import WriterClientBase -from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._constants import AGENTLESS_ENDPOINT from ddtrace.llmobs._constants import DROPPED_IO_COLLECTION_ERROR from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT @@ -243,6 +242,7 @@ def __init__( interval: float, timeout: float, is_agentless: bool = True, + agentless_url: str = "", dogstatsd=None, sync_mode=False, reuse_connections=None, @@ -250,8 +250,10 @@ def __init__( headers = {} clients = [] # type: List[WriterClientBase] if is_agentless: + if not agentless_url: + raise ValueError("agentless_url is required for agentless mode") clients.append(LLMObsAgentlessEventClient()) - intake_url = "%s.%s" % (AGENTLESS_BASE_URL, config._dd_site) + intake_url = agentless_url headers["DD-API-KEY"] = config._dd_api_key else: clients.append(LLMObsProxiedEventClient()) diff --git a/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml b/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml new file mode 100644 index 00000000000..352ffefc369 --- /dev/null +++ b/releasenotes/notes/fix-llmobs-json-encoding-ascii-f8da77867a910de1.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + LLM Observability: This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors. diff --git a/tests/contrib/langgraph/conftest.py b/tests/contrib/langgraph/conftest.py index 19d01c018aa..a521ff367fb 100644 --- a/tests/contrib/langgraph/conftest.py +++ b/tests/contrib/langgraph/conftest.py @@ -11,11 +11,15 @@ from ddtrace.contrib.internal.langgraph.patch import patch from ddtrace.contrib.internal.langgraph.patch import unpatch from ddtrace.llmobs import LLMObs as llmobs_service +from ddtrace.llmobs._constants import AGENTLESS_BASE_URL from ddtrace.llmobs._writer import LLMObsSpanWriter from tests.utils import DummyTracer from tests.utils import override_global_config +DATADOG_SITE = "datad0g.com" + + @pytest.fixture def mock_tracer(): yield DummyTracer() @@ -48,7 +52,8 @@ def enqueue(self, event): @pytest.fixture def llmobs_span_writer(): - yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0) + agentless_url = "{}.{}".format(AGENTLESS_BASE_URL, DATADOG_SITE) + yield TestLLMObsSpanWriter(is_agentless=True, agentless_url=agentless_url, interval=1.0, timeout=1.0) @pytest.fixture diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index 61a028e5caf..e47595f0973 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -1,4 +1,9 @@ +from http.server import BaseHTTPRequestHandler +from http.server import HTTPServer +import json import os +import threading +import time import mock import pytest @@ -195,15 +200,79 @@ def llmobs_env(): class TestLLMObsSpanWriter(LLMObsSpanWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.events = [] + self._events = [] def enqueue(self, event): - self.events.append(event) + self._events.append(event) + super().enqueue(event) + + def events(self): + return self._events @pytest.fixture -def llmobs_span_writer(): - yield TestLLMObsSpanWriter(interval=1.0, timeout=1.0) +def llmobs_span_writer(_llmobs_backend): + url, _ = _llmobs_backend + sw = TestLLMObsSpanWriter(interval=1.0, timeout=1.0, agentless_url=url) + sw._headers["DD-API-KEY"] = "" + yield sw + + +class LLMObsServer(BaseHTTPRequestHandler): + """A mock server for the LLMObs backend used to capture the requests made by the client. + + Python's HTTPRequestHandler is a bit weird and uses a class rather than an instance + for running an HTTP server so the requests are stored in a class variable and reset in the pytest fixture. + """ + + requests = [] + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def do_POST(self) -> None: + content_length = int(self.headers["Content-Length"]) + body = self.rfile.read(content_length).decode("utf-8") + self.requests.append({"path": self.path, "headers": dict(self.headers), "body": body}) + self.send_response(200) + self.end_headers() + self.wfile.write(b"OK") + + +@pytest.fixture +def _llmobs_backend(): + LLMObsServer.requests = [] + # Create and start the HTTP server + server = HTTPServer(("localhost", 0), LLMObsServer) + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + + # Provide the server details to the test + server_address = f"http://{server.server_address[0]}:{server.server_address[1]}" + + yield server_address, LLMObsServer.requests + + # Stop the server after the test + server.shutdown() + server.server_close() + + +@pytest.fixture +def llmobs_backend(_llmobs_backend): + _, reqs = _llmobs_backend + + class _LLMObsBackend: + def wait_for_num_events(self, num, attempts=1000): + for _ in range(attempts): + if len(reqs) == num: + return [json.loads(r["body"]) for r in reqs] + # time.sleep will yield the GIL so the server can process the request + time.sleep(0.001) + else: + raise TimeoutError(f"Expected {num} events, got {len(reqs)}") + + return _LLMObsBackend() @pytest.fixture @@ -231,4 +300,4 @@ def llmobs( @pytest.fixture def llmobs_events(llmobs, llmobs_span_writer): - return llmobs_span_writer.events + return llmobs_span_writer.events() diff --git a/tests/llmobs/test_llmobs.py b/tests/llmobs/test_llmobs.py index 6cf19fc3e2c..004b77b5764 100644 --- a/tests/llmobs/test_llmobs.py +++ b/tests/llmobs/test_llmobs.py @@ -245,3 +245,30 @@ def test_only_generate_span_events_from_llmobs_spans(tracer, llmobs_events): assert len(llmobs_events) == 2 assert llmobs_events[1] == _expected_llmobs_llm_span_event(root_span, "llm") assert llmobs_events[0] == expected_grandchild_llmobs_span + + +def test_utf_non_ascii_io(llmobs, llmobs_backend): + with llmobs.workflow() as workflow_span: + with llmobs.llm(model_name="gpt-3.5-turbo-0125") as llm_span: + llmobs.annotate(llm_span, input_data="안녕, 지금 몇 시야?") + llmobs.annotate(workflow_span, input_data="안녕, 지금 몇 시야?") + events = llmobs_backend.wait_for_num_events(num=1) + assert len(events) == 1 + assert events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"] == "안녕, 지금 몇 시야?" + assert events[0]["spans"][1]["meta"]["input"]["value"] == "안녕, 지금 몇 시야?" + + +def test_non_utf8_inputs_outputs(llmobs, llmobs_backend): + """Test that latin1 encoded inputs and outputs are correctly decoded.""" + with llmobs.llm(model_name="gpt-3.5-turbo-0125") as span: + llmobs.annotate( + span, + input_data="The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967.", + ) + + events = llmobs_backend.wait_for_num_events(num=1) + assert len(events) == 1 + assert ( + events[0]["spans"][0]["meta"]["input"]["messages"][0]["content"] + == "The first Super Bowl (aka First AFL–NFL World Championship Game), was played in 1967." + ) diff --git a/tests/llmobs/test_llmobs_span_agent_writer.py b/tests/llmobs/test_llmobs_span_agent_writer.py index d16bb9f0e2c..55f0a56e4d5 100644 --- a/tests/llmobs/test_llmobs_span_agent_writer.py +++ b/tests/llmobs/test_llmobs_span_agent_writer.py @@ -50,7 +50,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=False, interval=1000, timeout=1) llmobs_span_writer.enqueue(_oversized_llm_event()) llmobs_span_writer.enqueue(_oversized_retrieval_event()) llmobs_span_writer.enqueue(_oversized_workflow_event()) diff --git a/tests/llmobs/test_llmobs_span_agentless_writer.py b/tests/llmobs/test_llmobs_span_agentless_writer.py index cac0d926a74..8a1a0697752 100644 --- a/tests/llmobs/test_llmobs_span_agentless_writer.py +++ b/tests/llmobs/test_llmobs_span_agentless_writer.py @@ -20,14 +20,14 @@ def test_writer_start(mock_writer_logs): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.start() mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", INTAKE_URL)]) def test_buffer_limit(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) for _ in range(1001): llmobs_span_writer.enqueue({}) mock_writer_logs.warning.assert_called_with( @@ -39,7 +39,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( mock_writer_logs, mock_http_writer_send_payload_response ): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.enqueue(_large_event()) llmobs_span_writer.enqueue(_large_event()) llmobs_span_writer.enqueue(_large_event()) @@ -56,7 +56,7 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit( def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1000, timeout=1) llmobs_span_writer.enqueue(_oversized_llm_event()) llmobs_span_writer.enqueue(_oversized_retrieval_event()) llmobs_span_writer.enqueue(_oversized_workflow_event()) @@ -77,7 +77,7 @@ def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_pay def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) llmobs_span_writer.periodic() @@ -86,7 +86,7 @@ def test_send_completion_event(mock_writer_logs, mock_http_writer_send_payload_r def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_chat_completion_event()) llmobs_span_writer.periodic() @@ -96,7 +96,7 @@ def test_send_chat_completion_event(mock_writer_logs, mock_http_writer_send_payl @mock.patch("ddtrace.internal.writer.writer.log") def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put_response_forbidden): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=1, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) llmobs_span_writer.periodic() @@ -110,7 +110,7 @@ def test_send_completion_bad_api_key(mock_http_writer_logs, mock_http_writer_put def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() mock_writer_logs.reset_mock() @@ -125,7 +125,7 @@ def test_send_timed_events(mock_writer_logs, mock_http_writer_send_payload_respo def test_send_multiple_events(mock_writer_logs, mock_http_writer_send_payload_response): with override_global_config(dict(_dd_site=DATADOG_SITE, _dd_api_key="foobar.baz")): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() mock_writer_logs.reset_mock() @@ -156,6 +156,7 @@ def test_send_on_exit(run_python_code_in_subprocess): from ddtrace.internal.utils.http import Response from ddtrace.llmobs._writer import LLMObsSpanWriter +from tests.llmobs.test_llmobs_span_agentless_writer import INTAKE_URL from tests.llmobs.test_llmobs_span_agentless_writer import _completion_event with mock.patch( @@ -165,7 +166,7 @@ def test_send_on_exit(run_python_code_in_subprocess): body="{}", ), ): - llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=0.01, timeout=1) + llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, agentless_url=INTAKE_URL, interval=0.01, timeout=1) llmobs_span_writer.start() llmobs_span_writer.enqueue(_completion_event()) """,