Skip to content

Commit

Permalink
Feedback from review.
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jul 3, 2024
1 parent 362e286 commit 1705100
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
42 changes: 34 additions & 8 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcpputils/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"

#include "attachment_helpers.hpp"
#include "rmw_data_types.hpp"
Expand Down Expand Up @@ -417,6 +418,12 @@ bool rmw_client_data_t::decrement_queries_in_flight_and_is_shutdown(bool & queri
return is_shutdown_;
}

bool rmw_client_data_t::is_shutdown() const
{
std::lock_guard<std::mutex> lock(in_flight_mutex_);
return is_shutdown_;
}

//==============================================================================
void sub_data_handler(
const z_sample_t * sample,
Expand Down Expand Up @@ -558,14 +565,7 @@ void client_data_handler(z_owned_reply_t * reply, void * data)

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
bool queries_in_flight = false;
bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight);

if (is_shutdown) {
if (!queries_in_flight) {
client_data->context->options.allocator.deallocate(
client_data, client_data->context->options.allocator.state);
}
if (client_data->is_shutdown()) {
return;
}

Expand Down Expand Up @@ -594,4 +594,30 @@ void client_data_handler(z_owned_reply_t * reply, void * data)
// Since we took ownership of the reply, null it out here
*reply = z_reply_null();
}

void client_data_drop(void * data)
{
auto client_data = static_cast<rmw_client_data_t *>(data);
if (client_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to obtain client_data_t "
);
return;
}

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
bool queries_in_flight = false;
bool is_shutdown = client_data->decrement_queries_in_flight_and_is_shutdown(queries_in_flight);

if (is_shutdown) {
if (!queries_in_flight) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
client_data->context->options.allocator.deallocate(
client_data, client_data->context->options.allocator.state);
}
}
}

} // namespace rmw_zenoh_cpp
5 changes: 4 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ void service_data_handler(const z_query_t * query, void * service_data);

///=============================================================================
void client_data_handler(z_owned_reply_t * reply, void * client_data);
void client_data_drop(void * data);

///=============================================================================
class ZenohQuery final
Expand Down Expand Up @@ -337,6 +338,8 @@ class rmw_client_data_t final
// See the comment for "num_in_flight" below on the use of this method.
bool decrement_queries_in_flight_and_is_shutdown(bool & queries_in_flight);

bool is_shutdown() const;

private:
void notify();

Expand Down Expand Up @@ -367,7 +370,7 @@ class rmw_client_data_t final
// returns, the memory in this structure will never be freed. There isn't much we can do about
// that at this time, but we may want to consider changing the timeout so that the memory can
// eventually be freed up.
std::mutex in_flight_mutex_;
mutable std::mutex in_flight_mutex_;
bool is_shutdown_{false};
size_t num_in_flight_{0};
};
Expand Down
16 changes: 10 additions & 6 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2389,11 +2389,11 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
client_data->response_type_support->~ResponseTypeSupport(), rmw_zenoh_cpp::ResponseTypeSupport,
);
allocator->deallocate(client_data->response_type_support, allocator->state);
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
if (!client_data->shutdown_and_query_in_flight()) {
RMW_TRY_DESTRUCTOR(client_data->~rmw_client_data_t(), rmw_client_data_t, );
allocator->deallocate(client->data, allocator->state);
}

Expand Down Expand Up @@ -2428,6 +2428,10 @@ rmw_send_request(
"Unable to retrieve client_data from client.",
RMW_RET_INVALID_ARGUMENT);

if (client_data->is_shutdown()) {
return RMW_RET_ERROR;
}

rmw_context_impl_s * context_impl = static_cast<rmw_context_impl_s *>(
client_data->context->impl);

Expand Down Expand Up @@ -2482,6 +2486,10 @@ rmw_send_request(
z_bytes_map_drop(z_move(map));
});

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
client_data->increment_in_flight_callbacks();

opts.attachment = z_bytes_map_as_attachment(&map);

opts.target = Z_QUERY_TARGET_ALL_COMPLETE;
Expand All @@ -2496,17 +2504,13 @@ rmw_send_request(
opts.consolidation = z_query_consolidation_latest();
opts.value.payload = z_bytes_t{data_length, reinterpret_cast<const uint8_t *>(request_bytes)};
z_owned_closure_reply_t zn_closure_reply =
z_closure(rmw_zenoh_cpp::client_data_handler, nullptr, client_data);
z_closure(rmw_zenoh_cpp::client_data_handler, rmw_zenoh_cpp::client_data_drop, client_data);
z_get(
z_loan(context_impl->session),
z_loan(client_data->keyexpr), "",
z_move(zn_closure_reply),
&opts);

// See the comment about the "num_in_flight" class variable in the rmw_client_data_t class for
// why we need to do this.
client_data->increment_in_flight_callbacks();

return RMW_RET_OK;
}

Expand Down

0 comments on commit 1705100

Please sign in to comment.