Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for flaky LruClockTest. #146

Merged
merged 5 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/llfs/lru_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ void LRUClock::add_local_counter(LocalCounter& counter) noexcept
{
std::unique_lock<std::mutex> lock{this->mutex_};

// Initialize the local counter to the max observed global value.
//
counter.value.store(this->observed_count_);

this->counter_list_.push_back(counter);
}

Expand All @@ -155,6 +159,10 @@ void LRUClock::remove_local_counter(LocalCounter& counter) noexcept
{
std::unique_lock<std::mutex> lock{this->mutex_};

// Update the global max observed count (last reading from this local counter).
//
this->observed_count_ = std::max(this->observed_count_, counter.value.load());

this->counter_list_.erase(this->counter_list_.iterator_to(counter));
}

Expand Down
5 changes: 4 additions & 1 deletion src/llfs/lru_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class LRUClock

~LocalCounter() noexcept;

/** \brief The next unused counter value for the current thread.
*/
std::atomic<i64> value{0};
};

Expand Down Expand Up @@ -131,7 +133,8 @@ class LRUClock
*/
void remove_local_counter(LocalCounter& counter) noexcept;

/** \brief Returns the maximum count value from the last time sync_local_counters() was called.
/** \brief Returns the maximum count value (least upper bound; i.e., the first unused value) from
* the last time sync_local_counters() was called.
*/
i64 read_observed_count() noexcept;

Expand Down
109 changes: 78 additions & 31 deletions src/llfs/lru_clock.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,27 @@ using namespace llfs::int_types;
TEST(LruClockTest, PerThreadUpdate)
{
const usize kNumThreads = std::thread::hardware_concurrency();
const usize kUpdatesPerThread = 1000;
const usize kUpdatesPerThread = 25 * 1000;

std::vector<std::vector<i64>> per_thread_values(kNumThreads, std::vector<i64>(kUpdatesPerThread));

std::atomic<bool> start{false};
std::vector<std::thread> threads;

for (usize i = 0; i < kNumThreads; ++i) {
threads.emplace_back([i, &per_thread_values] {
threads.emplace_back([i, &start, &per_thread_values] {
while (!start.load()) {
continue;
}
for (usize j = 0; j < kUpdatesPerThread; ++j) {
const i64 value = llfs::LRUClock::advance_local();
per_thread_values[i][j] = value;
}
});
}

start.store(true);

for (std::thread& t : threads) {
t.join();
}
Expand All @@ -72,14 +78,16 @@ TEST(LruClockTest, PerThreadUpdate)
}
}

usize repeated_values = 0;
for (const auto& [value, count] : count_per_value) {
if (count > 1) {
++repeated_values;
if (kNumThreads > 1) {
usize repeated_values = 0;
for (const auto& [value, count] : count_per_value) {
if (count > 1) {
++repeated_values;
}
}
}

EXPECT_GT(repeated_values, 0u);
EXPECT_GT(repeated_values, 0u);
}
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -88,17 +96,40 @@ TEST(LruClockTest, PerThreadUpdate)
void run_sync_update_test(const usize kNumFastThreads)
{
const usize kUpdatesPerThread = 50 * 1000 * 1000;
const usize kSlowThreadReads = 20;

std::vector<i64> slow_thread_values(kSlowThreadReads);
std::thread slow_thread{[&slow_thread_values] {
for (usize i = 0; i < kSlowThreadReads; ++i) {
std::this_thread::sleep_for(
std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec * 40));
slow_thread_values[i] = llfs::LRUClock::read_local();

// The maximum number of consecutive attempts on the slow thread to read an increasing value.
//
const usize kSyncDelayToleranceFactor = 100;

// The "slot thread" sleeps for the maximum sync delay times the tolerance factor, then takes a
// reading from its local counter. All observed local counts are recorded for verification below.
//
std::atomic<bool> stop_slow_thread{false};
std::vector<i64> slow_thread_values;
std::thread slow_thread{[&slow_thread_values, &stop_slow_thread] {
while (!stop_slow_thread.load()) {
const i64 prev_value = slow_thread_values.empty() ? -1 : slow_thread_values.back();
slow_thread_values.emplace_back();

for (usize j = 0; j < kSyncDelayToleranceFactor; ++j) {
std::this_thread::sleep_for(std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec));

// NOTE: we are only calling `read_local()` here, not `advance_local()`. That means unless
// the other (fast) threads are updating the counter via periodic global synchronization,
// this value will not change!
//
slow_thread_values.back() = llfs::LRUClock::read_local();
if (slow_thread_values.back() > prev_value || stop_slow_thread.load()) {
break;
}
}
}
}};

// The "fast threads" just advance their local counters as fast as possible. For each fast
// thread, we keep track of the maximum observed count value, which should eventually skip ahead
// due to global sync operations.
//
std::vector<std::thread> fast_threads;
std::vector<batt::CpuCacheLineIsolated<i64>> max_fast_thread_value(kNumFastThreads);

Expand All @@ -113,38 +144,54 @@ void run_sync_update_test(const usize kNumFastThreads)
});
}

// Wait for all threads to finish.
//
for (std::thread& t : fast_threads) {
t.join();
}

stop_slow_thread.store(true);
slow_thread.join();

// Calculate the maximum count value observed from any of the fast threads. Since all threads are
// known to have finished, this value will not change.
//
i64 max_count = 0;
for (batt::CpuCacheLineIsolated<i64>& count : max_fast_thread_value) {
max_count = std::max(max_count, *count);
}

const i64 max_synced_count = llfs::LRUClock::read_global();
EXPECT_EQ(max_synced_count, max_count + 1);

// Verify the required properties of the slow thread's count observations...
//
{
i64 prev_value = -1;

for (usize i = 0; i < slow_thread_values.size(); ++i) {
// Once we observe the maximum count, all future observations should be the same (since the
// slow thread does not do any updating on its own).
//
if (slow_thread_values[i] >= max_synced_count) {
for (; i < slow_thread_values.size(); ++i) {
EXPECT_EQ(slow_thread_values[i], max_synced_count);
}
break;
}

EXPECT_LE(max_synced_count, max_count);
// All other values should be strictly increasing.
//
EXPECT_GT(slow_thread_values[i], prev_value)
<< BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i])
<< BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count)
<< BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values);

for (usize i = 1; i < kSlowThreadReads; ++i) {
if (slow_thread_values[i] >= max_synced_count) {
for (; i < kSlowThreadReads; ++i) {
EXPECT_EQ(slow_thread_values[i], max_synced_count);
}
break;
}
if (slow_thread_values[i] == 0 && slow_thread_values[i - 1] == 0) {
continue;
prev_value = slow_thread_values[i];
}
EXPECT_GT(slow_thread_values[i] - slow_thread_values[i - 1], 50)
<< BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i])
<< BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count)
<< BATT_INSPECT(max_count) << BATT_INSPECT_RANGE(slow_thread_values);
}

EXPECT_GT(slow_thread_values.back(), kUpdatesPerThread / 10);
EXPECT_GT(slow_thread_values.back(), (kUpdatesPerThread * 95) / 100);
}

TEST(LruClockTest, SyncUpdate1)
Expand Down