Skip to content

Commit

Permalink
Fix static initialization order fiasco in velox (facebookincubator#12204
Browse files Browse the repository at this point in the history
)

Summary:
X-link: prestodb/presto#24458


Replace global statics with local to avoid non-determinism
https://en.cppreference.com/w/cpp/language/siof

Differential Revision: D68857074
  • Loading branch information
mkatsevVR authored and facebook-github-bot committed Jan 31, 2025
1 parent dcafd32 commit 836c36e
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 64 deletions.
8 changes: 4 additions & 4 deletions velox/common/base/VeloxException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ bool isStackTraceEnabled(VeloxException::Type type) {
using namespace std::literals::chrono_literals;
const bool isSysException = type == VeloxException::Type::kSystem;
if ((isSysException &&
!config::globalConfig.exceptionSystemStacktraceEnabled) ||
!config::globalConfig().exceptionSystemStacktraceEnabled) ||
(!isSysException &&
!config::globalConfig.exceptionUserStacktraceEnabled)) {
!config::globalConfig().exceptionUserStacktraceEnabled)) {
// VeloxException stacktraces are disabled.
return false;
}

const int32_t rateLimitMs = isSysException
? config::globalConfig.exceptionSystemStacktraceRateLimitMs
: config::globalConfig.exceptionUserStacktraceRateLimitMs;
? config::globalConfig().exceptionSystemStacktraceRateLimitMs
: config::globalConfig().exceptionUserStacktraceRateLimitMs;
// not static so the global config can be manipulated at runtime
if (0 == rateLimitMs) {
// VeloxException stacktraces are not rate-limited
Expand Down
8 changes: 4 additions & 4 deletions velox/common/base/tests/ExceptionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ void testExceptionTraceCollectionControl(bool userException, bool enabled) {
SCOPED_TRACE(fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
config::globalConfig.exceptionUserStacktraceEnabled,
config::globalConfig.exceptionSystemStacktraceEnabled));
config::globalConfig().exceptionUserStacktraceEnabled,
config::globalConfig().exceptionSystemStacktraceEnabled));
ASSERT_EQ(userException, e.exceptionType() == VeloxException::Type::kUser);
ASSERT_EQ(enabled, e.stackTrace() != nullptr);
}
Expand Down Expand Up @@ -179,8 +179,8 @@ void testExceptionTraceCollectionRateControl(
"userException: {}, hasRateLimit: {}, user limit: {}ms, sys limit: {}ms",
userException,
hasRateLimit,
config::globalConfig.exceptionUserStacktraceRateLimitMs,
config::globalConfig.exceptionSystemStacktraceRateLimitMs));
config::globalConfig().exceptionUserStacktraceRateLimitMs,
config::globalConfig().exceptionSystemStacktraceRateLimitMs));
ASSERT_EQ(
userException, e.exceptionType() == VeloxException::Type::kUser);
ASSERT_EQ(!hasRateLimit || ((iter % 2) == 0), e.stackTrace() != nullptr);
Expand Down
8 changes: 4 additions & 4 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ TEST_P(AsyncDataCacheTest, pin) {

TEST_P(AsyncDataCacheTest, replace) {
constexpr int64_t kMaxBytes = 64 << 20;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
// Load 10x the max size, inject an error every 21 batches.
loadLoop(0, kMaxBytes * 10, 21);
Expand All @@ -736,7 +736,7 @@ TEST_P(AsyncDataCacheTest, replace) {

TEST_P(AsyncDataCacheTest, evictAccounting) {
constexpr int64_t kMaxBytes = 64 << 20;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
auto pool = manager_->addLeafPool("test");

Expand All @@ -760,7 +760,7 @@ TEST_P(AsyncDataCacheTest, evictAccounting) {
TEST_P(AsyncDataCacheTest, largeEvict) {
constexpr int64_t kMaxBytes = 256 << 20;
constexpr int32_t kNumThreads = 24;
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kMaxBytes);
// Load 10x the max size, inject an allocation of 1/8 the capacity every 4
// batches.
Expand Down Expand Up @@ -839,7 +839,7 @@ TEST_P(AsyncDataCacheTest, DISABLED_ssd) {
constexpr uint64_t kRamBytes = 32 << 20;
constexpr uint64_t kSsdBytes = 512UL << 20;
#endif
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;
initializeCache(kRamBytes, kSsdBytes);
cache_->setVerifyHook(
[&](const AsyncDataCacheEntry& entry) { checkContents(entry); });
Expand Down
5 changes: 4 additions & 1 deletion velox/common/config/GlobalConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

namespace facebook::velox::config {

GlobalConfiguration globalConfig;
GlobalConfiguration& globalConfig() {
static GlobalConfiguration config;
return config;
}

} // namespace facebook::velox::config
2 changes: 1 addition & 1 deletion velox/common/config/GlobalConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ struct GlobalConfiguration {
std::string saveInputOnExpressionSystemFailurePath;
};

extern GlobalConfiguration globalConfig;
GlobalConfiguration& globalConfig();

} // namespace facebook::velox::config
2 changes: 1 addition & 1 deletion velox/common/memory/MallocAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ MallocAllocator::MallocAllocator(size_t capacity, uint32_t reservationByteLimit)

MallocAllocator::~MallocAllocator() {
// TODO: Remove the check when memory leak issue is resolved.
if (config::globalConfig.memoryLeakCheckEnabled) {
if (config::globalConfig().memoryLeakCheckEnabled) {
VELOX_CHECK(
((allocatedBytes_ - reservations_.read()) == 0) &&
(numAllocated_ == 0) && (numMapped_ == 0),
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ std::vector<std::shared_ptr<MemoryPool>> createSharedLeafMemoryPools(
VELOX_CHECK_EQ(sysPool.name(), kSysRootName);
std::vector<std::shared_ptr<MemoryPool>> leafPools;
const size_t numSharedPools =
std::max(1, config::globalConfig.memoryNumSharedLeafPools);
std::max(1, config::globalConfig().memoryNumSharedLeafPools);
leafPools.reserve(numSharedPools);
for (size_t i = 0; i < numSharedPools; ++i) {
leafPools.emplace_back(
Expand Down Expand Up @@ -129,7 +129,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
sysRoot_->name());
VELOX_CHECK_EQ(
sharedLeafPools_.size(),
std::max(1, config::globalConfig.memoryNumSharedLeafPools));
std::max(1, config::globalConfig().memoryNumSharedLeafPools));
}

MemoryManager::~MemoryManager() {
Expand Down
6 changes: 3 additions & 3 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ struct MemoryManagerOptions {

/// If true, enable memory usage tracking in the default memory pool.
bool trackDefaultUsage{
config::globalConfig.enableMemoryUsageTrackInDefaultMemoryPool};
config::globalConfig().enableMemoryUsageTrackInDefaultMemoryPool};

/// If true, check the memory pool and usage leaks on destruction.
///
/// TODO: deprecate this flag after all the existing memory leak use cases
/// have been fixed.
bool checkUsageLeak{config::globalConfig.memoryLeakCheckEnabled};
bool checkUsageLeak{config::globalConfig().memoryLeakCheckEnabled};

/// If true, the memory pool will be running in debug mode to track the
/// allocation and free call stacks to detect the source of memory leak for
/// testing purpose.
bool debugEnabled{config::globalConfig.memoryPoolDebugEnabled};
bool debugEnabled{config::globalConfig().memoryPoolDebugEnabled};

/// Terminates the process and generates a core file on an allocation failure
bool coreOnAllocationFailureEnabled{false};
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void MemoryAllocator::useHugePages(
const ContiguousAllocation& data,
bool enable) {
#ifdef linux
if (!config::globalConfig.memoryUseHugepages) {
if (!config::globalConfig().memoryUseHugepages) {
return;
}
auto maybeRange = data.hugePageRange();
Expand Down
4 changes: 2 additions & 2 deletions velox/common/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ struct Stats {

template <typename Op>
void recordAllocate(int64_t bytes, int32_t count, Op op) {
if (config::globalConfig.timeAllocations) {
if (config::globalConfig().timeAllocations) {
auto index = sizeIndex(bytes);
velox::ClockTimer timer(sizes[index].allocateClocks);
op();
Expand All @@ -107,7 +107,7 @@ struct Stats {

template <typename Op>
void recordFree(int64_t bytes, Op op) {
if (config::globalConfig.timeAllocations) {
if (config::globalConfig().timeAllocations) {
auto index = sizeIndex(bytes);
ClockTimer timer(sizes[index].freeClocks);
op();
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ std::string MemoryPoolImpl::treeMemoryUsage(bool skipEmptyPool) const {
if (parent_ != nullptr) {
return parent_->treeMemoryUsage(skipEmptyPool);
}
if (config::globalConfig.suppressMemoryCapacityExceedingErrorMessage) {
if (config::globalConfig().suppressMemoryCapacityExceedingErrorMessage) {
return "";
}
std::stringstream out;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {

/// If true, tracks the allocation and free call stacks to detect the source
/// of memory leak for testing purpose.
bool debugEnabled{config::globalConfig.memoryPoolDebugEnabled};
bool debugEnabled{config::globalConfig().memoryPoolDebugEnabled};

/// Terminates the process and generates a core file on an allocation
/// failure
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ MachinePageCount MmapAllocator::freeNonContiguousInternal(
ClockTimer timer(clocks);
pages = sizeClass->free(allocation);
}
if ((pages > 0) && config::globalConfig.timeAllocations) {
if ((pages > 0) && config::globalConfig().timeAllocations) {
// Increment the free time only if the allocation contained
// pages in the class. Note that size class indices in the
// allocator are not necessarily the same as in the stats.
Expand Down
13 changes: 7 additions & 6 deletions velox/dwio/common/tests/LoggedExceptionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ namespace {
void testTraceCollectionSwitchControl(bool enabled) {
// Logged exception is system type of velox exception.
// Disable rate control in the test.
config::globalConfig.exceptionUserStacktraceRateLimitMs = 0;
config::globalConfig.exceptionSystemStacktraceRateLimitMs = 0;
config::globalConfig().exceptionUserStacktraceRateLimitMs = 0;
config::globalConfig().exceptionSystemStacktraceRateLimitMs = 0;

// NOTE: the user config should not affect the tracing behavior of system type
// of exception collection.
config::globalConfig.exceptionUserStacktraceEnabled = folly::Random::oneIn(2);
config::globalConfig.exceptionSystemStacktraceEnabled =
config::globalConfig().exceptionUserStacktraceEnabled =
folly::Random::oneIn(2);
config::globalConfig().exceptionSystemStacktraceEnabled =
enabled ? true : false;

try {
Expand All @@ -40,8 +41,8 @@ void testTraceCollectionSwitchControl(bool enabled) {
SCOPED_TRACE(fmt::format(
"enabled: {}, user flag: {}, sys flag: {}",
enabled,
config::globalConfig.exceptionUserStacktraceEnabled,
config::globalConfig.exceptionSystemStacktraceEnabled));
config::globalConfig().exceptionUserStacktraceEnabled,
config::globalConfig().exceptionSystemStacktraceEnabled));
ASSERT_TRUE(e.exceptionType() == VeloxException::Type::kSystem);
ASSERT_EQ(enabled, e.stackTrace() != nullptr);
}
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ void Operator::MemoryReclaimer::enterArbitration() {
}

Driver* const runningDriver = driverThreadCtx->driverCtx()->driver;
if (!config::globalConfig.memoryPoolCapacityTransferAcrossTasks) {
if (!config::globalConfig().memoryPoolCapacityTransferAcrossTasks) {
if (auto opDriver = ensureDriver()) {
// NOTE: the current running driver might not be the driver of the
// operator that requests memory arbitration. The reason is that an
Expand Down Expand Up @@ -680,7 +680,7 @@ void Operator::MemoryReclaimer::leaveArbitration() noexcept {
return;
}
Driver* const runningDriver = driverThreadCtx->driverCtx()->driver;
if (!config::globalConfig.memoryPoolCapacityTransferAcrossTasks) {
if (!config::globalConfig().memoryPoolCapacityTransferAcrossTasks) {
if (auto opDriver = ensureDriver()) {
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ void setupMemory(
int64_t allocatorCapacity,
int64_t arbitratorCapacity,
bool enableGlobalArbitration) {
config::globalConfig.enableMemoryUsageTrackInDefaultMemoryPool = true;
config::globalConfig.memoryLeakCheckEnabled = true;
config::globalConfig().enableMemoryUsageTrackInDefaultMemoryPool = true;
config::globalConfig().memoryLeakCheckEnabled = true;
facebook::velox::memory::SharedArbitrator::registerFactory();
facebook::velox::memory::MemoryManagerOptions options;
options.allocatorCapacity = allocatorCapacity;
Expand Down
12 changes: 6 additions & 6 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,10 +660,10 @@ std::string onTopLevelException(VeloxException::Type exceptionType, void* arg) {
auto* context = static_cast<ExprExceptionContext*>(arg);

const char* basePath =
config::globalConfig.saveInputOnExpressionAnyFailurePath.c_str();
config::globalConfig().saveInputOnExpressionAnyFailurePath.c_str();
if (strlen(basePath) == 0 && exceptionType == VeloxException::Type::kSystem) {
basePath =
config::globalConfig.saveInputOnExpressionSystemFailurePath.c_str();
config::globalConfig().saveInputOnExpressionSystemFailurePath.c_str();
}
if (strlen(basePath) == 0) {
return fmt::format("Top-level Expression: {}", context->expr()->toString());
Expand Down Expand Up @@ -1850,10 +1850,10 @@ void printInputAndExprs(
const BaseVector* vector,
const std::vector<std::shared_ptr<Expr>>& exprs) {
const char* basePath =
config::globalConfig.saveInputOnExpressionAnyFailurePath.c_str();
config::globalConfig().saveInputOnExpressionAnyFailurePath.c_str();
if (strlen(basePath) == 0) {
basePath =
config::globalConfig.saveInputOnExpressionSystemFailurePath.c_str();
config::globalConfig().saveInputOnExpressionSystemFailurePath.c_str();
}
if (strlen(basePath) == 0) {
return;
Expand Down Expand Up @@ -1913,7 +1913,7 @@ void ExprSet::eval(
context.ensureFieldLoaded(field->index(context), rows);
}

if (config::globalConfig.experimentalSaveInputOnFatalSignal) {
if (config::globalConfig().experimentalSaveInputOnFatalSignal) {
auto other = process::GetThreadDebugInfo();
process::ThreadDebugInfo debugInfo;
if (other) {
Expand Down Expand Up @@ -1977,7 +1977,7 @@ std::unique_ptr<ExprSet> makeExprSetFromFlag(
std::vector<core::TypedExprPtr>&& source,
core::ExecCtx* execCtx) {
if (execCtx->queryCtx()->queryConfig().exprEvalSimplified() ||
config::globalConfig.forceEvalSimplified) {
config::globalConfig().forceEvalSimplified) {
return std::make_unique<ExprSetSimplified>(std::move(source), execCtx);
}
return std::make_unique<ExprSet>(std::move(source), execCtx);
Expand Down
2 changes: 1 addition & 1 deletion velox/expression/tests/ExprEncodingsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ExprEncodingsTest

void SetUp() override {
// This test throws a lot of exceptions, so turn off stack trace capturing.
config::globalConfig.exceptionUserStacktraceEnabled = false;
config::globalConfig().exceptionUserStacktraceEnabled = false;

functions::prestosql::registerAllScalarFunctions();
parse::registerTypeResolver();
Expand Down
10 changes: 5 additions & 5 deletions velox/expression/tests/ExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2408,8 +2408,8 @@ TEST_P(ParameterizedExprTest, exceptionContext) {
registerFunction<TestingAlwaysThrowsFunction, int32_t, int32_t>(
{"always_throws"});

config::globalConfig.saveInputOnExpressionAnyFailurePath = "";
config::globalConfig.saveInputOnExpressionSystemFailurePath = "";
config::globalConfig().saveInputOnExpressionAnyFailurePath = "";
config::globalConfig().saveInputOnExpressionSystemFailurePath = "";

try {
evaluate("always_throws(c0) + c1", data);
Expand Down Expand Up @@ -2443,7 +2443,7 @@ TEST_P(ParameterizedExprTest, exceptionContext) {

// Enable saving vector and expression SQL for system errors only.
auto tempDirectory = exec::test::TempDirectoryPath::create();
config::globalConfig.saveInputOnExpressionSystemFailurePath =
config::globalConfig().saveInputOnExpressionSystemFailurePath =
tempDirectory->getPath();

try {
Expand Down Expand Up @@ -2479,9 +2479,9 @@ TEST_P(ParameterizedExprTest, exceptionContext) {
}

// Enable saving vector and expression SQL for all errors.
config::globalConfig.saveInputOnExpressionAnyFailurePath =
config::globalConfig().saveInputOnExpressionAnyFailurePath =
tempDirectory->getPath();
config::globalConfig.saveInputOnExpressionSystemFailurePath = "";
config::globalConfig().saveInputOnExpressionSystemFailurePath = "";

try {
evaluate("always_throws(c0) + c1", data);
Expand Down
32 changes: 16 additions & 16 deletions velox/flag_definitions/flags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,34 +135,34 @@ DEFINE_int32(

namespace facebook::velox {
void translateFlagsToGlobalConfig() {
config::globalConfig.memoryNumSharedLeafPools =
config::globalConfig().memoryNumSharedLeafPools =
FLAGS_velox_memory_num_shared_leaf_pools;
config::globalConfig.memoryLeakCheckEnabled =
config::globalConfig().memoryLeakCheckEnabled =
FLAGS_velox_memory_leak_check_enabled;
config::globalConfig.memoryPoolDebugEnabled =
config::globalConfig().memoryPoolDebugEnabled =
FLAGS_velox_memory_pool_debug_enabled;
config::globalConfig.enableMemoryUsageTrackInDefaultMemoryPool =
config::globalConfig().enableMemoryUsageTrackInDefaultMemoryPool =
FLAGS_velox_enable_memory_usage_track_in_default_memory_pool;
config::globalConfig.timeAllocations = FLAGS_velox_time_allocations;
config::globalConfig.memoryUseHugepages = FLAGS_velox_memory_use_hugepages;
config::globalConfig.suppressMemoryCapacityExceedingErrorMessage =
config::globalConfig().timeAllocations = FLAGS_velox_time_allocations;
config::globalConfig().memoryUseHugepages = FLAGS_velox_memory_use_hugepages;
config::globalConfig().suppressMemoryCapacityExceedingErrorMessage =
FLAGS_velox_suppress_memory_capacity_exceeding_error_message;
config::globalConfig.memoryPoolCapacityTransferAcrossTasks =
config::globalConfig().memoryPoolCapacityTransferAcrossTasks =
FLAGS_velox_memory_pool_capacity_transfer_across_tasks;
config::globalConfig.exceptionSystemStacktraceEnabled =
config::globalConfig().exceptionSystemStacktraceEnabled =
FLAGS_velox_exception_system_stacktrace_enabled;
config::globalConfig.exceptionSystemStacktraceRateLimitMs =
config::globalConfig().exceptionSystemStacktraceRateLimitMs =
FLAGS_velox_exception_system_stacktrace_rate_limit_ms;
config::globalConfig.exceptionUserStacktraceEnabled =
config::globalConfig().exceptionUserStacktraceEnabled =
FLAGS_velox_exception_user_stacktrace_enabled;
config::globalConfig.exceptionUserStacktraceRateLimitMs =
config::globalConfig().exceptionUserStacktraceRateLimitMs =
FLAGS_velox_exception_user_stacktrace_rate_limit_ms;
config::globalConfig.forceEvalSimplified = FLAGS_force_eval_simplified;
config::globalConfig.experimentalSaveInputOnFatalSignal =
config::globalConfig().forceEvalSimplified = FLAGS_force_eval_simplified;
config::globalConfig().experimentalSaveInputOnFatalSignal =
FLAGS_velox_experimental_save_input_on_fatal_signal;
config::globalConfig.saveInputOnExpressionAnyFailurePath =
config::globalConfig().saveInputOnExpressionAnyFailurePath =
FLAGS_velox_save_input_on_expression_any_failure_path;
config::globalConfig.saveInputOnExpressionSystemFailurePath =
config::globalConfig().saveInputOnExpressionSystemFailurePath =
FLAGS_velox_save_input_on_expression_system_failure_path;
}
} // namespace facebook::velox

0 comments on commit 836c36e

Please sign in to comment.