Skip to content

Commit

Permalink
feat: add support for distributed trace chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
majorgreys committed May 27, 2022
1 parent 6330d47 commit f41a756
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 5 deletions.
21 changes: 20 additions & 1 deletion ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,30 @@ def copy_trace(t: Trace) -> Trace:

def root_span(t: Trace) -> Span:
"""Return the root span of the trace."""
# Follow approach used in Datadog Agent: https://github.com/DataDog/datadog-agent/blob/927f9ca9acf7983b72a4bfbdd7a69132e1da8501/pkg/trace/traceutil/trace.go#L53

if len(t) == 0:
raise ValueError("empty trace: %s" % t)

# common case optimization to check for span where parent_id is either not
# set or set to 0
for s in t:
if "parent_id" not in s or s["parent_id"] is None or s["parent_id"] == 0:
return s

raise ValueError("root span not found in trace: %s" % t)
# collect root spans as those with parents that are not themselves spans in trace
span_ids = set(s["span_id"] for s in t)
roots = {
s["parent_id"]: s
for s in t
if "parent_id" in s and s["parent_id"] not in span_ids
}

if len(roots) != 1:
raise ValueError("single root span not found in trace: %s" % t)

# return any root candidate
return roots.popitem()[1]


def trace_id(t: Trace) -> TraceId:
Expand Down
4 changes: 3 additions & 1 deletion ddapm_test_agent/trace_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def _normalize_trace(trace: Trace, trace_id: TraceId) -> Trace:
span["span_id"] = span_id
parent_id = span.get("parent_id")
if parent_id:
span["parent_id"] = new_id_map[parent_id]
# If parent_id is not in the map, assume this is a trace chunk with
# a parent not in the trace chunk. Eg: distributed traces.
span["parent_id"] = new_id_map.get(parent_id, parent_id)
else:
# Normalize the parent of root spans to be 0.
span["parent_id"] = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add support for distributed traces where an instrumented service sends a trace chunk where the root span has a parent not in the trace chunk.
29 changes: 29 additions & 0 deletions tests/integration_snapshots/test_trace_distributed_propagated.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[[
{
"name": "root",
"service": null,
"resource": "root",
"trace_id": 0,
"span_id": 1,
"parent_id": 5678,
"meta": {
"runtime-id": "9118dd9528d447629254178d1bb4dbcf"
},
"metrics": {
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"system.pid": 81934
},
"duration": 171000,
"start": 1653668237165110000
},
{
"name": "child",
"service": null,
"resource": "child",
"trace_id": 0,
"span_id": 2,
"parent_id": 1,
"duration": 6000,
"start": 1653668237165133000
}]]
22 changes: 22 additions & 0 deletions tests/test_snapshot_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aiohttp.client_exceptions import ClientConnectorError
from aiohttp.client_exceptions import ClientOSError
from ddtrace import Tracer
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.sampler import DatadogSampler
import pytest

Expand Down Expand Up @@ -210,6 +211,27 @@ async def test_trace_distributed_same_payload(testagent, tracer):
assert resp.status == 200


async def test_trace_distributed_propagated(testagent, tracer):
await testagent.get(
"http://localhost:8126/test/session/start?test_session_token=test_trace_distributed_propagated"
)
headers = {
"x-datadog-trace-id": "1234",
"x-datadog-parent-id": "5678",
}
context = HTTPPropagator.extract(headers)
tracer.context_provider.activate(context)

with tracer.trace("root"):
with tracer.trace("child"):
pass
tracer.flush()
resp = await testagent.get(
"http://localhost:8126/test/session/snapshot?test_session_token=test_trace_distributed_propagated"
)
assert resp.status == 200


async def test_trace_missing_received(testagent, tracer):
resp = await testagent.get(
"http://localhost:8126/test/session/start?test_session_token=test_trace_missing_received"
Expand Down
11 changes: 11 additions & 0 deletions tests/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ddapm_test_agent.trace import dfs_order
from ddapm_test_agent.trace import root_span

from .trace_utils import random_id
from .trace_utils import random_trace


Expand All @@ -20,6 +21,16 @@ def test_random_trace():
assert bfs_order(t)


def test_trace_chunk():
trace_id = random_id()
parent_id = random_id()
t = random_trace(10, trace_id=trace_id, parent_id=parent_id)
root = root_span(t)
assert root
assert root.get("trace_id") == trace_id
assert root.get("parent_id") == parent_id


@pytest.mark.parametrize(
"content_type, payload",
[
Expand Down
19 changes: 16 additions & 3 deletions tests/trace_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from random import Random
from typing import Any
from typing import Dict
from typing import Optional

from ddapm_test_agent.trace import Span
from ddapm_test_agent.trace import Trace
Expand Down Expand Up @@ -58,7 +59,7 @@ def span(rnd: Random = _random, **kwargs: Any) -> Span:

for k in ["trace_id", "span_id"]:
if k not in kwargs:
kwargs[k] = rnd.randint(0, 2**64)
kwargs[k] = random_id(rnd)

# Don't assign a parent id by default
if "parent_id" not in kwargs:
Expand Down Expand Up @@ -112,20 +113,32 @@ def _prufers_trace(n: int, rnd: Random = _random) -> Trace:
return list(dfs_order(spans))


def random_trace(nspans: int, rng: Random = _random) -> Trace:
def random_trace(
nspans: int,
rng: Random = _random,
trace_id: Optional[int] = None,
parent_id: Optional[int] = None,
) -> Trace:
# TODO:
# represent arbitrary random services (subtrees in spans)
# resource names (should only be on service entry)
# smarter type (should only be on service entry)
# sampling decisions
# dd_origin?
assert nspans > 0
trace_id = rng.randint(0, 2**64)
if not trace_id:
trace_id = random_id(rng)
t = _prufers_trace(nspans, rng)
root = root_span(t)
if parent_id:
root["parent_id"] = parent_id
for s in t:
if s is not root:
del s["type"]
del s["resource"]
s["trace_id"] = trace_id
return t


def random_id(rng: Random = _random) -> int:
return rng.randint(0, 2**64)

0 comments on commit f41a756

Please sign in to comment.