Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: allow subscription message pointer to be changed between callbacks #186

Open
BrettRD opened this issue Aug 16, 2021 · 15 comments

Comments

@BrettRD
Copy link
Contributor

BrettRD commented Aug 16, 2021

I'm having a hard time getting micro-ros to go fast enough for my application.
I've implemented a queue of pre-allocated ros messages for the publisher to send on a separate thread to my data acquisition loop, and the API supports that fine, especially with the multi-threaded rmw (thanks pablogs9!)

It would be nice for _rclc_take_new_data()/rcl_take() to write directly into my queue to avoid copying a large buffer during the callback.

Changing handle->data is always safe inside that handle's subscription callback as long as the new message is valid and initialised.

This feature involves a new call to the executor that changes handle->data perhaps leveraging _rclc_executor_find_handle() and sanity-checking handle->type.

I can get by with #include "rclc/executor.h" and hacking around directly, but I imagine other people will want a similar performance boost.

I'm happy to implement and test this, is there anything I'm missing? and should similar be written for the other handle types?

@JanStaschulat
Copy link
Contributor

Sounds very interesting to me. Sure, go ahead.
How about providing a pointer to the queue directly when you configure the subscription to the executor, e.g

rcl_ret_t
rclc_executor_add_subscription_dataqueue(
  rclc_executor_t * executor,
  rcl_subscription_t * subscription,
  void * ptr_to_data_queue,
  rclc_subscription_callback_t callback,
  rclc_executor_handle_invocation_t invocation);

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 16, 2021

Passing the queue on subscription declaration would need someone to establish a convention for accessing a queue (a wrapper of platform-agnostic function calls) , and then the queue accessor would need to be called during _rclc_take_new_data().

If the scope is limited to a function call that exchanges a handle->data pointer, then assumptions about the nature of the queue can be avoided, and the same code will apply to both existing types of subscriptions, and all client and service handles.

In practice, the user-side code isn't too bad.

