Skip to content

Commit

Permalink
refactor(profiling): migrate to product interface
Browse files Browse the repository at this point in the history
We migrate the profiler product to the product interface for automatic
lifecycle and telemetry management.
  • Loading branch information
P403n1x87 committed Feb 4, 2025
1 parent af9098c commit 913affc
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 60 deletions.
9 changes: 0 additions & 9 deletions ddtrace/bootstrap/preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os # noqa:I001

from ddtrace import config # noqa:F401
from ddtrace.settings.profiling import config as profiling_config # noqa:F401
from ddtrace.internal.logger import get_logger # noqa:F401
from ddtrace.internal.module import ModuleWatchdog # noqa:F401
from ddtrace.internal.products import manager # noqa:F401
Expand Down Expand Up @@ -59,14 +58,6 @@ def register_post_preload(func: t.Callable) -> None:
except Exception:
log.error("failed to enable crashtracking", exc_info=True)


if profiling_config.enabled:
log.debug("profiler enabled via environment variable")
try:
import ddtrace.profiling.auto # noqa: F401
except Exception:
log.error("failed to enable profiling", exc_info=True)

if config._runtime_metrics_enabled:
RuntimeWorker.enable()

Expand Down
19 changes: 10 additions & 9 deletions ddtrace/profiling/collector/threading.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

import threading
from threading import Thread
import typing # noqa:F401

from ddtrace.internal.datadog.profiling import stack_v2
Expand Down Expand Up @@ -32,19 +30,22 @@ class ThreadingLockCollector(_lock.LockCollector):

PROFILED_LOCK_CLASS = _ProfiledThreadingLock

def _get_patch_target(self):
# type: (...) -> typing.Any
def _get_patch_target(self) -> typing.Any:
import threading

return threading.Lock

def _set_patch_target(
self, value # type: typing.Any
):
# type: (...) -> None
def _set_patch_target(self, value: typing.Any) -> None:
import threading

threading.Lock = value # type: ignore[misc]


# Also patch threading.Thread so echion can track thread lifetimes
def init_stack_v2():
def init_stack_v2() -> None:
import threading
from threading import Thread

if config.stack.v2_enabled and stack_v2.is_available:
_thread_set_native_id = Thread._set_native_id
_thread_bootstrap_inner = Thread._bootstrap_inner
Expand Down
34 changes: 34 additions & 0 deletions ddtrace/profiling/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from ddtrace.settings.profiling import config


def post_preload():
pass


def start():
if config.enabled:
import ddtrace.profiling.auto # noqa: F401


def restart(join=False):
if config.enabled:
from ddtrace.profiling import bootstrap

try:
bootstrap.profiler._restart_on_fork()
except AttributeError:
pass


def stop(join=False):
if config.enabled:
from ddtrace.profiling import bootstrap

try:
bootstrap.profiler.stop()
except AttributeError:
pass


def at_exit(join=False):
stop(join=join)
30 changes: 2 additions & 28 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@

import ddtrace
from ddtrace import config
from ddtrace.internal import atexit
from ddtrace.internal import forksafe
from ddtrace.internal import service
from ddtrace.internal import uwsgi
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.internal.module import ModuleWatchdog
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.profiling import collector
from ddtrace.profiling import exporter # noqa:F401
from ddtrace.profiling import recorder
Expand Down Expand Up @@ -46,39 +41,18 @@ class Profiler(object):
def __init__(self, *args, **kwargs):
self._profiler = _ProfilerInstance(*args, **kwargs)

def start(self, stop_on_exit=True, profile_children=True):
"""Start the profiler.
:param stop_on_exit: Whether to stop the profiler and flush the profile on exit.
:param profile_children: Whether to start a profiler in child processes.
"""

if profile_children:
try:
uwsgi.check_uwsgi(self._restart_on_fork, atexit=self.stop if stop_on_exit else None)
except uwsgi.uWSGIMasterProcess:
# Do nothing, the start() method will be called in each worker subprocess
return
def start(self):
"""Start the profiler."""

