Skip to content

Commit

Permalink
Remove the storing of previous content.
Browse files Browse the repository at this point in the history
While it is a nice idea, it adds too much to this feature.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Oct 12, 2022
1 parent 64f6544 commit 9c1f112
Showing 1 changed file with 13 additions and 77 deletions.
90 changes: 13 additions & 77 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import os
import tempfile
import time
from typing import List, Optional, Tuple, TypeVar

Expand Down Expand Up @@ -164,19 +161,14 @@ def main(args) -> Optional[str]:

if args.interactive:
print('Interactive mode...')
# Read last message that was sent if it exists in temp
content = get_last_message_content(args.message_type)
# Show the tui
orig_msg, orig_timestamp_fields = parse_msg(args.message_type, content)
default_msg, default_timestamp_fields = parse_msg(args.message_type)
content = show_interactive_tui(message_to_yaml(orig_msg), message_to_yaml(default_msg))
# Load msg YAML now to be sure it does not fail and we store a broken message
msg, timestamp_fields = parse_msg(args.message_type, content)
# Store the user input so we are able to load it the next time
store_message_content(args.message_type, content)
default_msg, default_timestamp_fields = parse_msg(args.message_type, args.values)
content = show_interactive_tui(message_to_yaml(default_msg))
else:
# Parse the yaml string and get a message object of the desired type
msg, timestamp_fields = parse_msg(args.message_type, content)
content = args.values

# Parse the yaml string and get a message object of the desired type
msg, timestamp_fields = parse_msg(args.message_type, content)

with DirectNode(args, node_name=args.node_name) as node:
return publisher(
Expand All @@ -193,63 +185,7 @@ def main(args) -> Optional[str]:
args.keep_alive)


def get_history_file(msg_type: str) -> str:
"""
Get paths for semi persistent history based on message name.
:param msg_type: Name of the message type
:returns: The path where a history file would be located if it exists
"""
# Get temporary directory name
msg_history_cache_folder_path = os.path.join(
tempfile.gettempdir(), "ros_interactive_msg_cache")
# Create temporary history dir if needed
os.makedirs(msg_history_cache_folder_path, exist_ok=True)
# Create a file based on the message name
return os.path.join(
msg_history_cache_folder_path,
f'{hashlib.sha224(msg_type.encode()).hexdigest()[:20]}.yaml')


def get_last_message_content(msg_type: str) -> str:
"""
Retrieve the last message of the given type that was sent using the tui if it exists.
:param msg_type: Name of the message type
:returns: The YAML representation containing the last message or an empty dict
"""
content = "{}"
try:
history_path = get_history_file(msg_type)
# Load previous values for that message type
if os.path.exists(history_path):
with open(history_path, 'r') as f:
content = f.read()
except OSError:
print('Unable load history...')
return content


def store_message_content(msg_type: str, content: str) -> None:
"""
Store the YAML for the current message in a semi persistent file.
:param msg_type: Name of the message type
:param content: The YAML entered by the user
"""
try:
history_path = get_history_file(msg_type)
# Clear cache
if os.path.exists(history_path):
os.remove(history_path)
# Store last message in cache
with open(history_path, 'w') as f:
f.write(content)
except OSError:
print('Unable to store history')


def show_interactive_tui(msg_str: str, default_msg_str: Optional[str] = None) -> str:
def show_interactive_tui(default_msg_str: str) -> str:
"""
Show a tui to edit a given message yaml.
Expand All @@ -263,17 +199,16 @@ def bottom_toolbar():

# Create key bindings for the prompt
bindings = KeyBindings()
if default_msg_str is not None:
@bindings.add('c-r')
def _(event):
"""Reset the promt to the default message."""
event.app.current_buffer.text = default_msg_str
@bindings.add('c-r')
def _(event):
"""Reset the promt to the default message."""
event.app.current_buffer.text = default_msg_str

# Show prompt to edit the message before sending it
return prompt(
"> ",
multiline=True,
default=msg_str,
default=default_msg_str,
lexer=PygmentsLexer(YamlLexer),
mouse_support=True,
bottom_toolbar=bottom_toolbar,
Expand Down Expand Up @@ -304,6 +239,7 @@ def parse_msg(msg_type: str, yaml_values: Optional[str] = None) -> Tuple[MsgType
raise RuntimeError('The passed value needs to be a dictionary in YAML format')
# Set all fields in the message to the provided values
try:
# Unfortunately, if you specifi
timestamp_fields = set_message_fields(
msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
except Exception as e:
Expand Down

0 comments on commit 9c1f112

Please sign in to comment.