Skip to content

Commit

Permalink
Expose required methods from Entity instead
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <[email protected]>
  • Loading branch information
Yadunund committed Sep 30, 2024
1 parent 665616f commit 0712429
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 20 deletions.
14 changes: 7 additions & 7 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_
for (const auto & [local_entity, event_set] : entities_with_events) {
// Trigger callback set for this entity for the event type.
GraphEventCallbackMap::const_iterator event_callbacks_it =
event_callbacks_.find(local_entity);
event_callbacks_.find(local_entity->guid());
if (event_callbacks_it != event_callbacks_.end()) {
for (const rmw_zenoh_event_type_t & event_type : event_set) {
GraphEventCallbacks::const_iterator callback_it =
Expand Down Expand Up @@ -858,7 +858,7 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
// we support matched qos events.
*subscription_count = 0;
auto topic_info = pub_data->entity()->topic_info().value();
auto topic_info = pub_data->topic_info();
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
if (topic_it != graph_topics_.end()) {
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
Expand Down Expand Up @@ -1243,7 +1243,7 @@ rmw_ret_t GraphCache::service_server_is_available(

///=============================================================================
void GraphCache::set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback)
{
Expand All @@ -1256,20 +1256,20 @@ void GraphCache::set_qos_event_callback(
return;
}

const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity);
const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity_guid);
if (event_cb_it == event_callbacks_.end()) {
// First time a callback is being set for this entity.
event_callbacks_[entity] = {std::make_pair(event_type, std::move(callback))};
event_callbacks_[entity_guid] = {std::make_pair(event_type, std::move(callback))};
return;
}
event_cb_it->second[event_type] = std::move(callback);
}

///=============================================================================
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
void GraphCache::remove_qos_event_callbacks(std::size_t entity_guid)
{
std::lock_guard<std::mutex> lock(graph_mutex_);
event_callbacks_.erase(entity);
event_callbacks_.erase(entity_guid);
}

///=============================================================================
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ class GraphCache final
/// Set a qos event callback for an entity from the current session.
/// @note The callback will be removed when the entity is removed from the graph.
void set_qos_event_callback(
liveliness::ConstEntityPtr entity,
std::size_t entity_guid,
const rmw_zenoh_event_type_t & event_type,
GraphCacheEventCallback callback);

/// Remove all qos event callbacks for an entity.
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);
void remove_qos_event_callbacks(std::size_t entity_guid);

/// Returns true if the entity is a publisher or client. False otherwise.
static bool is_entity_pub(const liveliness::Entity & entity);
Expand Down Expand Up @@ -290,13 +290,13 @@ class GraphCache final
GraphNode::TopicMap graph_services_ = {};

using GraphEventCallbacks = std::unordered_map<rmw_zenoh_event_type_t, GraphCacheEventCallback>;
// Map entity (based on uuid) to a map of event callbacks.
// Map an entity's guid to a map of event callbacks.
// Note: Since we use unordered_map, we will only store a single callback for an
// entity string. So we do not support the case where a node create a duplicate
// pub/sub with the exact same topic, type & QoS but registers a different callback
// for the same event type. We could switch to a multimap here but removing the callback
// will be impossible right now since entities do not have unique IDs.
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
GraphEventCallbackMap event_callbacks_;
// Map keyexpressions to QueryingSubscriberCallback.
Expand Down
11 changes: 9 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,17 @@ rmw_ret_t PublisherData::publish(
}

///=============================================================================
std::shared_ptr<const liveliness::Entity> PublisherData::entity() const
std::size_t PublisherData::guid() const
{
std::lock_guard<std::mutex> lock(mutex_);
return entity_;
return entity_->guid();
}

///=============================================================================
liveliness::TopicInfo PublisherData::topic_info() const
{
std::lock_guard<std::mutex> lock(mutex_);
return entity_->topic_info().value();
}

///=============================================================================
Expand Down
9 changes: 6 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ class PublisherData final
const void * ros_message,
std::optional<zc_owned_shm_manager_t> & shm_manager);

// Get the entity from this PublisherData.
std::shared_ptr<const liveliness::Entity> entity() const;

// Get the next sequence number.
size_t get_next_sequence_number();

// Get a copy of the GUID of this PublisherData's liveliness::Entity.
std::size_t guid() const;

// Get a copy of the TopicInfo of this PublisherData.
liveliness::TopicInfo topic_info() const;

// Get the GID of this PublisherData.
const uint8_t * gid() const;

Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ rmw_publisher_event_init(

// Register the event with graph cache.
context_impl->graph_cache()->set_qos_event_callback(
pub_data->entity(),
pub_data->guid(),
zenoh_event_type,
[pub_data,
zenoh_event_type](std::unique_ptr<rmw_zenoh_cpp::rmw_zenoh_event_status_t> zenoh_event)
Expand Down Expand Up @@ -132,7 +132,7 @@ rmw_subscription_event_init(
}

context_impl->graph_cache()->set_qos_event_callback(
sub_data->entity,
sub_data->entity->guid(),
zenoh_event_type,
[sub_data,
zenoh_event_type](std::unique_ptr<rmw_zenoh_cpp::rmw_zenoh_event_status_t> zenoh_event)
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
return RMW_RET_INVALID_ARGUMENT;
}
// Remove any event callbacks registered to this publisher.
context_impl->graph_cache()->remove_qos_event_callbacks(pub_data->entity());
context_impl->graph_cache()->remove_qos_event_callbacks(pub_data->guid());
// Remove the PublisherData from NodeData.
node_data->delete_pub_data(publisher);

Expand Down Expand Up @@ -1326,7 +1326,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
}

// Remove any event callbacks registered to this subscription.
context_impl->graph_cache()->remove_qos_event_callbacks(sub_data->entity);
context_impl->graph_cache()->remove_qos_event_callbacks(sub_data->entity->guid());

RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, );
allocator->deallocate(sub_data, allocator->state);
Expand Down

0 comments on commit 0712429

Please sign in to comment.