Skip to content

Commit

Permalink
Add support for span events
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Costa <[email protected]>
  • Loading branch information
marcotc committed Mar 7, 2025
1 parent 245d310 commit de7c4df
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 25 deletions.
1 change: 1 addition & 0 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ async def handle_info(self, request: Request) -> web.Response:
"client_drop_p0s": True,
# Just a random selection of some peer_tags to aggregate on for testing, not exhaustive
"peer_tags": ["db.name", "mongodb.db", "messaging.system"],
"span_events": True, # Advertise support for the top-level Span field for Span Events
},
headers={"Datadog-Agent-State": "03e868b3ecdd62a91423cc4c3917d0d151fb9fa486736911ab7f5a0750c63824"},
)
Expand Down
199 changes: 190 additions & 9 deletions ddapm_test_agent/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class SpanLink(TypedDict):
flags: NotRequired[Optional[int]]


class SpanEvent(TypedDict):
time_unix_nano: int
name: str
attributes: NotRequired[Dict[str, Any]]


class Span(TypedDict):
name: str
span_id: SpanId
Expand All @@ -71,6 +77,7 @@ class Span(TypedDict):
meta: NotRequired[Dict[str, str]]
metrics: NotRequired[Dict[str, MetricType]]
span_links: NotRequired[List[SpanLink]]
span_events: NotRequired[List[SpanEvent]]
meta_struct: NotRequired[Dict[str, Dict[str, Any]]]


