Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe access to PublisherData #278

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
src/rmw_get_node_info_and_types.cpp
Expand Down
25 changes: 12 additions & 13 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity);
event_callbacks_.find(local_entity->guid());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
Expand Down Expand Up @@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
auto topic_info = pub_data->topic_info();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_publisher_data_t * pub_data =
static_cast<rmw_publisher_data_t *>(publisher->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
pub_data->type_support->get_name());
topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile,
pub_data->adapted_qos_profile(),
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1244,7 +1243,7 @@ rmw_ret_t GraphCache::service_server_is_available(

///=============================================================================
void GraphCache::set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
Expand All @@ -1257,20 +1256,20 @@ void GraphCache::set_qos_event_callback(
return;
}

const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity);
const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity_guid);
if (event_cb_it == event_callbacks_.end()) {
// First time a callback is being set for this entity.
event_callbacks_[entity] = {std::make_pair(event_type, std::move(callback))};
event_callbacks_[entity_guid] = {std::make_pair(event_type, std::move(callback))};
return;
}
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
void GraphCache::remove_qos_event_callbacks(std::size_t entity_guid)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
event_callbacks_.erase(entity_guid);
}

///=============================================================================
Expand Down Expand Up @@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
Expand Down
12 changes: 7 additions & 5 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DETAIL__GRAPH_CACHE_HPP_
#define DETAIL__GRAPH_CACHE_HPP_

#include <cstddef>
#include <functional>
#include <map>
#include <memory>
Expand All @@ -27,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand Down Expand Up @@ -133,7 +135,7 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
Expand Down Expand Up @@ -183,12 +185,12 @@ class GraphCache final
/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);
void remove_qos_event_callbacks(std::size_t entity_guid);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);
Expand Down Expand Up @@ -289,13 +291,13 @@ class GraphCache final
GraphNode::TopicMap graph_services_ = {};

using GraphEventCallbacks = std::unordered_map<rmw_zenoh_event_type_t, GraphCacheEventCallback>;
// Map entity (based on uuid) to a map of event callbacks.
// Map an entity's guid to a map of event callbacks.
// Note: Since we use unordered_map, we will only store a single callback for an
// entity string. So we do not support the case where a node create a duplicate
// pub/sub with the exact same topic, type & QoS but registers a different callback
// for the same event type. We could switch to a multimap here but removing the callback
// will be impossible right now since entities do not have unique IDs.
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
Expand Down
21 changes: 21 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include "liveliness_utils.hpp"

#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
return this->node_info_.enclave_;
}

///=============================================================================
NodeInfo Entity::node_info() const
{
return this->node_info_;
}

///=============================================================================
std::optional<TopicInfo> Entity::topic_info() const
{
Expand Down Expand Up @@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
return output;
}
} // namespace liveliness

void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid[i] = dist(rng);
}
}
} // namespace rmw_zenoh_cpp
11 changes: 10 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <zenoh.h>

#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -160,7 +162,10 @@ class Entity

std::string node_enclave() const;

/// Get the topic_info.
/// Get the NodeInfo.
NodeInfo node_info() const;

/// Get the TopicInfo if present.
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
Expand Down Expand Up @@ -230,6 +235,10 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
/// Convert a Zenoh id to a string.
std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
18 changes: 17 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,22 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return RMW_RET_OK;
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
node_it->second->id(),
ret
);
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
Expand All @@ -170,6 +184,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
rmw_context_impl_s::Data::~Data()
{
auto ret = this->shutdown();
nodes_.clear();
static_cast<void>(ret);
}

Expand Down Expand Up @@ -407,6 +422,7 @@ bool rmw_context_impl_s::create_node_data(
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
z_drop(z_move(payload));
}

///=============================================================================
size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

///=============================================================================
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
Expand Down
39 changes: 0 additions & 39 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
class rmw_publisher_data_t final
{
public:
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity;

// An owned publisher.
z_owned_publisher_t pub;

// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache;

// Store the actual QoS profile used to configure this publisher.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;

// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

EventsManager events_mgr;

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///=============================================================================
// z_owned_closure_sample_t
void sub_data_handler(const z_sample_t * sample, void * sub_data);
Expand Down
Loading