diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 9c54d2e5..393fb818 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -48,9 +48,6 @@ std::shared_ptr PublisherData::make( const rosidl_message_type_support_t * type_support, const rmw_qos_profile_t * qos_profile) { - auto pub_data = std::shared_ptr(new PublisherData{}); - pub_data->rmw_node_ = node; - generate_random_gid(pub_data->gid_); rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( node, topic_name.c_str(), &adapted_qos_profile, rmw_get_subscriptions_info_by_topic); @@ -61,9 +58,8 @@ std::shared_ptr PublisherData::make( rcutils_allocator_t * allocator = &node->context->options.allocator; const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); - pub_data->type_support_impl_ = type_support->data; auto callbacks = static_cast(type_support->data); - pub_data->type_support_ = std::make_unique(callbacks); + auto message_type_support = std::make_unique(callbacks); // Convert the type hash to a string so that it can be included in // the keyexpr. @@ -82,7 +78,7 @@ std::shared_ptr PublisherData::make( }); std::size_t domain_id = node_info.domain_id_; - pub_data->entity_ = liveliness::Entity::make( + auto entity = liveliness::Entity::make( z_info_zid(session), std::to_string(node_id), std::to_string(publisher_id), @@ -91,11 +87,11 @@ std::shared_ptr PublisherData::make( liveliness::TopicInfo{ std::move(domain_id), topic_name, - pub_data->type_support_->get_name(), + message_type_support->get_name(), type_hash_c_str, adapted_qos_profile} ); - if (pub_data->entity_ == nullptr) { + if (entity == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to generate keyexpr for liveliness token for the publisher %s.", @@ -103,7 +99,7 @@ std::shared_ptr PublisherData::make( return nullptr; } z_owned_keyexpr_t keyexpr = z_keyexpr_new( - pub_data->entity_->topic_info()->topic_keyexpr_.c_str()); + entity->topic_info()->topic_keyexpr_.c_str()); auto always_free_ros_keyexpr = rcpputils::make_scope_exit( [&keyexpr]() { z_keyexpr_drop(z_move(keyexpr)); @@ -114,6 +110,7 @@ std::shared_ptr PublisherData::make( } // Create a Publication Cache if durability is transient_local. + std::optional pub_cache = std::nullopt; if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); pub_cache_opts.history = adapted_qos_profile.depth; @@ -124,26 +121,26 @@ std::shared_ptr PublisherData::make( // When such a prefix is added to the PublicationCache, it listens to queries with this extra // prefix (allowing to be queried in a unique way), but still replies with the original // publications' key expressions. - z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(pub_data->entity_->zid().c_str()); + z_owned_keyexpr_t queryable_prefix = z_keyexpr_new(entity->zid().c_str()); auto always_free_queryable_prefix = rcpputils::make_scope_exit( [&queryable_prefix]() { z_keyexpr_drop(z_move(queryable_prefix)); }); pub_cache_opts.queryable_prefix = z_loan(queryable_prefix); - pub_data->pub_cache_ = ze_declare_publication_cache( + pub_cache = ze_declare_publication_cache( session, z_loan(keyexpr), &pub_cache_opts ); - if (!pub_data->pub_cache_.has_value() || !z_check(pub_data->pub_cache_.value())) { + if (!pub_cache.has_value() || !z_check(pub_cache.value())) { RMW_SET_ERROR_MSG("unable to create zenoh publisher cache"); return nullptr; } } auto undeclare_z_publisher_cache = rcpputils::make_scope_exit( - [pub_data]() { - if (pub_data && pub_data->pub_cache_.has_value()) { - z_drop(z_move(pub_data->pub_cache_.value())); + [&pub_cache]() { + if (pub_cache.has_value()) { + z_drop(z_move(pub_cache.value())); } }); @@ -156,54 +153,73 @@ std::shared_ptr PublisherData::make( opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK; } // TODO(clalancette): What happens if the key name is a valid but empty string? - pub_data->pub_ = z_declare_publisher( + z_owned_publisher_t pub = z_declare_publisher( session, z_loan(keyexpr), &opts ); - if (!z_check(pub_data->pub_)) { + if (!z_check(pub)) { RMW_SET_ERROR_MSG("Unable to create Zenoh publisher."); return nullptr; } auto undeclare_z_publisher = rcpputils::make_scope_exit( - [pub_data]() { - z_undeclare_publisher(z_move(pub_data->pub_)); + [&pub]() { + z_undeclare_publisher(z_move(pub)); }); - pub_data->token_ = zc_liveliness_declare_token( + zc_owned_liveliness_token_t token = zc_liveliness_declare_token( session, - z_keyexpr(pub_data->entity_->liveliness_keyexpr().c_str()), + z_keyexpr(entity->liveliness_keyexpr().c_str()), NULL ); auto free_token = rcpputils::make_scope_exit( - [pub_data]() { - if (pub_data != nullptr) { - z_drop(z_move(pub_data->token_)); - } + [&token]() { + z_drop(z_move(token)); }); - if (!z_check(pub_data->token_)) { + if (!z_check(token)) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", "Unable to create liveliness token for the publisher."); return nullptr; } - // Initialize the events manager. - pub_data->events_mgr_ = std::make_shared(); - free_token.cancel(); undeclare_z_publisher_cache.cancel(); undeclare_z_publisher.cancel(); - return pub_data; + return std::shared_ptr( + new PublisherData{ + node, + std::move(entity), + std::move(pub), + std::move(pub_cache), + std::move(token), + type_support->data, + std::move(message_type_support) + }); } ///============================================================================= -PublisherData::PublisherData() -: sequence_number_(1), +PublisherData::PublisherData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + z_owned_publisher_t pub, + std::optional pub_cache, + zc_owned_liveliness_token_t token, + const void * type_support_impl, + std::unique_ptr type_support) +: rmw_node_(rmw_node), + entity_(std::move(entity)), + pub_(std::move(pub)), + pub_cache_(std::move(pub_cache)), + token_(std::move(token)), + type_support_impl_(type_support_impl), + type_support_(std::move(type_support)), + sequence_number_(1), is_shutdown_(false) { - // Do nothing. + generate_random_gid(gid_); + events_mgr_ = std::make_shared(); } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 51b364cc..7737deeb 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -90,7 +90,14 @@ class PublisherData final private: // Constructor. - PublisherData(); + PublisherData( + const rmw_node_t * rmw_node, + std::shared_ptr entity, + z_owned_publisher_t pub, + std::optional pub_cache, + zc_owned_liveliness_token_t token, + const void * type_support_impl, + std::unique_ptr type_support); // Internal mutex. mutable std::mutex mutex_;