Skip to content

Commit

Permalink
Add count limit option (#18)
Browse files Browse the repository at this point in the history
Add the option to specify a maximum number of messages per topic.
  • Loading branch information
jorgenfb authored Feb 17, 2022
1 parent b71eeee commit c5e453f
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 14 deletions.
15 changes: 12 additions & 3 deletions rosbag_snapshot/include/rosbag_snapshot/snapshotter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,24 @@ struct ROSBAG_DECL SnapshotterTopicOptions
static const ros::Duration NO_DURATION_LIMIT;
// When the value of memory_limit_, do not trunctate the buffer no matter how much memory it consumes (DANGROUS)
static const int32_t NO_MEMORY_LIMIT;
// When the value of count_limit_, do not trunctate the buffer no matter how many message it consumes
static const int32_t NO_COUNT_LIMIT;
// When the value of duration_limit_, inherit the limit from the node's configured default
static const ros::Duration INHERIT_DURATION_LIMIT;
// When the value of memory_limit_, inherit the limit from the node's configured default
static const int32_t INHERIT_MEMORY_LIMIT;
// When the value of count_limit_, inherit the limit from the node's configured default
static const int32_t INHERIT_COUNT_LIMIT;

// Maximum difference in time from newest and oldest message in buffer before older messages are removed
ros::Duration duration_limit_;
// Maximum memory usage of the buffer before older messages are removed
int32_t memory_limit_;
// Maximum number of message in the buffer before older messages are removed
int32_t count_limit_;

SnapshotterTopicOptions(ros::Duration duration_limit = INHERIT_DURATION_LIMIT,
int32_t memory_limit = INHERIT_MEMORY_LIMIT);
int32_t memory_limit = INHERIT_MEMORY_LIMIT, int32_t count_limit = INHERIT_COUNT_LIMIT);
};

