From 913affcca177a92c2d8573cf0888e51336f20851 Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Thu, 30 Jan 2025 13:02:19 +0000 Subject: [PATCH] refactor(profiling): migrate to product interface We migrate the profiler product to the product interface for automatic lifecycle and telemetry management. --- ddtrace/bootstrap/preload.py | 9 ------ ddtrace/profiling/collector/threading.py | 19 ++++++------ ddtrace/profiling/product.py | 34 +++++++++++++++++++++ ddtrace/profiling/profiler.py | 30 ++---------------- pyproject.toml | 1 + tests/profiling/collector/test_threading.py | 5 ++- tests/profiling/gevent_fork.py | 2 +- tests/profiling/recorder_fork.py | 2 +- tests/profiling/simple_program_fork.py | 2 +- tests/profiling/test_main.py | 8 ++--- tests/profiling/test_uwsgi.py | 19 ++++++++++-- tests/profiling/uwsgi-app.py | 10 +++++- tests/profiling_v2/simple_program_fork.py | 2 +- tests/profiling_v2/test_main.py | 1 + 14 files changed, 84 insertions(+), 60 deletions(-) create mode 100644 ddtrace/profiling/product.py diff --git a/ddtrace/bootstrap/preload.py b/ddtrace/bootstrap/preload.py index 95dc8f4cd55..a2591db55ba 100644 --- a/ddtrace/bootstrap/preload.py +++ b/ddtrace/bootstrap/preload.py @@ -6,7 +6,6 @@ import os # noqa:I001 from ddtrace import config # noqa:F401 -from ddtrace.settings.profiling import config as profiling_config # noqa:F401 from ddtrace.internal.logger import get_logger # noqa:F401 from ddtrace.internal.module import ModuleWatchdog # noqa:F401 from ddtrace.internal.products import manager # noqa:F401 @@ -59,14 +58,6 @@ def register_post_preload(func: t.Callable) -> None: except Exception: log.error("failed to enable crashtracking", exc_info=True) - -if profiling_config.enabled: - log.debug("profiler enabled via environment variable") - try: - import ddtrace.profiling.auto # noqa: F401 - except Exception: - log.error("failed to enable profiling", exc_info=True) - if config._runtime_metrics_enabled: RuntimeWorker.enable() diff --git a/ddtrace/profiling/collector/threading.py b/ddtrace/profiling/collector/threading.py index 3700c145312..e74c8dfdd97 100644 --- a/ddtrace/profiling/collector/threading.py +++ b/ddtrace/profiling/collector/threading.py @@ -1,7 +1,5 @@ from __future__ import absolute_import -import threading -from threading import Thread import typing # noqa:F401 from ddtrace.internal.datadog.profiling import stack_v2 @@ -32,19 +30,22 @@ class ThreadingLockCollector(_lock.LockCollector): PROFILED_LOCK_CLASS = _ProfiledThreadingLock - def _get_patch_target(self): - # type: (...) -> typing.Any + def _get_patch_target(self) -> typing.Any: + import threading + return threading.Lock - def _set_patch_target( - self, value # type: typing.Any - ): - # type: (...) -> None + def _set_patch_target(self, value: typing.Any) -> None: + import threading + threading.Lock = value # type: ignore[misc] # Also patch threading.Thread so echion can track thread lifetimes -def init_stack_v2(): +def init_stack_v2() -> None: + import threading + from threading import Thread + if config.stack.v2_enabled and stack_v2.is_available: _thread_set_native_id = Thread._set_native_id _thread_bootstrap_inner = Thread._bootstrap_inner diff --git a/ddtrace/profiling/product.py b/ddtrace/profiling/product.py new file mode 100644 index 00000000000..f09a290eded --- /dev/null +++ b/ddtrace/profiling/product.py @@ -0,0 +1,34 @@ +from ddtrace.settings.profiling import config + + +def post_preload(): + pass + + +def start(): + if config.enabled: + import ddtrace.profiling.auto # noqa: F401 + + +def restart(join=False): + if config.enabled: + from ddtrace.profiling import bootstrap + + try: + bootstrap.profiler._restart_on_fork() + except AttributeError: + pass + + +def stop(join=False): + if config.enabled: + from ddtrace.profiling import bootstrap + + try: + bootstrap.profiler.stop() + except AttributeError: + pass + + +def at_exit(join=False): + stop(join=join) diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index 111c1624fd2..abfa5661d92 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -10,14 +10,9 @@ import ddtrace from ddtrace import config -from ddtrace.internal import atexit -from ddtrace.internal import forksafe from ddtrace.internal import service -from ddtrace.internal import uwsgi from ddtrace.internal.datadog.profiling import ddup from ddtrace.internal.module import ModuleWatchdog -from ddtrace.internal.telemetry import telemetry_writer -from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT from ddtrace.profiling import collector from ddtrace.profiling import exporter # noqa:F401 from ddtrace.profiling import recorder @@ -46,39 +41,18 @@ class Profiler(object): def __init__(self, *args, **kwargs): self._profiler = _ProfilerInstance(*args, **kwargs) - def start(self, stop_on_exit=True, profile_children=True): - """Start the profiler. - - :param stop_on_exit: Whether to stop the profiler and flush the profile on exit. - :param profile_children: Whether to start a profiler in child processes. - """ - - if profile_children: - try: - uwsgi.check_uwsgi(self._restart_on_fork, atexit=self.stop if stop_on_exit else None) - except uwsgi.uWSGIMasterProcess: - # Do nothing, the start() method will be called in each worker subprocess - return + def start(self): + """Start the profiler.""" self._profiler.start() - if stop_on_exit: - atexit.register(self.stop) - - if profile_children: - forksafe.register(self._restart_on_fork) - - telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) - def stop(self, flush=True): """Stop the profiler. :param flush: Flush last profile. """ - atexit.unregister(self.stop) try: self._profiler.stop(flush) - telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, False) except service.ServiceStatusError: # Not a best practice, but for backward API compatibility that allowed to call `stop` multiple times. pass diff --git a/pyproject.toml b/pyproject.toml index 0f2523d5fea..a995a153daf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ ddtrace = "ddtrace.contrib.internal.pytest.plugin" "code-origin-for-spans" = "ddtrace.debugging._products.code_origin.span" "dynamic-instrumentation" = "ddtrace.debugging._products.dynamic_instrumentation" "exception-replay" = "ddtrace.debugging._products.exception_replay" +"profiling" = "ddtrace.profiling.product" "remote-configuration" = "ddtrace.internal.remoteconfig.product" "symbol-database" = "ddtrace.internal.symbol_db.product" "appsec" = "ddtrace.internal.appsec.product" diff --git a/tests/profiling/collector/test_threading.py b/tests/profiling/collector/test_threading.py index ae7e7204a68..c2b32c8cd3f 100644 --- a/tests/profiling/collector/test_threading.py +++ b/tests/profiling/collector/test_threading.py @@ -266,7 +266,10 @@ def test_lock_release_events(): @pytest.mark.skipif(not TESTING_GEVENT, reason="only works with gevent") -@pytest.mark.subprocess(ddtrace_run=True, env={"DD_PROFILING_FILE_PATH": __file__}) +@pytest.mark.subprocess( + ddtrace_run=True, + env={"DD_PROFILING_ENABLED": "1", "DD_PROFILING_LOCK_ENABLED": "1", "DD_PROFILING_FILE_PATH": __file__}, +) def test_lock_gevent_tasks(): from gevent import monkey # noqa:F401 diff --git a/tests/profiling/gevent_fork.py b/tests/profiling/gevent_fork.py index a648ca7f80c..86d681019dd 100644 --- a/tests/profiling/gevent_fork.py +++ b/tests/profiling/gevent_fork.py @@ -9,7 +9,7 @@ from ddtrace.profiling import profiler # noqa:E402,F401 -p = profiler.Profiler().start(profile_children=True) +p = profiler.Profiler().start() pid = os.fork() if pid == 0: diff --git a/tests/profiling/recorder_fork.py b/tests/profiling/recorder_fork.py index 6df24f9d0dd..7ed33736466 100644 --- a/tests/profiling/recorder_fork.py +++ b/tests/profiling/recorder_fork.py @@ -6,7 +6,7 @@ p = Profiler() -p.start(profile_children=False, stop_on_exit=False) +p.start() e = stack_event.StackSampleEvent() diff --git a/tests/profiling/simple_program_fork.py b/tests/profiling/simple_program_fork.py index 5671e0904b0..32f89b83dec 100644 --- a/tests/profiling/simple_program_fork.py +++ b/tests/profiling/simple_program_fork.py @@ -2,8 +2,8 @@ import sys import threading +import ddtrace.auto from ddtrace.internal import service -import ddtrace.profiling.auto import ddtrace.profiling.bootstrap from ddtrace.profiling.collector import stack_event from ddtrace.profiling.collector import threading as cthreading diff --git a/tests/profiling/test_main.py b/tests/profiling/test_main.py index 65d71b1c714..7b65cc7b1cb 100644 --- a/tests/profiling/test_main.py +++ b/tests/profiling/test_main.py @@ -38,10 +38,7 @@ def test_call_script_gevent(monkeypatch): def test_call_script_pprof_output(tmp_path, monkeypatch): - """This checks if the pprof output and atexit register work correctly. - - The script does not run for one minute, so if the `stop_on_exit` flag is broken, this test will fail. - """ + """This checks if the pprof output and atexit register work correctly.""" filename = str(tmp_path / "pprof") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "1") @@ -61,13 +58,14 @@ def test_call_script_pprof_output(tmp_path, monkeypatch): @pytest.mark.skipif(sys.platform == "win32", reason="fork only available on Unix") def test_fork(tmp_path, monkeypatch): filename = str(tmp_path / "pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_API_TIMEOUT", "0.1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "100") stdout, stderr, exitcode, pid = call_program( "python", os.path.join(os.path.dirname(__file__), "simple_program_fork.py") ) - assert exitcode == 0 + assert exitcode == 0, stderr child_pid = stdout.decode().strip() utils.check_pprof_file(filename + "." + str(pid) + ".1") utils.check_pprof_file(filename + "." + str(child_pid) + ".1") diff --git a/tests/profiling/test_uwsgi.py b/tests/profiling/test_uwsgi.py index d19a4cf4a1a..33a24f6bbfc 100644 --- a/tests/profiling/test_uwsgi.py +++ b/tests/profiling/test_uwsgi.py @@ -42,8 +42,11 @@ def uwsgi(monkeypatch): def test_uwsgi_threads_disabled(uwsgi): proc = uwsgi() - stdout, _ = proc.communicate() - assert proc.wait() != 0 + try: + stdout, _ = proc.communicate(timeout=1) + except TimeoutExpired: + proc.terminate() + stdout, _ = proc.communicate() assert THREADS_MSG in stdout @@ -59,6 +62,7 @@ def test_uwsgi_threads_number_set(uwsgi): def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads") worker_pids = _get_worker_pids(proc.stdout, 1) @@ -71,8 +75,14 @@ def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_no_master(uwsgi, monkeypatch): + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") + monkeypatch.setenv("STOP_AFTER_LOAD", "1") proc = uwsgi("--enable-threads", "--processes", "2") - stdout, _ = proc.communicate() + try: + stdout, _ = proc.communicate(timeout=1) + except TimeoutExpired: + proc.terminate() + stdout, _ = proc.communicate() assert ( b"ddtrace.internal.uwsgi.uWSGIConfigError: master option must be enabled when multiple processes are used" in stdout @@ -101,6 +111,7 @@ def _get_worker_pids(stdout, num_worker, num_app_started=1): def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--master", "--py-call-uwsgi-fork-hooks", "--processes", "2") worker_pids = _get_worker_pids(proc.stdout, 2) @@ -114,6 +125,7 @@ def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--master", "--processes", "2", "--lazy-apps") worker_pids = _get_worker_pids(proc.stdout, 2, 2) @@ -127,6 +139,7 @@ def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch): def test_uwsgi_threads_processes_no_master_lazy_apps(uwsgi, tmp_path, monkeypatch): filename = str(tmp_path / "uwsgi.pprof") + monkeypatch.setenv("DD_PROFILING_ENABLED", "1") monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename) proc = uwsgi("--enable-threads", "--processes", "2", "--lazy-apps") worker_pids = _get_worker_pids(proc.stdout, 2, 2) diff --git a/tests/profiling/uwsgi-app.py b/tests/profiling/uwsgi-app.py index 6d875c0aca3..7519060c82d 100644 --- a/tests/profiling/uwsgi-app.py +++ b/tests/profiling/uwsgi-app.py @@ -1,5 +1,13 @@ -import ddtrace.profiling.auto # noqa:F401 +import os + +import ddtrace.auto # noqa:F401 def application(): pass + + +if os.getenv("STOP_AFTER_LOAD"): + import sys + + sys.exit(0) diff --git a/tests/profiling_v2/simple_program_fork.py b/tests/profiling_v2/simple_program_fork.py index ad8c0541ccd..947f8cb2fbe 100644 --- a/tests/profiling_v2/simple_program_fork.py +++ b/tests/profiling_v2/simple_program_fork.py @@ -2,8 +2,8 @@ import sys import threading +import ddtrace.auto from ddtrace.internal import service -import ddtrace.profiling.auto import ddtrace.profiling.bootstrap import ddtrace.profiling.profiler diff --git a/tests/profiling_v2/test_main.py b/tests/profiling_v2/test_main.py index 132fc8aa502..bf3607432c7 100644 --- a/tests/profiling_v2/test_main.py +++ b/tests/profiling_v2/test_main.py @@ -79,6 +79,7 @@ def test_fork(tmp_path): filename = str(tmp_path / "pprof") env = os.environ.copy() + env["DD_PROFILING_ENABLED"] = "1" env["DD_PROFILING_OUTPUT_PPROF"] = filename env["DD_PROFILING_CAPTURE_PCT"] = "100" stdout, stderr, exitcode, pid = call_program(