From 3c6be8402a8f817ed3a484459378959ca367b83c Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 22 Apr 2024 00:14:20 -0400 Subject: [PATCH] WIP lockless example --- CMakeLists.txt | 3 +- examples/lockless_example/CMakeLists.txt | 10 ++ examples/lockless_example/main.cc | 145 ++++++++++++++++++ include/cactus_rt/experimental/lockless.h | 8 + .../lockless/spsc/realtime_readable_value.h | 47 ++++++ .../lockless/spsc/realtime_writable_value.h | 8 + 6 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 examples/lockless_example/CMakeLists.txt create mode 100644 examples/lockless_example/main.cc create mode 100644 include/cactus_rt/experimental/lockless.h create mode 100644 include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h create mode 100644 include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 795f9c3..fd58327 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -174,8 +174,9 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME}) if (ENABLE_EXAMPLES) message(STATUS "Building example programs. Turn it off via ENABLE_EXAMPLES=OFF") - add_subdirectory(examples/message_passing_example) + add_subdirectory(examples/lockless_example) add_subdirectory(examples/logging_example) + add_subdirectory(examples/message_passing_example) add_subdirectory(examples/mutex_example) add_subdirectory(examples/signal_handling_example) add_subdirectory(examples/simple_deadline_example) diff --git a/examples/lockless_example/CMakeLists.txt b/examples/lockless_example/CMakeLists.txt new file mode 100644 index 0000000..90eef58 --- /dev/null +++ b/examples/lockless_example/CMakeLists.txt @@ -0,0 +1,10 @@ +add_executable(rt_lockless_example + main.cc +) + +target_link_libraries(rt_lockless_example + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_lockless_example) diff --git a/examples/lockless_example/main.cc b/examples/lockless_example/main.cc new file mode 100644 index 0000000..78e69d7 --- /dev/null +++ b/examples/lockless_example/main.cc @@ -0,0 +1,145 @@ +#include +#include + +#include +#include + +using cactus_rt::App; +using cactus_rt::CyclicThread; +using cactus_rt::Thread; +using cactus_rt::experimental::lockless::AtomicMessage; +using cactus_rt::experimental::lockless::spsc::RealtimeReadableValue; +using namespace std::chrono_literals; + +struct Pose { + // We want default constructed values to have a flag showing it is default + // constructed. This is because the RealtimeReadableValue will default + // construct a value and it can immediately be read. We need to tell the writer + // it is invalid. It may not be necessary to do this in general. + bool valid = false; + double x = 0.0; + double y = 0.0; + double z = 0.0; + double roll = 0.0; + double pitch = 0.0; + double yaw = 0.0; + + Pose() {} + + Pose(double xx, double yy, double zz, double ro, double pi, double ya) : valid(true), + x(xx), + y(yy), + z(zz), + roll(ro), + pitch(pi), + yaw(ya) {} +}; + +bool operator==(const Pose& p1, const Pose& p2) { + return p1.x == p2.x && + p1.y == p2.y && + p1.z == p2.z && + p1.roll == p2.roll && + p1.pitch == p2.pitch && + p1.yaw == p2.yaw; +} + +bool operator!=(const Pose& p1, const Pose& p2) { + return !(p1 == p2); +} + +/** + * A struct that holds all the shared data so it can be passed to both the real-time and non-real-time threads + */ +struct Context { + AtomicMessage done = false; + RealtimeReadableValue target_pose = {}; +}; + +/** + * This is a real-time thread + */ +class RTThread : public CyclicThread { + Context& ctx_; + Pose current_target_pose_ = {}; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + RTThread(Context& ctx) : CyclicThread("RTThread", CreateThreadConfig()), ctx_(ctx) {} + + protected: + bool Loop(int64_t /*now*/) noexcept final { + if (ctx_.done.Read()) { + return true; + } + + Pose new_pose = ctx_.target_pose.Read(); + if (!new_pose.valid) { + return false; + } + + if (new_pose != current_target_pose_) { + current_target_pose_ = new_pose; + LOG_INFO( + Logger(), + "detected new pose: {} {} {} {} {} {}", + current_target_pose_.x, + current_target_pose_.y, + current_target_pose_.z, + current_target_pose_.roll, + current_target_pose_.pitch, + current_target_pose_.yaw + ); + } + + return false; + } +}; + +class NonRTThread : public Thread { + Context& ctx_; + + static cactus_rt::ThreadConfig CreateThreadConfig() { + cactus_rt::ThreadConfig thread_config; + thread_config.SetOtherScheduler(); + return thread_config; + } + + public: + NonRTThread(Context& ctx) : Thread("NonRTThread", CreateThreadConfig()), ctx_(ctx) {} + + void Run() final { + ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 6.5)); + std::this_thread::sleep_for(1s); + + // Realistically only one of these values should be visible on the real-time thread. + ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 7.5)); + ctx_.target_pose.Write(Pose(1.5, 2.5, 3.5, 4.5, 5.5, 8.5)); + std::this_thread::sleep_for(1s); + + ctx_.done.Write(true); + } +}; + +int main() { + Context ctx; + auto rt_thread = std::make_shared(ctx); + auto non_rt_thread = std::make_shared(ctx); + + App app; + app.RegisterThread(rt_thread); + app.RegisterThread(non_rt_thread); + + app.Start(); + app.Join(); + + return 0; +} diff --git a/include/cactus_rt/experimental/lockless.h b/include/cactus_rt/experimental/lockless.h new file mode 100644 index 0000000..d1f4bac --- /dev/null +++ b/include/cactus_rt/experimental/lockless.h @@ -0,0 +1,8 @@ +#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_ +#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_H_ + +#include "lockless/atomic_message.h" +#include "lockless/spsc/realtime_readable_value.h" +#include "lockless/spsc/realtime_writable_value.h" + +#endif diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h new file mode 100644 index 0000000..e1ba4ee --- /dev/null +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h @@ -0,0 +1,47 @@ +#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_ +#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_READABLE_VALUE_ + +#include +#include + +namespace cactus_rt::experimental::lockless::spsc { + +/** + * This uses the CAS exchange algorithm to allow a single (real-time) thread to + * be able to atomically read a value shared by a different (non-real-time) + * thread. + * + * The reader for this algorithm is wait-free while the writer is lock-free. The + * reader is unable to modify the value and transmit it back to the writer with + * this algorithm. + */ +template +class RealtimeReadableValue { + std::unique_ptr storage_ptr_ = std::make_unique(); + std::atomic atomic_ptr_ = storage_ptr_.get(); + + public: + T Read() { + // TODO: need to figure out the atomic memory order here! + T* data_ptr = atomic_ptr_.exchange(nullptr); + T data = *data_ptr; + atomic_ptr_.store(data_ptr); + return data; + } + + void Write(const T& new_value) { + auto new_ptr = std::make_unique(new_value); + T* expected; + + do { + expected = storage_ptr_.get(); + // TODO: sequential consistency is probably wrong here. Need to understand if acq_rel is sufficient. + } while (!atomic_ptr_.compare_exchange_weak(expected, new_ptr.get())); + + storage_ptr_ = std::move(new_ptr); + } +}; + +} // namespace cactus_rt::experimental::lockless::spsc + +#endif diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h new file mode 100644 index 0000000..7b9d37f --- /dev/null +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h @@ -0,0 +1,8 @@ +#ifndef CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_ +#define CACTUS_RT_EXPERIMENTAL_LOCKLESS_SPSC_REALTIME_WRITABLE_VALUE_ + +namespace cactus_rt::experimental::lockless::spsc { + +} + +#endif