Skip to content

Commit

Permalink
chore(telemetry): fix flaky tests part 2 (#11052)
Browse files Browse the repository at this point in the history
Improves the reliability of instrumentation telemetry tests by:

- Providing a mechanism to filter out telemetry events with unexpected
runtime IDs in TelemetryTestSession.get_events(..). This enhances
control over which telemetry events are captured in tests
- Fixing the flaky `test_update_dependencies_event_when_disabled` by
ensuring DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED=False is set.
Previously, this test was run with default environment variables, which
introduced flakiness.
- Fixing the flaky `test_unhandled_integration_error` by avoiding
assertions on the runtime ID of captured telemetry events. This
assertion was a significant source of flakiness and was the main reason
the subprocess parameter was added to
TelemetryTestSession.get_events(..).


A little unrelated but this PR also refactors the
`test_app_started_event_configuration_override` to use
_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED instead of
manually starting the telemetry writer.
- Note: Importing from ddtrace.internal.telemetry previously created the
symbol_db and dynamic_instrumentation objects as a side effect. Since we
have removed this side effect, we must create these configurations in
the test.
- I can move this to another PR but this seems like a minor test change
that's not worth the effort.


## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [ ] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
mabdinur authored Oct 22, 2024
1 parent f5314ed commit a046898
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 41 deletions.
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ddtrace.internal.compat import parse
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.service import ServiceStatus
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.telemetry import TelemetryWriter
Expand Down Expand Up @@ -550,12 +551,16 @@ def get_requests(self, request_type=None, filter_heartbeats=True):

return sorted(requests, key=lambda r: r["body"]["seq_id"], reverse=True)

def get_events(self, event_type=None, filter_heartbeats=True):
def get_events(self, event_type=None, filter_heartbeats=True, subprocess=False):
"""Get a list of the event payloads sent to the test agent
Results are in reverse order by ``seq_id``
"""
requests = self.get_requests(event_type, filter_heartbeats)
if subprocess:
# Use get_runtime_id to filter telemetry events generated in the current process
runtime_id = get_runtime_id()
requests = [req for req in requests if req["body"]["runtime_id"] != runtime_id]
return [req["body"] for req in requests]

def get_metrics(self, name=None):
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_setting_origin_environment(test_agent_session, run_python_code_in_subpr
)
assert status == 0, err

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLE_RATE")

assert {
Expand Down Expand Up @@ -95,7 +95,7 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess):
)
assert status == 0, err

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLE_RATE")
assert {
"name": "DD_TRACE_SAMPLE_RATE",
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_remoteconfig_sampling_rate_default(test_agent_session, run_python_code_
)
assert status == 0, err

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLE_RATE")
assert {"name": "DD_TRACE_SAMPLE_RATE", "value": 1.0, "origin": "default"} in events_trace_sample_rate

Expand All @@ -200,7 +200,7 @@ def test_remoteconfig_sampling_rate_telemetry(test_agent_session, run_python_cod
)
assert status == 0, err

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
events_trace_sample_rate = _get_telemetry_config_items(events, "DD_TRACE_SAMPLE_RATE")
assert {"name": "DD_TRACE_SAMPLE_RATE", "value": 0.5, "origin": "remote_config"} in events_trace_sample_rate

Expand Down Expand Up @@ -237,7 +237,7 @@ def test_remoteconfig_header_tags_telemetry(test_agent_session, run_python_code_
)
assert status == 0, err

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
events_trace_header_tags = _get_telemetry_config_items(events, "DD_TRACE_HEADER_TAGS")
assert {
"name": "DD_TRACE_HEADER_TAGS",
Expand Down
4 changes: 2 additions & 2 deletions tests/runtime/test_runtime_metrics_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def find_telemetry_event(events, request_type):
_, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
# app-started, app-closing, app-client-configuration-change, app-dependencies-loaded
assert len(events) == 4

Expand Down Expand Up @@ -127,7 +127,7 @@ def find_telemetry_event(events, request_type):
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
# app-started, app-closing, app-client-configuration-change, app-integrations-change, app-dependencies-loaded
assert len(events) == 5

Expand Down
30 changes: 10 additions & 20 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
runtime_id = stdout.strip().decode("utf-8")

# Validate that one app-closing event was sent and it was queued in the parent process
app_closing = test_agent_session.get_events("app-closing")
app_closing = test_agent_session.get_events("app-closing", subprocess=True)
assert len(app_closing) == 1
assert app_closing[0]["runtime_id"] == runtime_id

# Validate that one app-started event was sent and it was queued in the parent process
app_started = test_agent_session.get_events("app-started")
app_started = test_agent_session.get_events("app-started", subprocess=True)
assert len(app_started) == 1
assert app_started[0]["runtime_id"] == runtime_id

Expand Down Expand Up @@ -97,7 +97,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess
runtime_id = stdout.strip().decode("utf-8")

# Allow test agent session to capture all heartbeat events
app_heartbeats = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False)
app_heartbeats = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False, subprocess=True)
assert len(app_heartbeats) > 0
for hb in app_heartbeats:
assert hb["runtime_id"] == runtime_id
Expand Down Expand Up @@ -177,7 +177,7 @@ def process_trace(self, trace):
assert status == 0, stderr
assert b"Exception raised in trace filter" in stderr

events = test_agent_session.get_events("app-started")
events = test_agent_session.get_events("app-started", subprocess=True)

assert len(events) == 1

Expand Down Expand Up @@ -221,7 +221,7 @@ def pre_ddtrace_exc_hook(exctype, value, traceback):
# Regression test for invalid number of arguments in wrapped exception hook
assert b"3 positional arguments but 4 were given" not in stderr

