Skip to content

Commit

Permalink
multiprocess: Lock CapnpProtocol::m_loop with mutex
Browse files Browse the repository at this point in the history
This change is intended to fix occasional ipc_tests unit tests and rpc_misc.py
functional tests errors that happen on mac os, which sometimes fail with error
"terminating due to uncaught exception of type std::__1::system_error: mutex
lock failed: Invalid argument" as reported
chaincodelabs/libmultiprocess#154

This error could happen when `m_loop.reset();` in `CapnpProtocol::startLoop`
executes before the `m_loop->removeClient(lock);` call in the `~CapnpProtocol`
destructor returns, causing an failure to reacquire the `EventLoop::m_mutex`
inside the `removeClient` method.

Fix this error by adding a new mutex to guard access to the
`CapnpProtocol::m_loop` variable and making sure `m_loop.reset();` cannot be
called until after `m_loop->removeClient(lock);` returns.
  • Loading branch information
ryanofsky authored and Sjors committed Feb 7, 2025
1 parent cf56731 commit 106fd58
Showing 1 changed file with 26 additions and 22 deletions.
48 changes: 26 additions & 22 deletions src/ipc/capnp/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,61 +41,65 @@ class CapnpProtocol : public Protocol
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
{
LOCK(m_loop_mutex);
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
}
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
assert(WITH_LOCK(m_loop_mutex, return !m_loop));
};
std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) override
std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex)
{
startLoop(exe_name);
return mp::ConnectStream<messages::Init>(*m_loop, fd);
return mp::ConnectStream<messages::Init>(WITH_LOCK(m_loop_mutex, return *m_loop), fd);
}
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex)
{
startLoop(exe_name);
if (::listen(listen_fd, /*backlog=*/5) != 0) {
throw std::system_error(errno, std::system_category());
}
mp::ListenConnections<messages::Init>(*m_loop, listen_fd, init);
mp::ListenConnections<messages::Init>(WITH_LOCK(m_loop_mutex, return *m_loop), listen_fd, init);
}
void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) override
void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) override EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex)
{
assert(!m_loop);
assert(WITH_LOCK(m_loop_mutex, return !m_loop));
mp::g_thread_context.thread_name = mp::ThreadName(exe_name);
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
auto& loop{WITH_LOCK(m_loop_mutex, m_loop.emplace(exe_name, &IpcLogFn, &m_context); return *m_loop)};
if (ready_fn) ready_fn();
mp::ServeStream<messages::Init>(*m_loop, fd, init);
m_loop->loop();
m_loop.reset();
mp::ServeStream<messages::Init>(loop, fd, init);
loop.loop();
WITH_LOCK(m_loop_mutex, m_loop.reset());
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
}
Context& context() override { return m_context; }
void startLoop(const char* exe_name)
void startLoop(const char* exe_name) EXCLUSIVE_LOCKS_REQUIRED(!m_loop_mutex)
{
if (m_loop) return;
if (WITH_LOCK(m_loop_mutex, return m_loop.has_value())) return;
std::promise<void> promise;
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
auto& loop{WITH_LOCK(m_loop_mutex, m_loop.emplace(exe_name, &IpcLogFn, &m_context); return *m_loop)};
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
std::unique_lock<std::mutex> lock(loop.m_mutex);
loop.addClient(lock);
}
promise.set_value();
m_loop->loop();
m_loop.reset();
loop.loop();
WITH_LOCK(m_loop_mutex, m_loop.reset());
});
promise.get_future().wait();
}
Context m_context;
std::thread m_loop_thread;
std::optional<mp::EventLoop> m_loop;
Mutex m_loop_mutex;
std::optional<mp::EventLoop> m_loop GUARDED_BY(m_loop_mutex);
};
} // namespace

Expand Down

0 comments on commit 106fd58

Please sign in to comment.