Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More generic subscriber implementation using NodeInterfaces from rclcpp #113

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 106 additions & 100 deletions include/message_filters/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,74 +39,48 @@
#include <type_traits>

#include <rclcpp/rclcpp.hpp>
#include <rclcpp/node_interfaces/node_interfaces.hpp>
#include <rclcpp/create_subscription.hpp>

#include "message_filters/connection.h"
#include "message_filters/simple_filter.h"

namespace message_filters
{

template<class NodeType = rclcpp::Node>
class SubscriberBase
{
public:
typedef std::shared_ptr<NodeType> NodePtr;
using NodeParametersInterface = rclcpp::node_interfaces::NodeParametersInterface;
using NodeTopicsInterface = rclcpp::node_interfaces::NodeTopicsInterface;

virtual ~SubscriberBase() = default;
/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
virtual void subscribe(NodePtr node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default) = 0;
using RequiredInterfaces = rclcpp::node_interfaces::NodeInterfaces<NodeParametersInterface, NodeTopicsInterface>;

virtual ~SubscriberBase() = default;
/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node to use to subscribe.
* \param node The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
virtual void subscribe(NodeType * node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default) = 0;
virtual void subscribe(RequiredInterfaces node_interfaces, const std::string& topic,
const rmw_qos_profile_t qos = rmw_qos_profile_default) = 0;

/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
* This override allows SubscriptionOptions to be passed into the class without changing API.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
*/
virtual void subscribe(
NodePtr node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options)
{
this->subscribe(node.get(), topic, qos, options);
};

/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node to use to subscribe.
* \param node The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
*/
virtual void subscribe(
NodeType * node,
RequiredInterfaces node_interfaces,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options) = 0;
Expand All @@ -120,8 +94,8 @@ class SubscriberBase
*/
virtual void unsubscribe() = 0;
};
template <typename T>
using SubscriberBasePtr = std::shared_ptr<SubscriberBase<T>>;

using SubscriberBasePtr = std::shared_ptr<SubscriberBase>;

/**
* \brief ROS subscription filter.
Expand Down Expand Up @@ -161,62 +135,93 @@ struct message_type <M, false>
template <typename M>
using message_type_t = typename message_type<M>::type;

template<class M, class NodeType = rclcpp::Node>
template<class M>
class Subscriber
: public SubscriberBase<NodeType>
: public SubscriberBase
, public SimpleFilter<message_type_t<M>>
{
public:
typedef std::shared_ptr<NodeType> NodePtr;
typedef message_type_t<M> MessageType;
typedef MessageEvent<MessageType const> EventType;

/**
* \brief Constructor
*
* See the rclcpp::Node::subscribe() variants for more information on the parameters
* \brief Constructor for rclcpp::Node / rclcpp_lifecycle::LifecycleNode.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param node The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
Subscriber(NodePtr node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default)
Subscriber(RequiredInterfaces node_interfaces, const std::string& topic,
const rmw_qos_profile_t qos = rmw_qos_profile_default)
{
subscribe(node, topic, qos);
subscribe(node_interfaces, topic, qos);
}

Subscriber(NodeType * node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default)
{
subscribe(node, topic, qos);
}
/**
* \brief Constructor for rclcpp::Node::SharedPtr / rclcpp_lifecycle::LifecycleNode::SharedPtr.
*
* \param node The NodeT::SharedPtr to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
template<typename NodeT>
Subscriber(std::shared_ptr<NodeT> node, const std::string& topic,
const rmw_qos_profile_t qos = rmw_qos_profile_default) : Subscriber(node.get(), topic, qos) {}

/**
* \brief Constructor
* \brief Constructor for raw pointer to rclcpp::Node / rclcpp_lifecycle::LifecycleNode.
*
* See the rclcpp::Node::subscribe() variants for more information on the parameters
* \param node The NodeT raw pointer to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
template<typename NodeT>
Subscriber(NodeT * node, const std::string& topic,
const rmw_qos_profile_t qos = rmw_qos_profile_default) : Subscriber(*node, topic, qos) {}

/**
* \brief Constructor for rclcpp::Node / rclcpp_lifecycle::LifecycleNode.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param node_interfaces The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
*/
Subscriber(
NodePtr node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options)
Subscriber(RequiredInterfaces node_interfaces,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options)
{
subscribe(node.get(), topic, qos, options);
subscribe(node_interfaces, topic, qos, options);
}

Subscriber(
NodeType * node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options)
{
subscribe(node, topic, qos, options);
}
/**
* \brief Constructor for rclcpp::Node::SharedPtr / rclcpp_lifecycle::LifecycleNode::SharedPtr.
*
* \param node The NodeT::SharedPtr to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
*/
template<typename NodeT>
Subscriber(std::shared_ptr<NodeT> node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options) : Subscriber(node.get(), topic, qos, options) {}

/**
* \brief Constructor for raw pointer to rclcpp::Node / rclcpp_lifecycle::LifecycleNode.
*
* \param node The NodeT raw pointer to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
*/
template<typename NodeT>
Subscriber(NodeT * node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options) : Subscriber(*node, topic, qos, options) {}

/**
* \brief Empty constructor, use subscribe() to subscribe to a topic
Expand All @@ -233,64 +238,61 @@ class Subscriber
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param node_interfaces The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
void subscribe(NodePtr node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default) override
// TODO(wjwwood): deprecate in favor of API's that use `rclcpp::QoS` instead.
void subscribe(RequiredInterfaces node_interfaces, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default) override
{
subscribe(node.get(), topic, qos, rclcpp::SubscriptionOptions());
subscribe(node_interfaces, topic, qos, rclcpp::SubscriptionOptions());
}

/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node to use to subscribe.
* \param node The NodeT::SharedPtr to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
// TODO(wjwwood): deprecate in favor of API's that use `rclcpp::QoS` instead.
void subscribe(NodeType * node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default) override
template<typename NodeT>
void subscribe(std::shared_ptr<NodeT> node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default)
{
subscribe(node, topic, qos, rclcpp::SubscriptionOptions());
subscribe(node.get(), topic, qos, rclcpp::SubscriptionOptions());
}

/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node::SharedPtr to use to subscribe.
* \param node The NodeT raw pointer to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe.
* \param options The subscription options to use to subscribe.
* \param qos (optional) The rmw qos profile to use to subscribe
*/
void subscribe(
NodePtr node,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options) override
// TODO(wjwwood): deprecate in favor of API's that use `rclcpp::QoS` instead.
template<typename NodeT>
void subscribe(NodeT * node, const std::string& topic, const rmw_qos_profile_t qos = rmw_qos_profile_default)
{
subscribe(node.get(), topic, qos, options);
node_raw_ = nullptr;
node_shared_ = node;
subscribe(*node, topic, qos, rclcpp::SubscriptionOptions());
}

/**
* \brief Subscribe to a topic.
*
* If this Subscriber is already subscribed to a topic, this function will first unsubscribe.
*
* \param node The rclcpp::Node to use to subscribe.
* \param node The NodeInterfaces to use to subscribe.
* \param topic The topic to subscribe to.
* \param qos The rmw qos profile to use to subscribe
* \param options The subscription options to use to subscribe.
*/
// TODO(wjwwood): deprecate in favor of API's that use `rclcpp::QoS` instead.
void subscribe(
NodeType * node,
RequiredInterfaces node_interfaces,
const std::string& topic,
const rmw_qos_profile_t qos,
rclcpp::SubscriptionOptions options) override
Expand All @@ -304,12 +306,21 @@ class Subscriber
rclcpp_qos.get_rmw_qos_profile() = qos;
qos_ = qos;
options_ = options;
sub_ = node->template create_subscription<M>(topic, rclcpp_qos,
[this](const std::shared_ptr<MessageType const> msg) {
this->cb(EventType(msg));
}, options);

node_raw_ = node;
// Note: temporary variables are solely needed because the involved submodules are inconsistent about the passing
// of shared_ptr. While the node_interfaces return by value, the create_subscription function expects the
// shared_ptr to be passed via reference...
auto parameters_interface = node_interfaces.get<NodeParametersInterface>();
auto topics_interface = node_interfaces.get<NodeTopicsInterface>();

sub_ = rclcpp::create_subscription<MessageType>(parameters_interface,
topics_interface,
topic, rclcpp_qos,
[this](const std::shared_ptr<MessageType const> msg) {
this->cb(EventType(msg));
}, options);

node_interfaces_ = node_interfaces;
}
}

Expand All @@ -320,11 +331,7 @@ class Subscriber
{
if (!topic_.empty())
{
if (node_raw_ != nullptr) {
subscribe(node_raw_, topic_, qos_, options_);
} else if (node_shared_ != nullptr) {
subscribe(node_shared_, topic_, qos_, options_);
}
subscribe(node_interfaces_, topic_, qos_, options_);
}
}

Expand Down Expand Up @@ -372,8 +379,7 @@ class Subscriber

typename rclcpp::Subscription<M>::SharedPtr sub_;

NodePtr node_shared_;
NodeType * node_raw_ {nullptr};
RequiredInterfaces node_interfaces_;

std::string topic_;
rmw_qos_profile_t qos_;
Expand Down
2 changes: 1 addition & 1 deletion test/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ TEST(Subscriber, lifecycle)
{
auto node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_node");
Helper h;
message_filters::Subscriber<Msg, rclcpp_lifecycle::LifecycleNode> sub(node, "test_topic");
message_filters::Subscriber<Msg> sub(node, "test_topic");
sub.registerCallback(std::bind(&Helper::cb, &h, std::placeholders::_1));
auto pub = node->create_publisher<Msg>("test_topic", 10);
pub->on_activate();
Expand Down