Skip to content

Commit

Permalink
initial refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Jan 22, 2025
1 parent 2f52abb commit 5b7673e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 40 deletions.
38 changes: 37 additions & 1 deletion ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContex
def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> "Span":
span_kwargs = _get_parameters_for_new_span_directly_from_context(ctx)
call_trace = ctx.get_item("call_trace", call_trace)
tracer = (ctx.get_item("middleware") or ctx["pin"]).tracer
tracer = ctx.get_item("tracer") or (ctx.get_item("middleware") or ctx["pin"]).tracer
distributed_headers_config = ctx.get_item("distributed_headers_config")
if distributed_headers_config:
trace_utils.activate_distributed_headers(
Expand All @@ -126,6 +126,31 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
return span


def _set_web_frameworks_tags(ctx, span, int_config):
span.set_tag_str(COMPONENT, int_config.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
span.set_tag(SPAN_MEASURED_KEY)


anayltics_enabled = ctx.get_item('analytics_enabled')
analytics_sample_rate = ctx.get_item('analytics_sample_rate', True)

# Configure trace search sample rate
if (config._analytics_enabled and anayltics_enabled is not False) or anayltics_enabled is True:
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, analytics_sample_rate)


def _on_web_framework_request(ctx, int_config):
request_span = ctx.get_item("req_span")
_set_web_frameworks_tags(ctx, request_span, int_config)
# trace_utils.set_http_meta(
# span,
# azure_functions_config,
# status_code=res.status_code if res else None,
# response_headers=res.headers if res else None,
# )


def _on_traced_request_context_started_flask(ctx):
current_span = ctx["pin"].tracer.current_span()
if not ctx["pin"].enabled or not current_span:
Expand Down Expand Up @@ -761,6 +786,9 @@ def listen():
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

# web frameworks general handlers
core.on("aiohttp.request", _on_web_framework_request)

core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
Expand All @@ -769,6 +797,7 @@ def listen():
core.on("rq.queue.enqueue_job", _propagate_context)

for context_name in (
"aiohttp.request",
"flask.call",
"flask.jsonify",
"flask.render_template",
Expand All @@ -795,5 +824,12 @@ def listen():
):
core.on(f"context.started.start_span.{context_name}", _start_span)

# # web framework specific
# breakpoint()
# for context_name in (
# "aiohttp.request"
# ):
# core.on(f"context.started.start_span.{context_name}", _start_span)


listen()
72 changes: 33 additions & 39 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import http
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_url_operation
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
Expand All @@ -35,47 +36,40 @@ async def attach_context(request):
# application configs
tracer = app[CONFIG_KEY]["tracer"]
service = app[CONFIG_KEY]["service"]
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"]
# Create a new context based on the propagated information.
trace_utils.activate_distributed_headers(
tracer,
int_config=config.aiohttp,
request_headers=request.headers,
override=distributed_tracing,
)

# trace the handler
request_span = tracer.trace(
schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
service=service,
span_type=SpanTypes.WEB,
)
request_span.set_tag(SPAN_MEASURED_KEY)

request_span.set_tag_str(COMPONENT, config.aiohttp.integration_name)

# set span.kind tag equal to type of request
request_span.set_tag_str(SPAN_KIND, SpanKind.SERVER)

# Configure trace search sample rate
# DEV: aiohttp is special case maintains separate configuration from config api
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"]
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
request_span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
request[REQUEST_SPAN_KEY] = request_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
raise
# Create a new context based on the propagated information.

breakpoint()
with core.context_with_data(
"aiohttp.request",
distributed_headers=request.headers,
headers_case_sensitive=True,
span_name=schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
distributed_headers_config=app[CONFIG_KEY],
span_type=SpanTypes.WEB,
service=service,
tags={COMPONENT: config.aiohttp.integration_name},
tracer=tracer,
analytics_enabled=analytics_enabled,
analytics_sample_rate=app[CONFIG_KEY].get("analytics_sample_rate", True)
) as ctx, ctx.span as req_span:
ctx.set_item("req_span", req_span)
core.dispatch("web.request", (ctx, config.aiohttp))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = req_span.context
request[REQUEST_SPAN_KEY] = req_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
req_span.set_traceback()
raise

return attach_context

Expand Down

0 comments on commit 5b7673e

Please sign in to comment.