From 17051002eb4a4e7552f360659cfef7963624c6af Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 2 Jul 2024 21:07:55 +0000 Subject: [PATCH] Feedback from review. Signed-off-by: Chris Lalancette --- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 42 +++++++++++++++++---- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 5 ++- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 16 +++++--- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 6ac400d2..4908d667 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -28,6 +28,7 @@ #include "rcpputils/scope_exit.hpp" #include "rmw/error_handling.h" +#include "rmw/impl/cpp/macros.hpp" #include "attachment_helpers.hpp" #include "rmw_data_types.hpp" @@ -417,6 +418,12 @@ bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queri return is_shutdown_; } +bool rmw_client_data_t::is_shutdown() const +{ + std::lock_guard lock(in_flight_mutex_); + return is_shutdown_; +} + //============================================================================== void sub_data_handler( const z_sample_t * sample, @@ -558,14 +565,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for // why we need to do this. - bool queries_in_flight = false; - bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); - - if (is_shutdown) { - if (!queries_in_flight) { - client_data->context->options.allocator.deallocate( - client_data, client_data->context->options.allocator.state); - } + if (client_data->is_shutdown()) { return; } @@ -594,4 +594,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data) // Since we took ownership of the reply, null it out here *reply = z_reply_null(); } + +void client_data_drop(void * data) +{ + auto client_data = static_cast(data); + if (client_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain client_data_t " + ); + return; + } + + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + bool queries_in_flight = false; + bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight); + + if (is_shutdown) { + if (!queries_in_flight) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); + client_data->context->options.allocator.deallocate( + client_data, client_data->context->options.allocator.state); + } + } +} + } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 378ce0b7..60e4bd01 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data); ///============================================================================= void client_data_handler(z_owned_reply_t * reply, void * client_data); +void client_data_drop(void * data); ///============================================================================= class ZenohQuery final @@ -337,6 +338,8 @@ class rmw_client_data_t final // See the comment for "num_in_flight" below on the use of this method. bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight); + bool is_shutdown() const; + private: void notify(); @@ -367,7 +370,7 @@ class rmw_client_data_t final // returns, the memory in this structure will never be freed. There isn't much we can do about // that at this time, but we may want to consider changing the timeout so that the memory can // eventually be freed up. - std::mutex in_flight_mutex_; + mutable std::mutex in_flight_mutex_; bool is_shutdown_{false}; size_t num_in_flight_{0}; }; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index d6fa21e2..3dec87f0 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -2389,11 +2389,11 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport, ); allocator->deallocate(client_data->response_type_support, allocator->state); - RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for // why we need to do this. if (!client_data->shutdown_and_query_in_flight()) { + RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, ); allocator->deallocate(client->data, allocator->state); } @@ -2428,6 +2428,10 @@ rmw_send_request( "Unable to retrieve client_data from client.", RMW_RET_INVALID_ARGUMENT); + if (client_data->is_shutdown()) { + return RMW_RET_ERROR; + } + rmw_context_impl_s * context_impl = static_cast( client_data->context->impl); @@ -2482,6 +2486,10 @@ rmw_send_request( z_bytes_map_drop(z_move(map)); }); + // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for + // why we need to do this. + client_data->increment_in_flight_callbacks(); + opts.attachment = z_bytes_map_as_attachment(&map); opts.target = Z_QUERY_TARGET_ALL_COMPLETE; @@ -2496,17 +2504,13 @@ rmw_send_request( opts.consolidation = z_query_consolidation_latest(); opts.value.payload = z_bytes_t{data_length, reinterpret_cast(request_bytes)}; z_owned_closure_reply_t zn_closure_reply = - z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data); + z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data); z_get( z_loan(context_impl->session), z_loan(client_data->keyexpr), "", z_move(zn_closure_reply), &opts); - // See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for - // why we need to do this. - client_data->increment_in_flight_callbacks(); - return RMW_RET_OK; }