void subscription_callback(const void * msgin) {
// process or enqueue the message
msg_ring_write(msg_ring, msgin); // return or free the message
void * newmsg = msg_ring_borrow(msg_ring); // borrow or allocate a new message
rclc_executor_swap_msg(executor, /*rcl_handle,*/ msgin,  newmsg); //give the new message to the executor

I'm undecided if it should search by message pointer or by rcl_handle.

In cases with very large numbers of handles, it would make sense to skip this step and allow the executor to advance the queue, but that starts to look like a whole separate polling API

I'd actually like to see a reduction in the number of rclc_executor_add_* calls.
after the last refactor, rclc_executor_add_subscription can almost be replaced by a call to rclc_executor_add_subscription_with_context carrying a NULL context, it just needs to adjust the handle->type afterwards so the correct number of args are passed to the callback

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 16, 2021

okay, understood. Then go ahead with the option that limits the scope to exchange the handle->data pointer. I would start with a simple solution. At micro-controller do you forsee many number of handles? Also, if timing is relevant, then you can use the order in which the handles are added to the executor as priority-level. So a handle will be found faster, if it is added earlier to the executor.

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 16, 2021

I haven't had any need for more than four or five handles on my esp32 executors personally, but I'm about to start using more guard-conditions to synchronise code.

If search performance becomes important, we can allow the user to cache the results:

rcl_ret_t
rclc_executor_swap_message(
  rclc_executor_t * executor,
  void * old_message,
  void * new_message,
  void ** handle_ptr_cache
) {
  //test handle_ptr_cache first:
  if ( (NULL != handle_ptr_cache) &&
       (old_message == (*handle_ptr_cache)->data) {
    matching_handle = *handle_ptr_cache;
  } else {
    // then search the executor's other handles
    for(executor->handles){
      ...
    }
  }
  ...

  // then cache the result
  if(NULL != handle_ptr_cache) {
    *handle_ptr_cache = matching_handle;
  }
}

This will only search more than once if an earlier handle is removed, and will accept NULL as an always-search option for stateless callbacks.
If run-time searching is never ok, the user can exchange a message with itself on startup

Searching based on message pointer should allow multiple context-free subscriptions to use the same callback and still rotate separate message queues, I'll try that first.

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 17, 2021

Looking at other ways of doing this:

I might get better performance by using rcl_take_loaned_message and configuring a large rmw memory pool, but subscription borrow is not yet supported for micro-ros

or almost the same performance by passing messages with zeroed variable-sized part and passing a specialised allocator which manages the queues

Only message swapping and custom allocators will allow a borrow concept to be combined with device-specific allocation, and even then, only down-stream of the rmw (the ESP32 has multiple RAM devices with wildly different speeds)

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 17, 2021

but I'm about to start using more guard-conditions to synchronise code.

Regarding guard-conditions: How do you want to use guard-conditions? Is timing critical?

Note, that a triggered guard-condition does not cause rcl_wait to return immediately. If a guard-condition was triggered, the rcl_wait returns only after the timeout is reached or if any other event (timer is ready, message for a subscription is available) happened. We wanted to use hardware interrupts to signal to a guard-condition, and by this mechanism to synchronize to non-ROS functions (in other threads) in micro-ROS. However, this is currently not possible. There will be always a time delay - at maximum the length of the timeout of rcl_wait.

@pablogs9 How much effort would it be to change the behavior of rcl_wait, so that it would return, if a guard-condition was ready?

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 18, 2021

A triggered guard-condition does not lead to an immediate return of rcl_wait.

Thanks for the heads up! That will save me at least a day of debugging.

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 19, 2021

I have a concern about data corruption:

  • new data d1 for subscription s1 is available at DDS middleware
  • rcl_take takes that data and assigns handle->data= d1
  • which your new approach, you put the pointer to d1 in your queue (which is accessed by some other threads)
  • in the subscrption callback you do not process d1 (handle->data) and control is given back to the Executor
  • at the same time, data d1 is processed by a different thread by accessing the queue
  • if spin_some is called again by the Executur and new data d2 were available for subscription s1, and rcl_take would assign handle->data=d2 while the other thread is processing data d1. In this case input data will be corrupted.

How do you solve this? Did I misunderstood your approach?

A similar approach I have implemented in the "budget-enabled real-time executor", in which every subscription callback is processed in a different thread and in one specific Executor thread accesses to the middleware are managed (aka rcl_wait, rcl_take, rcl_publish. It has been taken care of, that new data for a subscription is only taken with rcl_take from DDS middleware, after the thread finished processing the subscription callback with the new data.

Publication: https://arxiv.org/abs/2105.05590
Pull request: #87

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 19, 2021

Your concerns about data corruption are exactly why I chose to build this feature.
What I've implemented is similar in intent to your paper, but the timing behaviour of my worker thread is totally decoupled from the cadence of the executor unless the process queue becomes empty.
I'm short on CPU cycles in my application, so I can't afford to copy the messages during the subscription callback but I needed a guarantee that the executor won't corrupt the old data with the new. I have a pre-allocated message pool (four messages including a large pre-allocated ros sequence), and I use the above feature to control which threads own which parts of the message pool.

I'm using FreeRTOS queues to ensure thread safety of the message pool itself. I have a Queue of unused messages, and a Queue of messages ready for processing. The queues only hold pointers to the messages.
I'm controlling access to the messages by ensuring only one thread can hold a message pointer at a time, and there are limited times when it is safe to change ownership of a message. when a message pointer is passed between threads, the messages are held in FreeRTOS queues which are internally protected by semaphores.

The application starts with one pre-allocated message (M1) held by the executor, handle->data is the only instance of that message pointer anywhere in the application. The only way to access that message is to be passed the message pointer during subscription callback.

During callback: a new (pre-allocated) message is borrowed from the pool M2, handle->data is replaced by M2; M1 which was passed in as an argument is given to the message queue to wait for processing. The only copy of the M1 pointer is in the processing queue, the only copy of the M2 pointer is in the executor.
At the end of the callback, handle->data points to a fresh empty message M2 that no other thread can point to, and the executor has no reference to M1, so it cannot corrupt downstream data. According to both schedulers available in Foxy, handle->data is not used again during spin-some, so it is not required to hold recent data.

The next time spin_some is called, the executor passes M2 to rcl_take to write into. No other thread has written to M2 since the callback because M2 does not exist anywhere else, and the executor has not written to M1 because M1 only exists in the queue or downstream from it.

The only times it is safe to use the message swap feature is either during the callback that uses the message being swapped, or while spin-some is not running, or callbacks that execute later than the handle that holds the message being swapped. This is mentioned in the PR docstrings.

I'll have a go at an example, but it'll be a bit simplistic on posix.

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 19, 2021

Great, thanks for the explanation. Changing the pointer of handle->data in the subscription callback to an unused message does the trick to preserve data consistency. Very nice!

In the paper about the multi-threaded Executor, I intentionally did not use a queue, which is indeed a typical way to organize multi-threading, because a user can define quality of service (QoS) in DDS. (e.g. buffer sizes=1). If the executor has now it's own queue to store new messages, then the QoS parameter will be shadowed. Also you need to manually check the length of the queue depending on the execution time of the "real subscription callback - that does sth with the data" and the topic/timer frequency. The queue-size might therefore change, e.g. when you port the application to a different hardware. Then the queue size would need to be adapted e.g. when there is a software update and the topic is published with higher frequency, or the algorithm is more complex, e.g. execution time of the message callback is longer ... Then messages will be missed. - Or you need a blocking wait in the subscription callback - that returns after a free message in the queue became available. ...

This was the reason, why in the general concept of the multi-threaded executor, I decided against an Executor internal queue and use the feedback mechanism, that only fetches new data, if a corresponding processing thread is READY to process new messages.

But I agree with you, avoiding copying data on micro-controller should be avoided as much as possible. As this is not necessary and only costs time. Maybe QoS is not so much an issue for you, so by choosing the queue size wisely will work fine for your use-case.

If you like, you are very much invited to present this new functionality and possibly an evaluation of the performance improvement in an Embedded Working Group Meeting later on.

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 19, 2021

I don't know your application, but: what about using multiple executors. Each one in its own thread. So that each received subscription message is processed in a different thread. The message is not copied, if you process it in the subscription callback. Would that help you?

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 20, 2021

I'm trying to write an esp-adf (audio development framework) component that takes messages from my ros-gst-bridge package. I'm currently able to stream 16b 48KHz stereo audio through RCL publish, with the publisher and executor in the two lowest priority threads.

I'm introducing queues and threads to make sure downstream elements stay fed and synchronised even if I have to inject a block of zeroes. The high priority threads are performing time-stamping so they need to idle as much as possible. Any call to RCL (and wifi) has a call duration that is much too variable to be allowed in the fast loop.

My latency is dominated by transport jitter, and the process queues are mostly empty unless the wifi chokes. In ros-gst-bridge, latency is configured for a whole pipeline and messages have a presentation-time-stamp. In esp-adf, every element in the pipeline has a thread and a ring buffer. With this much glue code, DDS QoS is just one of many dials I need to adjust, but excessive buffers aren't a problem.

rclc assumes I can collect the message in a callback in its thread, esp-adf assumes I can return the message contents during a time-sensitive callback in its thread. The two design patterns require a thread-safe queue between them.
The only other viable approach would be to present a whole new rclc polling API, or to use rcl_take directly in the adf callback.

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 20, 2021

Hi @BrettRD, I got a rough understanding of your application use-case:

  • there are time-sensitive functions, which send 16b @ 48KHz using RCL publish from uC
  • there are other time-sensitive functions, which receive data on the uC through incoming messages from ros-gst-bridge via incoming subscriptions that you want to process in a pipelined fashion with multiple threads without copying the message.

That is, in your application you want to pass the pointer of the received message from one thread to another thread in the audio processing pipeline. While the rclc-executor assumes a single-threaded processing of the subscription callback. Consequently, after the subscription callback has finished, handle->data can be re-used for the next incoming message by rcl_take. In your application you want to use handle->data beyond the scope of the subscription callback in multiple threads. Only after the entire pipeline has processed the message and is done with it, the message can be re-used again.

Essentially, in your proposed approach you manage your own message pool by swapping the message in handle->data in the subscription callback. Then, rcl_take puts the new message always in an unused pre-allocated message and when this message is no longer needed by the processing pipeline, it is returned to the message pool and can be re-used in some later invocation of the subscription callback.

Which makes sense for me and is a reasonable solution given the current implmentation of the rclc-Executor.

How about a message pool inside the rclc-Executor:

take_new_msg()
// the executor maintains a message data pool. Each call to rcl_take uses as message pointer a 
// new pre-allocated message of this pool, e.g. msg_i, and calls the subscription_callback with
// the parameter msgin=msg_i

void * subscriptionA_callback_with_context( void * msgin, void * context)
{
 my_msg_t * my_msg = (my_msg_t *) msgin;
 my_context_t * my_context = (my_context_t *) context;
 // put msg into a queue which is part of the context: 
 queue_add (my_context->queue, my_msg);
}

in some other thread
void * pipeline_function_A(void * queue_A, void * queue_B)
{
  // process msg from the queue_A
 // put msg into queue_B
}

in some other thread
void * pipeline_function_Z(void * queue_Z)
{
  // release message msg_i for subscription A
  rclc_executor_release_message( &subA, queue_Z->msg );
}

@BrettRD would that be a solution or does adding to the queue etc need some additional stuff ?

@pablogs9 With rcl_take_loaned_message we could easily implement this feature, without any further memory pool directly in the rclc executor. What do you think, how much effort is it to implement these functions for micro-ROS?

@BrettRD
Copy link
Contributor Author

BrettRD commented Aug 20, 2021

A borrow API from the middle-ware memory pool would be ideal, especially if I can specify the memory device the whole rmw pool is allocated into.

A release-message call, and a way of making the subscription or the whole executor operate in borrow mode would be the only rclc API changes necessary. (And I think the release-message call is already provided by RCL)

My pool has a borrow and return method, and an upstream queue for available messages. those would all disappear, leaving me with a vanilla freertos queue object and no user-side allocations.

How would you manage the case of memory exhaustion when all messages are on loan?
In my queue, I can force deletion of old data, but the executor can't force the application to release a message.

@JanStaschulat
Copy link
Contributor

JanStaschulat commented Aug 20, 2021

How would you manage the case of memory exhaustion when all messages are on loan?
In my queue, I can force deletion of old data, but the executor can't force the application to release a message.

That's always the great question :)

In this case, I would only take new message from DDS with rcl_take, if a message in the pool was available. In case memory is exhausted, then the QoS configuration of DDS will kick in and will manage message loss (e.g. depending on the size of DDS/middleware buffers).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants