Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safe access to PublisherData #278

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/qos.cpp
src/detail/rmw_context_impl_s.cpp
src/detail/rmw_data_types.cpp
src/detail/rmw_publisher_data.cpp
src/detail/rmw_node_data.cpp
src/detail/service_type_support.cpp
src/detail/type_support.cpp
Expand Down
13 changes: 6 additions & 7 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(

///=============================================================================
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count)
{
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
auto topic_info = pub_data->entity()->topic_info().value();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
if (topic_it != graph_topics_.end()) {
rmw_publisher_data_t * pub_data =
static_cast<rmw_publisher_data_t *>(publisher->data);
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
pub_data->type_support->get_name());
topic_info.type_);
if (topic_data_it != topic_it->second.end()) {
for (const auto & [_, topic_data] : topic_data_it->second) {
// If a subscription exists with compatible QoS, update the subscription count.
if (!topic_data->subs_.empty()) {
rmw_qos_compatibility_type_t is_compatible;
rmw_ret_t ret = rmw_qos_profile_check_compatible(
pub_data->adapted_qos_profile,
pub_data->adapted_qos_profile(),
topic_data->info_.qos_,
&is_compatible,
nullptr,
Expand Down Expand Up @@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
const rmw_subscription_data_t * sub_data,
QueryingSubscriberCallback cb)
{
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
auto cb_it = querying_subs_cbs_.find(keyexpr);
if (cb_it == querying_subs_cbs_.end()) {
querying_subs_cbs_[keyexpr] = std::move(
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "ordered_map.hpp"
#include "rmw_publisher_data.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"
Expand Down Expand Up @@ -133,7 +134,7 @@ class GraphCache final
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
PublisherDataConstPtr pub_data,
size_t * subscription_count);

rmw_ret_t subscription_count_matched_publishers(
Expand Down
21 changes: 21 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include "liveliness_utils.hpp"

#include <functional>
#include <limits>
#include <optional>
#include <random>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
return this->node_info_.enclave_;
}

///=============================================================================
NodeInfo Entity::node_info() const
{
return this->node_info_;
}

///=============================================================================
std::optional<TopicInfo> Entity::topic_info() const
{
Expand Down Expand Up @@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
return output;
}
} // namespace liveliness

void
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
{
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());
Yadunund marked this conversation as resolved.
Show resolved Hide resolved

for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid[i] = dist(rng);
}
}
} // namespace rmw_zenoh_cpp
10 changes: 9 additions & 1 deletion rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ class Entity

std::string node_enclave() const;

/// Get the topic_info.
/// Get the NodeInfo.
NodeInfo node_info() const;

/// Get the TopicInfo if present.
std::optional<TopicInfo> topic_info() const;

/// Get the liveliness keyexpr for this entity.
Expand Down Expand Up @@ -230,8 +233,13 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
/// Convert a Zenoh id to a string.
std::string zid_to_str(const z_id_t & id);
} // namespace liveliness

///=============================================================================
// Helper function to generate a randon GID.
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
} // namespace rmw_zenoh_cpp


Yadunund marked this conversation as resolved.
Show resolved Hide resolved
///=============================================================================
// Allow Entity to be hashed and used as a key in unordered_maps/sets
namespace std
Expand Down
18 changes: 17 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,22 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
rmw_ret_t rmw_context_impl_s::Data::shutdown()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return RMW_RET_OK;
return ret;
}

// Shutdown all the nodes in this context.
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
ret = node_it->second->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu",
Yadunund marked this conversation as resolved.
Show resolved Hide resolved
node_it->second->id(),
ret
);
}
}

z_undeclare_subscriber(z_move(graph_subscriber_));
Expand All @@ -170,6 +184,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
rmw_context_impl_s::Data::~Data()
{
auto ret = this->shutdown();
nodes_.clear();
static_cast<void>(ret);
}

Expand Down Expand Up @@ -407,6 +422,7 @@ bool rmw_context_impl_s::create_node_data(
}

auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(data_->session_),
data_->domain_id_,
Expand Down
7 changes: 0 additions & 7 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
z_drop(z_move(payload));
}

///=============================================================================
size_t rmw_publisher_data_t::get_next_sequence_number()
{
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
return sequence_number_++;
}

///=============================================================================
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
rmw_wait_set_data_t * wait_set_data)
Expand Down
39 changes: 0 additions & 39 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,6 @@

namespace rmw_zenoh_cpp
{
///=============================================================================
class rmw_publisher_data_t final
{
public:
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity;

// An owned publisher.
z_owned_publisher_t pub;

// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache;

// Store the actual QoS profile used to configure this publisher.
rmw_qos_profile_t adapted_qos_profile;

// Liveliness token for the publisher.
zc_owned_liveliness_token_t token;

// Type support fields
const void * type_support_impl;
const char * typesupport_identifier;
const rosidl_type_hash_t * type_hash;
MessageTypeSupport * type_support;

// Context for memory allocation for messages.
rmw_context_t * context;

uint8_t pub_gid[RMW_GID_STORAGE_SIZE];

size_t get_next_sequence_number();

EventsManager events_mgr;

private:
std::mutex sequence_number_mutex_;
size_t sequence_number_{1};
};

///=============================================================================
// z_owned_closure_sample_t
void sub_data_handler(const z_sample_t * sample, void * sub_data);
Expand Down
118 changes: 115 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<NodeData> NodeData::make(
const rmw_node_t * const node,
std::size_t id,
z_session_t session,
std::size_t domain_id,
Expand Down Expand Up @@ -75,6 +76,7 @@ std::shared_ptr<NodeData> NodeData::make(

return std::shared_ptr<NodeData>(
new NodeData{
node,
id,
std::move(entity),
std::move(token)
Expand All @@ -83,20 +85,31 @@ std::shared_ptr<NodeData> NodeData::make(

///=============================================================================
NodeData::NodeData(
const rmw_node_t * const node,
std::size_t id,
std::shared_ptr<liveliness::Entity> entity,
zc_owned_liveliness_token_t token)
: id_(std::move(id)),
: node_(node),
id_(std::move(id)),
entity_(std::move(entity)),
token_(std::move(token))
token_(std::move(token)),
is_shutdown_(false),
pubs_({})
{
// Do nothing.
}

///=============================================================================
NodeData::~NodeData()
{
zc_liveliness_undeclare_token(z_move(token_));
const rmw_ret_t ret = this->shutdown();
if (ret != RMW_RET_OK) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Error destructing node /%s.",
entity_->node_name().c_str()
);
}
}

///=============================================================================
Expand All @@ -105,4 +118,103 @@ std::size_t NodeData::id() const
std::lock_guard<std::mutex> lock(mutex_);
return id_;
}

///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
z_session_t session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
if (is_shutdown_) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create PublisherData as the NodeData has been shutdown.");
return false;
}

if (pubs_.count(publisher) > 0) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"PublisherData already exists.");
return false;
}

auto pub_data = PublisherData::make(
std::move(session),
node_,
entity_->node_info(),
id_,
std::move(id),
std::move(topic_name),
type_support,
qos_profile);
if (pub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to make PublisherData.");
return false;
}

auto insertion = pubs_.insert(std::make_pair(publisher, std::move(pub_data)));
if (!insertion.second) {
return false;
}
return true;
}

///=============================================================================
PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
auto it = pubs_.find(publisher);
if (it == pubs_.end()) {
return nullptr;
}

return it->second;
}

///=============================================================================
void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
{
std::lock_guard<std::mutex> lock_guard(mutex_);
pubs_.erase(publisher);
}

///=============================================================================
rmw_ret_t NodeData::shutdown()
{
std::lock_guard<std::mutex> lock(mutex_);
rmw_ret_t ret = RMW_RET_OK;
if (is_shutdown_) {
return ret;
}

// Shutdown all the entities within this node.
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
ret = pub_it->second->shutdown();
if (ret != RMW_RET_OK) {
return ret;
}
}

// Unregister this node from the ROS graph.
zc_liveliness_undeclare_token(z_move(token_));

is_shutdown_ = true;
return RMW_RET_OK;
}

///=============================================================================
// Check if the Node is shutdown.
bool NodeData::is_shutdown() const
{
std::lock_guard<std::mutex> lock(mutex_);
return is_shutdown_;
}

} // namespace rmw_zenoh_cpp
Loading