Expand All @@ -88,9 +95,20 @@ class Span(TypedDict):
"meta",
"metrics",
"span_links",
"span_events",
"meta_struct",
]
TopLevelSpanValue = Union[None, SpanId, TraceId, int, str, Dict[str, str], Dict[str, MetricType], List[SpanLink]]
TopLevelSpanValue = Union[
None,
SpanId,
TraceId,
int,
str,
Dict[str, str],
Dict[str, MetricType],
List[SpanLink],
List[SpanEvent],
]
Trace = List[Span]
v04TracePayload = List[List[Span]]
TraceMap = OrderedDict[int, Trace]
Expand Down Expand Up @@ -123,9 +141,11 @@ def verify_span(d: Any) -> Span:
assert isinstance(d["error"], int), "Expected error to be of type: 'int', got: " + str(type(d["error"]))
if "meta" in d:
assert isinstance(d["meta"], dict)
for k, v in d["meta"].items():
for k, attr in d["meta"].items():
assert isinstance(k, str), f"Expected key 'meta.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(v, str), f"Expected value of key 'meta.{k}' to be of type: 'str', got: {type(v)}"
assert isinstance(
attr, str
), f"Expected value of key 'meta.{k}' to be of type: 'str', got: {type(attr)}"
if "meta_struct" in d:
assert isinstance(d["meta_struct"], dict)
for k, val in d["meta_struct"].items():
Expand All @@ -139,11 +159,11 @@ def verify_span(d: Any) -> Span:
), f"Expected key 'meta_struct.{k}.{inner_k}' to be of type: 'str', got: {type(inner_k)}"
if "metrics" in d:
assert isinstance(d["metrics"], dict)
for k, v in d["metrics"].items():
for k, attr in d["metrics"].items():
assert isinstance(k, str), f"Expected key 'metrics.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(
v, (int, float)
), f"Expected value of key 'metrics.{k}' to be of type: 'float/int', got: {type(v)}"
attr, (int, float)
), f"Expected value of key 'metrics.{k}' to be of type: 'float/int', got: {type(attr)}"
if "span_links" in d:
assert isinstance(d["span_links"], list)
for link in d["span_links"]:
Expand All @@ -163,11 +183,11 @@ def verify_span(d: Any) -> Span:
), "Expected 'trace_id_high' to be of type: 'int', got: " + str(type(link["trace_id_high"]))
if "attributes" in link:
assert isinstance(link["attributes"], dict)
for k, v in link["attributes"].items():
for k, attr in link["attributes"].items():
assert isinstance(k, str), f"Expected key 'attributes.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(
v, str
), f"Expected value of key 'attributes.{k}' to be of type: 'str', got: {type(v)}"
attr, str
), f"Expected value of key 'attributes.{k}' to be of type: 'str', got: {type(attr)}"
if "tracestate" in link:
assert isinstance(
link["tracestate"], (str, NoneType) # type: ignore
Expand All @@ -176,6 +196,108 @@ def verify_span(d: Any) -> Span:
assert isinstance(link["flags"], int), "Expected flags to be of type: 'int', got: " + str(
type(link["flags"])
)
if "span_events" in d:
assert isinstance(d["span_events"], list)
for event in d["span_events"]:
assert isinstance(event, dict), f"Expected all span_events to be of type: 'dict', got: {type(event)}"
required_attrs = ["time_unix_nano", "name"]
for attr in required_attrs:
assert attr in event, f"'{attr}' required in span event"
assert isinstance(
event["time_unix_nano"], int
), "Expected 'time_unix_nano' to be of type: 'int', got: " + str(type(event["time_unix_nano"]))
assert isinstance(event["name"], str), "Expected 'name' to be of type: 'str', got: " + str(
type(event["name"])
)
if "attributes" in event:
assert isinstance(event["attributes"], dict)
for k, attr in event["attributes"].items():
assert isinstance(k, str), f"Expected key 'attributes.{k}' to be of type: 'str', got: {type(k)}"
assert isinstance(
attr, dict
), f"Expected value 'attributes.{k}={attr}' to be of type: 'dict', got: {type(attr)}"

# for this data struture for attr:
# // AttributeAnyValue is an Object with String keys, defined below.
# // We have to implement a union type manually here because Go's MessagePack generator does not support
# // unions: https://github.com/tinylib/msgp/issues/184
# {
# // Represents the type of the value represented in this attribute entry.
# // For String values: "type": 0
# // For Boolean values: "type": 1
# // For Integer values: "type": 2
# // For Double values: "type": 3
# // For Array values: "type": 4
# "type": Integer,
#
# // Populate with a String value if `type` is 0, otherwise do not include this field.
# "string_value": String,
# // Populate with a Boolean value if `type` is 1, otherwise do not include this field.
# "bool_value": Boolean,
# // Populate with a Integer value if `type` is 2, otherwise do not include this field.
# "int_value": Integer,
# // Populate with a Double value if `type` is 3, otherwise do not include this field.
# "double_value": Double,
# // Populate with a Array value if `type` is 4, otherwise do not include this field.
# "array_value": Array<AttributeArrayValue>,
# }
# assert its values

if attr["type"] == 0:
assert isinstance(
attr["string_value"], str
), f"Expected 'string_value' to be of type: 'str', got: {type(attr['string_value'])}"
elif attr["type"] == 1:
assert isinstance(
attr["bool_value"], bool
), f"Expected 'bool_value' to be of type: 'bool', got: {type(attr['bool_value'])}"
elif attr["type"] == 2:
assert isinstance(
attr["int_value"], int
), f"Expected 'int_value' to be of type: 'int', got: {type(attr['int_value'])}"
elif attr["type"] == 3:
assert isinstance(
attr["double_value"], float
), f"Expected 'double_value' to be of type: 'float', got: {type(attr['double_value'])}"
elif attr["type"] == 4:
assert isinstance(
attr["array_value"], list
), f"Expected 'array_value' to be of type: 'list', got: {type(attr['array_value'])}"
array = attr["array_value"]
if array:
first_type = array[0]["type"]
i = None
assert all(
i["type"] == first_type for i in array
), f"Expected all elements in list to be of the same type: '{first_type}', got: {i['type']}"

for e in array:
assert isinstance(
e, dict
), f"Expected all elements in 'array_value' to be of type: 'dict', got: {type(e)}"
if e["type"] == 0:
assert isinstance(
e["string_value"], str
), f"Expected 'string_value' to be of type: 'str', got: {type(e['string_value'])}"
elif e["type"] == 1:
assert isinstance(
e["bool_value"], bool
), f"Expected 'bool_value' to be of type: 'bool', got: {type(e['bool_value'])}"
elif e["type"] == 2:
assert isinstance(
e["int_value"], int
), f"Expected 'int_value' to be of type: 'int', got: {type(e['int_value'])}"
elif e["type"] == 3:
assert isinstance(
e["double_value"], float
), f"Expected 'double_value' to be of type: 'float', got: {type(e['double_value'])}"
else:
raise ValueError(
f"Unsupported span event attribute type {attr['type']} for: {k}={attr}"
)
else:
raise ValueError(f"Unsupported span event attribute type {attr['type']} for: {k}={attr}")

return cast(Span, d)
except AssertionError as e:
raise TypeError(*e.args) from e
Expand Down Expand Up @@ -304,17 +426,37 @@ def copy_span_links(s: SpanLink) -> SpanLink:
return copy


def copy_span_events(s: SpanEvent) -> SpanEvent:
attributes = s["attributes"].copy() if "attributes" in s else None
copy = s.copy()
if attributes is not None:
# Copy arrays inside attributes
for k, v in attributes.items():
if isinstance(v, dict) and v["type"] == "array_value":
array = v["array_value"]

value = v.copy()
value["array_value"] = array.copy()

attributes[k] = value
copy["attributes"] = attributes
return copy


def copy_span(s: Span) -> Span:
meta = s["meta"].copy() if "meta" in s else None
metrics = s["metrics"].copy() if "metrics" in s else None
links = s["span_links"].copy() if "span_links" in s else None
events = s["span_events"].copy() if "span_events" in s else None
copy = s.copy()
if meta is not None:
copy["meta"] = meta
if metrics is not None:
copy["metrics"] = metrics
if links is not None:
copy["span_links"] = [copy_span_links(link) for link in links]
if events is not None:
copy["span_events"] = [copy_span_events(event) for event in events]
return copy


Expand Down Expand Up @@ -379,6 +521,45 @@ def add_span_link(
return s


def add_span_event(
s: Span, time_unix_nano: int = 1730405656000000000, name: str = "event", attributes: Optional[Dict[str, Any]] = None
) -> Span:
if "span_events" not in s:
s["span_events"] = []
new_event = SpanEvent(time_unix_nano=time_unix_nano, name=name)
if attributes is not None:
# Add protobuf-like structure for attributes
new_attributes = {}
for k, v in attributes.items():
if isinstance(v, str):
new_attributes[k] = {"type": 0, "string_value": v}
elif isinstance(v, bool):
new_attributes[k] = {"type": 1, "bool_value": v}
elif isinstance(v, int):
new_attributes[k] = {"type": 2, "int_value": v}
elif isinstance(v, float):
new_attributes[k] = {"type": 3, "double_value": v}
elif isinstance(v, list):
array_value: List[Dict[str, Any]] = []
new_attributes[k] = {"type": 4, "array_value": array_value}
for i in v:
if isinstance(i, str):
array_value.append({"type": 0, "string_value": i})
elif isinstance(i, bool):
array_value.append({"type": 1, "bool_value": i})
elif isinstance(i, int):
array_value.append({"type": 2, "int_value": i})
elif isinstance(i, float):
array_value.append({"type": 3, "double_value": i})
else:
raise ValueError(f"Unsupported span event attribute type {type(i)} for: {k}={v}")
else:
raise ValueError(f"Unsupported span event attribute type {type(v)} for: {k}={v}")
new_event["attributes"] = new_attributes
s["span_events"].append(new_event)
return s


def _trace_decoder_flexible(json_string: bytes) -> Dict[str, Any]:
"""Parse Trace JSON and accounts for meta that may contain numbers such as ports. Converts these meta correctly to strings.
Also ensures that any valid integers/floats are correctly parsed, to prevent ids from being decoded as strings incorrectly.
Expand Down
Loading

0 comments on commit de7c4df

Please sign in to comment.