Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe access to SubscriptionData #288

Open
wants to merge 2 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
Expand All @@ -45,6 +46,7 @@ bool get_gid_from_attachment(
return true;
}

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name)
{
Expand Down
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

namespace rmw_zenoh_cpp
{
//==============================================================================
bool get_gid_from_attachment(
const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]);

//==============================================================================
int64_t get_int64_from_attachment(
const z_attachment_t * const attachment, const std::string & name);
} // namespace rmw_zenoh_cpp
Expand Down
41 changes: 19 additions & 22 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,24 +852,23 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
PublisherDataConstPtr pub_data,
const liveliness::TopicInfo & pub_topic_info,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
auto topic_info = pub_data->topic_info();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(pub_topic_info.name_);
if (topic_it != graph_topics_.end()) {
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
topic_info.type_);
pub_topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile(),
pub_topic_info.qos_,
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand All @@ -887,25 +886,23 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions(

///=============================================================================
rmw_ret_t GraphCache::subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
const liveliness::TopicInfo & sub_topic_info,
size_t * publisher_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*publisher_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name);
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(sub_topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_subscription_data_t * sub_data =
static_cast<rmw_subscription_data_t *>(subscription->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
sub_data->type_support->get_name());
sub_topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->pubs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
sub_data->adapted_qos_profile,
sub_topic_info.qos_,
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1344,29 +1341,29 @@ std::unique_ptr<rmw_zenoh_event_status_t> GraphCache::take_event_status(

///=============================================================================
void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
const std::string & sub_keyexpr,
const std::size_t sub_guid,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
auto cb_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
std::unordered_map<const rmw_subscription_data_t *,
QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(keyexpr);
querying_subs_cbs_[sub_keyexpr] = std::move(
std::unordered_map<std::size_t, QueryingSubscriberCallback>{});
cb_it = querying_subs_cbs_.find(sub_keyexpr);
}
cb_it->second.insert(std::make_pair(sub_data, std::move(cb)));
cb_it->second.insert(std::make_pair(sub_guid, std::move(cb)));
}

///=============================================================================
void GraphCache::remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data)
const std::string & sub_keyexpr,
const std::size_t sub_guid)
{
auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_);
auto cb_map_it = querying_subs_cbs_.find(sub_keyexpr);
if (cb_map_it == querying_subs_cbs_.end()) {
return;
}
cb_map_it->second.erase(sub_data);
cb_map_it->second.erase(sub_guid);
}

} // namespace rmw_zenoh_cpp
20 changes: 8 additions & 12 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand All @@ -40,11 +39,6 @@

namespace rmw_zenoh_cpp
{
// Forward declare to prevent circular dependency.
// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of
// rmw_data_types.hpp.
class rmw_subscription_data_t;

///=============================================================================
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
Expand Down Expand Up @@ -135,11 +129,11 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
PublisherDataConstPtr pub_data,
const liveliness::TopicInfo & pub_topic_info,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
const liveliness::TopicInfo & sub_topic_info,
size_t * publisher_count);

rmw_ret_t get_service_names_and_types(
Expand Down Expand Up @@ -196,11 +190,13 @@ class GraphCache final
static bool is_entity_pub(const liveliness::Entity & entity);

void set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
const std::string & sub_keyexpr,
const std::size_t sub_guid,
QueryingSubscriberCallback cb);

void remove_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data);
const std::string & sub_keyexpr,
const std::size_t sub_guid);

private:
// Helper function to convert an Entity into a GraphNode.
Expand Down Expand Up @@ -300,8 +296,8 @@ class GraphCache final
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
std::unordered_map<std::string, std::unordered_map<const rmw_subscription_data_t *,
// Map key expressions to a map of sub guid and QueryingSubscriberCallback.
std::unordered_map<std::string, std::unordered_map<std::size_t,
QueryingSubscriberCallback>> querying_subs_cbs_;
// Counters to track changes to event statues for each topic.
std::unordered_map<std::string,
Expand Down
13 changes: 13 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ std::string demangle_name(const std::string & input)
}
} // namespace liveliness

///=============================================================================
void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
Expand All @@ -678,4 +679,16 @@ generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
gid[i] = dist(rng);
}
}

///=============================================================================
size_t hash_gid(const uint8_t * gid)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(gid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}
} // namespace rmw_zenoh_cpp
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,12 @@ std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
/// Helper function to generate a randon GID.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Helper function to generate a randon GID.
/// Helper function to generate a random GID.

void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);

///=============================================================================
/// Generate a hash for a given GID.
size_t hash_gid(const uint8_t * gid);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you are just copying this over, but I'm going to suggest that we change this signature to:

Suggested change
size_t hash_gid(const uint8_t * gid);
size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]);

It won't make much of a practical difference, but it will be clear to the reader exactly what the size of gid is.

} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
Loading