diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index 99f6a1a05..3b2026385 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import hashlib -import os -import tempfile import time from typing import List, Optional, Tuple, TypeVar @@ -164,19 +161,14 @@ def main(args) -> Optional[str]: if args.interactive: print('Interactive mode...') - # Read last message that was sent if it exists in temp - content = get_last_message_content(args.message_type) # Show the tui - orig_msg, orig_timestamp_fields = parse_msg(args.message_type, content) - default_msg, default_timestamp_fields = parse_msg(args.message_type) - content = show_interactive_tui(message_to_yaml(orig_msg), message_to_yaml(default_msg)) - # Load msg YAML now to be sure it does not fail and we store a broken message - msg, timestamp_fields = parse_msg(args.message_type, content) - # Store the user input so we are able to load it the next time - store_message_content(args.message_type, content) + default_msg, default_timestamp_fields = parse_msg(args.message_type, args.values) + content = show_interactive_tui(message_to_yaml(default_msg)) else: - # Parse the yaml string and get a message object of the desired type - msg, timestamp_fields = parse_msg(args.message_type, content) + content = args.values + + # Parse the yaml string and get a message object of the desired type + msg, timestamp_fields = parse_msg(args.message_type, content) with DirectNode(args, node_name=args.node_name) as node: return publisher( @@ -193,63 +185,7 @@ def main(args) -> Optional[str]: args.keep_alive) -def get_history_file(msg_type: str) -> str: - """ - Get paths for semi persistent history based on message name. - - :param msg_type: Name of the message type - :returns: The path where a history file would be located if it exists - """ - # Get temporary directory name - msg_history_cache_folder_path = os.path.join( - tempfile.gettempdir(), "ros_interactive_msg_cache") - # Create temporary history dir if needed - os.makedirs(msg_history_cache_folder_path, exist_ok=True) - # Create a file based on the message name - return os.path.join( - msg_history_cache_folder_path, - f'{hashlib.sha224(msg_type.encode()).hexdigest()[:20]}.yaml') - - -def get_last_message_content(msg_type: str) -> str: - """ - Retrieve the last message of the given type that was sent using the tui if it exists. - - :param msg_type: Name of the message type - :returns: The YAML representation containing the last message or an empty dict - """ - content = "{}" - try: - history_path = get_history_file(msg_type) - # Load previous values for that message type - if os.path.exists(history_path): - with open(history_path, 'r') as f: - content = f.read() - except OSError: - print('Unable load history...') - return content - - -def store_message_content(msg_type: str, content: str) -> None: - """ - Store the YAML for the current message in a semi persistent file. - - :param msg_type: Name of the message type - :param content: The YAML entered by the user - """ - try: - history_path = get_history_file(msg_type) - # Clear cache - if os.path.exists(history_path): - os.remove(history_path) - # Store last message in cache - with open(history_path, 'w') as f: - f.write(content) - except OSError: - print('Unable to store history') - - -def show_interactive_tui(msg_str: str, default_msg_str: Optional[str] = None) -> str: +def show_interactive_tui(default_msg_str: str) -> str: """ Show a tui to edit a given message yaml. @@ -263,17 +199,16 @@ def bottom_toolbar(): # Create key bindings for the prompt bindings = KeyBindings() - if default_msg_str is not None: - @bindings.add('c-r') - def _(event): - """Reset the promt to the default message.""" - event.app.current_buffer.text = default_msg_str + @bindings.add('c-r') + def _(event): + """Reset the promt to the default message.""" + event.app.current_buffer.text = default_msg_str # Show prompt to edit the message before sending it return prompt( "> ", multiline=True, - default=msg_str, + default=default_msg_str, lexer=PygmentsLexer(YamlLexer), mouse_support=True, bottom_toolbar=bottom_toolbar, @@ -304,6 +239,7 @@ def parse_msg(msg_type: str, yaml_values: Optional[str] = None) -> Tuple[MsgType raise RuntimeError('The passed value needs to be a dictionary in YAML format') # Set all fields in the message to the provided values try: + # Unfortunately, if you specifi timestamp_fields = set_message_fields( msg, values_dictionary, expand_header_auto=True, expand_time_now=True) except Exception as e: