Skip to content

Commit

Permalink
Fix integration test for span cleanup (#1374)
Browse files Browse the repository at this point in the history
  • Loading branch information
nnarayen committed Feb 7, 2025
1 parent 4db445b commit ef2e43b
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions truss/templates/server/model_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ async def _stream_with_background_task(
generator: Union[Generator[bytes, None, None], AsyncGenerator[bytes, None]],
span: trace.Span,
trace_ctx: trace.Context,
release_and_end: Callable[[], None],
cleanup_fn: Callable[[], None],
) -> AsyncGenerator[bytes, None]:
# The streaming read timeout is the amount of time in between streamed chunk
# before a timeout is triggered.
Expand All @@ -661,7 +661,7 @@ async def _stream_with_background_task(
self._write_response_to_queue(response_queue, async_generator, span)
)
# Defer the release of the semaphore until the write_response_to_queue task.
gen_task.add_done_callback(lambda _: release_and_end())
gen_task.add_done_callback(lambda _: cleanup_fn())

# The gap between responses in a stream must be < streaming_read_timeout
# TODO: this whole buffering might be superfluous and sufficiently done by
Expand Down Expand Up @@ -717,7 +717,7 @@ async def _process_model_fn(

if inspect.isgenerator(result) or inspect.isasyncgen(result):
return await self._handle_generator_response(
request, result, fn_span, detached_ctx, release_and_end=lambda: None
request, result, fn_span, detached_ctx
)

return result
Expand All @@ -738,13 +738,13 @@ async def _handle_generator_response(
generator: Union[Generator[bytes, None, None], AsyncGenerator[bytes, None]],
span: trace.Span,
trace_ctx: trace.Context,
release_and_end: Callable[[], None],
get_cleanup_fn: Callable[[], Callable[[], None]] = lambda: lambda: None,
):
if self._should_gather_generator(request):
return await _gather_generator(generator)
else:
return await self._stream_with_background_task(
generator, span, trace_ctx, release_and_end
generator, span, trace_ctx, cleanup_fn=get_cleanup_fn()
)

async def completions(
Expand Down Expand Up @@ -824,7 +824,7 @@ async def __call__(
predict_result,
span_predict,
detached_ctx,
release_and_end=get_defer_fn(),
get_cleanup_fn=get_defer_fn,
)

if isinstance(predict_result, starlette.responses.Response):
Expand Down

0 comments on commit ef2e43b

Please sign in to comment.