Skip to content

Commit

Permalink
Fix for IoRing shutdown hang bug. (#144)
Browse files Browse the repository at this point in the history
* Fix for IoRing shutdown hang bug.

* Get rid of some duplicate code.

* Fix Conan cache location (to be version-specific)

* script
  • Loading branch information
tonyastolfi authored Mar 15, 2024
1 parent b0743aa commit 4a642e7
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ default:
- "test -d /local/gitlab-runner-local-cache || { echo \"FATAL: no local docker cache volume mapped\"; false; }"
- export XDG_CACHE_HOME=/local/gitlab-runner-local-cache/.cache
- export CONAN_USER_HOME=/local/gitlab-runner-local-cache
- export CONAN_HOME=/local/gitlab-runner-local-cache/.conan2
- export CONAN_HOME=/local/gitlab-runner-local-cache/.conan_2.0.16
- /setup-conan.sh
- echo "CI_DEPLOY_USER=${CI_DEPLOY_USER}"
- echo "CI_DEPLOY_PASSWORD=${CI_DEPLOY_PASSWORD}"
Expand Down
2 changes: 1 addition & 1 deletion script
67 changes: 35 additions & 32 deletions src/llfs/ioring.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,48 +228,51 @@ TEST(IoRingTest, EnqueueManyHandlers)
{
constexpr usize kNumThreads = 3;
constexpr usize kNumHandlers = 50;
constexpr usize kNumIterations = 5000;

StatusOr<IoRing> io = IoRing::make_new(llfs::MaxQueueDepth{64});
ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status());
for (usize n = 0; n < kNumIterations; ++n) {
StatusOr<IoRing> io = IoRing::make_new(llfs::MaxQueueDepth{64});
ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status());

io->on_work_started();
io->on_work_started();

std::array<llfs::Status, kNumThreads> status;
status.fill(batt::StatusCode::kUnknown);
std::array<llfs::Status, kNumThreads> status;
status.fill(batt::StatusCode::kUnknown);

std::atomic<bool> begin{false};
std::atomic<i32> counter{0};
std::atomic<bool> begin{false};
std::atomic<i32> counter{0};

std::vector<std::thread> helper_threads;
for (usize i = 0; i < kNumThreads; ++i) {
helper_threads.emplace_back([&io, &begin, &status, i] {
while (!begin) {
std::this_thread::yield();
}
status[i] = io->run();
});
}
std::vector<std::thread> helper_threads;
for (usize i = 0; i < kNumThreads; ++i) {
helper_threads.emplace_back([&io, &begin, &status, i] {
while (!begin) {
std::this_thread::yield();
}
status[i] = io->run();
});
}

for (usize i = 0; i < kNumHandlers; ++i) {
io->post([&counter](llfs::StatusOr<i32>) {
counter++;
});
}
for (usize i = 0; i < kNumHandlers; ++i) {
io->post([&counter](llfs::StatusOr<i32>) {
counter++;
});
}

begin = true;
begin = true;

io->on_work_finished();
io->on_work_finished();

{
usize i = 0;
for (std::thread& t : helper_threads) {
t.join();
EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i);
++i;
};
}
{
usize i = 0;
for (std::thread& t : helper_threads) {
t.join();
EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i);
++i;
};
}

EXPECT_EQ(counter, kNumHandlers);
EXPECT_EQ(counter, kNumHandlers);
}
}

#ifdef BATT_PLATFORM_IS_LINUX
Expand Down
120 changes: 81 additions & 39 deletions src/llfs/ioring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,8 @@ void IoRingImpl::on_work_finished() noexcept

const isize prior_count = this->work_count_.fetch_sub(1);
if (prior_count == 1) {
this->state_change_.notify_all();
this->wake_all();
}

// Submit a no-op to wake the run loop.
//
this->submit(
no_buffers(),

/*handler=*/
[this](StatusOr<i32>) {
},

/*start_op=*/
[](struct io_uring_sqe* sqe, auto&& /*op_handler*/) {
io_uring_prep_nop(sqe);
});
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down Expand Up @@ -217,6 +203,12 @@ Status IoRingImpl::run() noexcept
// eventfd_read, UNLESS handler is nullptr. So in that case, wake up a waiting thread.
//
if (handler == nullptr) {
{
// See comment in IoRingImpl::wake_all() for an explanation of why this seemingly
// unproductive statement is necessary.
//
std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
}
this->state_change_.notify_one();
}
});
Expand Down Expand Up @@ -276,10 +268,12 @@ Status IoRingImpl::wait_for_ring_event()
return batt::status_from_retval(retval);
}

