Skip to content

Commit

Permalink
Remove recorder from diagnostics test
Browse files Browse the repository at this point in the history
We robustify this test by not relying on a bag recorder, but instead
have a subscriber within the same process as the test. Since we have the
messages available without extra effort, we use the chance to also check
the contents of the messages.
  • Loading branch information
ahans committed Oct 14, 2023
1 parent b823fbb commit 164cca6
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 80 deletions.
18 changes: 5 additions & 13 deletions ros2/test/diagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
load("@com_github_mvukov_rules_ros2//ros2:bag.bzl", "ros2_bag")
load("@com_github_mvukov_rules_ros2//ros2:cc_defs.bzl", "ros2_cpp_binary")
load("@com_github_mvukov_rules_ros2//ros2:test.bzl", "ros2_test")
load("@rules_ros2_pip_deps//:requirements.bzl", "requirement")
Expand All @@ -12,27 +11,20 @@ ros2_cpp_binary(
],
)

ros2_bag(
name = "bag",
idl_deps = [
"@ros2_common_interfaces//:diagnostic_msgs",
"@ros2_rcl_interfaces//:rcl_interfaces",
"@ros2_rosbag2//:rosbag2_interfaces",
],
)

ros2_test(
name = "diagnostic_heartbeat_test",
size = "small",
data = [
"aggregator_config.yaml",
":bag",
],
launch_file = "tests.py",
nodes = [
":diagnostic_publisher",
"@ros2_diagnostics//:aggregator_node",
],
tags = ["cpu:2"],
deps = [requirement("pyyaml")],
deps = [
requirement("pyyaml"),
"@ros2_common_interfaces//:py_diagnostic_msgs",
"@ros2_rclpy//:rclpy",
],
)
11 changes: 3 additions & 8 deletions ros2/test/diagnostics/diagnostic_publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>

#include "diagnostic_updater/diagnostic_updater.hpp"
Expand All @@ -25,21 +24,17 @@ class Publisher : public rclcpp::Node {
diagnostic_updater_.setHardwareID("none");
diagnostic_updater_.add("", &diagnostic_heartbeat_,
&diagnostic_updater::Heartbeat::run);

auto timer_callback{[this]() {
// do nothing
}};
timer_ = create_wall_timer(std::chrono::milliseconds(20), timer_callback);
}

private:
diagnostic_updater::Updater diagnostic_updater_;
diagnostic_updater::Heartbeat diagnostic_heartbeat_{};
rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Publisher>());
return rclcpp::shutdown() ? EXIT_SUCCESS : EXIT_FAILURE;
rclcpp::shutdown(); // will return false if already shutdown due to SIGINT,
// so ignore return value
return EXIT_SUCCESS;
}
123 changes: 64 additions & 59 deletions ros2/test/diagnostics/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import unittest
from threading import Event
from threading import Thread

import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import yaml

bag_dir = os.path.join(os.environ['TEST_TMPDIR'], 'bag')
import rclpy
from diagnostic_msgs.msg import DiagnosticArray
from rclpy import node


@launch_testing.markers.keep_alive
Expand All @@ -39,64 +39,69 @@ def generate_test_description():
parameters=['ros2/test/diagnostics/aggregator_config.yaml'],
)

recorder = launch.actions.ExecuteProcess(
cmd=[
'ros2/test/diagnostics/bag', 'record', '-o', bag_dir,
'/diagnostics', '/diagnostics_agg'
],
output='screen',
log_cmd=True,
)

return (launch.LaunchDescription([
return launch.LaunchDescription([
publisher_node,
aggregator_node,
recorder,
launch.actions.TimerAction(
period=3.0,
actions=[
launch.actions.EmitEvent(
event=launch.events.process.SignalProcess(
signal_number=signal.SIGINT,
process_matcher=lambda proc: proc is recorder))
]),
launch_testing.actions.ReadyToTest(),
]), {
'recorder': recorder,
})
])


class DiagnosticsListener(node.Node):

def __init__(self):
super().__init__('diagnostics_listener')

self.diagnostics_subscription = self.create_subscription(
DiagnosticArray, '/diagnostics', self._on_diagnostics, 10)
self.diagnostics_agg_subscription = self.create_subscription(
DiagnosticArray, '/diagnostics_agg', self._on_diagnostics_agg, 10)
self.messages = {}
self.both_messages_received = Event()

def _on_diagnostics(self, msg):
self.messages['/diagnostics'] = msg
self._check_if_both_messages_received()

def _on_diagnostics_agg(self, msg):
self.messages['/diagnostics_agg'] = msg
self._check_if_both_messages_received()

def _check_if_both_messages_received(self):
if len(self.messages) == 2:
self.both_messages_received.set()


class TestHeartbeatDiagnostic(unittest.TestCase):

def test_record_heartbeat_diagnostics(self, proc_info, recorder):
proc_info.assertWaitForShutdown(process=recorder, timeout=5)
launch_testing.asserts.assertExitCodes(
proc_info,
allowable_exit_codes=[launch_testing.asserts.EXIT_OK],
process=recorder)
bag_metadata_file = os.path.join(bag_dir, 'metadata.yaml')

self.assertTrue(os.path.exists(bag_metadata_file))

with open(bag_metadata_file, encoding='utf-8') as stream:
bag_metadata = yaml.load(
stream, Loader=yaml.Loader)['rosbag2_bagfile_information']

min_num_received_msgs = 1
self.assertGreaterEqual(bag_metadata['message_count'],
min_num_received_msgs)

self.assertEqual(len(bag_metadata['topics_with_message_count']), 2)

# Are any heartbeat messages sent?
diag_topic = bag_metadata['topics_with_message_count'][0]
self.assertEqual(diag_topic['topic_metadata']['name'], '/diagnostics')
self.assertGreaterEqual(diag_topic['message_count'],
min_num_received_msgs)

# Does the aggregator work (i.e. are plugins loaded)?
agg_topic = bag_metadata['topics_with_message_count'][1]
self.assertEqual(agg_topic['topic_metadata']['name'],
'/diagnostics_agg')
self.assertGreaterEqual(agg_topic['message_count'],
min_num_received_msgs)
def test_record_heartbeat_diagnostics(self):
rclpy.init()
try:
# Start listener node and wait for messages to be received.
diagnostics_listener = DiagnosticsListener()
thread = Thread(target=lambda node: rclpy.spin(node),
args=(diagnostics_listener,))
thread.start()
event_triggered = diagnostics_listener.both_messages_received.wait(
timeout=10.0)
self.assertTrue(event_triggered,
'timeout while waiting for messages')

# Basic sanity checks of messages.
diagnostics_msg = diagnostics_listener.messages['/diagnostics']
self.assertEqual(diagnostics_msg.status[0].message, 'Alive')

diagnostics_agg_msg = diagnostics_listener.messages[
'/diagnostics_agg']
status_names = (
status.name for status in diagnostics_agg_msg.status)
self.assertTrue(
any('diagnostic_publisher' in name for name in status_names))
finally:
rclpy.shutdown()


@launch_testing.post_shutdown_test()
class TestHeartbeatDiagnosticShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
launch_testing.asserts.assertExitCodes(proc_info)

0 comments on commit 164cca6

Please sign in to comment.