Skip to content

Commit

Permalink
🖤 Add Black formatter for Python files. (ros-controls#936)
Browse files Browse the repository at this point in the history
  • Loading branch information
destogl authored Feb 8, 2023
1 parent e7bab63 commit 8ffe7df
Show file tree
Hide file tree
Showing 25 changed files with 666 additions and 483 deletions.
10 changes: 8 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ repos:

# PyDocStyle
- repo: https://github.com/PyCQA/pydocstyle
rev: 6.2.2
rev: 6.3.0
hooks:
- id: pydocstyle
args: ["--ignore=D100,D101,D102,D103,D104,D105,D106,D107,D203,D212,D404"]

- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
args: ["--line-length=99"]

- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
Expand Down Expand Up @@ -117,7 +123,7 @@ repos:
exclude: CHANGELOG\.rst$

- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.9.0
rev: v1.10.0
hooks:
- id: rst-backticks
exclude: CHANGELOG\.rst$
Expand Down
20 changes: 10 additions & 10 deletions controller_manager/controller_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
load_controller,
reload_controller_libraries,
switch_controllers,
unload_controller
unload_controller,
)

__all__ = [
'configure_controller',
'list_controller_types',
'list_controllers',
'list_hardware_components',
'list_hardware_interfaces',
'load_controller',
'reload_controller_libraries',
'switch_controllers',
'unload_controller',
"configure_controller",
"list_controller_types",
"list_controllers",
"list_hardware_components",
"list_hardware_interfaces",
"load_controller",
"reload_controller_libraries",
"switch_controllers",
"unload_controller",
]
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from controller_manager_msgs.srv import ConfigureController, \
ListControllers, ListControllerTypes, ListHardwareComponents, ListHardwareInterfaces, \
LoadController, ReloadControllerLibraries, SwitchController, UnloadController
from controller_manager_msgs.srv import (
ConfigureController,
ListControllers,
ListControllerTypes,
ListHardwareComponents,
ListHardwareInterfaces,
LoadController,
ReloadControllerLibraries,
SwitchController,
UnloadController,
)

import rclpy

Expand All @@ -24,68 +32,90 @@ def service_caller(node, service_name, service_type, request, service_timeout=10

if not cli.service_is_ready():
node.get_logger().debug(
f'waiting {service_timeout} seconds for service {service_name} to become available...')
f"waiting {service_timeout} seconds for service {service_name} to become available..."
)
if not cli.wait_for_service(service_timeout):
raise RuntimeError(f'Could not contact service {service_name}')
raise RuntimeError(f"Could not contact service {service_name}")

node.get_logger().debug(f'requester: making request: {request}\n')
node.get_logger().debug(f"requester: making request: {request}\n")
future = cli.call_async(request)
rclpy.spin_until_future_complete(node, future)
if future.result() is not None:
return future.result()
else:
raise RuntimeError(f'Exception while calling service: {future.exception()}')
raise RuntimeError(f"Exception while calling service: {future.exception()}")


def configure_controller(node, controller_manager_name, controller_name):
request = ConfigureController.Request()
request.name = controller_name
return service_caller(node, f'{controller_manager_name}/configure_controller',
ConfigureController, request)
return service_caller(
node, f"{controller_manager_name}/configure_controller", ConfigureController, request
)


def list_controllers(node, controller_manager_name):
request = ListControllers.Request()
return service_caller(node, f'{controller_manager_name}/list_controllers',
ListControllers, request)
return service_caller(
node, f"{controller_manager_name}/list_controllers", ListControllers, request
)


def list_controller_types(node, controller_manager_name):
request = ListControllerTypes.Request()
return service_caller(node,
f'{controller_manager_name}/list_controller_types',
ListControllerTypes, request)
return service_caller(
node, f"{controller_manager_name}/list_controller_types", ListControllerTypes, request
)


def list_hardware_components(node, controller_manager_name):
request = ListHardwareComponents.Request()
return service_caller(node, f'{controller_manager_name}/list_hardware_components',
ListHardwareComponents, request)
return service_caller(
node,
f"{controller_manager_name}/list_hardware_components",
ListHardwareComponents,
request,
)


def list_hardware_interfaces(node, controller_manager_name):
request = ListHardwareInterfaces.Request()
return service_caller(node, f'{controller_manager_name}/list_hardware_interfaces',
ListHardwareInterfaces, request)
return service_caller(
node,
f"{controller_manager_name}/list_hardware_interfaces",
ListHardwareInterfaces,
request,
)


