Skip to content

Commit

Permalink
fix(profiler): update memalloc guard [backport 2.19] (#11878)
Browse files Browse the repository at this point in the history
Backport 983c84f from #11460 to 2.19.

Previously, the memory allocation profiler would use Python's builtin
thread-local storage interfaces in order to set and get the state of a
thread-local guard.

I've updated a few things here.

* I think get/set idioms are slightly problematic for this type of code,
since it pushes the responsibility of maintaining clean internal state
up to the parent. A consequence of this is that the propagation of the
underlying state _by value_ opens the door for race conditions if
execution changes between contexts (unlikely here, but I think
minimizing indirection is still cleaner). Accordingly, I've updated this
to use native thread-local storage
* Based on @nsrip-dd's observation, I widened the guard over `free()`
operations. I believe this is correct, and if it isn't then the
detriment is performance, not correctness.
* I got rid of the PY37 failovers


We don't have any reproductions for the defects that prompted this
change, but I've been running a patched library in an environment that
_does_ reproduce the behavior, and I haven't seen any defects.

1. I don't believe this patch is harmful, and if our memory allocation
tests pass then I believe it should be fine.
2. I have a reason to believe this fixes a critical defect, which can
cause crashes.


## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

Co-authored-by: David Sanchez <[email protected]>
Co-authored-by: Taegyun Kim <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent 8fc7d4e commit e24bae1
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 88 deletions.
125 changes: 84 additions & 41 deletions ddtrace/profiling/collector/_memalloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,95 @@ static PyObject* object_string = NULL;

#define ALLOC_TRACKER_MAX_COUNT UINT64_MAX

// The data coordination primitives in this and related files are related to a crash we started seeing.
// We don't have a precise understanding of the causal factors within the runtime that lead to this condition,
// since the GIL alone was sufficient in the past for preventing this issue.
// We add an option here to _add_ a crash, in order to observe this condition in a future diagnostic iteration.
// **This option is _intended_ to crash the Python process** do not use without a good reason!
static char g_crash_on_mutex_pass_str[] = "_DD_PROFILING_MEMALLOC_CRASH_ON_MUTEX_PASS";
static const char* g_truthy_values[] = { "1", "true", "yes", "on", "enable", "enabled", NULL }; // NB the sentinel NULL
static memlock_t g_memalloc_lock;

static alloc_tracker_t* global_alloc_tracker;

// This is a multiplatform way to define an operation to happen at static initialization time
static void
memalloc_init(void);

#ifdef _MSC_VER
#pragma section(".CRT$XCU", read)
__declspec(allocate(".CRT$XCU")) void (*memalloc_init_func)(void) = memalloc_init;

#elif defined(__GNUC__) || defined(__clang__)
__attribute__((constructor))
#else
#error Unsupported compiler
#endif
static void
memalloc_init()
{
// Check if we should crash the process on mutex pass
char* crash_on_mutex_pass_str = getenv(g_crash_on_mutex_pass_str);
bool crash_on_mutex_pass = false;
if (crash_on_mutex_pass_str) {
for (int i = 0; g_truthy_values[i]; i++) {
if (strcmp(crash_on_mutex_pass_str, g_truthy_values[i]) == 0) {
crash_on_mutex_pass = true;
break;
}
}
}
memlock_init(&g_memalloc_lock, crash_on_mutex_pass);
}

static void
memalloc_add_event(memalloc_context_t* ctx, void* ptr, size_t size)
{
/* Do not overflow; just ignore the new events if we ever reach that point */
if (global_alloc_tracker->alloc_count >= ALLOC_TRACKER_MAX_COUNT)
uint64_t alloc_count = atomic_add_clamped(&global_alloc_tracker->alloc_count, 1, ALLOC_TRACKER_MAX_COUNT);

/* Return if we've reached the maximum number of allocations */
if (alloc_count == 0)
return;

global_alloc_tracker->alloc_count++;
// Return if we can't take the guard
if (!memalloc_take_guard()) {
return;
}

/* Avoid loops */
if (memalloc_get_reentrant())
// In this implementation, the `global_alloc_tracker` isn't intrinsically protected. Before we read or modify,
// take the lock. The count of allocations is already forward-attributed elsewhere, so if we can't take the lock
// there's nothing to do.
if (!memlock_trylock(&g_memalloc_lock)) {
return;
}

/* Determine if we can capture or if we need to sample */
if (global_alloc_tracker->allocs.count < ctx->max_events) {
/* set a barrier so we don't loop as getting a traceback allocates memory */
memalloc_set_reentrant(true);
/* Buffer is not full, fill it */
traceback_t* tb = memalloc_get_traceback(ctx->max_nframe, ptr, size, ctx->domain);
memalloc_set_reentrant(false);
if (tb)
if (tb) {
traceback_array_append(&global_alloc_tracker->allocs, tb);
}
} else {
/* Sampling mode using a reservoir sampling algorithm: replace a random
* traceback with this one */
uint64_t r = random_range(global_alloc_tracker->alloc_count);
uint64_t r = random_range(alloc_count);

if (r < ctx->max_events) {
/* set a barrier so we don't loop as getting a traceback allocates memory */
memalloc_set_reentrant(true);
// In addition to event size, need to check that the tab is in a good state
if (r < ctx->max_events && global_alloc_tracker->allocs.tab != NULL) {
/* Replace a random traceback with this one */
traceback_t* tb = memalloc_get_traceback(ctx->max_nframe, ptr, size, ctx->domain);
memalloc_set_reentrant(false);

// Need to check not only that the tb returned
if (tb) {
traceback_free(global_alloc_tracker->allocs.tab[r]);
global_alloc_tracker->allocs.tab[r] = tb;
}
}
}

memlock_unlock(&g_memalloc_lock);
memalloc_yield_guard();
}

static void
Expand All @@ -98,12 +146,6 @@ memalloc_free(void* ctx, void* ptr)
alloc->free(alloc->ctx, ptr);
}

#ifdef _PY37_AND_LATER
Py_tss_t memalloc_reentrant_key = Py_tss_NEEDS_INIT;
#else
int memalloc_reentrant_key = -1;
#endif

static void*
memalloc_alloc(int use_calloc, void* ctx, size_t nelem, size_t elsize)
{
Expand Down Expand Up @@ -233,7 +275,10 @@ memalloc_start(PyObject* Py_UNUSED(module), PyObject* args)

global_memalloc_ctx.domain = PYMEM_DOMAIN_OBJ;

global_alloc_tracker = alloc_tracker_new();
if (memlock_trylock(&g_memalloc_lock)) {
global_alloc_tracker = alloc_tracker_new();
memlock_unlock(&g_memalloc_lock);
}

PyMem_GetAllocator(PYMEM_DOMAIN_OBJ, &global_memalloc_ctx.pymem_allocator_obj);
PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &alloc);
Expand All @@ -258,8 +303,11 @@ memalloc_stop(PyObject* Py_UNUSED(module), PyObject* Py_UNUSED(args))

PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &global_memalloc_ctx.pymem_allocator_obj);
memalloc_tb_deinit();
alloc_tracker_free(global_alloc_tracker);
global_alloc_tracker = NULL;
if (memlock_trylock(&g_memalloc_lock)) {
alloc_tracker_free(global_alloc_tracker);
global_alloc_tracker = NULL;
memlock_unlock(&g_memalloc_lock);
}

