Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for composition actions #15

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test_launch_ros/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<test_depend>ament_pep257</test_depend>
<test_depend>demo_nodes_py</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>launch_xml</test_depend>
<test_depend>launch_yaml</test_depend>
<test_depend>python3-pytest</test_depend>
Expand Down
4 changes: 4 additions & 0 deletions test_launch_ros/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/test_launch_ros
[install]
install-scripts=$base/lib/test_launch_ros
4 changes: 4 additions & 0 deletions test_launch_ros/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@
),
license='Apache License, Version 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'mock-composable-container=test_launch_ros.mock_composable_container:main'],
}
)
175 changes: 175 additions & 0 deletions test_launch_ros/test/rostest/composition.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest

from composition_interfaces.srv import LoadNode
from launch import LaunchDescription
from launch import LaunchService
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction
from launch.actions import RegisterEventHandler
from launch.event_handlers.on_process_start import OnProcessStart
from launch_ros import get_default_launch_description
from launch_ros.actions import ComposableNodeContainer
from launch_ros.actions import LoadComposableNodes
from launch_ros.descriptions import ComposableNode
import launch_testing
import launch_testing.asserts
from rcl_interfaces.msg import Parameter
from rcl_interfaces.msg import ParameterType


def generate_test_description(ready_fn):
# Necessary to get real-time stdout from python processes:
proc_env = os.environ.copy()
proc_env['PYTHONUNBUFFERED'] = '1'

launch_description = LaunchDescription()

mock_container = ComposableNodeContainer(
env=proc_env,
node_name='my_container',
node_namespace='/my_ns',
package='test_launch_ros',
node_executable='mock-composable-container',
composable_node_descriptions=[
ComposableNode(package='fake_package', node_plugin='successfully_load'),
ComposableNode(package='fake_package', node_plugin='fail_to_load'),
ComposableNode(
package='fake_package', node_plugin='node_name',
node_name='my_talker'
),
ComposableNode(
package='fake_package', node_plugin='node_namespace',
node_namespace='my_namespace'
),
ComposableNode(
package='fake_package', node_plugin='remap_rules',
remappings=[('~/foo', '/bar')]
),
ComposableNode(
package='fake_package', node_plugin='parameters',
parameters=[{'foo': {'bar': 'baz'}}]
),
ComposableNode(
package='fake_package', node_plugin='extra_arguments',
extra_arguments=[{'ping.pong': 5}]
),
# TODO(sloretz) log level
# ComposableNode(
# package='fake_package', node_plugin='log_level',
# log_level=1
# ),
])

launch_description.add_action(get_default_launch_description())
launch_description.add_action(mock_container)
launch_description.add_action(
RegisterEventHandler(
event_handler=OnProcessStart(
target_action=mock_container,
on_start=[
LoadComposableNodes(
composable_node_descriptions=[
ComposableNode(
package='fake_package', node_plugin='node_name_on_event',
node_name='my_talker_on_event'
),
],
target_container=mock_container
)
]
)
)
)
launch_description.add_action(
OpaqueFunction(function=lambda context: ready_fn())
)

return launch_description, {'container': mock_container}


class TestComposition(unittest.TestCase):