def load_controller(node, controller_manager_name, controller_name):
request = LoadController.Request()
request.name = controller_name
return service_caller(node, f'{controller_manager_name}/load_controller',
LoadController, request)
return service_caller(
node, f"{controller_manager_name}/load_controller", LoadController, request
)


def reload_controller_libraries(node, controller_manager_name, force_kill):
request = ReloadControllerLibraries.Request()
request.force_kill = force_kill
return service_caller(node,
f'{controller_manager_name}/reload_controller_libraries',
ReloadControllerLibraries, request)


def switch_controllers(node, controller_manager_name, deactivate_controllers,
activate_controllers, strict, activate_asap, timeout):
return service_caller(
node,
f"{controller_manager_name}/reload_controller_libraries",
ReloadControllerLibraries,
request,
)


def switch_controllers(
node,
controller_manager_name,
deactivate_controllers,
activate_controllers,
strict,
activate_asap,
timeout,
):
request = SwitchController.Request()
request.activate_controllers = activate_controllers
request.deactivate_controllers = deactivate_controllers
Expand All @@ -95,12 +125,14 @@ def switch_controllers(node, controller_manager_name, deactivate_controllers,
request.strictness = SwitchController.Request.BEST_EFFORT
request.activate_asap = activate_asap
request.timeout = rclpy.duration.Duration(seconds=timeout).to_msg()
return service_caller(node, f'{controller_manager_name}/switch_controller',
SwitchController, request)
return service_caller(
node, f"{controller_manager_name}/switch_controller", SwitchController, request
)


def unload_controller(node, controller_manager_name, controller_name):
request = UnloadController.Request()
request.name = controller_name
return service_caller(node, f'{controller_manager_name}/unload_controller',
UnloadController, request)
return service_caller(
node, f"{controller_manager_name}/unload_controller", UnloadController, request
)
59 changes: 37 additions & 22 deletions controller_manager/controller_manager/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
from launch_ros.actions import Node


def generate_load_controller_launch_description(controller_name,
controller_type=None,
controller_params_file=None):
def generate_load_controller_launch_description(
controller_name, controller_type=None, controller_params_file=None
):
"""
Generate launch description for loading a controller using spawner.
Expand All @@ -44,38 +44,53 @@ def generate_load_controller_launch_description(controller_name,
"""
declare_controller_mgr_name = DeclareLaunchArgument(
'controller_manager_name', default_value='controller_manager',
description='Controller manager node name'
"controller_manager_name",
default_value="controller_manager",
description="Controller manager node name",
)
declare_unload_on_kill = DeclareLaunchArgument(
'unload_on_kill', default_value='false',
description='Wait until the node is interrupted and then unload controller'
"unload_on_kill",
default_value="false",
description="Wait until the node is interrupted and then unload controller",
)

spawner_arguments = [
controller_name,
'--controller-manager',
LaunchConfiguration('controller_manager_name')
"--controller-manager",
LaunchConfiguration("controller_manager_name"),
]

if controller_type:
spawner_arguments += ['--controller-type', controller_type]
spawner_arguments += ["--controller-type", controller_type]

if controller_params_file:
spawner_arguments += ['--param-file', controller_params_file]
spawner_arguments += ["--param-file", controller_params_file]

# Setting --unload-on-kill if launch arg unload_on_kill is "true"
# See https://github.com/ros2/launch/issues/290
spawner_arguments += [PythonExpression(
['"--unload-on-kill"', ' if "true" == "',
LaunchConfiguration('unload_on_kill'), '" else ""']
)]
spawner_arguments += [
PythonExpression(
[
'"--unload-on-kill"',
' if "true" == "',
LaunchConfiguration("unload_on_kill"),
'" else ""',
]
)
]

spawner = Node(package='controller_manager', executable='spawner',
arguments=spawner_arguments, shell=True, output='screen')
spawner = Node(
package="controller_manager",
executable="spawner",
arguments=spawner_arguments,
shell=True,
output="screen",
)

return LaunchDescription([
declare_controller_mgr_name,
declare_unload_on_kill,
spawner,
])
return LaunchDescription(
[
declare_controller_mgr_name,
declare_unload_on_kill,
spawner,
]
)
Loading

0 comments on commit 8ffe7df

Please sign in to comment.