Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Output interface #25

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<exec_depend>controller_manager_msgs</exec_depend>
<exec_depend>diagnostic_msgs</exec_depend>
<exec_depend>ros_graph_parser</exec_depend>
<exec_depend>rospkg</exec_depend>
<exec_depend>python-rospkg</exec_depend>

<export>
</export>
Expand Down
4 changes: 2 additions & 2 deletions scripts/monitor
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3

import importlib
import time
Expand Down Expand Up @@ -55,7 +55,7 @@ class ModuleManager(object):
try:
module = self._modules[name]
self._observers[name] = getattr(module, name)(name)
self._observers[name].start()
self._observers[name].start(self._observers[name].perform_output)
except Exception as exc:
print("Could not start {}".format(name))
started = False
Expand Down
54 changes: 24 additions & 30 deletions src/rosgraph_monitor/observer.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
import threading
import mutex
import rospy
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus


class Observer(object):
def __init__(self, name, loop_rate_hz=1):
self._name = name
self._rate = rospy.Rate(loop_rate_hz)
self._seq = 1
self._lock = threading.Lock()
self._thread = threading.Thread(
target=self._run)
self._thread.daemon = True
self._stop_event = threading.Event()

self._pub_diag = rospy.Publisher(
'/diagnostics', DiagnosticArray, queue_size=10)

def __del__(self):
if Observer:
print("{} stopped".format(self._name))

# Every derived class needs to override this
def generate_diagnostics(self):
msg = DiagnosticArray()
return msg
def generate_output():
return None

def _run(self):
while not rospy.is_shutdown() and not self._stopped():
diag_msg = DiagnosticArray()
diag_msg.header.stamp = rospy.get_rostime()
msg = self.generate_output()
print(msg)

status_msgs = self.generate_diagnostics()
diag_msg.status.extend(status_msgs)
self._pub_diag.publish(diag_msg)
# perform_output() should be implemented by OutputInterface's sub-class
self._perform_output(msg)

self._seq += 1
self._rate.sleep()

def start(self):
def start(self, func):
print("starting {}...".format(self._name))
self._perform_output = func
self._thread.start()

def stop(self):
Expand Down Expand Up @@ -76,48 +68,50 @@ def start_service(self):
except rospy.ServiceException as exc:
print("Service {} is not running: ".format(self.name) + str(exc))

def generate_diagnostics(self):
def generate_output(self):
try:
resp = self.client.call()
except rospy.ServiceException as exc:
print("Service {} did not process request: ".format(
self.name) + str(exc))
status_msg = self.diagnostics_from_response(resp)
return status_msg
msg = self.msg_from_response(resp)
return msg

# Every derived class needs to override this
def diagnostics_from_response(self, response):
msg = DiagnosticArray()
return msg
def msg_from_response(self, response):
None


class TopicObserver(Observer):
def __init__(self, name, loop_rate_hz, topics):
super(TopicObserver, self).__init__(name, loop_rate_hz)
super(TopicObserver, self).__init__(name=name, loop_rate_hz=loop_rate_hz)
self._topics = topics
self._id = ""
self._num_topics = len(topics)

def is_close(self, a, b, rel_tol=1e-09, abs_tol=0.0):
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)

# Every derived class needs to override this
def calculate_attr(self, msgs):
# do calculations
return DiagnosticStatus()
return None

def generate_diagnostics(self):
def generate_output(self):
msgs = []
received_all = True
for topic, topic_type in self._topics:
try:
# Add message synchronization here -- message filter
# Consider "asynchronous" reception as well?
msgs.append(rospy.wait_for_message(topic, topic_type))
except rospy.ROSException as exc:
print("Topic {} is not found: ".format(topic) + str(exc))
received_all = False
break

status_msgs = list()
status_msg = DiagnosticStatus()
msg = None
if received_all:
status_msg = self.calculate_attr(msgs)
status_msgs.append(status_msg)
msg = self.calculate_attr(msgs)

return status_msgs
return msg
52 changes: 52 additions & 0 deletions src/rosgraph_monitor/observers/dummy_diag_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from rosgraph_monitor.observer import TopicObserver
from rosgraph_monitor.output_interface import ObservationOutputPublisher
from std_msgs.msg import Int32, Bool
from diagnostic_msgs.msg import DiagnosticStatus, KeyValue


# example observer to publish result as a DiagnosticStatus message
class DummyDiagnosticObserver(TopicObserver, ObservationOutputPublisher):
def __init__(self, name): #, set_point=None): # reads from param?
topics = [("/speed", Int32), ("/accel", Int32)] # list of pairs
self._set_point = 5.5 #set_point
ObservationPublisher.__init__(self, topic_name="/diagnostics", DiagnosticStatus)
TopicObserver.__init__(self, name=name, loop_rate_hz=10, topics=topics)

def calculate_attr(self, msgs):
status_msg = DiagnosticStatus()

attr = msgs[0].data + msgs[1].data
is_close = True
if(self._set_point):
is_close = self.is_close(attr, self._set_point, 0.05)
print("{0} + {1}; is_close={2}".format(msgs[0].data, msgs[1].data, is_close))

