Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
lievan committed Jan 9, 2025
2 parents 521700d + 04ee68f commit 967bddd
Show file tree
Hide file tree
Showing 31 changed files with 1,407 additions and 1,707 deletions.
32 changes: 32 additions & 0 deletions .riot/requirements/16562eb.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# This file is autogenerated by pip-compile with Python 3.7
# by the following command:
#
# pip-compile --allow-unsafe --config=pyproject.toml --no-annotate --resolver=backtracking .riot/requirements/16562eb.in
#
attrs==24.2.0
coverage[toml]==7.2.7
exceptiongroup==1.2.2
hypothesis==6.45.0
idna==3.10
importlib-metadata==6.7.0
iniconfig==2.0.0
mock==5.1.0
multidict==6.0.5
opentracing==2.4.0
packaging==24.0
pluggy==1.2.0
pytest==7.4.4
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-mock==3.11.1
pyyaml==6.0.1
six==1.17.0
sortedcontainers==2.4.0
tomli==2.0.1
typing-extensions==4.7.1
urllib3==1.26.20
vcrpy==4.4.0
wrapt==1.16.0
yarl==1.9.4
zipp==3.15.0
5 changes: 4 additions & 1 deletion ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ddtrace.internal.atexit import register_on_exit_signal
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
from ddtrace.internal.constants import SPAN_API_DATADOG
from ddtrace.internal.core import dispatch
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.logger import get_logger
from ddtrace.internal.peer_service.processor import PeerServiceProcessor
Expand Down Expand Up @@ -849,7 +850,7 @@ def _start_span(
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
p.on_span_start(span)
self._hooks.emit(self.__class__.start_span, span)

dispatch("trace.span_start", (span,))
return span

start_span = _start_span
Expand All @@ -866,6 +867,8 @@ def _on_span_finish(self, span: Span) -> None:
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
p.on_span_finish(span)

dispatch("trace.span_finish", (span,))

if log.isEnabledFor(logging.DEBUG):
log.debug("finishing span %s (enabled:%s)", span._pprint(), self.enabled)

Expand Down
8 changes: 8 additions & 0 deletions ddtrace/contrib/internal/openai/_endpoint_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ def _record_request(self, pin, integration, span, args, kwargs):
span.set_tag_str("openai.request.messages.%d.content" % idx, integration.trunc(str(content)))
span.set_tag_str("openai.request.messages.%d.role" % idx, str(role))
span.set_tag_str("openai.request.messages.%d.name" % idx, str(name))
if parse_version(OPENAI_VERSION) >= (1, 26) and kwargs.get("stream"):
if kwargs.get("stream_options", {}).get("include_usage", None) is not None:
# Only perform token chunk auto-extraction if this option is not explicitly set
return
span._set_ctx_item("_dd.auto_extract_token_chunk", True)
stream_options = kwargs.get("stream_options", {})
stream_options["include_usage"] = True
kwargs["stream_options"] = stream_options

def _record_response(self, pin, integration, span, args, kwargs, resp, error):
resp = super()._record_response(pin, integration, span, args, kwargs, resp, error)
Expand Down
69 changes: 66 additions & 3 deletions ddtrace/contrib/internal/openai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)

def __iter__(self):
return self
exception_raised = False
try:
for chunk in self.__wrapped__:
self._extract_token_chunk(chunk)
yield chunk
_loop_handler(self._dd_span, chunk, self._streamed_chunks)
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
exception_raised = True
raise
finally:
if not exception_raised:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()
self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns)

def __next__(self):
try:
chunk = self.__wrapped__.__next__()
self._extract_token_chunk(chunk)
_loop_handler(self._dd_span, chunk, self._streamed_chunks)
return chunk
except StopIteration:
Expand All @@ -68,6 +85,22 @@ def __next__(self):
self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns)
raise

def _extract_token_chunk(self, chunk):
"""Attempt to extract the token chunk (last chunk in the stream) from the streamed response."""
if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"):
return
choice = getattr(chunk, "choices", [None])[0]
if not getattr(choice, "finish_reason", None):
# Only the second-last chunk in the stream with token usage enabled will have finish_reason set
return
try:
# User isn't expecting last token chunk to be present since it's not part of the default streamed response,
# so we consume it and extract the token usage metadata before it reaches the user.
usage_chunk = self.__wrapped__.__next__()
self._streamed_chunks[0].insert(0, usage_chunk)
except (StopIteration, GeneratorExit):
return


class TracedOpenAIAsyncStream(BaseTracedOpenAIStream):
async def __aenter__(self):
Expand All @@ -77,12 +110,29 @@ async def __aenter__(self):
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)

def __aiter__(self):
return self
async def __aiter__(self):
exception_raised = False
try:
async for chunk in self.__wrapped__:
await self._extract_token_chunk(chunk)
yield chunk
_loop_handler(self._dd_span, chunk, self._streamed_chunks)
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
exception_raised = True
raise
finally:
if not exception_raised:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()
self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns)

async def __anext__(self):
try:
chunk = await self.__wrapped__.__anext__()
await self._extract_token_chunk(chunk)
_loop_handler(self._dd_span, chunk, self._streamed_chunks)
return chunk
except StopAsyncIteration:
Expand All @@ -98,6 +148,19 @@ async def __anext__(self):
self._dd_integration.metric(self._dd_span, "dist", "request.duration", self._dd_span.duration_ns)
raise

async def _extract_token_chunk(self, chunk):
"""Attempt to extract the token chunk (last chunk in the stream) from the streamed response."""
if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"):
return
choice = getattr(chunk, "choices", [None])[0]
if not getattr(choice, "finish_reason", None):
return
try:
usage_chunk = await self.__wrapped__.__anext__()
self._streamed_chunks[0].insert(0, usage_chunk)
except (StopAsyncIteration, GeneratorExit):
return


def _compute_token_count(content, model):
# type: (Union[str, List[int]], Optional[str]) -> Tuple[bool, int]
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/internal/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def collect(tracer):
from ddtrace._trace.tracer import log

return dict(
# Timestamp UTC ISO 8601
date=datetime.datetime.utcnow().isoformat(),
# Timestamp UTC ISO 8601 with the trailing +00:00 removed
date=datetime.datetime.now(datetime.timezone.utc).isoformat()[0:-6],
# eg. "Linux", "Darwin"
os_name=platform.system(),
# eg. 12.5.0
Expand Down
Loading

0 comments on commit 967bddd

Please sign in to comment.