self._profiler.start()

if stop_on_exit:
atexit.register(self.stop)

if profile_children:
forksafe.register(self._restart_on_fork)

telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True)

def stop(self, flush=True):
"""Stop the profiler.
:param flush: Flush last profile.
"""
atexit.unregister(self.stop)
try:
self._profiler.stop(flush)
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, False)
except service.ServiceStatusError:
# Not a best practice, but for backward API compatibility that allowed to call `stop` multiple times.
pass
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ ddtrace = "ddtrace.contrib.internal.pytest.plugin"
"code-origin-for-spans" = "ddtrace.debugging._products.code_origin.span"
"dynamic-instrumentation" = "ddtrace.debugging._products.dynamic_instrumentation"
"exception-replay" = "ddtrace.debugging._products.exception_replay"
"profiling" = "ddtrace.profiling.product"
"remote-configuration" = "ddtrace.internal.remoteconfig.product"
"symbol-database" = "ddtrace.internal.symbol_db.product"
"appsec" = "ddtrace.internal.appsec.product"
Expand Down
5 changes: 4 additions & 1 deletion tests/profiling/collector/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ def test_lock_release_events():


@pytest.mark.skipif(not TESTING_GEVENT, reason="only works with gevent")
@pytest.mark.subprocess(ddtrace_run=True, env={"DD_PROFILING_FILE_PATH": __file__})
@pytest.mark.subprocess(
ddtrace_run=True,
env={"DD_PROFILING_ENABLED": "1", "DD_PROFILING_LOCK_ENABLED": "1", "DD_PROFILING_FILE_PATH": __file__},
)
def test_lock_gevent_tasks():
from gevent import monkey # noqa:F401

Expand Down
2 changes: 1 addition & 1 deletion tests/profiling/gevent_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ddtrace.profiling import profiler # noqa:E402,F401


p = profiler.Profiler().start(profile_children=True)
p = profiler.Profiler().start()

pid = os.fork()
if pid == 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/profiling/recorder_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


p = Profiler()
p.start(profile_children=False, stop_on_exit=False)
p.start()


e = stack_event.StackSampleEvent()
Expand Down
2 changes: 1 addition & 1 deletion tests/profiling/simple_program_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import sys
import threading

import ddtrace.auto
from ddtrace.internal import service
import ddtrace.profiling.auto
import ddtrace.profiling.bootstrap
from ddtrace.profiling.collector import stack_event
from ddtrace.profiling.collector import threading as cthreading
Expand Down
8 changes: 3 additions & 5 deletions tests/profiling/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ def test_call_script_gevent(monkeypatch):


def test_call_script_pprof_output(tmp_path, monkeypatch):
"""This checks if the pprof output and atexit register work correctly.
The script does not run for one minute, so if the `stop_on_exit` flag is broken, this test will fail.
"""
"""This checks if the pprof output and atexit register work correctly."""
filename = str(tmp_path / "pprof")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "1")
Expand All @@ -61,13 +58,14 @@ def test_call_script_pprof_output(tmp_path, monkeypatch):
@pytest.mark.skipif(sys.platform == "win32", reason="fork only available on Unix")
def test_fork(tmp_path, monkeypatch):
filename = str(tmp_path / "pprof")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("DD_PROFILING_API_TIMEOUT", "0.1")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
monkeypatch.setenv("DD_PROFILING_CAPTURE_PCT", "100")
stdout, stderr, exitcode, pid = call_program(
"python", os.path.join(os.path.dirname(__file__), "simple_program_fork.py")
)
assert exitcode == 0
assert exitcode == 0, stderr
child_pid = stdout.decode().strip()
utils.check_pprof_file(filename + "." + str(pid) + ".1")
utils.check_pprof_file(filename + "." + str(child_pid) + ".1")
Expand Down
19 changes: 16 additions & 3 deletions tests/profiling/test_uwsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ def uwsgi(monkeypatch):

