Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove recorder from diagnostics test #196

Merged
merged 2 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions ros2/test/diagnostics/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
load("@com_github_mvukov_rules_ros2//ros2:bag.bzl", "ros2_bag")
load("@com_github_mvukov_rules_ros2//ros2:cc_defs.bzl", "ros2_cpp_binary")
load("@com_github_mvukov_rules_ros2//ros2:test.bzl", "ros2_test")
load("@rules_ros2_pip_deps//:requirements.bzl", "requirement")

ros2_cpp_binary(
name = "diagnostic_publisher",
Expand All @@ -12,27 +10,19 @@ ros2_cpp_binary(
],
)

ros2_bag(
name = "bag",
idl_deps = [
"@ros2_common_interfaces//:diagnostic_msgs",
"@ros2_rcl_interfaces//:rcl_interfaces",
"@ros2_rosbag2//:rosbag2_interfaces",
],
)

ros2_test(
name = "diagnostic_heartbeat_test",
size = "small",
data = [
"aggregator_config.yaml",
":bag",
],
launch_file = "tests.py",
nodes = [
":diagnostic_publisher",
"@ros2_diagnostics//:aggregator_node",
],
tags = ["cpu:2"],
mvukov marked this conversation as resolved.
Show resolved Hide resolved
deps = [requirement("pyyaml")],
deps = [
"@ros2_common_interfaces//:py_diagnostic_msgs",
"@ros2_rclpy//:rclpy",
],
)
11 changes: 3 additions & 8 deletions ros2/test/diagnostics/diagnostic_publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>

#include "diagnostic_updater/diagnostic_updater.hpp"
Expand All @@ -25,21 +24,17 @@ class Publisher : public rclcpp::Node {
diagnostic_updater_.setHardwareID("none");
diagnostic_updater_.add("", &diagnostic_heartbeat_,
&diagnostic_updater::Heartbeat::run);

auto timer_callback{[this]() {
// do nothing
}};
timer_ = create_wall_timer(std::chrono::milliseconds(20), timer_callback);
mvukov marked this conversation as resolved.
Show resolved Hide resolved
}

private:
diagnostic_updater::Updater diagnostic_updater_;
diagnostic_updater::Heartbeat diagnostic_heartbeat_{};
rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Publisher>());
return rclcpp::shutdown() ? EXIT_SUCCESS : EXIT_FAILURE;
rclcpp::shutdown(); // will return false if already shutdown due to SIGINT,
// so ignore return value
return EXIT_SUCCESS;
}
123 changes: 64 additions & 59 deletions ros2/test/diagnostics/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import unittest
from threading import Event
mvukov marked this conversation as resolved.
Show resolved Hide resolved
from threading import Thread

import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import yaml

bag_dir = os.path.join(os.environ['TEST_TMPDIR'], 'bag')
import rclpy
from diagnostic_msgs.msg import DiagnosticArray
mvukov marked this conversation as resolved.
Show resolved Hide resolved
from rclpy import node
mvukov marked this conversation as resolved.
Show resolved Hide resolved


@launch_testing.markers.keep_alive
Expand All @@ -39,64 +39,69 @@ def generate_test_description():
parameters=['ros2/test/diagnostics/aggregator_config.yaml'],
)

recorder = launch.actions.ExecuteProcess(
cmd=[
'ros2/test/diagnostics/bag', 'record', '-o', bag_dir,
'/diagnostics', '/diagnostics_agg'
],
output='screen',
log_cmd=True,
)

return (launch.LaunchDescription([
return launch.LaunchDescription([
publisher_node,
aggregator_node,
recorder,
launch.actions.TimerAction(
period=3.0,
actions=[
launch.actions.EmitEvent(
event=launch.events.process.SignalProcess(
signal_number=signal.SIGINT,
process_matcher=lambda proc: proc is recorder))
]),
launch_testing.actions.ReadyToTest(),
]), {
'recorder': recorder,
})
])


class DiagnosticsListener(node.Node):

def __init__(self):
super().__init__('diagnostics_listener')

self.diagnostics_subscription = self.create_subscription(
DiagnosticArray, '/diagnostics', self._on_diagnostics, 10)
self.diagnostics_agg_subscription = self.create_subscription(
DiagnosticArray, '/diagnostics_agg', self._on_diagnostics_agg, 10)
self.messages = {}
self.both_messages_received = Event()

def _on_diagnostics(self, msg):
self.messages['/diagnostics'] = msg
self._check_if_both_messages_received()

def _on_diagnostics_agg(self, msg):
self.messages['/diagnostics_agg'] = msg
self._check_if_both_messages_received()

def _check_if_both_messages_received(self):
if len(self.messages) == 2:
self.both_messages_received.set()


class TestHeartbeatDiagnostic(unittest.TestCase):

def test_record_heartbeat_diagnostics(self, proc_info, recorder):
proc_info.assertWaitForShutdown(process=recorder, timeout=5)
launch_testing.asserts.assertExitCodes(
proc_info,
allowable_exit_codes=[launch_testing.asserts.EXIT_OK],
process=recorder)
bag_metadata_file = os.path.join(bag_dir, 'metadata.yaml')

self.assertTrue(os.path.exists(bag_metadata_file))

with open(bag_metadata_file, encoding='utf-8') as stream:
bag_metadata = yaml.load(
stream, Loader=yaml.Loader)['rosbag2_bagfile_information']

min_num_received_msgs = 1
self.assertGreaterEqual(bag_metadata['message_count'],
min_num_received_msgs)

self.assertEqual(len(bag_metadata['topics_with_message_count']), 2)

# Are any heartbeat messages sent?
diag_topic = bag_metadata['topics_with_message_count'][0]
self.assertEqual(diag_topic['topic_metadata']['name'], '/diagnostics')
self.assertGreaterEqual(diag_topic['message_count'],
min_num_received_msgs)

# Does the aggregator work (i.e. are plugins loaded)?
agg_topic = bag_metadata['topics_with_message_count'][1]
self.assertEqual(agg_topic['topic_metadata']['name'],
'/diagnostics_agg')
self.assertGreaterEqual(agg_topic['message_count'],
min_num_received_msgs)
def test_record_heartbeat_diagnostics(self):
rclpy.init()
try:
# Start listener node and wait for messages to be received.
diagnostics_listener = DiagnosticsListener()
thread = Thread(target=lambda node: rclpy.spin(node),
args=(diagnostics_listener,))
thread.start()
event_triggered = diagnostics_listener.both_messages_received.wait(
timeout=10.0)
self.assertTrue(event_triggered,
'timeout while waiting for messages')

# Basic sanity checks of messages.
diagnostics_msg = diagnostics_listener.messages['/diagnostics']
self.assertEqual(diagnostics_msg.status[0].message, 'Alive')

diagnostics_agg_msg = diagnostics_listener.messages[
'/diagnostics_agg']
status_names = (
status.name for status in diagnostics_agg_msg.status)
self.assertTrue(
any('diagnostic_publisher' in name for name in status_names))
finally:
rclpy.shutdown()


@launch_testing.post_shutdown_test()
class TestHeartbeatDiagnosticShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
launch_testing.asserts.assertExitCodes(proc_info)