Skip to content

Commit

Permalink
Set subscription handle to SubScriptionData raw ptr due to rmw_wait l…
Browse files Browse the repository at this point in the history
…imitation

Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Oct 2, 2024
1 parent 38b3316 commit 71bbb6a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 114 deletions.
9 changes: 9 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
return nullptr;
}
}
sub_data->graph_cache_ = graph_cache;

auto undeclare_z_sub = rcpputils::make_scope_exit(
[data = sub_data]() {
Expand Down Expand Up @@ -612,4 +613,12 @@ void SubscriptionData::set_on_new_message_callback(
std::lock_guard<std::mutex> lock(mutex_);
data_callback_mgr_.set_callback(user_data, callback);
}

//==============================================================================
std::shared_ptr<GraphCache> SubscriptionData::graph_cache() const
{
std::lock_guard<std::mutex> lock(mutex_);
return graph_cache_;
}

} // namespace rmw_zenoh_cpp
4 changes: 4 additions & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class SubscriptionData final
rmw_event_callback_t callback,
const void * user_data);

std::shared_ptr<GraphCache> graph_cache() const;

// Destructor.
~SubscriptionData();

Expand Down Expand Up @@ -153,6 +155,8 @@ class SubscriptionData final
std::shared_ptr<EventsManager> events_mgr_;
// Shutdown flag.
bool is_shutdown_;
// The graph cache.
std::shared_ptr<GraphCache> graph_cache_;
};
using SubscriptionDataPtr = std::shared_ptr<SubscriptionData>;
using SubscriptionDataConstPtr = std::shared_ptr<const SubscriptionData>;
Expand Down
19 changes: 6 additions & 13 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,8 @@ rmw_subscription_event_init(
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->implementation_identifier, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);
if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) {
RMW_SET_ERROR_MSG(
Expand All @@ -135,14 +128,14 @@ rmw_subscription_event_init(
return RMW_RET_OK;
}

std::weak_ptr<rmw_zenoh_cpp::SubscriptionData> data_wp = sub_data;
context_impl->graph_cache()->set_qos_event_callback(
// std::weak_ptr<rmw_zenoh_cpp::SubscriptionData> data_wp = sub_data;
sub_data->graph_cache()->set_qos_event_callback(
sub_data->guid(),
zenoh_event_type,
[data_wp,
[sub_data,
zenoh_event_type](std::unique_ptr<rmw_zenoh_cpp::rmw_zenoh_event_status_t> zenoh_event)
{
auto sub_data = data_wp.lock();
// auto sub_data = data_wp.lock();
if (sub_data == nullptr) {
return;
}
Expand Down
125 changes: 24 additions & 101 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,11 @@ rmw_create_subscription(

// Store type erased node in rmw_subscription->data so that the
// Subscription can be safely accessed.
rmw_subscription->data = reinterpret_cast<void *>(const_cast<rmw_node_t *>(node));
// TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased
// subscription handle will be returned in the rmw_subscriptions_t in rmw_wait
// from which we cannot obtain SubscriptionData.
// rmw_subscription->data = reinterpret_cast<void *>(const_cast<rmw_node_t *>(node));
rmw_subscription->data = static_cast<void *>(node_data->get_sub_data(rmw_subscription).get());
rmw_subscription->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
rmw_subscription->options = *subscription_options;
rmw_subscription->can_loan_messages = false;
Expand Down Expand Up @@ -1058,18 +1062,11 @@ rmw_subscription_count_matched_publishers(
rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return context_impl->graph_cache()->subscription_count_matched_publishers(
return sub_data->graph_cache()->subscription_count_matched_publishers(
sub_data->topic_info(), publisher_count);
}

Expand All @@ -1087,15 +1084,8 @@ rmw_subscription_get_actual_qos(
rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

*qos = sub_data->adapted_qos_profile();
Expand Down Expand Up @@ -1152,15 +1142,8 @@ rmw_take(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, nullptr, taken);
Expand Down Expand Up @@ -1188,15 +1171,8 @@ rmw_take_with_info(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_one_message(ros_message, message_info, taken);
Expand Down Expand Up @@ -1225,15 +1201,8 @@ rmw_take_sequence(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

if (0u == count) {
Expand Down Expand Up @@ -1308,15 +1277,8 @@ __rmw_take_serialized(
subscription handle,
subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

return sub_data->take_serialized_message(
Expand Down Expand Up @@ -2795,24 +2757,8 @@ check_and_attach_condition(

if (subscriptions) {
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
rmw_node_t * node =
static_cast<rmw_node_t *>(subscriptions->subscribers[i]);
if (node == nullptr) {
continue;
}
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
if (context_impl == nullptr) {
continue;
}
auto node_data = context_impl->get_node_data(node);
if (node_data == nullptr) {
continue;
}
auto sub_data = node_data->get_sub_data(subscriptions->subscribers[i]);
if (sub_data == nullptr) {
continue;
}
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscriptions->subscribers[i]);
if (sub_data->queue_has_data_and_attach_condition_if_not(wait_set_data)) {
return true;
}
Expand Down Expand Up @@ -2962,24 +2908,8 @@ rmw_wait(

if (subscriptions) {
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
rmw_node_t * node =
static_cast<rmw_node_t *>(subscriptions->subscribers[i]);
if (node == nullptr) {
continue;
}
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
if (context_impl == nullptr) {
continue;
}
auto node_data = context_impl->get_node_data(node);
if (node_data == nullptr) {
continue;
}
auto sub_data = node_data->get_sub_data(subscriptions->subscribers[i]);
if (sub_data == nullptr) {
continue;
}
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscriptions->subscribers[i]);
if (sub_data == nullptr) {
continue;
}
Expand Down Expand Up @@ -3335,15 +3265,8 @@ rmw_subscription_set_on_new_message_callback(
const void * user_data)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
rmw_node_t * node =
static_cast<rmw_node_t *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
rmw_context_impl_s * context_impl =
static_cast<rmw_context_impl_s *>(node->context->impl);
RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT);
auto node_data = context_impl->get_node_data(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node_data, RMW_RET_INVALID_ARGUMENT);
auto sub_data = node_data->get_sub_data(subscription);
rmw_zenoh_cpp::SubscriptionData * sub_data =
static_cast<rmw_zenoh_cpp::SubscriptionData *>(subscription->data);
RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT);

sub_data->set_on_new_message_callback(callback, user_data);
Expand Down

0 comments on commit 71bbb6a

Please sign in to comment.