From 106fd585635acb93a93429c85e9d75d85a2de69c Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Fri, 7 Feb 2025 06:34:27 -0500 Subject: [PATCH] multiprocess: Lock CapnpProtocol::m_loop with mutex This change is intended to fix occasional ipc_tests unit tests and rpc_misc.py functional tests errors that happen on mac os, which sometimes fail with error "terminating due to uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument" as reported https://github.com/chaincodelabs/libmultiprocess/issues/154 This error could happen when `m_loop.reset();` in `CapnpProtocol::startLoop` executes before the `m_loop->removeClient(lock);` call in the `~CapnpProtocol` destructor returns, causing an failure to reacquire the `EventLoop::m_mutex` inside the `removeClient` method. Fix this error by adding a new mutex to guard access to the `CapnpProtocol::m_loop` variable and making sure `m_loop.reset();` cannot be called until after `m_loop->removeClient(lock);` returns. --- src/ipc/capnp/protocol.cpp | 48 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/ipc/capnp/protocol.cpp b/src/ipc/capnp/protocol.cpp index 691bdf5f9242b..560285639d4ae 100644 --- a/src/ipc/capnp/protocol.cpp +++ b/src/ipc/capnp/protocol.cpp @@ -41,61 +41,65 @@ class CapnpProtocol : public Protocol public: ~CapnpProtocol() noexcept(true) { - if (m_loop) { - std::unique_lock lock(m_loop->m_mutex); - m_loop->removeClient(lock); + { + LOCK(m_loop_mutex); + if (m_loop) { + std::unique_lock lock(m_loop->m_mutex); + m_loop->removeClient(lock); + } } if (m_loop_thread.joinable()) m_loop_thread.join(); - assert(!m_loop); + assert(WITH_LOCK(m_loop_mutex, return !m_loop)); }; - std::unique_ptr connect(int fd, const char* exe_name) override + std::unique_ptr connect(int fd, const char* exe_name) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex) { startLoop(exe_name); - return mp::ConnectStream(*m_loop, fd); + return mp::ConnectStream(WITH_LOCK(m_loop_mutex, return *m_loop), fd); } - void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override + void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex) { startLoop(exe_name); if (::listen(listen_fd, /*backlog=*/5) != 0) { throw std::system_error(errno, std::system_category()); } - mp::ListenConnections(*m_loop, listen_fd, init); + mp::ListenConnections(WITH_LOCK(m_loop_mutex, return *m_loop), listen_fd, init); } - void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function& ready_fn = {}) override + void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function& ready_fn = {}) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex) { - assert(!m_loop); + assert(WITH_LOCK(m_loop_mutex, return !m_loop)); mp::g_thread_context.thread_name = mp::ThreadName(exe_name); - m_loop.emplace(exe_name, &IpcLogFn, &m_context); + auto& loop{WITH_LOCK(m_loop_mutex, m_loop.emplace(exe_name, &IpcLogFn, &m_context); return *m_loop)}; if (ready_fn) ready_fn(); - mp::ServeStream(*m_loop, fd, init); - m_loop->loop(); - m_loop.reset(); + mp::ServeStream(loop, fd, init); + loop.loop(); + WITH_LOCK(m_loop_mutex, m_loop.reset()); } void addCleanup(std::type_index type, void* iface, std::function cleanup) override { mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup)); } Context& context() override { return m_context; } - void startLoop(const char* exe_name) + void startLoop(const char* exe_name) EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex) { - if (m_loop) return; + if (WITH_LOCK(m_loop_mutex, return m_loop.has_value())) return; std::promise promise; m_loop_thread = std::thread([&] { util::ThreadRename("capnp-loop"); - m_loop.emplace(exe_name, &IpcLogFn, &m_context); + auto& loop{WITH_LOCK(m_loop_mutex, m_loop.emplace(exe_name, &IpcLogFn, &m_context); return *m_loop)}; { - std::unique_lock lock(m_loop->m_mutex); - m_loop->addClient(lock); + std::unique_lock lock(loop.m_mutex); + loop.addClient(lock); } promise.set_value(); - m_loop->loop(); - m_loop.reset(); + loop.loop(); + WITH_LOCK(m_loop_mutex, m_loop.reset()); }); promise.get_future().wait(); } Context m_context; std::thread m_loop_thread; - std::optional m_loop; + Mutex m_loop_mutex; + std::optional m_loop GUARDED_BY(m_loop_mutex); }; } // namespace