Skip to content

Commit

Permalink
Thread-safe access to PublisherData (#278)
Browse files Browse the repository at this point in the history
* Create PublisherData within NodeData

Signed-off-by: Yadunund <[email protected]>

* Continue shutting down other nodes if one fails

Signed-off-by: Yadunund <[email protected]>

* includes

Signed-off-by: Yadunund <[email protected]>

* Expose required methods from Entity instead

Signed-off-by: Yadunund <[email protected]>

* Reuse methods and make publish_serialized_message also thread safe

Signed-off-by: Yadunund <[email protected]>

* Address alex's feedback

Signed-off-by: Yadunund <[email protected]>

* Safely copy GID

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund authored Oct 1, 2024
1 parent 824886a commit 9bf5b74
Show file tree
Hide file tree
Showing 16 changed files with 1,047 additions and 533 deletions.
2 changes: 2 additions & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
src/detail/zenoh_router_check.cpp
src/detail/zenoh_utils.cpp
src/rmw_event.cpp
src/rmw_get_network_flow_endpoints.cpp
src/rmw_get_node_info_and_types.cpp
Expand Down
25 changes: 12 additions & 13 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity);
event_callbacks_.find(local_entity->guid());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
Expand Down Expand Up @@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
auto topic_info = pub_data->topic_info();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_publisher_data_t * pub_data =
static_cast<rmw_publisher_data_t *>(publisher->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
pub_data->type_support->get_name());
topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile,
pub_data->adapted_qos_profile(),
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1244,7 +1243,7 @@ rmw_ret_t GraphCache::service_server_is_available(

///=============================================================================
void GraphCache::set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
Expand All @@ -1257,20 +1256,20 @@ void GraphCache::set_qos_event_callback(
return;
}

const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity);
const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity_guid);
if (event_cb_it == event_callbacks_.end()) {
// First time a callback is being set for this entity.
event_callbacks_[entity] = {std::make_pair(event_type, std::move(callback))};
event_callbacks_[entity_guid] = {std::make_pair(event_type, std::move(callback))};
return;
}
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
void GraphCache::remove_qos_event_callbacks(std::size_t entity_guid)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
event_callbacks_.erase(entity_guid);
}

///=============================================================================
Expand Down Expand Up @@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
Expand Down
12 changes: 7 additions & 5 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DETAIL__GRAPH_CACHE_HPP_
#define DETAIL__GRAPH_CACHE_HPP_

#include <cstddef>
#include <functional>
#include <map>
#include <memory>
Expand All @@ -27,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand Down Expand Up @@ -133,7 +135,7 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
Expand Down Expand Up @@ -183,12 +185,12 @@ class GraphCache final
/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);
void remove_qos_event_callbacks(std::size_t entity_guid);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);
Expand Down Expand Up @@ -289,13 +291,13 @@ class GraphCache final
GraphNode::TopicMap graph_services_ = {};

using GraphEventCallbacks = std::unordered_map<rmw_zenoh_event_type_t, GraphCacheEventCallback>;
// Map entity (based on uuid) to a map of event callbacks.
// Map an entity's guid to a map of event callbacks.
// Note: Since we use unordered_map, we will only store a single callback for an
// entity string. So we do not support the case where a node create a duplicate
// pub/sub with the exact same topic, type & QoS but registers a different callback
// for the same event type. We could switch to a multimap here but removing the callback
// will be impossible right now since entities do not have unique IDs.
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
Expand Down
21 changes: 21 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include "liveliness_utils.hpp"

#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
return this->node_info_.enclave_;
}

///=============================================================================
NodeInfo Entity::node_info() const
{
return this->node_info_;
}

///=============================================================================
std::optional<TopicInfo> Entity::topic_info() const
{
Expand Down Expand Up @@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
return output;
}
} // namespace liveliness

void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());

for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid[i] = dist(rng);
}
}
} // namespace rmw_zenoh_cpp
11 changes: 10 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <zenoh.h>

#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -160,7 +162,10 @@ class Entity

std::string node_enclave() const;

/// Get the topic_info.
/// Get the NodeInfo.
NodeInfo node_info() const;

/// Get the TopicInfo if present.
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
Expand Down Expand Up @@ -230,6 +235,10 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
/// Convert a Zenoh id to a string.
std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
} // namespace rmw_zenoh_cpp

///=============================================================================
Expand Down
18 changes: 17 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,22 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return RMW_RET_OK;
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
node_it->second->id(),
ret
);
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
Expand All @@ -170,6 +184,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
rmw_context_impl_s::Data::~Data()
{
auto ret = this->shutdown();
nodes_.clear();
static_cast<void>(ret);
}

Expand Down Expand Up @@ -407,6 +422,7 @@ bool rmw_context_impl_s::create_node_data(
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
z_drop(z_move(payload));
}

///=============================================================================
size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

///=============================================================================
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
Expand Down
39 changes: 0 additions & 39 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
class rmw_publisher_data_t final
{
public:
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity;

// An owned publisher.
z_owned_publisher_t pub;

// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache;

// Store the actual QoS profile used to configure this publisher.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;

// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

EventsManager events_mgr;

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///=============================================================================
// z_owned_closure_sample_t
void sub_data_handler(const z_sample_t * sample, void * sub_data);
Expand Down
Loading

0 comments on commit 9bf5b74

Please sign in to comment.