diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 3d1cfe8f5..10fd5e3b2 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions( publisher, subscription_count); } +rmw_ret_t +rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions( + publisher, non_local_subscription_count); +} + rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index c5b1ca459..d7b6b9cc8 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -150,6 +150,23 @@ rmw_publisher_count_matched_subscriptions( publisher, subscription_count); } +rmw_ret_t +rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, + publisher->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions( + publisher, non_local_subscription_count); +} + rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index f3023f72b..cf339ef66 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -120,9 +120,10 @@ class RMWPublisherEvent final : public EventListenerInterface * user calls rmw_count_subscribers(). * * \param[in] guid The GUID of the newly-matched subscription to track. + * \param[in] is_local Whether \c guid belongs to the same participant as this publisher. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC - void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid); + void track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local); /// Remove a GUID from the internal set of unique subscriptions matched to this publisher. /** @@ -130,9 +131,10 @@ class RMWPublisherEvent final : public EventListenerInterface * user calls rmw_count_subscribers(). * * \param[in] guid The GUID of the newly-unmatched subscription to track. + * \param[in] is_local Whether \c guid belongs to the same participant as this publisher. */ RMW_FASTRTPS_SHARED_CPP_PUBLIC - void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid); + void untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid, bool is_local); /// Return the number of unique subscriptions matched to this publisher. /** @@ -141,6 +143,13 @@ class RMWPublisherEvent final : public EventListenerInterface RMW_FASTRTPS_SHARED_CPP_PUBLIC size_t subscription_count() const; + /// Return the number of unique non-local subscriptions matched to this publisher. + /** + * \return Number of unique non-local subscriptions matched to this publisher. + */ + RMW_FASTRTPS_SHARED_CPP_PUBLIC + size_t non_local_subscription_count() const; + RMW_FASTRTPS_SHARED_CPP_PUBLIC void update_deadline(uint32_t total_count, uint32_t total_count_change); @@ -165,6 +174,9 @@ class RMWPublisherEvent final : public EventListenerInterface std::set subscriptions_ RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_); + std::set non_local_subscriptions_ + RCPPUTILS_TSA_GUARDED_BY(subscriptions_mutex_); + mutable std::mutex subscriptions_mutex_; bool deadline_changed_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 2994c108f..b693bc5fa 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -217,6 +217,12 @@ __rmw_publisher_count_matched_subscriptions( const rmw_publisher_t * publisher, size_t * subscription_count); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publisher_get_actual_qos( diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index f6d4cfbe8..36c53c4c0 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -38,14 +38,14 @@ void CustomDataWriterListener::on_publication_matched( eprosima::fastdds::dds::DataWriter * writer, const eprosima::fastdds::dds::PublicationMatchedStatus & status) { - (void)writer; + eprosima::fastrtps::rtps::GUID_t subscription_guid = + eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle); + bool is_local = writer->guid().guidPrefix == subscription_guid.guidPrefix; if (status.current_count_change == 1) { - publisher_event_->track_unique_subscription( - eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle)); + publisher_event_->track_unique_subscription(subscription_guid, is_local); } else if (status.current_count_change == -1) { - publisher_event_->untrack_unique_subscription( - eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle)); + publisher_event_->untrack_unique_subscription(subscription_guid, is_local); } else { return; } @@ -277,16 +277,26 @@ void RMWPublisherEvent::set_on_new_event_callback( publisher_info_->data_writer_->set_listener(publisher_info_->data_writer_listener_, status_mask); } -void RMWPublisherEvent::track_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid) +void RMWPublisherEvent::track_unique_subscription( + eprosima::fastrtps::rtps::GUID_t guid, + bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.insert(guid); + if (!is_local) { + non_local_subscriptions_.insert(guid); + } } -void RMWPublisherEvent::untrack_unique_subscription(eprosima::fastrtps::rtps::GUID_t guid) +void RMWPublisherEvent::untrack_unique_subscription( + eprosima::fastrtps::rtps::GUID_t guid, + bool is_local) { std::lock_guard lock(subscriptions_mutex_); subscriptions_.erase(guid); + if (!is_local) { + non_local_subscriptions_.erase(guid); + } } size_t RMWPublisherEvent::subscription_count() const @@ -295,6 +305,12 @@ size_t RMWPublisherEvent::subscription_count() const return subscriptions_.size(); } +size_t RMWPublisherEvent::non_local_subscription_count() const +{ + std::lock_guard lock(subscriptions_mutex_); + return non_local_subscriptions_.size(); +} + void RMWPublisherEvent::update_deadline(uint32_t total_count, uint32_t total_count_change) { rcpputils::unique_lock lock_mutex(on_new_event_m_); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8ad697aba..e08c56018 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -99,6 +99,18 @@ __rmw_publisher_count_matched_subscriptions( return RMW_RET_OK; } +rmw_ret_t +__rmw_publisher_count_non_local_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * non_local_subscription_count) +{ + auto info = static_cast(publisher->data); + + *non_local_subscription_count = info->publisher_event_->non_local_subscription_count(); + + return RMW_RET_OK; +} + rmw_ret_t __rmw_publisher_assert_liveliness( const char * identifier,