Skip to content

Commit

Permalink
Add tui (interactive mode) for message creation
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Vahl <[email protected]>
  • Loading branch information
Flova committed Sep 1, 2022
1 parent 409bfdf commit f27a29f
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 21 deletions.
2 changes: 2 additions & 0 deletions ros2topic/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
<depend>ros2cli</depend>

<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-prompt-toolkit</exec_depend>
<exec_depend>python3-pygments</exec_depend>
<exec_depend>python3-yaml</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>rosidl_runtime_py</exec_depend>
Expand Down
184 changes: 163 additions & 21 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import os
import tempfile
import time
from typing import Optional
from typing import TypeVar

from prompt_toolkit import prompt
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.lexers import PygmentsLexer
from pygments.lexers.data import YamlLexer
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
Expand All @@ -28,6 +36,7 @@
from ros2topic.api import TopicTypeCompleter
from ros2topic.verb import VerbExtension
from rosidl_runtime_py import set_message_fields
from rosidl_runtime_py.convert import message_to_yaml
from rosidl_runtime_py.utilities import get_message
import yaml

Expand Down Expand Up @@ -84,6 +93,9 @@ def add_arguments(self, parser, cli_name):
parser.add_argument(
'-p', '--print', metavar='N', type=int, default=1,
help='Only print every N-th published message (default: 1)')
parser.add_argument(
'-i', '--interactive', action='store_true',
help='Interactively edit and send the message')
group = parser.add_mutually_exclusive_group()
group.add_argument(
'-1', '--once', action='store_true',
Expand Down Expand Up @@ -135,7 +147,7 @@ def main(self, *, args):
return main(args)


def main(args):
def main(args) -> Optional[str]:
qos_profile = get_pub_qos_profile()

qos_profile_name = args.qos_profile
Expand All @@ -149,12 +161,29 @@ def main(args):
if args.once:
times = 1

if args.interactive:
print('Interactive mode...')
# Read last message that was send if it exists in temp
content = get_last_message_content(args.message_type)
# Show the tui
content = show_interactive_tui(
message_to_yaml(parse_msg(args.message_type, content)),
message_to_yaml(parse_msg(args.message_type)))
# Load msg YAML just to be sure it does not fail and we store a broken message
parse_msg(args.message_type, content)
# Store the user input so we are able to load it the next time
store_message_content(args.message_type, content)
else:
content = args.values

# Parse the yaml string and get a message ofbject of the desired type
msg = parse_msg(args.message_type, content)

with DirectNode(args, node_name=args.node_name) as node:
return publisher(
node.node,
args.message_type,
args.topic_name,
args.values,
msg,
1. / args.rate,
args.print,
times,
Expand All @@ -164,28 +193,147 @@ def main(args):
args.keep_alive)


def get_history_file(msg_type: str) -> str:
"""
Get paths for semi persistent history based on message name.
:param msg_type: Name of the message type
:returns: The path where a history file would be located if it exists
"""
# Get temporary directory name
msg_history_cache_folder_path = os.path.join(
tempfile.gettempdir(), "ros_interactive_msg_cache")
# Create temporary history dir if needed
os.makedirs(msg_history_cache_folder_path, exist_ok=True)
# Create a file based on the message name
return os.path.join(
msg_history_cache_folder_path,
f'{hashlib.sha224(msg_type.encode()).hexdigest()[:20]}.yaml')


def get_last_message_content(msg_type: str) -> str:
"""
Retrive the last message of the given type that was send using the tui if it exists.
:param msg_type: Name of the message type
:returns: The YAML representation containing the last message or an empty dict
"""
content = "{}"
try:
history_path = get_history_file(msg_type)
# Load previous values for that message type
if os.path.exists(history_path):
with open(history_path, 'r') as f:
content = f.read()
except OSError:
print('Unable load history...')
return content


def store_message_content(msg_type: str, content: str) -> None:
"""
Store the YAML for the current message in a semi persistent file.
:param msg_type: Name of the message type
:param content: The YAML entered by the user
"""
try:
history_path = get_history_file(msg_type)
# Clear cache
if os.path.exists(history_path):
os.remove(history_path)
# Store last message in cache
with open(history_path, 'w') as f:
f.write(content)
except OSError:
print('Unable to store history')


def show_interactive_tui(msg_str: str, default_msg_str: Optional[str] = None) -> str:
"""
Show a tui to edit a given message yaml.
:param msg_str: Mesage yaml string which is initially presented to the user
:param default_msg_str: Mesage yaml string with default values for the given message
:return: The mesage yaml string edited by the user
"""
# Create the bottom bar to pressent the options to the user
def bottom_toolbar():
return HTML(' Continue: <b>alt+enter</b> | Exit: <b>ctrl+c</b> | Reset: <b>ctrl+r</b>')

# Create key bindings for the prompt
bindings = KeyBindings()
if default_msg_str is not None:
@bindings.add('c-r')
def _(event):
"""Reset the promt to the default message."""
event.app.current_buffer.text = default_msg_str

# Show prompt to edit the message before sending it
return prompt(
"> ",
multiline=True,
default=msg_str,
lexer=PygmentsLexer(YamlLexer),
mouse_support=True,
bottom_toolbar=bottom_toolbar,
key_bindings=bindings)


def parse_msg(msg_type: str, yaml_values: Optional[str] = None) -> MsgType:
"""
Parse the name and contents of a given message.
:param msg_type: Name of the message as a string (e.g. std_msgs/msg/Header)
:param yaml_values: Contents of the message as a string in YAML layout
:returns: An constructed instance of the message type
"""
# Get the message type from the name string
try:
msg_module = get_message(msg_type)
except (AttributeError, ModuleNotFoundError, ValueError):
raise RuntimeError('The passed message type is invalid')
# Create a default instance of the message with the given name
msg = msg_module()
# Check if we want to add values to the message
if yaml_values is not None:
# Load the user provided fields of the message
values_dictionary = yaml.safe_load(yaml_values)
if not isinstance(values_dictionary, dict):
raise RuntimeError('The passed value needs to be a dictionary in YAML format')
# Set all fields in the message to the provided values
try:
set_message_fields(msg, values_dictionary)
except Exception as e:
raise RuntimeError(f'Failed to populate field: {e}')
return msg


def publisher(
node: Node,
message_type: MsgType,
topic_name: str,
values: dict,
msg: MsgType,
period: float,
print_nth: int,
times: int,
wait_matching_subscriptions: int,
qos_profile: QoSProfile,
keep_alive: float,
) -> Optional[str]:
"""Initialize a node with a single publisher and run its publish loop (maybe only once)."""
try:
msg_module = get_message(message_type)
except (AttributeError, ModuleNotFoundError, ValueError):
raise RuntimeError('The passed message type is invalid')
values_dictionary = yaml.safe_load(values)
if not isinstance(values_dictionary, dict):
return 'The passed value needs to be a dictionary in YAML format'
) -> None:
"""
Initialize a node with a single publisher and run its publish loop (maybe only once).
pub = node.create_publisher(msg_module, topic_name, qos_profile)
:param node: The given node used for publishing the given message
:param topic_name: The topic on which the the message is published
:param msg: The message that is published
:param period: Period after which the msg is published again
:param print_nth: Interval in which the message is printed
:param times: Number of times the message is published
:param wait_matching_subscriptions: Wait until there is a certain number of subscribtions
:param qos_profile: QOS profile
:param keep_alive: Time the node is kept alive after the message was send
"""
pub = node.create_publisher(type(msg), topic_name, qos_profile)

times_since_last_log = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
Expand All @@ -196,12 +344,6 @@ def publisher(
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)

msg = msg_module()
try:
set_message_fields(msg, values_dictionary)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)

print('publisher: beginning loop')
count = 0

Expand Down

0 comments on commit f27a29f

Please sign in to comment.