Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delay for telemetry requests #198

Merged
merged 8 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,22 @@ Returns the tracer-flares in the following json format:

If there was an error parsing the tracer-flare form, that will be recorded under `error`.

### /test/settings (POST)

Allows to change some settings on the fly.
This endpoint takes a POST request with a json content listing the keys and values to apply.

```json
{ 'key': value }
```

Supported keys:
- `trace_request_delay`: sets a delay to apply to trace and telemetry requests

```
curl -X POST 'http://0.0.0.0:8126/test/settings' -d '{ "trace_request_delay": 5 }'
```

## Development

### Prerequisites
Expand Down
27 changes: 22 additions & 5 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from . import trace_snapshot
from . import tracestats_snapshot
from .apmtelemetry import TelemetryEvent
from .apmtelemetry import v2_decode as v2_apmtelemetry_decode
from .apmtelemetry import v2_decode_request as v2_apmtelemetry_decode_request
from .checks import CheckTrace
from .checks import Checks
from .checks import start_trace
Expand Down Expand Up @@ -332,7 +332,7 @@ async def apmtelemetry(self) -> List[TelemetryEvent]:
_events: List[TelemetryEvent] = []
for req in reversed(self._requests):
if req.match_info.handler == self.handle_v2_apmtelemetry:
_events.append(v2_apmtelemetry_decode(await req.read()))
_events.append(await v2_apmtelemetry_decode_request(req, await req.read()))
return _events

async def _trace_by_trace_id(self, trace_id: int) -> Trace:
Expand Down Expand Up @@ -435,7 +435,7 @@ async def _apmtelemetry_by_session(self, token: Optional[str]) -> List[Telemetry
events: List[TelemetryEvent] = []
for req in self._requests_by_session(token):
if req.match_info.handler == self.handle_v2_apmtelemetry:
events.append(v2_apmtelemetry_decode(await req.read()))
events.append(await v2_apmtelemetry_decode_request(req, await req.read()))

# TODO: Sort the events?
return events
Expand Down Expand Up @@ -590,7 +590,7 @@ async def handle_v07_remoteconfig_put(self, request: Request) -> web.Response:
return web.HTTPAccepted()

async def handle_v2_apmtelemetry(self, request: Request) -> web.Response:
v2_apmtelemetry_decode(self._request_data(request))
await v2_apmtelemetry_decode_request(request, self._request_data(request))
# TODO: Validation
# TODO: Snapshots
return web.HTTPOk()
Expand Down Expand Up @@ -661,6 +661,22 @@ async def handle_get_tested_integrations(self, request: Request) -> web.Response
aggregated_text = ",".join(text_headers) + "\n" + aggregated_text
return web.Response(body=aggregated_text, content_type="text/plain", headers=req_headers)

async def handle_settings(self, request: Request) -> web.Response:
"""Allow to change test agent settings on the fly"""
raw_data = await request.read()
data = json.loads(raw_data)

# First pass to validate the data
for key in data:
if key not in request.app:
return web.HTTPUnprocessableEntity(text=f"Unknown key: '{key}'")

# Second pass to apply the config
for key in data:
request.app[key] = data[key]

return web.HTTPAccepted()

async def handle_info(self, request: Request) -> web.Response:
return web.json_response(
{
Expand Down Expand Up @@ -1142,6 +1158,7 @@ def make_app(
web.get("/test/trace_check/clear", agent.clear_trace_check_failures),
web.get("/test/trace_check/summary", agent.get_trace_check_summary),
web.get("/test/integrations/tested_versions", agent.handle_get_tested_integrations),
web.post("/test/settings", agent.handle_settings),
]
)
checks = Checks(
Expand Down Expand Up @@ -1254,7 +1271,7 @@ def main(args: Optional[List[str]] = None) -> None:
"--trace-request-delay",
type=float,
default=os.environ.get("DD_TEST_STALL_REQUEST_SECONDS", 0.0),
help=("Will stall trace requests for specified amount of time"),
help=("Will stall trace and telemetry requests for specified amount of time"),
)
parser.add_argument(
"--suppress-trace-parse-errors",
Expand Down
18 changes: 18 additions & 0 deletions ddapm_test_agent/apmtelemetry.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
import asyncio
import json
import logging
from typing import Any
from typing import Dict
from typing import cast

from aiohttp.web import Request


log = logging.getLogger(__name__)
TelemetryEvent = Dict[str, Any]


async def v2_decode_request(request: Request, data: bytes) -> TelemetryEvent:
headers = request.headers

if "X-Datadog-Test-Stall-Seconds" in headers:
duration = float(headers["X-Datadog-Test-Stall-Seconds"])
else:
duration = request.app["trace_request_delay"]
if duration > 0:
log.info("Stalling for %r seconds.", duration)
await asyncio.sleep(duration)
return v2_decode(data)


def v2_decode(data: bytes) -> TelemetryEvent:
"""Decode v2 apm telemetry request data as a dict"""
# TODO: Handle decoding into a telemetry payload object
Expand Down
7 changes: 7 additions & 0 deletions releasenotes/notes/Settings-endpoint-928ae5b6e8ddc625.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
Added a /test/settings endpoint to change the settings on the fly.
Copy link
Member

@Kyle-Verhoog Kyle-Verhoog Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little bit iffy about exposing the raw config values direct to users. This'll probably result in accidental breakages to downstream test suites that depend on config values that we might accidentally change in the test agent.

If we do wanna go this route then I'll want to ensure we have tests covering the config names we want to be stable. Otherwise can we hold off on adding this endpoint for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in faf6994

It takes a json encoded list of key/values, that will be applied to the app dict.
Example:
{ "trace_request_delay": 5 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
--trace-request-delay now also affects telemetry requests.
The delay can be set independently for each request by setting the X-Datadog-Test-Stall-Seconds header.
32 changes: 32 additions & 0 deletions tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,35 @@ def test_uds(tmp_path, available_port):
pass
else:
raise Exception("Test agent failed to start")


async def test_post_known_settings(agent):
resp = await agent.post(
"/test/settings",
data='{ "trace_request_delay": 5 }',
)

assert resp.status == 202, await resp.text()
assert agent.app["trace_request_delay"] == 5

resp = await agent.post(
"/test/settings",
data='{ "trace_request_delay": 0 }',
)

assert resp.status == 202, await resp.text()
assert agent.app["trace_request_delay"] == 0


async def test_post_unknown_settings(
agent,
):
resp = await agent.post(
"/test/settings",
data='{ "dummy_setting": 5 }',
)

assert resp.status == 422
text = await resp.text()
assert text == "Unknown key: 'dummy_setting'"
assert "dummy_setting" not in agent.app
Loading