memalloc_heap_tracker_deinit();

Expand Down Expand Up @@ -310,9 +358,15 @@ iterevents_new(PyTypeObject* type, PyObject* Py_UNUSED(args), PyObject* Py_UNUSE
if (!iestate)
return NULL;

iestate->alloc_tracker = global_alloc_tracker;
/* reset the current traceback list */
global_alloc_tracker = alloc_tracker_new();
if (memlock_trylock(&g_memalloc_lock)) {
iestate->alloc_tracker = global_alloc_tracker;
global_alloc_tracker = alloc_tracker_new();
memlock_unlock(&g_memalloc_lock);
} else {
Py_TYPE(iestate)->tp_free(iestate);
return NULL;
}
iestate->seq_index = 0;

PyObject* iter_and_count = PyTuple_New(3);
Expand All @@ -326,8 +380,11 @@ iterevents_new(PyTypeObject* type, PyObject* Py_UNUSED(args), PyObject* Py_UNUSE
static void
iterevents_dealloc(IterEventsState* iestate)
{
alloc_tracker_free(iestate->alloc_tracker);
Py_TYPE(iestate)->tp_free(iestate);
if (memlock_trylock(&g_memalloc_lock)) {
alloc_tracker_free(iestate->alloc_tracker);
Py_TYPE(iestate)->tp_free(iestate);
memlock_unlock(&g_memalloc_lock);
}
}

static PyObject*
Expand Down Expand Up @@ -442,20 +499,6 @@ PyInit__memalloc(void)
return NULL;
}

#ifdef _PY37_AND_LATER
if (PyThread_tss_create(&memalloc_reentrant_key) != 0) {
#else
memalloc_reentrant_key = PyThread_create_key();
if (memalloc_reentrant_key == -1) {
#endif
#ifdef MS_WINDOWS
PyErr_SetFromWindowsErr(0);
#else
PyErr_SetFromErrno(PyExc_OSError);
#endif
return NULL;
}

if (PyType_Ready(&MemallocIterEvents_Type) < 0)
return NULL;
Py_INCREF((PyObject*)&MemallocIterEvents_Type);
Expand Down
93 changes: 79 additions & 14 deletions ddtrace/profiling/collector/_memalloc_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
typedef struct
{
/* Granularity of the heap profiler in bytes */
uint32_t sample_size;
uint64_t sample_size;
/* Current sample size of the heap profiler in bytes */
uint32_t current_sample_size;
uint64_t current_sample_size;
/* Tracked allocations */
traceback_array_t allocs;
/* Allocated memory counter in bytes */
uint32_t allocated_memory;
uint64_t allocated_memory;
/* True if the heap tracker is frozen */
bool frozen;
/* Contains the ongoing heap allocation/deallocation while frozen */
Expand All @@ -26,8 +26,42 @@ typedef struct
} freezer;
} heap_tracker_t;

static char g_crash_on_mutex_pass_str[] = "_DD_PROFILING_MEMHEAP_CRASH_ON_MUTEX_PASS";
static const char* g_truthy_values[] = { "1", "true", "yes", "on", "enable", "enabled", NULL }; // NB the sentinel NULL
static memlock_t g_memheap_lock;

static heap_tracker_t global_heap_tracker;

// This is a multiplatform way to define an operation to happen at static initialization time
static void
memheap_init(void);

#ifdef _MSC_VER
#pragma section(".CRT$XCU", read)
__declspec(allocate(".CRT$XCU")) void (*memheap_init_func)(void) = memheap_init;

#elif defined(__GNUC__) || defined(__clang__)
__attribute__((constructor))
#else
#error Unsupported compiler
#endif
static void
memheap_init()
{
// Check if we should crash the process on mutex pass
char* crash_on_mutex_pass_str = getenv(g_crash_on_mutex_pass_str);
bool crash_on_mutex_pass = false;
if (crash_on_mutex_pass_str) {
for (int i = 0; g_truthy_values[i]; i++) {
if (strcmp(crash_on_mutex_pass_str, g_truthy_values[i]) == 0) {
crash_on_mutex_pass = true;
break;
}
}
}
memlock_init(&g_memheap_lock, crash_on_mutex_pass);
}

static uint32_t
heap_tracker_next_sample_size(uint32_t sample_size)
{
Expand Down Expand Up @@ -119,20 +153,30 @@ heap_tracker_thaw(heap_tracker_t* heap_tracker)
void
memalloc_heap_tracker_init(uint32_t sample_size)
{
heap_tracker_init(&global_heap_tracker);
global_heap_tracker.sample_size = sample_size;
global_heap_tracker.current_sample_size = heap_tracker_next_sample_size(sample_size);

if (memlock_trylock(&g_memheap_lock)) {
heap_tracker_init(&global_heap_tracker);
global_heap_tracker.sample_size = sample_size;
global_heap_tracker.current_sample_size = heap_tracker_next_sample_size(sample_size);
memlock_unlock(&g_memheap_lock);
}
}

void
memalloc_heap_tracker_deinit(void)
{
heap_tracker_wipe(&global_heap_tracker);
if (memlock_trylock(&g_memheap_lock)) {
heap_tracker_wipe(&global_heap_tracker);
memlock_unlock(&g_memheap_lock);
}
}

void
memalloc_heap_untrack(void* ptr)
{
if (!memlock_trylock(&g_memheap_lock)) {
return;
}
if (global_heap_tracker.frozen) {
/* Check that we still have space to store the free. If we don't have
enough space, we ignore the untrack. That's sad as there is a change
Expand All @@ -144,6 +188,8 @@ memalloc_heap_untrack(void* ptr)
ptr_array_append(&global_heap_tracker.freezer.frees, ptr);
} else
heap_tracker_untrack_thawed(&global_heap_tracker, ptr);

memlock_unlock(&g_memheap_lock);
}

/* Track a memory allocation in the heap profiler.
Expand All @@ -157,26 +203,36 @@ memalloc_heap_track(uint16_t max_nframe, void* ptr, size_t size, PyMemAllocatorD
return false;

/* Check for overflow */
global_heap_tracker.allocated_memory = Py_MIN(global_heap_tracker.allocated_memory + size, MAX_HEAP_SAMPLE_SIZE);
uint64_t res = atomic_add_clamped(&global_heap_tracker.allocated_memory, size, MAX_HEAP_SAMPLE_SIZE);
if (0 == res)
return false;

// Take the lock
if (!memlock_trylock(&g_memheap_lock)) {
return false;
}

/* Check if we have enough sample or not */
if (global_heap_tracker.allocated_memory < global_heap_tracker.current_sample_size)
if (global_heap_tracker.allocated_memory < global_heap_tracker.current_sample_size) {
memlock_unlock(&g_memheap_lock);
return false;
}

/* Check if we can add more samples: the sum of the freezer + alloc tracker
cannot be greater than what the alloc tracker can handle: when the alloc
tracker is thawed, all the allocs in the freezer will be moved there!*/
if ((global_heap_tracker.freezer.allocs.count + global_heap_tracker.allocs.count) >= TRACEBACK_ARRAY_MAX_COUNT)
if (global_heap_tracker.freezer.allocs.count + global_heap_tracker.allocs.count >= TRACEBACK_ARRAY_MAX_COUNT) {
memlock_unlock(&g_memheap_lock);
return false;
}

/* Avoid loops */
if (memalloc_get_reentrant())
if (!memalloc_take_guard()) {
memlock_unlock(&g_memheap_lock);
return false;
}

memalloc_set_reentrant(true);
traceback_t* tb = memalloc_get_traceback(max_nframe, ptr, global_heap_tracker.allocated_memory, domain);
memalloc_set_reentrant(false);

if (tb) {
if (global_heap_tracker.frozen)
traceback_array_append(&global_heap_tracker.freezer.allocs, tb);
Expand All @@ -189,15 +245,23 @@ memalloc_heap_track(uint16_t max_nframe, void* ptr, size_t size, PyMemAllocatorD
/* Compute the new target sample size */
global_heap_tracker.current_sample_size = heap_tracker_next_sample_size(global_heap_tracker.sample_size);

memalloc_yield_guard();
memlock_unlock(&g_memheap_lock);
return true;
}

memalloc_yield_guard();
memlock_unlock(&g_memheap_lock);
return false;
}

PyObject*
memalloc_heap()
{
if (!memlock_trylock(&g_memheap_lock)) {
return NULL;
}

heap_tracker_freeze(&global_heap_tracker);

PyObject* heap_list = PyList_New(global_heap_tracker.allocs.count);
Expand All @@ -213,5 +277,6 @@ memalloc_heap()

heap_tracker_thaw(&global_heap_tracker);

memlock_unlock(&g_memheap_lock);
return heap_list;
}
3 changes: 3 additions & 0 deletions ddtrace/profiling/collector/_memalloc_reentrant.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "_memalloc_reentrant.h"

bool _MEMALLOC_ON_THREAD = false;
Loading

0 comments on commit e24bae1

Please sign in to comment.