status_msg.level = DiagnosticStatus.OK if is_close else DiagnosticStatus.ERROR
status_msg.name = self._id
status_msg.values.append(
KeyValue("enery", str(attr)))
status_msg.message = "QA status"

return status_msg


# example observer to publish result as a Bool message
class DummyBoolObserver(TopicObserver, ObservationPublisher):
def __init__(self, name): #, set_point=None): # reads from param?
topics = [("/speed", Int32), ("/accel", Int32)] # list of pairs
self._set_point = 5.5 #set_point
ObservationPublisher.__init__(self, topic_name="/output_topic", Bool)
TopicObserver.__init__(self, name=name, loop_rate_hz=10, topics=topics)

def calculate_attr(self, msgs):
status_msg = DiagnosticStatus()

attr = msgs[0].data + msgs[1].data
is_close = False
if(self._set_point):
is_close = self.is_close(attr, self._set_point, 0.05)
print("{0} + {1}; is_close={2}".format(msgs[0].data, msgs[1].data, is_close))

msg = Bool()
msg.data = is_close
return msg
18 changes: 11 additions & 7 deletions src/rosgraph_monitor/observers/graph_observer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import imp
from rosgraph_monitor.observer import ServiceObserver
from rosgraph_monitor.parser import ModelParser
from pyparsing import *
import rospkg
import os.path
import re
from pyparsing import *
import rospkg
from rosgraph_monitor.observer import ServiceObserver
from rosgraph_monitor.output_interface import ObservationOutputPublisher
from rosgraph_monitor.parser import ModelParser


from ros_graph_parser.srv import GetROSModel, GetROSSystemModel
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
Expand All @@ -14,7 +16,7 @@ def strip_slash(string):
return '{}'.format(string[1:] if string.startswith('/') else string)


class ROSGraphObserver(ServiceObserver):
class ROSGraphObserver(ServiceObserver, ObservationOutputPublisher):
def __init__(self, name):
super(ROSGraphObserver, self).__init__(
name, '/get_rossystem_model', GetROSSystemModel)
Expand All @@ -23,7 +25,8 @@ def __init__(self, name):
self.model_path = os.path.join(rospack.get_path('rosgraph_monitor'), "resources/cob4-25.rossystem")
self._rossystem_parser = ModelParser(self.model_path)

def diagnostics_from_response(self, resp):
def msg_from_response(self, resp):
diag_array = DiagnosticArray()
status_msgs = list()
if resp is None:
return status_msgs
Expand Down Expand Up @@ -69,7 +72,8 @@ def diagnostics_from_response(self, resp):
KeyValue(params[0], str(params[1])))
status_msgs.append(status_msg)

return status_msgs
diag_msg.status.extend(status_msgs)
return diag_msg

# find out missing and additional interfaces
# if both lists are empty, system is running fine
Expand Down
45 changes: 45 additions & 0 deletions src/rosgraph_monitor/output_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import rospy


class OutputInterface(object):
def __init__(self):
pass

def perform_output():
pass


class ObservationOutputPublisher(OutputInterface):
def __init__(self, topic_name, message_type):
super(ObservationPublisher, self).__init__()

self._seq = 0;
self._publisher = rospy.Publisher(
topic_name, message_type, queue_size=10)

def perform_output(self, msg):
try:
msg.header.stamp = rospy.get_rostime()
except Exception as exc:
pass

self._publisher.publish(msg)
self._seq += 1


class ObservationOutputLogger(OutputInterface):
def __init__(self, file_name):
pass

def perform_output(self, msg):
# do logging
pass


class ObservationOutputPlotter(OutputInterface):
def __init__(self, plt_params):
pass

def perform_output(self, msg):
# do plotting
pass
Empty file.
8 changes: 4 additions & 4 deletions src/rosgraph_monitor/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ def __init__(self, model, isFile=True):
_action_servers = Keyword("RosActionServers").suppress()
_action_server = Keyword("RosActionServer").suppress(
) | Keyword("RosServer").suppress()
_ref_server = Keyword("RefServer").suppress()
_ref_server = Keyword("RefServer").suppress() | Keyword("RefActionServer").suppress()

# Actio Clients Def
_action_clients = Keyword("RosActionClients").suppress()
_action_client = Keyword("RosActionClient").suppress(
) | Keyword("RosClient").suppress()
_ref_action_client = Keyword("RefClient").suppress()
_ref_action_client = Keyword("RefClient").suppress() | Keyword("RefActionClient").suppress()

# Topic Connections Def
_topic_connections = Keyword("TopicConnections").suppress()
Expand Down Expand Up @@ -196,8 +196,8 @@ def parse(self):
self._parse_from_file()
else:
self._parse_from_string()
except Exception as e:
print(e.args) # Should set a default 'result'?
except ParseException as e:
print(ParseException.explain(e, depth=0)) # Should set a default 'result'?
return self._result


Expand Down