Skip to content

Commit

Permalink
(feat) update logic used to retrieve session traces
Browse files Browse the repository at this point in the history
Does the following:

- Clearing a session now clears traces only, instead of also deleting the session
- Requesting traces for a esssion will return a 404 Session Not found if no traces or session match a non-None session token
- Sessions now return session-less requests only up to the latest session marker
  • Loading branch information
tabgok committed Feb 13, 2024
1 parent 4776d42 commit 815476e
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 16 deletions.
54 changes: 42 additions & 12 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
from .tracestats import v06StatsPayload


class NoSuchSessionException(Exception):
pass


_Handler = Callable[[Request], Awaitable[web.Response]]


Expand Down Expand Up @@ -358,18 +362,34 @@ def _requests_by_session(self, token: Optional[str]) -> List[Request]:
"""
# Go backwards in the requests received gathering requests until
# the /session-start request for the token is found.
reqs: List[Request] = []
for req in reversed(self._requests):
# Note that this may not return all associated traces, because some
# may be generated before the session-start call
session_reqs: List[Tuple[int, Request]] = []
sessionless_reqs: List[Tuple[int, Request]] = []
matched = token is None

for i, req in enumerate(reversed(self._requests)):
if req.match_info.handler == self.handle_session_start:
if token is None or _session_token(req) == token:
if token is None:
# If no token is specified, then we match the latest session
break
else:
# The requests made were from a different manual session
# so continue.
elif _session_token(req) == token:
# If a token is specified and it matches, we've hit the start of our session
matched = True
break
elif _session_token(req) != token:
# If a token is specified and it doesn't match, we've hit the start of a different session
# So we reset the list of requests
sessionless_reqs = []
continue
if _session_token(req) in [token, None]:
reqs.append(req)
return reqs
if _session_token(req) == token:
session_reqs.append((i, req))
elif _session_token(req) is None:
sessionless_reqs.append((i, req))

if not matched and not session_reqs:
raise NoSuchSessionException(f"No session found for token '{token}'")
return [x[1] for x in sorted(session_reqs + sessionless_reqs, key=lambda x: x[0])]

async def _traces_from_request(self, req: Request) -> List[List[Span]]:
"""Return the trace from a trace request."""
Expand Down Expand Up @@ -814,7 +834,12 @@ async def handle_snapshot(self, request: Request) -> web.Response:

async def handle_session_traces(self, request: Request) -> web.Response:
token = request["session_token"]
traces = await self._traces_by_session(token)
traces = []
try:
traces = await self._traces_by_session(token)
except NoSuchSessionException as e:
return web.HTTPNotFound(reason=str(e))

return web.json_response(traces)

async def handle_session_apmtelemetry(self, request: Request) -> web.Response:
Expand Down Expand Up @@ -908,14 +933,19 @@ async def handle_session_clear(self, request: Request) -> web.Response:
if req.match_info.handler == self.handle_session_start:
if _session_token(req) == session_token:
in_token_sync_session = True
continue # Don't clear the session start
else:
in_token_sync_session = False
if in_token_sync_session:
setattr(req, "__delete", True)

# Filter out all the requests.
# Filter out all requests marked for deletion.
# Keep session starts.
self._requests = [
r for r in self._requests if _session_token(r) != session_token and not hasattr(r, "__delete")
r
for r in self._requests
if (_session_token(r) != session_token or r.match_info.handler == self.handle_session_start)
and not hasattr(r, "__delete")
]
else:
self._requests = []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
features:
- |
Updates the logic for associating requests with a session. Prior to this change:
(1) non-existent tokens were permitted and returned associations with all requests,
(2) existent tokens returned associations with all matching requests + all untokenized requests after the session was created
After this change:
(1) Requests with non-existent tokens return 400 error codes
(2) Requests with existent tokens return all matching requests + all untokenized requests after the session was created but
only up to the next session creation.
Requests without a token continue to return all requests.
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ def do_reference_v04_http_trace(
v04_reference_http_trace_payload_headers,
v04_reference_http_trace_payload_data,
):
def fn(token: Optional[str] = None) -> Awaitable[Response]:
def fn(token: Optional[str] = None, payload_override: Optional[bytes] = None) -> Awaitable[Response]:
params = {"test_session_token": token} if token is not None else {}
return agent.put( # type: ignore
"/v0.4/traces",
params=params,
headers=v04_reference_http_trace_payload_headers,
data=v04_reference_http_trace_payload_data,
data=payload_override or v04_reference_http_trace_payload_data,
)

yield fn
Expand Down
75 changes: 73 additions & 2 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
from typing import cast

import msgpack
import pytest
Expand Down Expand Up @@ -70,8 +71,7 @@ async def test_concurrent_session(
assert resp.status == 200
for token in ["test_case", "test_case2"]:
resp = await agent.get("/test/session/traces", params={"test_session_token": token})
assert resp.status == 200
assert await resp.json() == []
assert resp.status == 404


async def test_two_sessions(
Expand Down Expand Up @@ -132,3 +132,74 @@ async def test_session_requests(
assert requests[2]["url"].endswith("/v0.6/stats?test_session_token=test_case")
assert requests[3]["method"] == "POST"
assert requests[3]["url"].endswith("/telemetry/proxy/api/v2/apmtelemetry?test_session_token=test_case")


async def test_404_when_session_doesnt_exist(agent):
"""When a session that doesn't exist is requested, we should get an error."""
resp = await agent.get("/test/session/traces", params={"test_session_token": "nonexistent"})
assert resp.status == 404


async def test_empty_session_ok(agent):
"""When a session exists but has no traces, we should get an OK with an empty list returned"""
resp = await agent.get("/test/session/start", params={"test_session_token": "emptysession"})
assert resp.status == 200
resp = await agent.get("/test/session/traces", params={"test_session_token": "emptysession"})
assert resp.status == 200
assert await resp.json() == []


async def test_session_association_of_untokenized_traces(
agent, do_reference_v04_http_trace, v04_reference_http_trace_payload_data_raw
):
"""Requests sent without a session token are associated with the session that proceeds it"""
# Create session A, trace (no token), then create session B => assert the trace belong to A and not B
resp = await agent.get("/test/session/start", params={"test_session_token": "sessiona"})
assert resp.status == 200
await do_reference_v04_http_trace()
resp = await agent.get("/test/session/start", params={"test_session_token": "sessionb"})
assert resp.status == 200
resp_a = await agent.get("/test/session/traces", params={"test_session_token": "sessiona"})
assert resp_a.status == 200
a_results = await resp_a.json()
first_a_trace_id = a_results[0][0]["trace_id"]
resp_b = await agent.get("/test/session/traces", params={"test_session_token": "sessionb"})
assert resp_b.status == 200
b_results = await resp_b.json()
assert len(a_results) == 1
assert b_results == []

# Create a new trace (no token) and assert that sessions A and B now have a trace
await do_reference_v04_http_trace()
resp_a = await agent.get("/test/session/traces", params={"test_session_token": "sessiona"})
resp_a_results = await resp_a.json()
assert resp_a.status == 200
resp_b = await agent.get("/test/session/traces", params={"test_session_token": "sessionb"})
resp_b_results = await resp_b.json()
assert resp_b.status == 200
assert len(resp_a_results) == 1
assert len(resp_b_results) == 1

# Recreate session A, generate a "new" trace, and show that the trace for A is new
resp = await agent.get("/test/session/start", params={"test_session_token": "sessiona"})
assert resp.status == 200
v04_reference_http_trace_payload_data_raw[0][0]["trace_id"] = 2
v04_modified_payload_bytes = cast(bytes, msgpack.packb(v04_reference_http_trace_payload_data_raw))
await do_reference_v04_http_trace(payload_override=v04_modified_payload_bytes)
second_resp_a = await agent.get("/test/session/traces", params={"test_session_token": "sessiona"})
assert second_resp_a.status == 200
new_a_results = await second_resp_a.json()
assert len(new_a_results) == 1
second_a_trace_id = new_a_results[0][0]["trace_id"]
assert second_a_trace_id == 2
assert first_a_trace_id != second_a_trace_id


async def test_session_results_with_token_but_no_session_start(
agent, do_reference_v04_http_trace, v04_reference_http_trace_payload_data_raw
):
"""Requests sent without a session token are associated with the session that proceeds it"""
await do_reference_v04_http_trace(token="nosessionstart")
resp = await agent.get("/test/session/traces", params={"test_session_token": "nosessionstart"})
assert resp.status == 200
assert await resp.json() == v04_reference_http_trace_payload_data_raw
1 change: 1 addition & 0 deletions tests/test_snapshot_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ async def test_tracestats(
do_traces: Callable[[Tracer], None],
fail: bool,
) -> None:
await testagent.get("http://localhost:8126/test/session/start?test_session_token=test_trace_stats")
do_traces(stats_tracer)
stats_tracer.shutdown() # force out the stats
resp = await testagent.get("http://localhost:8126/test/session/snapshot?test_session_token=test_trace_stats")
Expand Down

0 comments on commit 815476e

Please sign in to comment.