// If we are stopping, then write to the eventfd to wake up any other threads which might be
// blocked inside eventfd_read.
// If we are stopping, either because this->stop() was called or because the work count has gone
// to zero, then write to the eventfd to wake up any other threads which might be blocked inside
// eventfd_read. (This propagation of the event essentially turns the call to eventfd_write in
// IoRingImpl::wake_all() into a broadcast-to-all-threads.)
//
if (this->needs_reset_) {
if (!this->can_run()) {
eventfd_write(this->event_fd_, v);
}

Expand All @@ -290,14 +284,20 @@ Status IoRingImpl::wait_for_ring_event()
//
auto IoRingImpl::wait_for_completions() -> StatusOr<CompletionHandler*>
{
const auto is_unblocked = [this] { // We aren't blocked if any of:
return !this->can_run() // - the run loop has been stopped
|| !this->completions_.empty() // - there are completions to execute
|| !this->event_wait_.load() // - no other thread is waiting for events
;
};

//----- --- -- - - - -

std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};

this->state_change_.wait(queue_lock, [this] {
return !this->can_run() // block while we `can_run`...
|| !this->completions_.empty() // and there are no completions...
|| !this->event_wait_.load() // and someone else is in event_wait.
;
});
while (!is_unblocked()) {
this->state_change_.wait(queue_lock);
}

return {this->pop_completion_with_lock(queue_lock)};
}
Expand Down Expand Up @@ -438,17 +438,7 @@ StatusOr<usize> IoRingImpl::transfer_completions(CompletionHandler** handler_out
void IoRingImpl::stop() noexcept
{
this->needs_reset_.store(true);
this->state_change_.notify_all();

// Submit a no-op to wake the run loop.
//
this->submit(
no_buffers(),
[](StatusOr<i32>) {
},
[](struct io_uring_sqe* sqe, auto&&) {
io_uring_prep_nop(sqe);
});
this->wake_all();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -464,10 +454,7 @@ void IoRingImpl::invoke_handler(CompletionHandler** handler) noexcept
{
auto on_scope_exit = batt::finally([handler, this] {
*handler = nullptr;
const isize prior_count = this->work_count_.fetch_sub(1);
if (prior_count == 1) {
this->state_change_.notify_all();
}
this->on_work_finished();
});

BATT_CHECK((*handler)->result);
Expand Down Expand Up @@ -671,6 +658,61 @@ Status IoRingImpl::unregister_files_with_lock(const std::unique_lock<std::mutex>
return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void IoRingImpl::wake_all() noexcept
{
//+++++++++++-+-+--+----- --- -- - - - -
// We must create a barrier so that the condition variable wait in wait_for_completions() will
// always see either the changes on the current thread made prior to calling wake_all(), _or_ the
// call to notify_all().
//
// wait_for_completions() is called by all threads who lose the race to call eventfd_read to wait
// directly on the io_uring event queue. Its primary mechanism of operation is a condition
// variable wait:
//
// std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
//
// while (!is_unblocked()) {
// this->state_change_.wait(queue_lock);
// }
//
// `is_unblocked()` may become true if, for example, the work counter goes from 1 to 0. In this
// case, without the queue_lock barrier, the following interleaving of events would be possible:
//
// 1. [Thread A] A1: Lock queue_mutex_
// 2. [Thread A] A2: Read work counter, observe non-zero (implies may be blocked)
// 3. [Thread B] B1: Decrement work counter, 1 -> 0
// 4. [Thread B] B2: Notify state_change_ condition variable
// 5. [Thread A] A3: Condition wait on state_change_ (atomic unlock-mutex-and-wait-for-notify)
//
// Since the call to notify_all (B2) happens strictly before the condition wait (A3), Thread A
// will wait indefinitely (BUG).
//
// With the queue_lock barrier in place (B1.5, between B1 and B2), this becomes impossible since
// A1..A3 form a critical section, which by definition must be serialized with all other critical
// sections (for this->queue_mutex_). In other words, either B1.5 happens-before A1 or B1.5
// happens-after A3:
//
// - If B1.5 happens-before A1, then B1 also happens-before A2, which means A2 can't observe the
// pre-B1 value of the work counter.
// - If B1.5 happens-after A3, then B2 also happens-after A3, which means B2 will interrupt the
// condition wait.
//
{
std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
}
//+++++++++++-+-+--+----- --- -- - - - -

// Wake any threads inside this->wait_for_completions().
//
this->state_change_.notify_all();

// Wake any thread inside this->wait_for_ring_event().
//
eventfd_write(this->event_fd_, 1);
}

} //namespace llfs

#endif // LLFS_DISABLE_IO_URING
11 changes: 11 additions & 0 deletions src/llfs/ioring_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ class IoRingImpl
*/
void invoke_handler(CompletionHandler** handler) noexcept;

/** \brief Wakes all threads inside a call to this->run().
*
* There are two blocking mechanisms which this function needs to address:
*
* 1. eventfd_read(), used to block waiting for the io_uring to signal an event
* (one thread at a time)
* 2. this->state_change_.wait(), a condition variable wait used by all threads
* _not_ waiting in eventfd_read(), to receive notification that they can proceed
*/
void wake_all() noexcept;

//+++++++++++-+-+--+----- --- -- - - - -

// Protects access to the io_uring context and associated data (registered_fds_, free_fds_,
Expand Down

0 comments on commit 4a642e7

Please sign in to comment.