def test_uwsgi_threads_disabled(uwsgi):
proc = uwsgi()
stdout, _ = proc.communicate()
assert proc.wait() != 0
try:
stdout, _ = proc.communicate(timeout=1)
except TimeoutExpired:
proc.terminate()
stdout, _ = proc.communicate()
assert THREADS_MSG in stdout


Expand All @@ -59,6 +62,7 @@ def test_uwsgi_threads_number_set(uwsgi):

def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch):
filename = str(tmp_path / "uwsgi.pprof")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
proc = uwsgi("--enable-threads")
worker_pids = _get_worker_pids(proc.stdout, 1)
Expand All @@ -71,8 +75,14 @@ def test_uwsgi_threads_enabled(uwsgi, tmp_path, monkeypatch):


def test_uwsgi_threads_processes_no_master(uwsgi, monkeypatch):
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("STOP_AFTER_LOAD", "1")
proc = uwsgi("--enable-threads", "--processes", "2")
stdout, _ = proc.communicate()
try:
stdout, _ = proc.communicate(timeout=1)
except TimeoutExpired:
proc.terminate()
stdout, _ = proc.communicate()
assert (
b"ddtrace.internal.uwsgi.uWSGIConfigError: master option must be enabled when multiple processes are used"
in stdout
Expand Down Expand Up @@ -101,6 +111,7 @@ def _get_worker_pids(stdout, num_worker, num_app_started=1):

def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch):
filename = str(tmp_path / "uwsgi.pprof")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
proc = uwsgi("--enable-threads", "--master", "--py-call-uwsgi-fork-hooks", "--processes", "2")
worker_pids = _get_worker_pids(proc.stdout, 2)
Expand All @@ -114,6 +125,7 @@ def test_uwsgi_threads_processes_master(uwsgi, tmp_path, monkeypatch):

def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch):
filename = str(tmp_path / "uwsgi.pprof")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
proc = uwsgi("--enable-threads", "--master", "--processes", "2", "--lazy-apps")
worker_pids = _get_worker_pids(proc.stdout, 2, 2)
Expand All @@ -127,6 +139,7 @@ def test_uwsgi_threads_processes_master_lazy_apps(uwsgi, tmp_path, monkeypatch):

def test_uwsgi_threads_processes_no_master_lazy_apps(uwsgi, tmp_path, monkeypatch):
filename = str(tmp_path / "uwsgi.pprof")
monkeypatch.setenv("DD_PROFILING_ENABLED", "1")
monkeypatch.setenv("DD_PROFILING_OUTPUT_PPROF", filename)
proc = uwsgi("--enable-threads", "--processes", "2", "--lazy-apps")
worker_pids = _get_worker_pids(proc.stdout, 2, 2)
Expand Down
10 changes: 9 additions & 1 deletion tests/profiling/uwsgi-app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import ddtrace.profiling.auto # noqa:F401
import os

import ddtrace.auto # noqa:F401


def application():
pass


if os.getenv("STOP_AFTER_LOAD"):
import sys

sys.exit(0)
2 changes: 1 addition & 1 deletion tests/profiling_v2/simple_program_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import sys
import threading

import ddtrace.auto
from ddtrace.internal import service
import ddtrace.profiling.auto
import ddtrace.profiling.bootstrap
import ddtrace.profiling.profiler

Expand Down
1 change: 1 addition & 0 deletions tests/profiling_v2/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def test_fork(tmp_path):

filename = str(tmp_path / "pprof")
env = os.environ.copy()
env["DD_PROFILING_ENABLED"] = "1"
env["DD_PROFILING_OUTPUT_PPROF"] = filename
env["DD_PROFILING_CAPTURE_PCT"] = "100"
stdout, stderr, exitcode, pid = call_program(
Expand Down

0 comments on commit 913affc

Please sign in to comment.