From 6da274623ef52d52fa31674b2ea71d8a5f04a722 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Aug 2024 01:11:58 +0800 Subject: [PATCH 01/12] Make rmw_context_impl_s an RAII class Signed-off-by: Yadunund --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/rmw_context_impl_s.cpp | 451 ++++++++++++++++++ .../src/detail/rmw_context_impl_s.hpp | 197 ++++++++ rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 8 - rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 32 -- rmw_zenoh_cpp/src/rmw_event.cpp | 9 +- .../src/rmw_get_node_info_and_types.cpp | 18 +- .../src/rmw_get_service_names_and_types.cpp | 6 +- .../src/rmw_get_topic_endpoint_info.cpp | 10 +- .../src/rmw_get_topic_names_and_types.cpp | 6 +- rmw_zenoh_cpp/src/rmw_init.cpp | 267 +++-------- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 112 ++--- 12 files changed, 791 insertions(+), 326 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index 4ecfe360..c548a6a7 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/logging.cpp src/detail/message_type_support.cpp src/detail/qos.cpp + src/detail/rmw_context_impl_s.cpp src/detail/rmw_data_types.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp new file mode 100644 index 00000000..e30bf902 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -0,0 +1,451 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_context_impl_s.hpp" + +#include + +#include "guard_condition.hpp" +#include "liveliness_utils.hpp" +#include "logging_macros.hpp" +#include "zenoh_config.hpp" + +#include "rcpputils/scope_exit.hpp" +#include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" + +///============================================================================= +void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto free_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + auto data_ptr = static_cast(data); + if (data_ptr == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to lock data_wp." + ); + return; + } + + // Update the graph cache. + std::lock_guard lock(data_ptr->mutex_); + if (data_ptr->is_shutdown_) { + return; + } + switch (sample->kind) { + case z_sample_kind_t::Z_SAMPLE_KIND_PUT: + data_ptr->graph_cache_->parse_put(keystr._cstr); + break; + case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: + data_ptr->graph_cache_->parse_del(keystr._cstr); + default: + return; + } + + // Trigger the ROS graph guard condition. + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_); + if (RMW_RET_OK != rmw_ret) { + RMW_ZENOH_LOG_WARN_NAMED( + "rmw_zenoh_cpp", + "[graph_sub_data_handler] Unable to trigger graph guard condition." + ); + } +} + +///============================================================================= +rmw_context_impl_s::Data::Data( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition) +: allocator_(allocator), + enclave_(std::move(enclave)), + session_(std::move(session)), + shm_manager_(std::move(shm_manager)), + graph_guard_condition_(graph_guard_condition), + is_shutdown_(false), + next_entity_id_(0), + is_initialized_(false) +{ + z_id_t zid = z_info_zid(z_loan(session_)); + graph_cache_ = std::make_unique(std::move(zid)); + // Setup liveliness subscriptions for discovery. + liveliness_str_ = rmw_zenoh_cpp::liveliness::subscription_token( + domain_id); + + // Query router/liveliness participants to get graph information before this session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session_), z_keyexpr(liveliness_str_.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache_->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::subscribe() +{ + if (is_initialized_) { + return RMW_RET_OK; + } + // Setup the liveliness subscriber to receives updates from the ROS graph + // and update the graph cache. + auto sub_options = zc_liveliness_subscriber_options_null(); + z_owned_closure_sample_t callback = z_closure( + rmw_context_impl_s::graph_sub_data_handler, nullptr, + this->shared_from_this().get()); + graph_subscriber_ = zc_liveliness_declare_subscriber( + z_loan(session_), + z_keyexpr(liveliness_str_.c_str()), + z_move(callback), + &sub_options); + zc_liveliness_subscriber_options_drop(z_move(sub_options)); + auto undeclare_z_sub = rcpputils::make_scope_exit( + [this]() { + z_undeclare_subscriber(z_move(this->graph_subscriber_)); + }); + if (!z_check(graph_subscriber_)) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return RMW_RET_ERROR; + } + + undeclare_z_sub.cancel(); + is_initialized_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::Data::shutdown() +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return RMW_RET_OK; + } + + z_undeclare_subscriber(z_move(graph_subscriber_)); + if (shm_manager_.has_value()) { + z_drop(z_move(shm_manager_.value())); + } + // Close the zenoh session + if (z_close(z_move(session_)) < 0) { + RMW_SET_ERROR_MSG("Error while closing zenoh session"); + return RMW_RET_ERROR; + } + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +rmw_context_impl_s::Data::~Data() +{ + RMW_TRY_DESTRUCTOR( + static_cast( + graph_guard_condition_->data)->~GuardCondition(), + rmw_zenoh_cpp::GuardCondition, ); + if (rcutils_allocator_is_valid(allocator_)) { + allocator_->deallocate(graph_guard_condition_->data, allocator_->state); + allocator_->deallocate(graph_guard_condition_, allocator_->state); + graph_guard_condition_ = nullptr; + } + + auto ret = this->shutdown(); + static_cast(ret); +} + +///============================================================================= +rmw_context_impl_s::rmw_context_impl_s( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition) +{ + data_ = std::make_shared( + allocator, + std::move(domain_id), + std::move(enclave), + std::move(session), + std::move(shm_manager), + graph_guard_condition); + + // TODO(Yadunund): Consider switching to a make() pattern to avoid throwing + // errors. + auto ret = data_->subscribe(); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Unable to subscribe to ROS Graph updates."); + } +} + +///============================================================================= +std::string rmw_context_impl_s::enclave() const +{ + std::lock_guard lock(data_->mutex_); + return data_->enclave_; +} + +///============================================================================= +z_session_t rmw_context_impl_s::session() const +{ + std::lock_guard lock(data_->mutex_); + return z_loan(data_->session_); +} + +///============================================================================= +std::optional & rmw_context_impl_s::shm_manager() +{ + std::lock_guard lock(data_->mutex_); + return data_->shm_manager_; +} + +///============================================================================= +rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_guard_condition_; +} + +///============================================================================= +size_t rmw_context_impl_s::get_next_entity_id() +{ + std::lock_guard lock(data_->mutex_); + return data_->next_entity_id_++; +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::shutdown() +{ + return data_->shutdown(); +} + +///============================================================================= +bool rmw_context_impl_s::is_shutdown() const +{ + std::lock_guard lock(data_->mutex_); + return data_->is_shutdown_; +} + +///============================================================================= +bool rmw_context_impl_s::session_is_valid() const +{ + std::lock_guard lock(data_->mutex_); + return z_check(data_->session_); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_node_names( + rcutils_string_array_t * node_names, + rcutils_string_array_t * node_namespaces, + rcutils_string_array_t * enclaves, + rcutils_allocator_t * allocator) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_node_names( + node_names, + node_namespaces, + enclaves, + allocator); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_topic_names_and_types( + allocator, + no_demangle, + topic_names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->publisher_count_matched_subscriptions( + publisher, + subscription_count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->subscription_count_matched_publishers( + subscription, + publisher_count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_service_names_and_types( + allocator, + service_names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_publishers( + const char * topic_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_publishers( + topic_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_subscriptions( + const char * topic_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_subscriptions( + topic_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_services( + const char * service_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_services( + service_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::count_clients( + const char * service_name, + size_t * count) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->count_clients( + service_name, + count); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_entity_names_and_types_by_node( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * names_and_types) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_entity_names_and_types_by_node( + entity_type, + allocator, + node_name, + node_namespace, + no_demangle, + names_and_types); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::get_entities_info_by_topic( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->get_entities_info_by_topic( + entity_type, + allocator, + topic_name, + no_demangle, + endpoints_info); +} + +///============================================================================= +rmw_ret_t rmw_context_impl_s::service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) const +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->service_server_is_available( + service_name, + service_type, + is_available); +} + +///============================================================================= +void rmw_context_impl_s::set_qos_event_callback( + rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, + const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->set_qos_event_callback( + std::move(entity), + event_type, + std::move(callback)); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp new file mode 100644 index 00000000..15f3a632 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -0,0 +1,197 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_CONTEXT_IMPL_S_HPP_ +#define DETAIL__RMW_CONTEXT_IMPL_S_HPP_ + +#include + +#include +#include +#include +#include + +#include "graph_cache.hpp" +#include "liveliness_utils.hpp" + +#include "rcutils/types.h" +#include "rmw/rmw.h" + +///============================================================================= +class rmw_context_impl_s final +{ +public: + using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; + + // Constructor. + // Once constructed, the context_impl instanced will manage the lifetime + // of these arguments. + rmw_context_impl_s( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition); + + // Get a copy of the enclave. + std::string enclave() const; + + // Loan the Zenoh session. + z_session_t session() const; + + // Get a reference to the shm_manager. + // Note: This is not thread-safe. + // TODO(Yadunund): Remove this API and instead include a publish() API + // that handles the shm_manager once the context manages publishers. + std::optional & shm_manager(); + + // Get the graph guard condition. + rmw_guard_condition_t * graph_guard_condition(); + + // Get a unique id for a new entity. + size_t get_next_entity_id(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Check if the Zenoh session is shutdown. + bool is_shutdown() const; + + // Returns true if the Zenoh session is valid. + bool session_is_valid() const; + + rmw_ret_t get_node_names( + rcutils_string_array_t * node_names, + rcutils_string_array_t * node_namespaces, + rcutils_string_array_t * enclaves, + rcutils_allocator_t * allocator) const; + + rmw_ret_t get_topic_names_and_types( + rcutils_allocator_t * allocator, + bool no_demangle, + rmw_names_and_types_t * topic_names_and_types) const; + + rmw_ret_t publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count); + + rmw_ret_t subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count); + + rmw_ret_t get_service_names_and_types( + rcutils_allocator_t * allocator, + rmw_names_and_types_t * service_names_and_types) const; + + rmw_ret_t count_publishers( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_subscriptions( + const char * topic_name, + size_t * count) const; + + rmw_ret_t count_services( + const char * service_name, + size_t * count) const; + + rmw_ret_t count_clients( + const char * service_name, + size_t * count) const; + + rmw_ret_t get_entity_names_and_types_by_node( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * node_name, + const char * node_namespace, + bool no_demangle, + rmw_names_and_types_t * names_and_types) const; + + rmw_ret_t get_entities_info_by_topic( + rmw_zenoh_cpp::liveliness::EntityType entity_type, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_demangle, + rmw_topic_endpoint_info_array_t * endpoints_info) const; + + rmw_ret_t service_server_is_available( + const char * service_name, + const char * service_type, + bool * is_available) const; + + void set_qos_event_callback( + rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, + const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, + GraphCacheEventCallback callback); + +private: + // Bundle all class members into a data struct which can be passed as a + // weak ptr to various threads for thread-safe access without capturing + // "this" ptr by reference. + struct Data : public std::enable_shared_from_this + { + // Constructor. + Data( + const rcutils_allocator_t * allocator, + const std::size_t domain_id, + const std::string & enclave, + z_owned_session_t session, + std::optional shm_manager, + rmw_guard_condition_t * graph_guard_condition); + + // Subscribe to the ROS graph. + rmw_ret_t subscribe(); + + // Shutdown the Zenoh session. + rmw_ret_t shutdown(); + + // Destructor. + ~Data(); + + // Mutex to lock when accessing members. + mutable std::mutex mutex_; + // RMW allocator. + const rcutils_allocator_t * allocator_; + // Enclave, name used to find security artifacts in a sros2 keystore. + std::string enclave_; + // An owned session. + z_owned_session_t session_; + // An optional SHM manager that is initialized of SHM is enabled in the + // zenoh session config. + std::optional shm_manager_; + // Liveliness keyexpr string to subscribe to for ROS graph changes. + std::string liveliness_str_; + // ROS graph liveliness subscriber. + z_owned_subscriber_t graph_subscriber_; + // Equivalent to rmw_dds_common::Context's guard condition + /// Guard condition that should be triggered when the graph changes. + rmw_guard_condition_t * graph_guard_condition_; + /// Shutdown flag. + bool is_shutdown_; + // A counter to assign a local id for every entity created in this session. + size_t next_entity_id_; + // Graph cache. + std::unique_ptr graph_cache_; + // True once graph subscriber is initialized. + bool is_initialized_; + }; + + std::shared_ptr data_{nullptr}; + + static void graph_sub_data_handler(const z_sample_t * sample, void * data); +}; + + +#endif // DETAIL__RMW_CONTEXT_IMPL_S_HPP_ diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 51f84b33..5beb6edd 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include #include @@ -54,12 +52,6 @@ size_t hash_gid(const rmw_request_id_t & request_id) } } // namespace -///============================================================================= -size_t rmw_context_impl_s::get_next_entity_id() -{ - return next_entity_id_++; -} - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 60e4bd01..5ed480e7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -42,38 +42,6 @@ /// Structs for various type erased data fields. -///============================================================================= -class rmw_context_impl_s final -{ -public: - // Enclave, name used to find security artifacts in a sros2 keystore. - char * enclave; - - // An owned session. - z_owned_session_t session; - - // An optional SHM manager that is initialized of SHM is enabled in the - // zenoh session config. - std::optional shm_manager; - - z_owned_subscriber_t graph_subscriber; - - /// Shutdown flag. - bool is_shutdown; - - // Equivalent to rmw_dds_common::Context's guard condition - /// Guard condition that should be triggered when the graph changes. - rmw_guard_condition_t * graph_guard_condition; - - std::unique_ptr graph_cache; - - size_t get_next_entity_id(); - -private: - // A counter to assign a local id for every entity created in this session. - size_t next_entity_id_{0}; -}; - namespace rmw_zenoh_cpp { ///============================================================================= diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 964118ee..17280be7 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -20,6 +20,7 @@ #include "detail/event.hpp" #include "detail/graph_cache.hpp" #include "detail/identifier.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" @@ -41,6 +42,8 @@ rmw_publisher_event_init( static_cast(publisher->data); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(pub_data->entity, RMW_RET_INVALID_ARGUMENT); if (publisher->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -61,7 +64,7 @@ rmw_publisher_event_init( rmw_event->event_type = event_type; // Register the event with graph cache. - pub_data->context->impl->graph_cache->set_qos_event_callback( + context_impl->set_qos_event_callback( pub_data->entity, zenoh_event_type, [pub_data, @@ -95,6 +98,8 @@ rmw_subscription_event_init( static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->entity, RMW_RET_INVALID_ARGUMENT); if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { @@ -121,7 +126,7 @@ rmw_subscription_event_init( return RMW_RET_OK; } - sub_data->context->impl->graph_cache->set_qos_event_callback( + sub_data->context->impl->set_qos_event_callback( sub_data->entity, zenoh_event_type, [sub_data, diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index c42295b3..4aadc4d2 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -14,7 +14,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_subscriber_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, node_name, @@ -71,7 +73,9 @@ rmw_get_publisher_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, node_name, @@ -98,7 +102,9 @@ rmw_get_service_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Service, allocator, node_name, @@ -125,7 +131,9 @@ rmw_get_client_names_and_types_by_node( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entity_names_and_types_by_node( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Client, allocator, node_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 12415ebe..5dc7cdf3 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -35,8 +35,10 @@ rmw_get_service_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT); + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_service_names_and_types( + return context_impl->get_service_names_and_types( allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index a4c4878d..d70ce0c7 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -15,7 +15,7 @@ #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -43,7 +43,9 @@ rmw_get_publishers_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, topic_name, @@ -69,7 +71,9 @@ rmw_get_subscriptions_info_by_topic( return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_entities_info_by_topic( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, topic_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index db7d972c..477dc7d9 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "detail/rmw_data_types.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "rcutils/allocator.h" @@ -37,7 +37,9 @@ rmw_get_topic_names_and_types( RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(topic_names_and_types, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_topic_names_and_types( + rmw_context_impl_t * context_impl = static_cast(node->context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); + return context_impl->get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 99c6f3ad..2556c718 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -21,6 +21,7 @@ #include "detail/guard_condition.hpp" #include "detail/identifier.hpp" #include "detail/liveliness_utils.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" #include "detail/zenoh_router_check.hpp" @@ -42,51 +43,6 @@ extern "C" // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 -namespace -{ -void -graph_sub_data_handler(const z_sample_t * sample, void * data) -{ - static_cast(data); - - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto free_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); - - // Get the context impl from data. - rmw_context_impl_s * context_impl = static_cast( - data); - if (context_impl == nullptr) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to convert data into context_impl" - ); - return; - } - - switch (sample->kind) { - case z_sample_kind_t::Z_SAMPLE_KIND_PUT: - context_impl->graph_cache->parse_put(keystr._cstr); - break; - case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: - context_impl->graph_cache->parse_del(keystr._cstr); - break; - default: - return; - } - - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(context_impl->graph_guard_condition); - if (RMW_RET_OK != rmw_ret) { - RMW_ZENOH_LOG_WARN_NAMED( - "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to trigger graph guard condition" - ); - } -} -} // namespace - //============================================================================== /// Initialize the middleware with the given options, and yielding an context. rmw_ret_t @@ -123,24 +79,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) const rcutils_allocator_t * allocator = &options->allocator; - context->impl = static_cast( - allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl, - "failed to allocate context impl", - return RMW_RET_BAD_ALLOC); - auto free_impl = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW(context->impl, context->impl, return RMW_RET_BAD_ALLOC, rmw_context_impl_t); - auto impl_destructor = rcpputils::make_scope_exit( - [context] { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - context->impl->~rmw_context_impl_t(), rmw_context_impl_t *); - }); - rmw_ret_t ret; if ((ret = rmw_init_options_copy(options, &context->options)) != RMW_RET_OK) { // error already set @@ -154,20 +92,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } }); - // Set the enclave. - context->impl->enclave = rcutils_strdup(options->enclave, *allocator); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "failed to allocate enclave", - return RMW_RET_BAD_ALLOC); - auto free_enclave = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->enclave, allocator->state); - }); - - // Initialize context's implementation - context->impl->is_shutdown = false; - // If not already defined, set the logging environment variable for Zenoh sessions // to warning level by default. // TODO(Yadunund): Switch to rcutils_get_env once it supports not overwriting values. @@ -189,26 +113,23 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) // Check if shm is enabled. z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto free_shm_enabled = rcpputils::make_scope_exit( + auto always_free_shm_enabled = rcpputils::make_scope_exit( [&shm_enabled]() { z_drop(z_move(shm_enabled)); }); // Initialize the zenoh session. - context->impl->session = z_open(z_move(config)); - if (!z_session_check(&context->impl->session)) { + z_owned_session_t session = z_open(z_move(config)); + if (!z_session_check(&session)) { RMW_SET_ERROR_MSG("Error setting up zenoh session"); return RMW_RET_ERROR; } auto close_session = rcpputils::make_scope_exit( - [context]() { - z_close(z_move(context->impl->session)); + [&session]() { + z_close(z_move(session)); }); - /// Initialize the graph cache. - z_id_t zid = z_info_zid(z_loan(context->impl->session)); - context->impl->graph_cache = std::make_unique(zid); - + // TODO(Yadunund) Move this check into a separate thread. // Verify if the zenoh router is running if configured. const std::optional configured_connection_attempts = rmw_zenoh_cpp::zenoh_router_check_attempts(); @@ -217,7 +138,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) uint64_t connection_attempts = 0; // Retry until the connection is successful. while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(context->impl->session))) != RMW_RET_OK) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { ++connection_attempts; } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -231,6 +152,8 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } // Initialize the shm manager if shared_memory is enabled in the config. + std::optional shm_manager = std::nullopt; + const z_id_t zid = z_info_zid(z_loan(session)); if (shm_enabled._cstr != nullptr && strcmp(shm_enabled._cstr, "true") == 0) { @@ -242,148 +165,94 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) idstr[sizeof(zid.id) * 2] = '\0'; // TODO(yadunund): Can we get the size of the shm from the config even though it's not // a standard parameter? - context->impl->shm_manager = + shm_manager = zc_shm_manager_new( - z_loan(context->impl->session), + z_loan(session), idstr, SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!context->impl->shm_manager.has_value() || - !zc_shm_manager_check(&context->impl->shm_manager.value())) + if (!shm_manager.has_value() || + !zc_shm_manager_check(&shm_manager.value())) { RMW_SET_ERROR_MSG("Unable to create shm manager."); return RMW_RET_ERROR; } } auto free_shm_manager = rcpputils::make_scope_exit( - [context]() { - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); + [&shm_manager]() { + if (shm_manager.has_value()) { + z_drop(z_move(shm_manager.value())); } }); // Initialize the guard condition. - context->impl->graph_guard_condition = + rmw_guard_condition_t * graph_guard_condition = static_cast(allocator->zero_allocate( 1, sizeof(rmw_guard_condition_t), allocator->state)); RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition, + graph_guard_condition, "failed to allocate graph guard condition", return RMW_RET_BAD_ALLOC); auto free_guard_condition = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition, allocator->state); }); - - context->impl->graph_guard_condition->implementation_identifier = + graph_guard_condition->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - - context->impl->graph_guard_condition->data = + graph_guard_condition->data = allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->graph_guard_condition->data, + graph_guard_condition->data, "failed to allocate graph guard condition data", return RMW_RET_BAD_ALLOC); auto free_guard_condition_data = rcpputils::make_scope_exit( - [context, allocator]() { - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition->data, allocator->state); }); - RMW_TRY_PLACEMENT_NEW( - context->impl->graph_guard_condition->data, - context->impl->graph_guard_condition->data, + graph_guard_condition->data, + graph_guard_condition->data, return RMW_RET_BAD_ALLOC, rmw_zenoh_cpp::GuardCondition); auto destruct_guard_condition_data = rcpputils::make_scope_exit( - [context]() { + [graph_guard_condition]() { auto gc_data = - static_cast(context->impl->graph_guard_condition->data); + static_cast(graph_guard_condition->data); RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( gc_data->~GuardCondition(), rmw_zenoh_cpp::GuardCondition); }); - // Setup liveliness subscriptions for discovery. - const std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( - context->actual_domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) - { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - context->impl->graph_cache->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - printf("[discovery] Received an error\n"); - } - } - z_drop(z_move(reply)); - z_drop(z_move(channel)); - - // TODO(Yadunund): Switch this to a liveliness subscriptions once the API is available. - - // Uncomment and rely on #if #endif blocks to enable this feature when building with - // zenoh-pico since liveliness is only available in zenoh-c. - // auto sub_options = z_subscriber_options_default(); - // sub_options.reliability = Z_RELIABILITY_RELIABLE; - // context->impl->graph_subscriber = z_declare_subscriber( - // z_loan(context->impl->session), - // z_keyexpr(liveliness_str.c_str()), - // z_move(callback), - // &sub_options); - auto sub_options = zc_liveliness_subscriber_options_null(); - z_owned_closure_sample_t callback = z_closure(graph_sub_data_handler, nullptr, context->impl); - context->impl->graph_subscriber = zc_liveliness_declare_subscriber( - z_loan(context->impl->session), - z_keyexpr(liveliness_str.c_str()), - z_move(callback), - &sub_options); - zc_liveliness_subscriber_options_drop(z_move(sub_options)); - auto undeclare_z_sub = rcpputils::make_scope_exit( - [context]() { - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); + + // Create the context impl. + context->impl = static_cast( + allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); + RMW_CHECK_FOR_NULL_WITH_MSG( + context->impl, + "failed to allocate context impl", + return RMW_RET_BAD_ALLOC); + auto free_impl = rcpputils::make_scope_exit( + [context, allocator]() { + allocator->deallocate(context->impl, allocator->state); }); - if (!z_check(context->impl->graph_subscriber)) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return RMW_RET_ERROR; - } - undeclare_z_sub.cancel(); + RMW_TRY_PLACEMENT_NEW( + context->impl, + context->impl, + return RMW_RET_BAD_ALLOC, + rmw_context_impl_t, + allocator, + context->actual_domain_id, + std::string(options->enclave), + std::move(session), + std::move(shm_manager), + graph_guard_condition + ); + close_session.cancel(); destruct_guard_condition_data.cancel(); - impl_destructor.cancel(); free_guard_condition_data.cancel(); free_guard_condition.cancel(); - free_enclave.cancel(); free_options.cancel(); - impl_destructor.cancel(); free_impl.cancel(); free_shm_manager.cancel(); restore_context.cancel(); @@ -407,19 +276,10 @@ rmw_shutdown(rmw_context_t * context) rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); - if (context->impl->shm_manager.has_value()) { - z_drop(z_move(context->impl->shm_manager.value())); - } - // Close the zenoh session - if (z_close(z_move(context->impl->session)) < 0) { - RMW_SET_ERROR_MSG("Error while closing zenoh session"); - return RMW_RET_ERROR; - } - - context->impl->is_shutdown = true; + rmw_context_impl_t * context_impl = static_cast(context->impl); + RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_OK; + return context_impl->shutdown(); } //============================================================================== @@ -437,24 +297,13 @@ rmw_context_fini(rmw_context_t * context) context->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - if (!context->impl->is_shutdown) { + if (!context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has not been shutdown"); return RMW_RET_INVALID_ARGUMENT; } const rcutils_allocator_t * allocator = &context->options.allocator; - RMW_TRY_DESTRUCTOR( - static_cast( - context->impl->graph_guard_condition->data)->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition, ); - allocator->deallocate(context->impl->graph_guard_condition->data, allocator->state); - - allocator->deallocate(context->impl->graph_guard_condition, allocator->state); - context->impl->graph_guard_condition = nullptr; - - allocator->deallocate(context->impl->enclave, allocator->state); - RMW_TRY_DESTRUCTOR(context->impl->~rmw_context_impl_t(), rmw_context_impl_t *, ); allocator->deallocate(context->impl, allocator->state); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 3de285a5..4678ff08 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -36,6 +36,7 @@ #include "detail/logging_macros.hpp" #include "detail/message_type_support.hpp" #include "detail/qos.hpp" +#include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/serialization_format.hpp" #include "detail/type_support_common.hpp" @@ -210,11 +211,7 @@ rmw_create_node( context->impl, "expected initialized context", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context->impl->enclave, - "expected initialized enclave", - return nullptr); - if (context->impl->is_shutdown) { + if (context->impl->is_shutdown()) { RCUTILS_SET_ERROR_MSG("context has been shutdown"); return nullptr; } @@ -297,13 +294,14 @@ rmw_create_node( // Initialize liveliness token for the node to advertise that a new node is in town. node_data->id = context->impl->get_next_entity_id(); + z_session_t session = context->impl->session(); node_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string(node_data->id), rmw_zenoh_cpp::liveliness::EntityType::Node, rmw_zenoh_cpp::liveliness::NodeInfo{context->actual_domain_id, namespace_, name, - context->impl->enclave}); + context->impl->enclave()}); if (node_data->entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", @@ -311,7 +309,7 @@ rmw_create_node( return nullptr; } node_data->token = zc_liveliness_declare_token( - z_loan(context->impl->session), + session, z_keyexpr(node_data->entity->keyexpr().c_str()), NULL ); @@ -388,7 +386,7 @@ rmw_node_get_graph_guard_condition(const rmw_node_t * node) RMW_CHECK_ARGUMENT_FOR_NULL(node->context, nullptr); RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, nullptr); - return node->context->impl->graph_guard_condition; + return node->context->impl->graph_guard_condition(); } //============================================================================== @@ -501,11 +499,7 @@ rmw_create_publisher( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -628,13 +622,14 @@ rmw_create_publisher( return nullptr; } + z_session_t session = context_impl->session(); // Create a Publication Cache if durability is transient_local. if (publisher_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); pub_cache_opts.history = publisher_data->adapted_qos_profile.depth; pub_cache_opts.queryable_complete = true; publisher_data->pub_cache = ze_declare_publication_cache( - z_loan(context_impl->session), + session, z_loan(keyexpr), &pub_cache_opts ); @@ -660,7 +655,7 @@ rmw_create_publisher( } // TODO(clalancette): What happens if the key name is a valid but empty string? publisher_data->pub = z_declare_publisher( - z_loan(context_impl->session), + session, z_loan(keyexpr), &opts ); @@ -674,13 +669,13 @@ rmw_create_publisher( }); publisher_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Publisher, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_publisher->topic_name, publisher_data->type_support->get_name(), @@ -694,7 +689,7 @@ rmw_create_publisher( return nullptr; } publisher_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(publisher_data->entity->keyexpr().c_str()), NULL ); @@ -932,15 +927,15 @@ rmw_publish( }); // Get memory from SHM buffer if available. - if (publisher_data->context->impl->shm_manager.has_value() && - zc_shm_manager_check(&publisher_data->context->impl->shm_manager.value())) + if (publisher_data->context->impl->shm_manager().has_value() && + zc_shm_manager_check(&publisher_data->context->impl->shm_manager().value())) { shmbuf = zc_shm_alloc( - &publisher_data->context->impl->shm_manager.value(), + &publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { - zc_shm_gc(&publisher_data->context->impl->shm_manager.value()); - shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager.value(), max_data_length); + zc_shm_gc(&publisher_data->context->impl->shm_manager().value()); + shmbuf = zc_shm_alloc(&publisher_data->context->impl->shm_manager().value(), max_data_length); if (!z_check(shmbuf.value())) { // TODO(Yadunund): Should we revert to regular allocation and not return an error? RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing"); @@ -1048,7 +1043,7 @@ rmw_publisher_count_matched_subscriptions( rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->publisher_count_matched_subscriptions( + return context_impl->publisher_count_matched_subscriptions( publisher, subscription_count); } @@ -1332,11 +1327,7 @@ rmw_create_subscription( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -1465,6 +1456,7 @@ rmw_create_subscription( // adapted_qos_profile. // TODO(Yadunund): Rely on a separate function to return the sub // as we start supporting more qos settings. + z_session_t session = context_impl->session(); z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); auto always_drop_keystr = rcpputils::make_scope_exit( [&owned_key_str]() { @@ -1483,7 +1475,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = ze_declare_querying_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1499,7 +1491,7 @@ rmw_create_subscription( sub_options.reliability = Z_RELIABILITY_RELIABLE; } sub_data->sub = z_declare_subscriber( - z_loan(context_impl->session), + session, z_loan(keyexpr), z_move(callback), &sub_options @@ -1526,13 +1518,13 @@ rmw_create_subscription( // Publish to the graph that a new subscription is in town sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Subscription, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_subscription->topic_name, sub_data->type_support->get_name(), @@ -1546,7 +1538,7 @@ rmw_create_subscription( return nullptr; } sub_data->token = zc_liveliness_declare_token( - z_loan(context_impl->session), + session, z_keyexpr(sub_data->entity->keyexpr().c_str()), NULL ); @@ -1654,7 +1646,7 @@ rmw_subscription_count_matched_publishers( rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache->subscription_count_matched_publishers( + return context_impl->subscription_count_matched_publishers( subscription, publisher_count); } @@ -1867,7 +1859,7 @@ rmw_take_sequence( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -1927,7 +1919,7 @@ __rmw_take_serialized( auto sub_data = static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown) { + if (sub_data->context->impl->is_shutdown()) { return RMW_RET_OK; } @@ -2082,11 +2074,7 @@ rmw_create_client( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2285,14 +2273,15 @@ rmw_create_client( return nullptr; } + z_session_t session = context_impl->session(); client_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Client, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_client->service_name, std::move(service_type), @@ -2306,7 +2295,7 @@ rmw_create_client( return nullptr; } client_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(client_data->entity->keyexpr().c_str()), NULL ); @@ -2497,7 +2486,7 @@ rmw_send_request( z_owned_closure_reply_t zn_closure_reply = z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( - z_loan(context_impl->session), + context_impl->session(), z_loan(client_data->keyexpr), "", z_move(zn_closure_reply), &opts); @@ -2686,11 +2675,7 @@ rmw_create_service( context_impl, "unable to get rmw_context_impl_s", return nullptr); - RMW_CHECK_FOR_NULL_WITH_MSG( - context_impl->enclave, - "expected initialized enclave", - return nullptr); - if (!z_check(context_impl->session)) { + if (!context_impl->session_is_valid()) { RMW_SET_ERROR_MSG("zenoh session is invalid"); return nullptr; } @@ -2865,6 +2850,7 @@ rmw_create_service( return nullptr; } + z_session_t session = context_impl->session(); z_owned_closure_query_t callback = z_closure( rmw_zenoh_cpp::service_data_handler, nullptr, service_data); @@ -2872,7 +2858,7 @@ rmw_create_service( z_queryable_options_t qable_options = z_queryable_options_default(); qable_options.complete = true; service_data->qable = z_declare_queryable( - z_loan(context_impl->session), + session, z_loan(service_data->keyexpr), z_move(callback), &qable_options); @@ -2887,13 +2873,13 @@ rmw_create_service( }); service_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(z_loan(node->context->impl->session)), + z_info_zid(session), std::to_string(node_data->id), std::to_string( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ rmw_service->service_name, std::move(service_type), @@ -2907,7 +2893,7 @@ rmw_create_service( return nullptr; } service_data->token = zc_liveliness_declare_token( - z_loan(node->context->impl->session), + session, z_keyexpr(service_data->entity->keyexpr().c_str()), NULL ); @@ -3658,7 +3644,7 @@ rmw_get_node_names( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->get_node_names( node_names, node_namespaces, nullptr, allocator); } @@ -3681,7 +3667,7 @@ rmw_get_node_names_with_enclaves( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->get_node_names( + return node->context->impl->get_node_names( node_names, node_namespaces, enclaves, allocator); } @@ -3712,7 +3698,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_publishers(topic_name, count); + return node->context->impl->count_publishers(topic_name, count); } //============================================================================== @@ -3742,7 +3728,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_subscriptions(topic_name, count); + return node->context->impl->count_subscriptions(topic_name, count); } //============================================================================== @@ -3772,7 +3758,7 @@ rmw_count_clients( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_clients(service_name, count); + return node->context->impl->count_clients(service_name, count); } //============================================================================== @@ -3802,7 +3788,7 @@ rmw_count_services( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->graph_cache->count_services(service_name, count); + return node->context->impl->count_services(service_name, count); } //============================================================================== @@ -3902,7 +3888,7 @@ rmw_service_server_is_available( return RMW_RET_INVALID_ARGUMENT; } - return node->context->impl->graph_cache->service_server_is_available( + return node->context->impl->service_server_is_available( client->service_name, service_type.c_str(), is_available); } From 6d94301ac8727b27fa059e633e5846eff4f52fcf Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 7 Aug 2024 06:35:17 +0800 Subject: [PATCH 02/12] fix regression with graph_guard_condition not triggering when entity is removed Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index e30bf902..e1559ab3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -54,6 +54,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void break; case z_sample_kind_t::Z_SAMPLE_KIND_DELETE: data_ptr->graph_cache_->parse_del(keystr._cstr); + break; default: return; } From 5bbe6fbc134b06c3e9b1697ab4bdbb8b286caa9e Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 8 Aug 2024 04:06:58 +0800 Subject: [PATCH 03/12] Have the context create the zenoh artifacts Signed-off-by: Yadunund --- .../src/detail/rmw_context_impl_s.cpp | 242 ++++++++++++++---- .../src/detail/rmw_context_impl_s.hpp | 21 +- rmw_zenoh_cpp/src/rmw_init.cpp | 138 +--------- 3 files changed, 199 insertions(+), 202 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index e1559ab3..d906a8b3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -15,16 +15,23 @@ #include "rmw_context_impl_s.hpp" #include +#include #include "guard_condition.hpp" +#include "identifier.hpp" #include "liveliness_utils.hpp" #include "logging_macros.hpp" #include "zenoh_config.hpp" +#include "zenoh_router_check.hpp" #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" #include "rmw/impl/cpp/macros.hpp" +// Megabytes of SHM to reserve. +// TODO(clalancette): Make this configurable, or get it from the configuration +#define SHM_BUFFER_SIZE_MB 10 + ///============================================================================= void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void * data) { @@ -72,66 +79,24 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void ///============================================================================= rmw_context_impl_s::Data::Data( const rcutils_allocator_t * allocator, - const std::size_t domain_id, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, + const std::string & liveliness_str, + std::unique_ptr graph_cache, rmw_guard_condition_t * graph_guard_condition) : allocator_(allocator), enclave_(std::move(enclave)), session_(std::move(session)), shm_manager_(std::move(shm_manager)), + liveliness_str_(std::move(liveliness_str)), + graph_cache_(std::move(graph_cache)), graph_guard_condition_(graph_guard_condition), is_shutdown_(false), next_entity_id_(0), is_initialized_(false) { - z_id_t zid = z_info_zid(z_loan(session_)); - graph_cache_ = std::make_unique(std::move(zid)); - // Setup liveliness subscriptions for discovery. - liveliness_str_ = rmw_zenoh_cpp::liveliness::subscription_token( - domain_id); - - // Query router/liveliness participants to get graph information before this session was started. - // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive - // replies for the zc_liveliness_get() call. This is necessary as if the `bound` - // is too low, the channel may starve the zenoh executor of its threads which - // would lead to deadlocks when trying to receive replies and block the - // execution here. - // The blocking channel will return when the sender end is closed which is - // the moment the query finishes. - // The non-blocking fifo exists only for the use case where we don't want to - // block the thread between responses (including the request termination response). - // In general, unless we want to cooperatively schedule other tasks on the same - // thread as reading the fifo, the blocking fifo will be more appropriate as - // the code will be simpler, and if we're just going to spin over the non-blocking - // reads until we obtain responses, we'll just be hogging CPU time by convincing - // the OS that we're doing actual work when it could instead park the thread. - z_owned_reply_channel_t channel = zc_reply_fifo_new(0); - zc_liveliness_get( - z_loan(session_), z_keyexpr(liveliness_str_.c_str()), - z_move(channel.send), NULL); - z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) - { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - // Ignore tokens from the same session to avoid race conditions from this - // query and the liveliness subscription. - graph_cache_->parse_put(z_loan(keystr), true); - z_drop(z_move(keystr)); - } else { - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply\n"); - } - } - z_drop(z_move(reply)); - z_drop(z_move(channel)); + // Do nothing. } ///============================================================================= @@ -208,22 +173,189 @@ rmw_context_impl_s::Data::~Data() rmw_context_impl_s::rmw_context_impl_s( const rcutils_allocator_t * allocator, const std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - rmw_guard_condition_t * graph_guard_condition) + const std::string & enclave) { + // Initialize the zenoh configuration. + z_owned_config_t config; + rmw_ret_t ret; + if ((ret = + rmw_zenoh_cpp::get_z_config( + rmw_zenoh_cpp::ConfigurableEntity::Session, + &config)) != RMW_RET_OK) + { + throw std::runtime_error("Error configuring Zenoh session."); + } + + // Check if shm is enabled. + z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); + auto always_free_shm_enabled = rcpputils::make_scope_exit( + [&shm_enabled]() { + z_drop(z_move(shm_enabled)); + }); + + // Initialize the zenoh session. + z_owned_session_t session = z_open(z_move(config)); + if (!z_session_check(&session)) { + throw std::runtime_error("Error setting up zenoh session"); + } + auto close_session = rcpputils::make_scope_exit( + [&session]() { + z_close(z_move(session)); + }); + + // TODO(Yadunund) Move this check into a separate thread. + // Verify if the zenoh router is running if configured. + const std::optional configured_connection_attempts = + rmw_zenoh_cpp::zenoh_router_check_attempts(); + if (configured_connection_attempts.has_value()) { + ret = RMW_RET_ERROR; + uint64_t connection_attempts = 0; + // Retry until the connection is successful. + while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { + if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { + ++connection_attempts; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + if (ret != RMW_RET_OK) { + throw std::runtime_error( + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); + } + } + + // Initialize the graph cache. + const z_id_t zid = z_info_zid(z_loan(session)); + auto graph_cache = std::make_unique(zid); + // Setup liveliness subscriptions for discovery. + std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( + domain_id); + + // Query router/liveliness participants to get graph information before this session was started. + // We create a blocking channel that is unbounded, ie. `bound` = 0, to receive + // replies for the zc_liveliness_get() call. This is necessary as if the `bound` + // is too low, the channel may starve the zenoh executor of its threads which + // would lead to deadlocks when trying to receive replies and block the + // execution here. + // The blocking channel will return when the sender end is closed which is + // the moment the query finishes. + // The non-blocking fifo exists only for the use case where we don't want to + // block the thread between responses (including the request termination response). + // In general, unless we want to cooperatively schedule other tasks on the same + // thread as reading the fifo, the blocking fifo will be more appropriate as + // the code will be simpler, and if we're just going to spin over the non-blocking + // reads until we obtain responses, we'll just be hogging CPU time by convincing + // the OS that we're doing actual work when it could instead park the thread. + z_owned_reply_channel_t channel = zc_reply_fifo_new(0); + zc_liveliness_get( + z_loan(session), z_keyexpr(liveliness_str.c_str()), + z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); + call_success = z_call(channel.recv, &reply)) + { + if (!call_success) { + continue; + } + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + // Ignore tokens from the same session to avoid race conditions from this + // query and the liveliness subscription. + graph_cache->parse_put(z_loan(keystr), true); + z_drop(z_move(keystr)); + } else { + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); + + // Initialize the shm manager if shared_memory is enabled in the config. + std::optional shm_manager = std::nullopt; + if (shm_enabled._cstr != nullptr && + strcmp(shm_enabled._cstr, "true") == 0) + { + char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 + static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 + for (size_t i = 0; i < sizeof(zid.id); ++i) { + snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); + } + idstr[sizeof(zid.id) * 2] = '\0'; + // TODO(yadunund): Can we get the size of the shm from the config even though it's not + // a standard parameter? + shm_manager = + zc_shm_manager_new( + z_loan(session), + idstr, + SHM_BUFFER_SIZE_MB * 1024 * 1024); + if (!shm_manager.has_value() || + !zc_shm_manager_check(&shm_manager.value())) + { + throw std::runtime_error("Unable to create shm manager."); + } + } + auto free_shm_manager = rcpputils::make_scope_exit( + [&shm_manager]() { + if (shm_manager.has_value()) { + z_drop(z_move(shm_manager.value())); + } + }); + + // Initialize the guard condition. + rmw_guard_condition_t * graph_guard_condition = + static_cast(allocator->zero_allocate( + 1, sizeof(rmw_guard_condition_t), allocator->state)); + if (graph_guard_condition == NULL) { + throw std::runtime_error( "failed to allocate graph guard condition"); + } + auto free_guard_condition = rcpputils::make_scope_exit( + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition, allocator->state); + }); + graph_guard_condition->implementation_identifier = + rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition->data = + allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); + if (graph_guard_condition->data == NULL) { + throw std::runtime_error("failed to allocate graph guard condition data"); + } + auto free_guard_condition_data = rcpputils::make_scope_exit( + [graph_guard_condition, allocator]() { + allocator->deallocate(graph_guard_condition->data, allocator->state); + }); + RMW_TRY_PLACEMENT_NEW( + graph_guard_condition->data, + graph_guard_condition->data, + throw std::runtime_error("failed to initialize graph_guard_condition->data."), + rmw_zenoh_cpp::GuardCondition); + auto destruct_guard_condition_data = rcpputils::make_scope_exit( + [graph_guard_condition]() { + auto gc_data = + static_cast(graph_guard_condition->data); + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( + gc_data->~GuardCondition(), + rmw_zenoh_cpp::GuardCondition); + }); + + close_session.cancel(); + free_shm_manager.cancel(); + destruct_guard_condition_data.cancel(); + free_guard_condition_data.cancel(); + free_guard_condition.cancel(); + data_ = std::make_shared( allocator, - std::move(domain_id), std::move(enclave), std::move(session), std::move(shm_manager), + std::move(liveliness_str), + std::move(graph_cache), graph_guard_condition); - // TODO(Yadunund): Consider switching to a make() pattern to avoid throwing - // errors. - auto ret = data_->subscribe(); + ret = data_->subscribe(); if (ret != RMW_RET_OK) { throw std::runtime_error("Unable to subscribe to ROS Graph updates."); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 15f3a632..faf533b3 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -34,16 +34,16 @@ class rmw_context_impl_s final public: using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; - // Constructor. - // Once constructed, the context_impl instanced will manage the lifetime - // of these arguments. + // Constructor that internally initializees the Zenoh session and other artifacts. + // Throws an std::runtime_error if any of the initializations fail. + // The construction will block until a Zenoh router is detected. + // TODO(Yadunund): Make this a non-blocking call by checking for the Zenoh + // router in a separate thread. Instead block when creating a node if router + // check has not succeeded. rmw_context_impl_s( const rcutils_allocator_t * allocator, const std::size_t domain_id, - const std::string & enclave, - z_owned_session_t session, - std::optional shm_manager, - rmw_guard_condition_t * graph_guard_condition); + const std::string & enclave); // Get a copy of the enclave. std::string enclave() const; @@ -145,10 +145,11 @@ class rmw_context_impl_s final // Constructor. Data( const rcutils_allocator_t * allocator, - const std::size_t domain_id, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, + const std::string & liveliness_str, + std::unique_ptr graph_cache, rmw_guard_condition_t * graph_guard_condition); // Subscribe to the ROS graph. @@ -173,6 +174,8 @@ class rmw_context_impl_s final std::optional shm_manager_; // Liveliness keyexpr string to subscribe to for ROS graph changes. std::string liveliness_str_; + // Graph cache. + std::unique_ptr graph_cache_; // ROS graph liveliness subscriber. z_owned_subscriber_t graph_subscriber_; // Equivalent to rmw_dds_common::Context's guard condition @@ -182,8 +185,6 @@ class rmw_context_impl_s final bool is_shutdown_; // A counter to assign a local id for every entity created in this session. size_t next_entity_id_; - // Graph cache. - std::unique_ptr graph_cache_; // True once graph subscriber is initialized. bool is_initialized_; }; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 2556c718..0f067264 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -24,7 +24,6 @@ #include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/zenoh_config.hpp" -#include "detail/zenoh_router_check.hpp" #include "rcutils/env.h" #include "detail/logging_macros.hpp" @@ -39,10 +38,6 @@ extern "C" { -// Megabytes of SHM to reserve. -// TODO(clalancette): Make this configurable, or get it from the configuration -#define SHM_BUFFER_SIZE_MB 10 - //============================================================================== /// Initialize the middleware with the given options, and yielding an context. rmw_ret_t @@ -100,129 +95,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) return RMW_RET_ERROR; } - // Initialize the zenoh configuration. - z_owned_config_t config; - if ((ret = - rmw_zenoh_cpp::get_z_config( - rmw_zenoh_cpp::ConfigurableEntity::Session, - &config)) != RMW_RET_OK) - { - RMW_SET_ERROR_MSG("Error configuring Zenoh session."); - return ret; - } - - // Check if shm is enabled. - z_owned_str_t shm_enabled = zc_config_get(z_loan(config), "transport/shared_memory/enabled"); - auto always_free_shm_enabled = rcpputils::make_scope_exit( - [&shm_enabled]() { - z_drop(z_move(shm_enabled)); - }); - - // Initialize the zenoh session. - z_owned_session_t session = z_open(z_move(config)); - if (!z_session_check(&session)) { - RMW_SET_ERROR_MSG("Error setting up zenoh session"); - return RMW_RET_ERROR; - } - auto close_session = rcpputils::make_scope_exit( - [&session]() { - z_close(z_move(session)); - }); - - // TODO(Yadunund) Move this check into a separate thread. - // Verify if the zenoh router is running if configured. - const std::optional configured_connection_attempts = - rmw_zenoh_cpp::zenoh_router_check_attempts(); - if (configured_connection_attempts.has_value()) { - ret = RMW_RET_ERROR; - uint64_t connection_attempts = 0; - // Retry until the connection is successful. - while (ret != RMW_RET_OK && connection_attempts < configured_connection_attempts.value()) { - if ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session))) != RMW_RET_OK) { - ++connection_attempts; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - if (ret != RMW_RET_OK) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( - "Unable to connect to a Zenoh router after %zu retries.", - configured_connection_attempts.value()); - return ret; - } - } - - // Initialize the shm manager if shared_memory is enabled in the config. - std::optional shm_manager = std::nullopt; - const z_id_t zid = z_info_zid(z_loan(session)); - if (shm_enabled._cstr != nullptr && - strcmp(shm_enabled._cstr, "true") == 0) - { - char idstr[sizeof(zid.id) * 2 + 1]; // 2 bytes for each byte of the id, plus the trailing \0 - static constexpr size_t max_size_of_each = 3; // 2 for each byte, plus the trailing \0 - for (size_t i = 0; i < sizeof(zid.id); ++i) { - snprintf(idstr + 2 * i, max_size_of_each, "%02x", zid.id[i]); - } - idstr[sizeof(zid.id) * 2] = '\0'; - // TODO(yadunund): Can we get the size of the shm from the config even though it's not - // a standard parameter? - shm_manager = - zc_shm_manager_new( - z_loan(session), - idstr, - SHM_BUFFER_SIZE_MB * 1024 * 1024); - if (!shm_manager.has_value() || - !zc_shm_manager_check(&shm_manager.value())) - { - RMW_SET_ERROR_MSG("Unable to create shm manager."); - return RMW_RET_ERROR; - } - } - auto free_shm_manager = rcpputils::make_scope_exit( - [&shm_manager]() { - if (shm_manager.has_value()) { - z_drop(z_move(shm_manager.value())); - } - }); - - // Initialize the guard condition. - rmw_guard_condition_t * graph_guard_condition = - static_cast(allocator->zero_allocate( - 1, sizeof(rmw_guard_condition_t), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - graph_guard_condition, - "failed to allocate graph guard condition", - return RMW_RET_BAD_ALLOC); - auto free_guard_condition = rcpputils::make_scope_exit( - [graph_guard_condition, allocator]() { - allocator->deallocate(graph_guard_condition, allocator->state); - }); - graph_guard_condition->implementation_identifier = - rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition->data = - allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); - RMW_CHECK_FOR_NULL_WITH_MSG( - graph_guard_condition->data, - "failed to allocate graph guard condition data", - return RMW_RET_BAD_ALLOC); - auto free_guard_condition_data = rcpputils::make_scope_exit( - [graph_guard_condition, allocator]() { - allocator->deallocate(graph_guard_condition->data, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - graph_guard_condition->data, - graph_guard_condition->data, - return RMW_RET_BAD_ALLOC, - rmw_zenoh_cpp::GuardCondition); - auto destruct_guard_condition_data = rcpputils::make_scope_exit( - [graph_guard_condition]() { - auto gc_data = - static_cast(graph_guard_condition->data); - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - gc_data->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition); - }); - - // Create the context impl. context->impl = static_cast( allocator->zero_allocate(1, sizeof(rmw_context_impl_t), allocator->state)); @@ -242,19 +114,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) rmw_context_impl_t, allocator, context->actual_domain_id, - std::string(options->enclave), - std::move(session), - std::move(shm_manager), - graph_guard_condition + std::string(options->enclave) ); - close_session.cancel(); - destruct_guard_condition_data.cancel(); - free_guard_condition_data.cancel(); - free_guard_condition.cancel(); free_options.cancel(); free_impl.cancel(); - free_shm_manager.cancel(); restore_context.cancel(); return RMW_RET_OK; From e7b5ff46759a9936a58ec5825142360b93cce148 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 8 Aug 2024 04:09:42 +0800 Subject: [PATCH 04/12] Add comment for session() api Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index faf533b3..be14ce0b 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -49,6 +49,8 @@ class rmw_context_impl_s final std::string enclave() const; // Loan the Zenoh session. + // TODO(Yadunund): Remove this API once rmw_context_impl_s is updated to + // create other Zenoh objects. z_session_t session() const; // Get a reference to the shm_manager. From f106c170571878d218714072ba3786be25b06737 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Thu, 8 Aug 2024 04:11:45 +0800 Subject: [PATCH 05/12] Style Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index d906a8b3..894b0a78 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -219,9 +219,9 @@ rmw_context_impl_s::rmw_context_impl_s( } if (ret != RMW_RET_OK) { throw std::runtime_error( - "Unable to connect to a Zenoh router after " + - std::to_string(configured_connection_attempts.value()) + - " retries."); + "Unable to connect to a Zenoh router after " + + std::to_string(configured_connection_attempts.value()) + + " retries."); } } @@ -309,7 +309,7 @@ rmw_context_impl_s::rmw_context_impl_s( static_cast(allocator->zero_allocate( 1, sizeof(rmw_guard_condition_t), allocator->state)); if (graph_guard_condition == NULL) { - throw std::runtime_error( "failed to allocate graph guard condition"); + throw std::runtime_error("failed to allocate graph guard condition"); } auto free_guard_condition = rcpputils::make_scope_exit( [graph_guard_condition, allocator]() { From 15911295a3aeb341132f038849477e2e6f5657ba Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 3 Sep 2024 17:03:45 +0800 Subject: [PATCH 06/12] Add api to register querying_sub cb in rmw_context_impl_s Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 11 +++++++++++ rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp | 7 ++++++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 4 ++-- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 894b0a78..c4941450 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -582,3 +582,14 @@ void rmw_context_impl_s::set_qos_event_callback( event_type, std::move(callback)); } + +///============================================================================= +void rmw_context_impl_s::set_querying_subscriber_callback( + const std::string & keyexpr, + QueryingSubscriberCallback cb) +{ + std::lock_guard lock(data_->mutex_); + return data_->graph_cache_->set_querying_subscriber_callback( + std::move(keyexpr), + std::move(cb)); +} diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index be14ce0b..0d1296f9 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -33,7 +33,8 @@ class rmw_context_impl_s final { public: using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; - + using QueryingSubscriberCallback = + rmw_zenoh_cpp::GraphCache::QueryingSubscriberCallback; // Constructor that internally initializees the Zenoh session and other artifacts. // Throws an std::runtime_error if any of the initializations fail. // The construction will block until a Zenoh router is detected. @@ -138,6 +139,10 @@ class rmw_context_impl_s final const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, GraphCacheEventCallback callback); + void set_querying_subscriber_callback( + const std::string & keyexpr, + QueryingSubscriberCallback cb); + private: // Bundle all class members into a data struct which can be passed as a // weak ptr to various threads for thread-safe access without capturing diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 5374a099..6b68b4bb 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1485,7 +1485,7 @@ rmw_create_subscription( } // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - context_impl->graph_cache->set_querying_subscriber_callback( + context_impl->set_querying_subscriber_callback( sub_data->entity->topic_info()->topic_keyexpr_, [sub_data](const std::string & queryable_prefix) -> void { @@ -2849,7 +2849,7 @@ rmw_create_service( context_impl->get_next_entity_id()), rmw_zenoh_cpp::liveliness::EntityType::Service, rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave}, + node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, rmw_zenoh_cpp::liveliness::TopicInfo{ node->context->actual_domain_id, rmw_service->service_name, From b3a8ef1c6e482a77429c702bdd25773ab3b95d67 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Tue, 3 Sep 2024 19:08:47 +0800 Subject: [PATCH 07/12] Have rmw_context_impl_s return a shared_ptr to GraphCache Signed-off-by: Yadunund --- .../src/detail/rmw_context_impl_s.cpp | 181 +----------------- .../src/detail/rmw_context_impl_s.hpp | 76 +------- rmw_zenoh_cpp/src/rmw_event.cpp | 4 +- .../src/rmw_get_node_info_and_types.cpp | 8 +- .../src/rmw_get_service_names_and_types.cpp | 2 +- .../src/rmw_get_topic_endpoint_info.cpp | 4 +- .../src/rmw_get_topic_names_and_types.cpp | 2 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 20 +- 8 files changed, 29 insertions(+), 268 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index c4941450..f49ed2df 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -45,7 +45,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void if (data_ptr == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", - "[graph_sub_data_handler] Unable to lock data_wp." + "[graph_sub_data_handler] Invalid data_ptr." ); return; } @@ -83,7 +83,7 @@ rmw_context_impl_s::Data::Data( z_owned_session_t session, std::optional shm_manager, const std::string & liveliness_str, - std::unique_ptr graph_cache, + std::shared_ptr graph_cache, rmw_guard_condition_t * graph_guard_condition) : allocator_(allocator), enclave_(std::move(enclave)), @@ -227,7 +227,7 @@ rmw_context_impl_s::rmw_context_impl_s( // Initialize the graph cache. const z_id_t zid = z_info_zid(z_loan(session)); - auto graph_cache = std::make_unique(zid); + auto graph_cache = std::make_shared(zid); // Setup liveliness subscriptions for discovery. std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token( domain_id); @@ -417,179 +417,8 @@ bool rmw_context_impl_s::session_is_valid() const } ///============================================================================= -rmw_ret_t rmw_context_impl_s::get_node_names( - rcutils_string_array_t * node_names, - rcutils_string_array_t * node_namespaces, - rcutils_string_array_t * enclaves, - rcutils_allocator_t * allocator) const +std::shared_ptr rmw_context_impl_s::graph_cache() { std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->get_node_names( - node_names, - node_namespaces, - enclaves, - allocator); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::get_topic_names_and_types( - rcutils_allocator_t * allocator, - bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->get_topic_names_and_types( - allocator, - no_demangle, - topic_names_and_types); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::publisher_count_matched_subscriptions( - const rmw_publisher_t * publisher, - size_t * subscription_count) -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->publisher_count_matched_subscriptions( - publisher, - subscription_count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::subscription_count_matched_publishers( - const rmw_subscription_t * subscription, - size_t * publisher_count) -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->subscription_count_matched_publishers( - subscription, - publisher_count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::get_service_names_and_types( - rcutils_allocator_t * allocator, - rmw_names_and_types_t * service_names_and_types) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->get_service_names_and_types( - allocator, - service_names_and_types); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::count_publishers( - const char * topic_name, - size_t * count) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->count_publishers( - topic_name, - count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::count_subscriptions( - const char * topic_name, - size_t * count) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->count_subscriptions( - topic_name, - count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::count_services( - const char * service_name, - size_t * count) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->count_services( - service_name, - count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::count_clients( - const char * service_name, - size_t * count) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->count_clients( - service_name, - count); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::get_entity_names_and_types_by_node( - rmw_zenoh_cpp::liveliness::EntityType entity_type, - rcutils_allocator_t * allocator, - const char * node_name, - const char * node_namespace, - bool no_demangle, - rmw_names_and_types_t * names_and_types) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->get_entity_names_and_types_by_node( - entity_type, - allocator, - node_name, - node_namespace, - no_demangle, - names_and_types); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::get_entities_info_by_topic( - rmw_zenoh_cpp::liveliness::EntityType entity_type, - rcutils_allocator_t * allocator, - const char * topic_name, - bool no_demangle, - rmw_topic_endpoint_info_array_t * endpoints_info) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->get_entities_info_by_topic( - entity_type, - allocator, - topic_name, - no_demangle, - endpoints_info); -} - -///============================================================================= -rmw_ret_t rmw_context_impl_s::service_server_is_available( - const char * service_name, - const char * service_type, - bool * is_available) const -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->service_server_is_available( - service_name, - service_type, - is_available); -} - -///============================================================================= -void rmw_context_impl_s::set_qos_event_callback( - rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, - const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, - GraphCacheEventCallback callback) -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->set_qos_event_callback( - std::move(entity), - event_type, - std::move(callback)); -} - -///============================================================================= -void rmw_context_impl_s::set_querying_subscriber_callback( - const std::string & keyexpr, - QueryingSubscriberCallback cb) -{ - std::lock_guard lock(data_->mutex_); - return data_->graph_cache_->set_querying_subscriber_callback( - std::move(keyexpr), - std::move(cb)); + return data_->graph_cache_; } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 0d1296f9..aaa64827 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -32,9 +32,6 @@ class rmw_context_impl_s final { public: - using GraphCacheEventCallback = rmw_zenoh_cpp::GraphCache::GraphCacheEventCallback; - using QueryingSubscriberCallback = - rmw_zenoh_cpp::GraphCache::QueryingSubscriberCallback; // Constructor that internally initializees the Zenoh session and other artifacts. // Throws an std::runtime_error if any of the initializations fail. // The construction will block until a Zenoh router is detected. @@ -75,73 +72,8 @@ class rmw_context_impl_s final // Returns true if the Zenoh session is valid. bool session_is_valid() const; - rmw_ret_t get_node_names( - rcutils_string_array_t * node_names, - rcutils_string_array_t * node_namespaces, - rcutils_string_array_t * enclaves, - rcutils_allocator_t * allocator) const; - - rmw_ret_t get_topic_names_and_types( - rcutils_allocator_t * allocator, - bool no_demangle, - rmw_names_and_types_t * topic_names_and_types) const; - - rmw_ret_t publisher_count_matched_subscriptions( - const rmw_publisher_t * publisher, - size_t * subscription_count); - - rmw_ret_t subscription_count_matched_publishers( - const rmw_subscription_t * subscription, - size_t * publisher_count); - - rmw_ret_t get_service_names_and_types( - rcutils_allocator_t * allocator, - rmw_names_and_types_t * service_names_and_types) const; - - rmw_ret_t count_publishers( - const char * topic_name, - size_t * count) const; - - rmw_ret_t count_subscriptions( - const char * topic_name, - size_t * count) const; - - rmw_ret_t count_services( - const char * service_name, - size_t * count) const; - - rmw_ret_t count_clients( - const char * service_name, - size_t * count) const; - - rmw_ret_t get_entity_names_and_types_by_node( - rmw_zenoh_cpp::liveliness::EntityType entity_type, - rcutils_allocator_t * allocator, - const char * node_name, - const char * node_namespace, - bool no_demangle, - rmw_names_and_types_t * names_and_types) const; - - rmw_ret_t get_entities_info_by_topic( - rmw_zenoh_cpp::liveliness::EntityType entity_type, - rcutils_allocator_t * allocator, - const char * topic_name, - bool no_demangle, - rmw_topic_endpoint_info_array_t * endpoints_info) const; - - rmw_ret_t service_server_is_available( - const char * service_name, - const char * service_type, - bool * is_available) const; - - void set_qos_event_callback( - rmw_zenoh_cpp::liveliness::ConstEntityPtr entity, - const rmw_zenoh_cpp::rmw_zenoh_event_type_t & event_type, - GraphCacheEventCallback callback); - - void set_querying_subscriber_callback( - const std::string & keyexpr, - QueryingSubscriberCallback cb); + /// Return a shared_ptr to the GraphCache stored in this context. + std::shared_ptr graph_cache(); private: // Bundle all class members into a data struct which can be passed as a @@ -156,7 +88,7 @@ class rmw_context_impl_s final z_owned_session_t session, std::optional shm_manager, const std::string & liveliness_str, - std::unique_ptr graph_cache, + std::shared_ptr graph_cache, rmw_guard_condition_t * graph_guard_condition); // Subscribe to the ROS graph. @@ -182,7 +114,7 @@ class rmw_context_impl_s final // Liveliness keyexpr string to subscribe to for ROS graph changes. std::string liveliness_str_; // Graph cache. - std::unique_ptr graph_cache_; + std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. z_owned_subscriber_t graph_subscriber_; // Equivalent to rmw_dds_common::Context's guard condition diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index 17280be7..6b1856c9 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -64,7 +64,7 @@ rmw_publisher_event_init( rmw_event->event_type = event_type; // Register the event with graph cache. - context_impl->set_qos_event_callback( + context_impl->graph_cache()->set_qos_event_callback( pub_data->entity, zenoh_event_type, [pub_data, @@ -126,7 +126,7 @@ rmw_subscription_event_init( return RMW_RET_OK; } - sub_data->context->impl->set_qos_event_callback( + context_impl->graph_cache()->set_qos_event_callback( sub_data->entity, zenoh_event_type, [sub_data, diff --git a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp index 4aadc4d2..9e838aec 100644 --- a/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_node_info_and_types.cpp @@ -45,7 +45,7 @@ rmw_get_subscriber_names_and_types_by_node( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entity_names_and_types_by_node( + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, node_name, @@ -75,7 +75,7 @@ rmw_get_publisher_names_and_types_by_node( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entity_names_and_types_by_node( + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, node_name, @@ -104,7 +104,7 @@ rmw_get_service_names_and_types_by_node( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entity_names_and_types_by_node( + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Service, allocator, node_name, @@ -133,7 +133,7 @@ rmw_get_client_names_and_types_by_node( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entity_names_and_types_by_node( + return context_impl->graph_cache()->get_entity_names_and_types_by_node( rmw_zenoh_cpp::liveliness::EntityType::Client, allocator, node_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp index 5dc7cdf3..681b8ceb 100644 --- a/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp @@ -38,7 +38,7 @@ rmw_get_service_names_and_types( rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_service_names_and_types( + return context_impl->graph_cache()->get_service_names_and_types( allocator, service_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp index d70ce0c7..3d7570cc 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -45,7 +45,7 @@ rmw_get_publishers_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entities_info_by_topic( + return context_impl->graph_cache()->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Publisher, allocator, topic_name, @@ -73,7 +73,7 @@ rmw_get_subscriptions_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT); rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_entities_info_by_topic( + return context_impl->graph_cache()->get_entities_info_by_topic( rmw_zenoh_cpp::liveliness::EntityType::Subscription, allocator, topic_name, diff --git a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp index 477dc7d9..9a3fb6c2 100644 --- a/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp +++ b/rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp @@ -39,7 +39,7 @@ rmw_get_topic_names_and_types( rmw_context_impl_t * context_impl = static_cast(node->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->get_topic_names_and_types( + return context_impl->graph_cache()->get_topic_names_and_types( allocator, no_demangle, topic_names_and_types); } } // extern "C" diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index 6b68b4bb..4e568317 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -1013,7 +1013,7 @@ rmw_publisher_count_matched_subscriptions( rmw_context_impl_t * context_impl = static_cast(pub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->publisher_count_matched_subscriptions( + return context_impl->graph_cache()->publisher_count_matched_subscriptions( publisher, subscription_count); } @@ -1485,7 +1485,7 @@ rmw_create_subscription( } // Register the querying subscriber with the graph cache to get latest // messages from publishers that were discovered after their first publication. - context_impl->set_querying_subscriber_callback( + context_impl->graph_cache()->set_querying_subscriber_callback( sub_data->entity->topic_info()->topic_keyexpr_, [sub_data](const std::string & queryable_prefix) -> void { @@ -1653,7 +1653,7 @@ rmw_subscription_count_matched_publishers( rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->subscription_count_matched_publishers( + return context_impl->graph_cache()->subscription_count_matched_publishers( subscription, publisher_count); } @@ -3649,7 +3649,7 @@ rmw_get_node_names( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->get_node_names( + return node->context->impl->graph_cache()->get_node_names( node_names, node_namespaces, nullptr, allocator); } @@ -3672,7 +3672,7 @@ rmw_get_node_names_with_enclaves( rcutils_allocator_t * allocator = &node->context->options.allocator; RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->get_node_names( + return node->context->impl->graph_cache()->get_node_names( node_names, node_namespaces, enclaves, allocator); } @@ -3703,7 +3703,7 @@ rmw_count_publishers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->count_publishers(topic_name, count); + return node->context->impl->graph_cache()->count_publishers(topic_name, count); } //============================================================================== @@ -3733,7 +3733,7 @@ rmw_count_subscribers( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->count_subscriptions(topic_name, count); + return node->context->impl->graph_cache()->count_subscriptions(topic_name, count); } //============================================================================== @@ -3763,7 +3763,7 @@ rmw_count_clients( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->count_clients(service_name, count); + return node->context->impl->graph_cache()->count_clients(service_name, count); } //============================================================================== @@ -3793,7 +3793,7 @@ rmw_count_services( } RMW_CHECK_ARGUMENT_FOR_NULL(count, RMW_RET_INVALID_ARGUMENT); - return node->context->impl->count_services(service_name, count); + return node->context->impl->graph_cache()->count_services(service_name, count); } //============================================================================== @@ -3893,7 +3893,7 @@ rmw_service_server_is_available( return RMW_RET_INVALID_ARGUMENT; } - return node->context->impl->service_server_is_available( + return node->context->impl->graph_cache()->service_server_is_available( client->service_name, service_type.c_str(), is_available); } From e773bd4d2368728778552ca628f5c22117780990 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Wed, 4 Sep 2024 20:23:59 +0800 Subject: [PATCH 08/12] Add todo on thread safety Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index f49ed2df..84762749 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -107,6 +107,16 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe() } // Setup the liveliness subscriber to receives updates from the ROS graph // and update the graph cache. + // TODO(Yadunund): This closure is still not 100% thread safe as we are + // passing Data* as the type erased argument to z_closure. Thus during + // the execution of graph_sub_data_handler, the rawptr may be freed/reset + // by a different thread. When we switch to zenoh-cpp we can replace z_closure + // with a lambda that captures a weak_ptr by copy. The lambda and caputed + // weak_ptr will have the same lifetime as the subscriber. Then within + // graph_sub_data_handler, we would first lock to weak_ptr to check if the + // shared_ptr exits. If it does, then even if a different thread calls + // rmw_context_fini() to destroy rmw_context_impl_s, the locked + // shared_ptr would live on until the graph_sub_data_handler callback. auto sub_options = zc_liveliness_subscriber_options_null(); z_owned_closure_sample_t callback = z_closure( rmw_context_impl_s::graph_sub_data_handler, nullptr, From ec4b0366f73ea04d1583a67b2ed6c711d8c13feb Mon Sep 17 00:00:00 2001 From: Yadu Date: Fri, 6 Sep 2024 09:39:24 +0530 Subject: [PATCH 09/12] Update rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp Co-authored-by: Chris Lalancette Signed-off-by: Yadu --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 84762749..fb2f6850 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -14,8 +14,9 @@ #include "rmw_context_impl_s.hpp" -#include +#include #include +#include #include "guard_condition.hpp" #include "identifier.hpp" From 6ad087c383d5b164869b7a5a3f1f52247722e5a4 Mon Sep 17 00:00:00 2001 From: Yadunund Date: Fri, 6 Sep 2024 14:33:15 +0800 Subject: [PATCH 10/12] Address feedback Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 15 +++++++++++---- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index fb2f6850..68dc999a 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -15,6 +15,8 @@ #include "rmw_context_impl_s.hpp" #include +#include +#include #include #include @@ -101,8 +103,9 @@ rmw_context_impl_s::Data::Data( } ///============================================================================= -rmw_ret_t rmw_context_impl_s::Data::subscribe() +rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph() { + std::lock_guard lock(mutex_); if (is_initialized_) { return RMW_RET_OK; } @@ -120,8 +123,7 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe() // shared_ptr would live on until the graph_sub_data_handler callback. auto sub_options = zc_liveliness_subscriber_options_null(); z_owned_closure_sample_t callback = z_closure( - rmw_context_impl_s::graph_sub_data_handler, nullptr, - this->shared_from_this().get()); + rmw_context_impl_s::graph_sub_data_handler, nullptr, this); graph_subscriber_ = zc_liveliness_declare_subscriber( z_loan(session_), z_keyexpr(liveliness_str_.c_str()), @@ -186,6 +188,11 @@ rmw_context_impl_s::rmw_context_impl_s( const std::size_t domain_id, const std::string & enclave) { + // Check if allocator is valid. + if (!rcutils_allocator_is_valid(allocator)) { + throw std::runtime_error("Invalid allocator passed to rmw_context_impl_s."); + } + // Initialize the zenoh configuration. z_owned_config_t config; rmw_ret_t ret; @@ -366,7 +373,7 @@ rmw_context_impl_s::rmw_context_impl_s( std::move(graph_cache), graph_guard_condition); - ret = data_->subscribe(); + ret = data_->subscribe_to_ros_graph(); if (ret != RMW_RET_OK) { throw std::runtime_error("Unable to subscribe to ROS Graph updates."); } diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index aaa64827..f2f88bf8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -92,7 +92,7 @@ class rmw_context_impl_s final rmw_guard_condition_t * graph_guard_condition); // Subscribe to the ROS graph. - rmw_ret_t subscribe(); + rmw_ret_t subscribe_to_ros_graph(); // Shutdown the Zenoh session. rmw_ret_t shutdown(); From 5be2532930a7ccefcf761811a601a3c3297a065f Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 14 Sep 2024 08:37:31 +0800 Subject: [PATCH 11/12] Do not use allocator for creating graph_guard_condition Signed-off-by: Yadunund --- .../src/detail/rmw_context_impl_s.cpp | 75 ++----------------- .../src/detail/rmw_context_impl_s.hpp | 10 +-- rmw_zenoh_cpp/src/rmw_init.cpp | 1 - 3 files changed, 13 insertions(+), 73 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 68dc999a..98684a7c 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -70,7 +70,7 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void } // Trigger the ROS graph guard condition. - rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_); + rmw_ret_t rmw_ret = rmw_trigger_guard_condition(data_ptr->graph_guard_condition_.get()); if (RMW_RET_OK != rmw_ret) { RMW_ZENOH_LOG_WARN_NAMED( "rmw_zenoh_cpp", @@ -81,25 +81,23 @@ void rmw_context_impl_s::graph_sub_data_handler(const z_sample_t * sample, void ///============================================================================= rmw_context_impl_s::Data::Data( - const rcutils_allocator_t * allocator, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, const std::string & liveliness_str, - std::shared_ptr graph_cache, - rmw_guard_condition_t * graph_guard_condition) -: allocator_(allocator), - enclave_(std::move(enclave)), + std::shared_ptr graph_cache) +: enclave_(std::move(enclave)), session_(std::move(session)), shm_manager_(std::move(shm_manager)), liveliness_str_(std::move(liveliness_str)), graph_cache_(std::move(graph_cache)), - graph_guard_condition_(graph_guard_condition), is_shutdown_(false), next_entity_id_(0), is_initialized_(false) { - // Do nothing. + graph_guard_condition_ = std::make_unique(); + graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + graph_guard_condition_->data = &guard_condition_data_; } ///============================================================================= @@ -168,31 +166,15 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown() ///============================================================================= rmw_context_impl_s::Data::~Data() { - RMW_TRY_DESTRUCTOR( - static_cast( - graph_guard_condition_->data)->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition, ); - if (rcutils_allocator_is_valid(allocator_)) { - allocator_->deallocate(graph_guard_condition_->data, allocator_->state); - allocator_->deallocate(graph_guard_condition_, allocator_->state); - graph_guard_condition_ = nullptr; - } - auto ret = this->shutdown(); static_cast(ret); } ///============================================================================= rmw_context_impl_s::rmw_context_impl_s( - const rcutils_allocator_t * allocator, const std::size_t domain_id, const std::string & enclave) { - // Check if allocator is valid. - if (!rcutils_allocator_is_valid(allocator)) { - throw std::runtime_error("Invalid allocator passed to rmw_context_impl_s."); - } - // Initialize the zenoh configuration. z_owned_config_t config; rmw_ret_t ret; @@ -322,56 +304,15 @@ rmw_context_impl_s::rmw_context_impl_s( } }); - // Initialize the guard condition. - rmw_guard_condition_t * graph_guard_condition = - static_cast(allocator->zero_allocate( - 1, sizeof(rmw_guard_condition_t), allocator->state)); - if (graph_guard_condition == NULL) { - throw std::runtime_error("failed to allocate graph guard condition"); - } - auto free_guard_condition = rcpputils::make_scope_exit( - [graph_guard_condition, allocator]() { - allocator->deallocate(graph_guard_condition, allocator->state); - }); - graph_guard_condition->implementation_identifier = - rmw_zenoh_cpp::rmw_zenoh_identifier; - graph_guard_condition->data = - allocator->zero_allocate(1, sizeof(rmw_zenoh_cpp::GuardCondition), allocator->state); - if (graph_guard_condition->data == NULL) { - throw std::runtime_error("failed to allocate graph guard condition data"); - } - auto free_guard_condition_data = rcpputils::make_scope_exit( - [graph_guard_condition, allocator]() { - allocator->deallocate(graph_guard_condition->data, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW( - graph_guard_condition->data, - graph_guard_condition->data, - throw std::runtime_error("failed to initialize graph_guard_condition->data."), - rmw_zenoh_cpp::GuardCondition); - auto destruct_guard_condition_data = rcpputils::make_scope_exit( - [graph_guard_condition]() { - auto gc_data = - static_cast(graph_guard_condition->data); - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - gc_data->~GuardCondition(), - rmw_zenoh_cpp::GuardCondition); - }); - close_session.cancel(); free_shm_manager.cancel(); - destruct_guard_condition_data.cancel(); - free_guard_condition_data.cancel(); - free_guard_condition.cancel(); data_ = std::make_shared( - allocator, std::move(enclave), std::move(session), std::move(shm_manager), std::move(liveliness_str), - std::move(graph_cache), - graph_guard_condition); + std::move(graph_cache)); ret = data_->subscribe_to_ros_graph(); if (ret != RMW_RET_OK) { @@ -404,7 +345,7 @@ std::optional & rmw_context_impl_s::shm_manager() rmw_guard_condition_t * rmw_context_impl_s::graph_guard_condition() { std::lock_guard lock(data_->mutex_); - return data_->graph_guard_condition_; + return data_->graph_guard_condition_.get(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index f2f88bf8..926d0989 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -23,6 +23,7 @@ #include #include "graph_cache.hpp" +#include "guard_condition.hpp" #include "liveliness_utils.hpp" #include "rcutils/types.h" @@ -39,7 +40,6 @@ class rmw_context_impl_s final // router in a separate thread. Instead block when creating a node if router // check has not succeeded. rmw_context_impl_s( - const rcutils_allocator_t * allocator, const std::size_t domain_id, const std::string & enclave); @@ -83,13 +83,11 @@ class rmw_context_impl_s final { // Constructor. Data( - const rcutils_allocator_t * allocator, const std::string & enclave, z_owned_session_t session, std::optional shm_manager, const std::string & liveliness_str, - std::shared_ptr graph_cache, - rmw_guard_condition_t * graph_guard_condition); + std::shared_ptr graph_cache); // Subscribe to the ROS graph. rmw_ret_t subscribe_to_ros_graph(); @@ -119,7 +117,9 @@ class rmw_context_impl_s final z_owned_subscriber_t graph_subscriber_; // Equivalent to rmw_dds_common::Context's guard condition /// Guard condition that should be triggered when the graph changes. - rmw_guard_condition_t * graph_guard_condition_; + std::unique_ptr graph_guard_condition_; + // The GuardCondition data structure. + rmw_zenoh_cpp::GuardCondition guard_condition_data_; /// Shutdown flag. bool is_shutdown_; // A counter to assign a local id for every entity created in this session. diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index 0f067264..4c6b2f20 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -112,7 +112,6 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) context->impl, return RMW_RET_BAD_ALLOC, rmw_context_impl_t, - allocator, context->actual_domain_id, std::string(options->enclave) ); From 5eb967c63f95d5bb436fac2e4321478a4439c24a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 28 Sep 2024 01:05:47 +0800 Subject: [PATCH 12/12] Address feedback Signed-off-by: Yadunund --- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp | 4 ++-- rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp index 96fc1df3..d60bece8 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp @@ -196,7 +196,7 @@ rmw_context_impl_s::rmw_context_impl_s( // Initialize the zenoh session. z_owned_session_t session = z_open(z_move(config)); if (!z_session_check(&session)) { - throw std::runtime_error("Error setting up zenoh session"); + throw std::runtime_error("Error setting up zenoh session."); } auto close_session = rcpputils::make_scope_exit( [&session]() { @@ -267,7 +267,7 @@ rmw_context_impl_s::rmw_context_impl_s( z_drop(z_move(keystr)); } else { RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply\n"); + "rmw_zenoh_cpp", "[rmw_context_impl_s] z_call received an invalid reply.\n"); } } z_drop(z_move(reply)); diff --git a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp index 926d0989..3cb6f474 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp @@ -17,6 +17,7 @@ #include +# include #include #include #include @@ -115,12 +116,12 @@ class rmw_context_impl_s final std::shared_ptr graph_cache_; // ROS graph liveliness subscriber. z_owned_subscriber_t graph_subscriber_; - // Equivalent to rmw_dds_common::Context's guard condition - /// Guard condition that should be triggered when the graph changes. + // Equivalent to rmw_dds_common::Context's guard condition. + // Guard condition that should be triggered when the graph changes. std::unique_ptr graph_guard_condition_; // The GuardCondition data structure. rmw_zenoh_cpp::GuardCondition guard_condition_data_; - /// Shutdown flag. + // Shutdown flag. bool is_shutdown_; // A counter to assign a local id for every entity created in this session. size_t next_entity_id_;