diff --git a/README.md b/README.md index d91bf54e..3dba4338 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,7 @@ The traces are normalized and output in JSON to a file. The following transforma - Trace ids are overwritten to match the order in which the traces were received. - Span ids are overwritten to be the DFS order of the spans in the trace tree. +- Parent ids are overwritten using the normalized span ids. However, if the parent is not a span in the trace, the parent id is not overwritten. This is necessary for handling distributed traces where all spans are not sent to the same agent. - Span attributes are ordered to be more human-readable, with the important attributes being listed first. - Span attributes are otherwise ordered alphanumerically. - The span meta and metrics maps if empty are excluded. diff --git a/ddapm_test_agent/trace.py b/ddapm_test_agent/trace.py index 0bf55f82..be1b5544 100644 --- a/ddapm_test_agent/trace.py +++ b/ddapm_test_agent/trace.py @@ -232,11 +232,30 @@ def copy_trace(t: Trace) -> Trace: def root_span(t: Trace) -> Span: """Return the root span of the trace.""" + # Follow approach used in Datadog Agent: https://github.com/DataDog/datadog-agent/blob/927f9ca9acf7983b72a4bfbdd7a69132e1da8501/pkg/trace/traceutil/trace.go#L53 + + if len(t) == 0: + raise ValueError("empty trace: %s" % t) + + # common case optimization to check for span where parent_id is either not + # set or set to 0 for s in t: if "parent_id" not in s or s["parent_id"] is None or s["parent_id"] == 0: return s - raise ValueError("root span not found in trace: %s" % t) + # collect root spans as those with parents that are not themselves spans in trace + span_ids = set(s["span_id"] for s in t) + roots = { + s["parent_id"]: s + for s in t + if "parent_id" in s and s["parent_id"] not in span_ids + } + + if len(roots) != 1: + raise ValueError("single root span not found in trace (n=%d): %s" % (len(t), t)) + + # return any root candidate + return roots.popitem()[1] def trace_id(t: Trace) -> TraceId: diff --git a/ddapm_test_agent/trace_snapshot.py b/ddapm_test_agent/trace_snapshot.py index 94bcdb86..eccb1503 100644 --- a/ddapm_test_agent/trace_snapshot.py +++ b/ddapm_test_agent/trace_snapshot.py @@ -96,7 +96,9 @@ def _normalize_trace(trace: Trace, trace_id: TraceId) -> Trace: span["span_id"] = span_id parent_id = span.get("parent_id") if parent_id: - span["parent_id"] = new_id_map[parent_id] + # If parent_id is not in the map, assume this is a trace chunk with + # a parent not in the trace chunk. Eg: distributed traces. + span["parent_id"] = new_id_map.get(parent_id, parent_id) else: # Normalize the parent of root spans to be 0. span["parent_id"] = 0 diff --git a/releasenotes/notes/add-distributed-trace-c35a314698a3b966.yaml b/releasenotes/notes/add-distributed-trace-c35a314698a3b966.yaml new file mode 100644 index 00000000..c2d3111b --- /dev/null +++ b/releasenotes/notes/add-distributed-trace-c35a314698a3b966.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add support for distributed traces where an instrumented service sends a trace chunk where the root span has a parent not in the trace chunk. diff --git a/tests/integration_snapshots/test_trace_distributed_propagated.json b/tests/integration_snapshots/test_trace_distributed_propagated.json new file mode 100644 index 00000000..e33522bb --- /dev/null +++ b/tests/integration_snapshots/test_trace_distributed_propagated.json @@ -0,0 +1,29 @@ +[[ + { + "name": "root", + "service": null, + "resource": "root", + "trace_id": 0, + "span_id": 1, + "parent_id": 5678, + "meta": { + "runtime-id": "9118dd9528d447629254178d1bb4dbcf" + }, + "metrics": { + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "system.pid": 81934 + }, + "duration": 171000, + "start": 1653668237165110000 + }, + { + "name": "child", + "service": null, + "resource": "child", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "duration": 6000, + "start": 1653668237165133000 + }]] diff --git a/tests/test_snapshot_integration.py b/tests/test_snapshot_integration.py index 4aa16153..138ae3ff 100644 --- a/tests/test_snapshot_integration.py +++ b/tests/test_snapshot_integration.py @@ -12,6 +12,7 @@ from aiohttp.client_exceptions import ClientConnectorError from aiohttp.client_exceptions import ClientOSError from ddtrace import Tracer +from ddtrace.propagation.http import HTTPPropagator from ddtrace.sampler import DatadogSampler import pytest @@ -217,6 +218,27 @@ async def test_trace_distributed_same_payload(testagent, tracer): assert resp.status == 200 +async def test_trace_distributed_propagated(testagent, tracer): + await testagent.get( + "http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_propagated" + ) + headers = { + "x-datadog-trace-id": "1234", + "x-datadog-parent-id": "5678", + } + context = HTTPPropagator.extract(headers) + tracer.context_provider.activate(context) + + with tracer.trace("root"): + with tracer.trace("child"): + pass + tracer.flush() + resp = await testagent.get( + "http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_propagated" + ) + assert resp.status == 200 + + async def test_trace_missing_received(testagent, tracer): resp = await testagent.get( "http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received" diff --git a/tests/test_trace.py b/tests/test_trace.py index c5e88bfc..2899ac58 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -8,6 +8,7 @@ from ddapm_test_agent.trace import dfs_order from ddapm_test_agent.trace import root_span +from .trace_utils import random_id from .trace_utils import random_trace @@ -20,6 +21,16 @@ def test_random_trace(): assert bfs_order(t) +def test_trace_chunk(): + trace_id = random_id() + parent_id = random_id() + t = random_trace(10, trace_id=trace_id, parent_id=parent_id) + root = root_span(t) + assert root + assert root.get("trace_id") == trace_id + assert root.get("parent_id") == parent_id + + @pytest.mark.parametrize( "content_type, payload", [ diff --git a/tests/trace_utils.py b/tests/trace_utils.py index d25d0ff4..3006f759 100644 --- a/tests/trace_utils.py +++ b/tests/trace_utils.py @@ -1,6 +1,7 @@ from random import Random from typing import Any from typing import Dict +from typing import Optional from ddapm_test_agent.trace import Span from ddapm_test_agent.trace import Trace @@ -58,7 +59,7 @@ def span(rnd: Random = _random, **kwargs: Any) -> Span: for k in ["trace_id", "span_id"]: if k not in kwargs: - kwargs[k] = rnd.randint(0, 2**64) + kwargs[k] = random_id(rnd) # Don't assign a parent id by default if "parent_id" not in kwargs: @@ -112,7 +113,12 @@ def _prufers_trace(n: int, rnd: Random = _random) -> Trace: return list(dfs_order(spans)) -def random_trace(nspans: int, rng: Random = _random) -> Trace: +def random_trace( + nspans: int, + rng: Random = _random, + trace_id: Optional[int] = None, + parent_id: Optional[int] = None, +) -> Trace: # TODO: # represent arbitrary random services (subtrees in spans) # resource names (should only be on service entry) @@ -120,12 +126,19 @@ def random_trace(nspans: int, rng: Random = _random) -> Trace: # sampling decisions # dd_origin? assert nspans > 0 - trace_id = rng.randint(0, 2**64) + if not trace_id: + trace_id = random_id(rng) t = _prufers_trace(nspans, rng) root = root_span(t) + if parent_id: + root["parent_id"] = parent_id for s in t: if s is not root: del s["type"] del s["resource"] s["trace_id"] = trace_id return t + + +def random_id(rng: Random = _random) -> int: + return rng.randint(0, 2**64)