Skip to content

Commit

Permalink
fix: thread-safe publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Oct 7, 2024
1 parent 37134bb commit 93e733f
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

std::string topic_keyexpr = pub_data->entity_->topic_info()->topic_keyexpr_;
std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_;
z_view_keyexpr_t pub_ke;
if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
Expand All @@ -108,6 +108,12 @@ std::shared_ptr<PublisherData> PublisherData::make(

// Create a Publication Cache if durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache = std::nullopt;
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts;
ze_publication_cache_options_default(&pub_cache_opts);
Expand All @@ -119,26 +125,20 @@ std::shared_ptr<PublisherData> PublisherData::make(
// When such a prefix is added to the PublicationCache, it listens to queries with this extra
// prefix (allowing to be queried in a unique way), but still replies with the original
// publications' key expressions.
std::string queryable_prefix = pub_data->entity_->zid();
std::string queryable_prefix = entity->zid();
z_view_keyexpr_t prefix_ke;
z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str());
pub_cache_opts.queryable_prefix = z_loan(prefix_ke);

ze_owned_publication_cache_t pub_cache;
ze_owned_publication_cache_t pub_cache_;
if (ze_declare_publication_cache(
&pub_cache, session, z_loan(pub_ke), &pub_cache_opts))
&pub_cache_, session, z_loan(pub_ke), &pub_cache_opts))
{
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
pub_data->pub_cache_ = pub_cache;
pub_cache = pub_cache_;
}
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});

// Set congestion_control to BLOCK if appropriate.
z_publisher_options_t opts;
Expand All @@ -152,27 +152,29 @@ std::shared_ptr<PublisherData> PublisherData::make(
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
}
z_owned_publisher_t pub;
// TODO(clalancette): What happens if the key name is a valid but empty string?
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[&pub]() {
z_undeclare_publisher(z_move(pub));
});
if (z_declare_publisher(
&pub_data->pub_, session, z_loan(pub_ke), &opts) != Z_OK)
&pub, session, z_loan(pub_ke), &opts) != Z_OK)
{
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}

std::string liveliness_keyexpr = pub_data->entity_->liveliness_keyexpr();
std::string liveliness_keyexpr = entity->liveliness_keyexpr();
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
zc_owned_liveliness_token_t token;
auto free_token = rcpputils::make_scope_exit(
[&token]() {
z_drop(z_move(token));
});
if (zc_liveliness_declare_token(
&pub_data->token_, session, z_loan(liveliness_ke),
&token, session, z_loan(liveliness_ke),
NULL) != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
Expand Down

0 comments on commit 93e733f

Please sign in to comment.