app_starteds = test_agent_session.get_events("app-started")
app_starteds = test_agent_session.get_events("app-started", subprocess=True)
assert len(app_starteds) == 1
# app-started captures unhandled exceptions raised in application code
assert app_starteds[0]["payload"]["error"]["code"] == 1
Expand Down Expand Up @@ -252,7 +252,7 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
expected_stderr = b"failed to import"
assert expected_stderr in stderr

integrations_events = test_agent_session.get_events("app-integrations-change")
integrations_events = test_agent_session.get_events("app-integrations-change", subprocess=True)
assert len(integrations_events) == 1
assert (
integrations_events[0]["payload"]["integrations"][0]["error"]
Expand All @@ -270,9 +270,6 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code_in_subprocess):
code = """
import logging
logging.basicConfig()
import flask
f = flask.Flask("hi")
Expand All @@ -286,20 +283,13 @@ def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code

assert b"not enough values to unpack (expected 2, got 0)" in stderr, stderr

events = test_agent_session.get_events()
assert len(events) > 0
# Same runtime id is used
first_runtimeid = events[0]["runtime_id"]
for event in events:
assert event["runtime_id"] == first_runtimeid

app_started_event = [event for event in events if event["request_type"] == "app-started"]
app_started_event = test_agent_session.get_events("app-started", subprocess=True)
assert len(app_started_event) == 1
assert app_started_event[0]["payload"]["error"]["code"] == 1
assert "ddtrace/contrib/internal/flask/patch.py" in app_started_event[0]["payload"]["error"]["message"]
assert "not enough values to unpack (expected 2, got 0)" in app_started_event[0]["payload"]["error"]["message"]

integration_events = [event for event in events if event["request_type"] == "app-integrations-change"]
integration_events = test_agent_session.get_events("app-integrations-change", subprocess=True)
integrations = integration_events[0]["payload"]["integrations"]

(flask_integration,) = [integration for integration in integrations if integration["name"] == "flask"]
Expand Down Expand Up @@ -332,7 +322,7 @@ def test_app_started_with_install_metrics(test_agent_session, run_python_code_in
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env)
assert status == 0, stderr

app_started_event = test_agent_session.get_events("app-started")
app_started_event = test_agent_session.get_events("app-started", subprocess=True)
assert len(app_started_event) == 1
assert app_started_event[0]["payload"]["install_signature"] == {
"install_id": "68e75c48-57ca-4a12-adfc-575c4b05fcbe",
Expand All @@ -355,7 +345,7 @@ def test_instrumentation_telemetry_disabled(test_agent_session, run_python_code_
"""
_, stderr, status, _ = run_python_code_in_subprocess(code, env=env)

events = test_agent_session.get_events()
events = test_agent_session.get_events(subprocess=True)
assert len(events) == 0

assert status == 0, stderr
Expand Down
25 changes: 12 additions & 13 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,12 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
which is then sent by periodic()
"""
code = """
import logging
logging.basicConfig()
# most configurations are reported when ddtrace.auto is imported
import ddtrace.auto
# By default telemetry collection is enabled after 10 seconds, so we either need to
# to sleep for 10 seconds or manually call _app_started() to generate the app started event.
# This delay allows us to collect start up errors and dynamic configurations
import ddtrace
ddtrace.internal.telemetry.telemetry_writer._app_started()
# report configurations not used by ddtrace.auto
import ddtrace.settings.symbol_db
import ddtrace.settings.dynamic_instrumentation
import ddtrace.settings.exception_replay
"""

env = os.environ.copy()
Expand Down Expand Up @@ -271,6 +267,10 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
env["DD_TRACE_PARTIAL_FLUSH_ENABLED"] = "false"
env["DD_TRACE_PARTIAL_FLUSH_MIN_SPANS"] = "3"
env["DD_SITE"] = "datadoghq.com"
# By default telemetry collection is enabled after 10 seconds, so we either need to
# to sleep for 10 seconds or manually call _app_started() to generate the app started event.
# This delay allows us to collect start up errors and dynamic configurations
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"

_, stderr, status, _ = run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
Expand Down Expand Up @@ -398,7 +398,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
{"name": "DD_PROFILING__FORCE_LEGACY_EXPORTER", "origin": "env_var", "value": True},
{"name": "DD_REMOTE_CONFIGURATION_ENABLED", "origin": "env_var", "value": True},
{"name": "DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "origin": "env_var", "value": 1.0},
{"name": "DD_RUNTIME_METRICS_ENABLED", "origin": "unknown", "value": True},
{"name": "DD_RUNTIME_METRICS_ENABLED", "origin": "unknown", "value": False},
{"name": "DD_SERVICE", "origin": "default", "value": "unnamed-python-service"},
{"name": "DD_SERVICE_MAPPING", "origin": "env_var", "value": "default_dd_service:remapped_dd_service"},
Expand Down Expand Up @@ -495,8 +494,8 @@ def test_update_dependencies_event_when_disabled(test_agent_session, ddtrace_run

# Import httppretty after ddtrace is imported, this ensures that the module is sent in a dependencies event
# Imports httpretty twice and ensures only one dependency entry is sent
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import xmltodict")
events = test_agent_session.get_events("app-dependencies-loaded")
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import xmltodict", env=env)
events = test_agent_session.get_events("app-dependencies-loaded", subprocess=True)
assert len(events) == 0, events


Expand Down Expand Up @@ -647,7 +646,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se
# Assert next flush contains app-heartbeat event
for _ in range(telemetry_writer._periodic_threshold):
telemetry_writer.periodic()
assert test_agent_session.get_events("app-heartbeat") == []
assert test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) == []

telemetry_writer.periodic()
heartbeat_events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False)
Expand Down

0 comments on commit a046898

Please sign in to comment.