From 164cca67f0166c1ef4a87d6fb9538f43f2306a32 Mon Sep 17 00:00:00 2001 From: Alexander Hans Date: Sat, 14 Oct 2023 22:37:02 +0200 Subject: [PATCH] Remove recorder from diagnostics test We robustify this test by not relying on a bag recorder, but instead have a subscriber within the same process as the test. Since we have the messages available without extra effort, we use the chance to also check the contents of the messages. --- ros2/test/diagnostics/BUILD.bazel | 18 +-- ros2/test/diagnostics/diagnostic_publisher.cc | 11 +- ros2/test/diagnostics/tests.py | 123 +++++++++--------- 3 files changed, 72 insertions(+), 80 deletions(-) diff --git a/ros2/test/diagnostics/BUILD.bazel b/ros2/test/diagnostics/BUILD.bazel index de3d208d..b10fc967 100644 --- a/ros2/test/diagnostics/BUILD.bazel +++ b/ros2/test/diagnostics/BUILD.bazel @@ -1,4 +1,3 @@ -load("@com_github_mvukov_rules_ros2//ros2:bag.bzl", "ros2_bag") load("@com_github_mvukov_rules_ros2//ros2:cc_defs.bzl", "ros2_cpp_binary") load("@com_github_mvukov_rules_ros2//ros2:test.bzl", "ros2_test") load("@rules_ros2_pip_deps//:requirements.bzl", "requirement") @@ -12,27 +11,20 @@ ros2_cpp_binary( ], ) -ros2_bag( - name = "bag", - idl_deps = [ - "@ros2_common_interfaces//:diagnostic_msgs", - "@ros2_rcl_interfaces//:rcl_interfaces", - "@ros2_rosbag2//:rosbag2_interfaces", - ], -) - ros2_test( name = "diagnostic_heartbeat_test", size = "small", data = [ "aggregator_config.yaml", - ":bag", ], launch_file = "tests.py", nodes = [ ":diagnostic_publisher", "@ros2_diagnostics//:aggregator_node", ], - tags = ["cpu:2"], - deps = [requirement("pyyaml")], + deps = [ + requirement("pyyaml"), + "@ros2_common_interfaces//:py_diagnostic_msgs", + "@ros2_rclpy//:rclpy", + ], ) diff --git a/ros2/test/diagnostics/diagnostic_publisher.cc b/ros2/test/diagnostics/diagnostic_publisher.cc index 3b0d5d03..4ca6a8e1 100644 --- a/ros2/test/diagnostics/diagnostic_publisher.cc +++ b/ros2/test/diagnostics/diagnostic_publisher.cc @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include "diagnostic_updater/diagnostic_updater.hpp" @@ -25,21 +24,17 @@ class Publisher : public rclcpp::Node { diagnostic_updater_.setHardwareID("none"); diagnostic_updater_.add("", &diagnostic_heartbeat_, &diagnostic_updater::Heartbeat::run); - - auto timer_callback{[this]() { - // do nothing - }}; - timer_ = create_wall_timer(std::chrono::milliseconds(20), timer_callback); } private: diagnostic_updater::Updater diagnostic_updater_; diagnostic_updater::Heartbeat diagnostic_heartbeat_{}; - rclcpp::TimerBase::SharedPtr timer_; }; int main(int argc, char* argv[]) { rclcpp::init(argc, argv); rclcpp::spin(std::make_shared()); - return rclcpp::shutdown() ? EXIT_SUCCESS : EXIT_FAILURE; + rclcpp::shutdown(); // will return false if already shutdown due to SIGINT, + // so ignore return value + return EXIT_SUCCESS; } diff --git a/ros2/test/diagnostics/tests.py b/ros2/test/diagnostics/tests.py index 64ceebf5..6c19613b 100644 --- a/ros2/test/diagnostics/tests.py +++ b/ros2/test/diagnostics/tests.py @@ -11,18 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal import unittest +from threading import Event +from threading import Thread import launch.actions import launch_ros.actions import launch_testing.actions import launch_testing.asserts import launch_testing.markers -import yaml - -bag_dir = os.path.join(os.environ['TEST_TMPDIR'], 'bag') +import rclpy +from diagnostic_msgs.msg import DiagnosticArray +from rclpy import node @launch_testing.markers.keep_alive @@ -39,64 +39,69 @@ def generate_test_description(): parameters=['ros2/test/diagnostics/aggregator_config.yaml'], ) - recorder = launch.actions.ExecuteProcess( - cmd=[ - 'ros2/test/diagnostics/bag', 'record', '-o', bag_dir, - '/diagnostics', '/diagnostics_agg' - ], - output='screen', - log_cmd=True, - ) - - return (launch.LaunchDescription([ + return launch.LaunchDescription([ publisher_node, aggregator_node, - recorder, - launch.actions.TimerAction( - period=3.0, - actions=[ - launch.actions.EmitEvent( - event=launch.events.process.SignalProcess( - signal_number=signal.SIGINT, - process_matcher=lambda proc: proc is recorder)) - ]), launch_testing.actions.ReadyToTest(), - ]), { - 'recorder': recorder, - }) + ]) + + +class DiagnosticsListener(node.Node): + + def __init__(self): + super().__init__('diagnostics_listener') + + self.diagnostics_subscription = self.create_subscription( + DiagnosticArray, '/diagnostics', self._on_diagnostics, 10) + self.diagnostics_agg_subscription = self.create_subscription( + DiagnosticArray, '/diagnostics_agg', self._on_diagnostics_agg, 10) + self.messages = {} + self.both_messages_received = Event() + + def _on_diagnostics(self, msg): + self.messages['/diagnostics'] = msg + self._check_if_both_messages_received() + + def _on_diagnostics_agg(self, msg): + self.messages['/diagnostics_agg'] = msg + self._check_if_both_messages_received() + + def _check_if_both_messages_received(self): + if len(self.messages) == 2: + self.both_messages_received.set() class TestHeartbeatDiagnostic(unittest.TestCase): - def test_record_heartbeat_diagnostics(self, proc_info, recorder): - proc_info.assertWaitForShutdown(process=recorder, timeout=5) - launch_testing.asserts.assertExitCodes( - proc_info, - allowable_exit_codes=[launch_testing.asserts.EXIT_OK], - process=recorder) - bag_metadata_file = os.path.join(bag_dir, 'metadata.yaml') - - self.assertTrue(os.path.exists(bag_metadata_file)) - - with open(bag_metadata_file, encoding='utf-8') as stream: - bag_metadata = yaml.load( - stream, Loader=yaml.Loader)['rosbag2_bagfile_information'] - - min_num_received_msgs = 1 - self.assertGreaterEqual(bag_metadata['message_count'], - min_num_received_msgs) - - self.assertEqual(len(bag_metadata['topics_with_message_count']), 2) - - # Are any heartbeat messages sent? - diag_topic = bag_metadata['topics_with_message_count'][0] - self.assertEqual(diag_topic['topic_metadata']['name'], '/diagnostics') - self.assertGreaterEqual(diag_topic['message_count'], - min_num_received_msgs) - - # Does the aggregator work (i.e. are plugins loaded)? - agg_topic = bag_metadata['topics_with_message_count'][1] - self.assertEqual(agg_topic['topic_metadata']['name'], - '/diagnostics_agg') - self.assertGreaterEqual(agg_topic['message_count'], - min_num_received_msgs) + def test_record_heartbeat_diagnostics(self): + rclpy.init() + try: + # Start listener node and wait for messages to be received. + diagnostics_listener = DiagnosticsListener() + thread = Thread(target=lambda node: rclpy.spin(node), + args=(diagnostics_listener,)) + thread.start() + event_triggered = diagnostics_listener.both_messages_received.wait( + timeout=10.0) + self.assertTrue(event_triggered, + 'timeout while waiting for messages') + + # Basic sanity checks of messages. + diagnostics_msg = diagnostics_listener.messages['/diagnostics'] + self.assertEqual(diagnostics_msg.status[0].message, 'Alive') + + diagnostics_agg_msg = diagnostics_listener.messages[ + '/diagnostics_agg'] + status_names = ( + status.name for status in diagnostics_agg_msg.status) + self.assertTrue( + any('diagnostic_publisher' in name for name in status_names)) + finally: + rclpy.shutdown() + + +@launch_testing.post_shutdown_test() +class TestHeartbeatDiagnosticShutdown(unittest.TestCase): + + def test_exit_codes(self, proc_info): + launch_testing.asserts.assertExitCodes(proc_info)