Skip to content

Commit

Permalink
Make SubscriptionData thread safe
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Oct 2, 2024
1 parent 9bf5b74 commit 619b33f
Show file tree
Hide file tree
Showing 17 changed files with 1,128 additions and 728 deletions.
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
Expand All @@ -45,6 +46,7 @@ bool get_gid_from_attachment(
return true;
}

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name)
{
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name);
} // namespace rmw_zenoh_cpp
Expand Down
41 changes: 19 additions & 22 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,24 +852,23 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
PublisherDataConstPtr pub_data,
const liveliness::TopicInfo & pub_topic_info,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
auto topic_info = pub_data->topic_info();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(pub_topic_info.name_);
if (topic_it != graph_topics_.end()) {
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
topic_info.type_);
pub_topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile(),
pub_topic_info.qos_,
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand All @@ -887,25 +886,23 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions(

///=============================================================================
rmw_ret_t GraphCache::subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
const liveliness::TopicInfo & sub_topic_info,
size_t * publisher_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*publisher_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name);
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(sub_topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_subscription_data_t * sub_data =
static_cast<rmw_subscription_data_t *>(subscription->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
sub_data->type_support->get_name());
sub_topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->pubs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
sub_data->adapted_qos_profile,
sub_topic_info.qos_,
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1344,29 +1341,29 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
const std::string & sub_keyexpr,
const std::size_t sub_guid,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
auto cb_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
querying_subs_cbs_[sub_keyexpr] = std::move(
std::unordered_map<std::size_t, QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(sub_keyexpr);
}
cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));
cb_it->second.insert(std::make_pair(sub_guid, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data)
const std::string & sub_keyexpr,
const std::size_t sub_guid)
{
auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_);
auto cb_map_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_data);
cb_map_it->second.erase(sub_guid);
}

} // namespace rmw_zenoh_cpp
20 changes: 8 additions & 12 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand All @@ -40,11 +39,6 @@

namespace rmw_zenoh_cpp
{
// Forward declare to prevent circular dependency.
// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of
// rmw_data_types.hpp.
class rmw_subscription_data_t;

///=============================================================================
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
Expand Down Expand Up @@ -135,11 +129,11 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
PublisherDataConstPtr pub_data,
const liveliness::TopicInfo & pub_topic_info,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
const liveliness::TopicInfo & sub_topic_info,
size_t * publisher_count);

rmw_ret_t get_service_names_and_types(
Expand Down Expand Up @@ -196,11 +190,13 @@ class GraphCache final
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
const std::string & sub_keyexpr,
const std::size_t sub_guid,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data);
const std::string & sub_keyexpr,
const std::size_t sub_guid);

private:
// Helper function to convert an Entity into a GraphNode.
Expand Down Expand Up @@ -300,8 +296,8 @@ class GraphCache final
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::unordered_map<const rmw_subscription_data_t *,
// Map key expressions to a map of sub guid and QueryingSubscriberCallback.
std::unordered_map<std::string, std::unordered_map<std::size_t,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
Expand Down
13 changes: 13 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ std::string demangle_name(const std::string & input)
}
} // namespace liveliness

///=============================================================================
void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
Expand All @@ -678,4 +679,16 @@ generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
gid[i] = dist(rng);
}
}

///=============================================================================
size_t hash_gid(const uint8_t * gid)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}
} // namespace rmw_zenoh_cpp
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,12 @@ std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
/// Helper function to generate a randon GID.
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);

///=============================================================================
/// Generate a hash for a given GID.
size_t hash_gid(const uint8_t * gid);
} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
Loading

0 comments on commit 619b33f

Please sign in to comment.