def test_successfully_load(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'successfully_load'
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_fail_to_load(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'fail_to_load'
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_node_name(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'node_name'
request.node_name = 'my_talker'
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_node_name_post_launch(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'node_name_on_event'
request.node_name = 'my_talker_on_event'
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_node_namespace(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'node_namespace'
request.node_namespace = 'my_namespace'
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_remap_rules(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'remap_rules'
request.remap_rules = ['~/foo:=/bar']
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_parameters(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'parameters'
p = Parameter()
p.name = 'foo.bar'
p.value.string_value = 'baz'
p.value.type = ParameterType.PARAMETER_STRING
request.parameters = [p]
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

def test_custom_extra_arguments(self, container):
request = LoadNode.Request()
request.package_name = 'fake_package'
request.plugin_name = 'extra_arguments'
p = Parameter()
p.name = 'ping.pong'
p.value.integer_value = 5
p.value.type = ParameterType.PARAMETER_INTEGER
request.extra_arguments = [p]
self.proc_output.assertWaitFor(expected_output=repr(request), process=container)

# TODO(sloretz) log level
# def test_custom_log_level(self, container):
# request = LoadNode.Request()
# request.package_name = 'fake_package'
# request.plugin_name = 'log_level'
# request.log_level = 1
# self.proc_output.assertWaitFor(expected_output=repr(request), process=container)
Empty file.
133 changes: 133 additions & 0 deletions test_launch_ros/test_launch_ros/mock_composable_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

from composition_interfaces.srv import LoadNode
import rclpy
from rclpy.node import Node


class MockComposableNodeContainer(Node):

def __enter__(self):
self.__load_service = self.create_service(
LoadNode, '~/_container/load_node', self._on_load_node)
return self

def __exit__(self, type_, value, traceback):
self.destroy_service(self.__load_service)
self.destroy_node()

def __init__(self, name, namespace):
"""
Initialize a mock container process.

:param load_node_responses: responses to a load node request for a given package/plugin
:type load_node_responses: {(str, str): composition_interfaces.LoadNode.Response, ...}
"""
super().__init__(name, namespace=namespace)

# Canned responses to load service
fail_to_load = LoadNode.Response()
fail_to_load.success = False
fail_to_load.error_message = 'intentional failure'

successfully_load = LoadNode.Response()
successfully_load.success = True
successfully_load.full_node_name = '/a_nodename'
successfully_load.unique_id = 2

node_name = LoadNode.Response()
node_name.success = True
node_name.full_node_name = '/my_talker'
node_name.unique_id = 4

node_namespace = LoadNode.Response()
node_namespace.success = True
node_namespace.full_node_name = '/my_namespace/my_talker'
node_namespace.unique_id = 8

log_level = LoadNode.Response()
log_level.success = True
log_level.full_node_name = '/a_nodename'
log_level.unique_id = 16

remap_rules = LoadNode.Response()
remap_rules.success = True
remap_rules.full_node_name = '/a_nodename'
remap_rules.unique_id = 32

parameters = LoadNode.Response()
parameters.success = True
parameters.full_node_name = '/a_nodename'
parameters.unique_id = 64

extra_arguments = LoadNode.Response()
extra_arguments.success = True
extra_arguments.full_node_name = '/a_nodename'
extra_arguments.unique_id = 128

node_name_on_event = LoadNode.Response()
node_name_on_event.success = True
node_name_on_event.full_node_name = '/my_talker_on_event'
node_name_on_event.unique_id = 256

self.__load_node_responses = {
('fake_package', 'fail_to_load'): fail_to_load,
('fake_package', 'successfully_load'): successfully_load,
('fake_package', 'node_name'): node_name,
('fake_package', 'node_namespace'): node_namespace,
('fake_package', 'log_level'): log_level,
('fake_package', 'remap_rules'): remap_rules,
('fake_package', 'parameters'): parameters,
('fake_package', 'extra_arguments'): extra_arguments,
('fake_package', 'node_name_on_event'): node_name_on_event,
}

self.unexpected_request = False

def _on_load_node(self, request, response):
key = (request.package_name, request.plugin_name)
if key not in self.__load_node_responses:
self.unexpected_request = True
unexpected_load = LoadNode.Response()
unexpected_load.success = False
unexpected_load.error_message = 'unexpected load request'
return response
else:
print(repr(request))
return self.__load_node_responses[key]
return response


def main():
rclpy.init()
container = MockComposableNodeContainer(name='mock_container', namespace='/')
with container:
try:
rclpy.spin(container)
except KeyboardInterrupt:
print('Got SIGINT, shutting down')
except:
import traceback
traceback.print_exc()
if container.unexpected_request:
sys.stderr.write('failing due to unexpected request\n')
sys.exit(1)
rclpy.shutdown()


if __name__ == '__main__':
main()