/* Configuration for the Snapshotter node. Contains default limits for memory and duration
Expand All @@ -89,6 +95,8 @@ struct ROSBAG_DECL SnapshotterOptions
ros::Duration default_duration_limit_;
// Memory limit to use for a topic's buffer if one is not specified
int32_t default_memory_limit_;
// Count limit to use for a topic's buffer if one is not specified
int32_t default_count_limit_;
// Period between publishing topic status messages. If <= ros::Duration(0), don't publish status
ros::Duration status_period_;
// Flag if all topics should be recorded
Expand All @@ -99,12 +107,13 @@ struct ROSBAG_DECL SnapshotterOptions
topics_t topics_;

SnapshotterOptions(ros::Duration default_duration_limit = ros::Duration(30), int32_t default_memory_limit = -1,
ros::Duration status_period = ros::Duration(1));
int32_t default_count_limit = -1, ros::Duration status_period = ros::Duration(1));

// Add a new topic to the configuration, returns false if the topic was already present
bool addTopic(std::string const& topic,
ros::Duration duration_limit = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT,
int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT);
int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT,
int32_t count_limit = SnapshotterTopicOptions::INHERIT_COUNT_LIMIT);
};

/* Stores a buffered message of an ambiguous type and it's associated metadata (time of arrival, connection data),
Expand Down
20 changes: 19 additions & 1 deletion rosbag_snapshot/src/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ bool parseOptions(po::variables_map& vm, int argc, char** argv)
("all,a", "Record all topics")
("size,s", po::value<double>()->default_value(-1),
"Maximum memory per topic to use in buffering in MB. Default: no limit")
("count,c", po::value<int32_t>()->default_value(-1),
"Maximum number of messages per topic to use when buffering. Default: no limit")
("duration,d", po::value<double>()->default_value(30.0),
"Maximum difference between newest and oldest buffered message per topic in seconds. Default: 30")
("output-prefix,o", po::value<std::string>()->default_value(""),
Expand Down Expand Up @@ -116,6 +118,7 @@ bool parseVariablesMap(SnapshotterOptions& opts, po::variables_map const& vm)
}
opts.default_memory_limit_ = static_cast<int>(MB_TO_BYTES * vm["size"].as<double>());
opts.default_duration_limit_ = ros::Duration(vm["duration"].as<double>());
opts.default_count_limit_ = vm["count"].as<int32_t>();
opts.all_topics_ = vm.count("all");
return true;
}
Expand Down Expand Up @@ -150,10 +153,13 @@ void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)

// Override program options for default limits if the parameters are set.
double tmp;
int32_t default_count;
if (nh.getParam("default_memory_limit", tmp))
opts.default_memory_limit_ = static_cast<int>(MB_TO_BYTES * tmp);
if (nh.getParam("default_duration_limit", tmp))
opts.default_duration_limit_ = ros::Duration(tmp);
if (nh.getParam("default_count_limit", default_count))
opts.default_count_limit_ = default_count;
nh.param("record_all_topics", opts.all_topics_, opts.all_topics_);

if (!nh.getParam("topics", topics))
Expand Down Expand Up @@ -181,8 +187,10 @@ void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)

ros::Duration dur = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT;
int64_t mem = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT;
int32_t cnt = SnapshotterTopicOptions::INHERIT_COUNT_LIMIT;
std::string duration = "duration";
std::string memory = "memory";
std::string count = "count";
if (topic_config.hasMember(duration))
{
XmlRpcValue& dur_limit = topic_config[duration];
Expand Down Expand Up @@ -215,7 +223,17 @@ void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)
else
ROS_FATAL("err");
}
opts.addTopic(topic, dur, mem);
if (topic_config.hasMember("count"))
{
XmlRpcValue& cnt_limit = topic_config[count];
if (cnt_limit.getType() == XmlRpcValue::TypeInt)
{
cnt = cnt_limit;
}
else
ROS_FATAL("err");
}
opts.addTopic(topic, dur, mem, cnt);
}
else
ROS_ASSERT_MSG(false, "Parameter invalid for topic %lu", i);
Expand Down
22 changes: 17 additions & 5 deletions rosbag_snapshot/src/snapshotter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,30 @@ namespace rosbag_snapshot
{
const ros::Duration SnapshotterTopicOptions::NO_DURATION_LIMIT = ros::Duration(-1);
const int32_t SnapshotterTopicOptions::NO_MEMORY_LIMIT = -1;
const int32_t SnapshotterTopicOptions::NO_COUNT_LIMIT = -1;
const ros::Duration SnapshotterTopicOptions::INHERIT_DURATION_LIMIT = ros::Duration(0);
const int32_t SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT = 0;
const int32_t SnapshotterTopicOptions::INHERIT_COUNT_LIMIT = 0;

SnapshotterTopicOptions::SnapshotterTopicOptions(ros::Duration duration_limit, int32_t memory_limit)
: duration_limit_(duration_limit), memory_limit_(memory_limit)
SnapshotterTopicOptions::SnapshotterTopicOptions(ros::Duration duration_limit, int32_t memory_limit,
int32_t count_limit)
: duration_limit_(duration_limit), memory_limit_(memory_limit), count_limit_(count_limit)
{
}

SnapshotterOptions::SnapshotterOptions(ros::Duration default_duration_limit, int32_t default_memory_limit,
ros::Duration status_period)
int32_t default_count_limit, ros::Duration status_period)
: default_duration_limit_(default_duration_limit)
, default_memory_limit_(default_memory_limit)
, default_count_limit_(default_count_limit)
, status_period_(status_period)
, topics_()
{
}

bool SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory)
bool SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory, int32_t count)
{
SnapshotterTopicOptions ops(duration, memory);
SnapshotterTopicOptions ops(duration, memory, count);
std::pair<topics_t::iterator, bool> ret;
ret = topics_.insert(topics_t::value_type(topic, ops));
return ret.second;
Expand Down Expand Up @@ -162,6 +166,12 @@ bool MessageQueue::preparePush(int32_t size, ros::Time const& time)
dt = time - queue_.front().time;
}
}

// If count limit is enforced, remove elements from front of queue until the the count is below the limit
if (options_.count_limit_ > SnapshotterTopicOptions::NO_COUNT_LIMIT && queue_.size() != 0)
while (queue_.size() != 0 && queue_.size() >= options_.count_limit_)
_pop();

return true;
}
void MessageQueue::push(SnapshotMessage const& _out)
Expand Down Expand Up @@ -253,6 +263,8 @@ void Snapshotter::fixTopicOptions(SnapshotterTopicOptions& options)
options.duration_limit_ = options_.default_duration_limit_;
if (options.memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT)
options.memory_limit_ = options_.default_memory_limit_;
if (options.count_limit_ == SnapshotterTopicOptions::INHERIT_COUNT_LIMIT)
options.count_limit_ = options_.default_memory_limit_;
}

bool Snapshotter::postfixFilename(string& file)
Expand Down
5 changes: 5 additions & 0 deletions rosbag_snapshot/test/snapshot.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
<rosparam>
default_duration_limit: 1 # Maximum time difference between newest and oldest message, seconds, overrides -d flag
default_memory_limit: 0.1 # Maximum memory used by messages in each topic's buffer, MB, override -s flag
default_count_limit: 1000 # Maximum number of messages in each topic's buffer, override -c flag
topics:
- test1 # Inherit defaults
- test2: # Override duration limit, inherit memory limit
duration: 2
- test3: # Override both limits
duration: -1 # Negative means no duration limit
memory: 0.00
- test4:
duration: -1 # Negative means no duration limit
count: 2
</rosparam>
</node>
<!-- A bunch of fixed frequency publishers, each 64bits to make size calculations easy -->
<node name="test1pub" pkg="rostopic" type="rostopic" args="pub /test1 std_msgs/Time '{data:{ secs: 0, nsecs: 0}}' -r2"/>
<node name="test2pub" pkg="rostopic" type="rostopic" args="pub /test2 std_msgs/Int64 'data: 1337' -r12"/>
<node name="test3pub" pkg="rostopic" type="rostopic" args="pub /test3 std_msgs/Float64 'data: 42.00' -r25" />
<node name="test4pub" pkg="rostopic" type="rostopic" args="pub /test4 std_msgs/Float64 'data: 42.00' -r25" />
<test test-name="snapshot_test" pkg="rosbag_snapshot" type="test_snapshot.py"/>
</launch>
19 changes: 14 additions & 5 deletions rosbag_snapshot/test/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TestRosbagSnapshot(unittest.TestCase):
Tests the "rosbag_snapshot" command.
Relies on the nodes launched in snapshot.test
'''

def __init__(self, *args):
self.params = rospy.get_param("snapshot")
self._parse_params(self.params)
Expand All @@ -64,19 +65,22 @@ def _parse_params(self, params):
self.topic_limits = {}
self.default_duration_limit = params['default_duration_limit']
self.default_memory_limit = params['default_memory_limit']
self.default_count_limit = params['default_count_limit']
for topic_obj in self.params['topics']:
duration = self.default_duration_limit
memory = self.default_memory_limit
count = self.default_count_limit
if type(topic_obj) == dict:
topic = list(topic_obj.keys())[0]
duration = topic_obj[topic].get('duration', duration)
memory = topic_obj[topic].get('memory', memory)
count = topic_obj[topic].get('count', count)
else:
topic = topic_obj
topic = rospy.resolve_name(topic)
duration = rospy.Duration(duration)
memory = 1E6 * memory
self.topic_limits[topic] = (duration, memory)
self.topic_limits[topic] = (duration, memory, count)

def _status_cb(self, msg):
self.last_status = msg
Expand Down Expand Up @@ -130,12 +134,13 @@ def _assert_write_success(self, topics=[], prefix_mode=False, **kwargs):
self.assertTrue(os.path.isfile(filename))
return filename

def _assert_limits_enforced(self, test_topic, duration, memory):
def _assert_limits_enforced(self, test_topic, duration, memory, count):
'''
Asserts that the measured duration and memory for a topic comply with the launch parameters
@param topic: string
@param duration: rospy.Duration, age of buffer
@param memory: integer, bytes of memory used
@param count: integer, number of messages stored
'''
test_topic = rospy.resolve_name(test_topic)
self.assertIn(test_topic, self.topic_limits)
Expand All @@ -144,6 +149,8 @@ def _assert_limits_enforced(self, test_topic, duration, memory):
self.assertLessEqual(duration, limits[0])
if limits[1] > 0:
self.assertLessEqual(memory, limits[1])
if limits[2] > 0:
self.assertLessEqual(count, limits[2])

def _assert_status_valid(self):
'''
Expand All @@ -159,7 +166,8 @@ def _assert_status_valid(self):
for topic in self.last_status.topics:
duration = topic.window_stop - topic.window_start
memory = topic.traffic
self._assert_limits_enforced(topic.topic, duration, memory)
count = topic.delivered_msgs
self._assert_limits_enforced(topic.topic, duration, memory, count)

def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None):
'''
Expand All @@ -175,6 +183,7 @@ def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=No
self.assertTrue(bag_topics.issubset(param_topics))
for topic in topics_dict:
size = topics_dict[topic].message_count * 8 # Calculate stored message size as each message is 8 bytes
count = topics_dict[topic].message_count
gen = bag.read_messages(topics=topic)
_, _, first_time = next(gen)
last_time = first_time # in case the next for loop does not execute
Expand All @@ -185,7 +194,7 @@ def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=No
if stop_time:
self.assertLessEqual(last_time, stop_time)
duration = last_time - first_time
self._assert_limits_enforced(topic, duration, size)
self._assert_limits_enforced(topic, duration, size, count)

def test_1_service_connects(self):
'''
Expand Down Expand Up @@ -240,7 +249,7 @@ def test_invalid_topics(self):
Test that an invalid topic or one not subscribed to fails
'''
self._assert_no_data(['_invalid_graph_name'])
self._assert_no_data(['/test4'])
self._assert_no_data(['/test5'])


if __name__ == '__main__':
Expand Down

0 comments on commit c5e453f

Please sign in to comment.