Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Posible bug: spin_some only processes one message per topic even if multiple messages are in the queue #2655

Open
RaphLins opened this issue Oct 22, 2024 · 4 comments
Labels

Comments

@RaphLins
Copy link

Bug report

Required Info:

  • Operating System:
    • ros:rolling-ros-core docker
  • Installation type:
    • binaries
  • Version or commit hash:
    • 28.3.3-1noble.20240919.221549
  • DDS implementation:
    • Fast-RTPS
  • Client library (if applicable):
    • rclcpp

Steps to reproduce issue

A topic is being published at a high frequency while a subscribed node calls rclcpp::spin_some() at a lower frequency.

#include <rclcpp/node.hpp>
#include <rclcpp/executors.hpp>
#include <std_msgs/msg/string.hpp>

int main(int argc, char *argv[]) {
  rclcpp::init(argc, argv);

  rclcpp::Node::SharedPtr pub_node = std::make_shared<rclcpp::Node>("publisher");
  auto pub = pub_node->create_publisher<std_msgs::msg::String>("topic", 100);
  int count = 0;
  auto timer_pub = pub_node->create_wall_timer(std::chrono::milliseconds(100), [&]() -> void {
    std_msgs::msg::String msg;
    msg.data = std::to_string(count);
    count++;
    pub->publish(msg);
    std::cout << "Published: " << msg.data << std::endl;
  });
  std::thread pub_node_thread([&]() {
    rclcpp::spin(pub_node);
  });

  rclcpp::Node::SharedPtr sub_node = std::make_shared<rclcpp::Node>("subscriber");
  auto sub =
      sub_node->create_subscription<std_msgs::msg::String>("topic", 100, [](std_msgs::msg::String::ConstSharedPtr msg) {
        std::cout << "Received: " << msg->data << std::endl;
      });

  while (rclcpp::ok()) {
    std::cout << "Running spin_some" << std::endl;
    rclcpp::spin_some(sub_node);
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  pub_node_thread.join();
  rclcpp::shutdown();

  return 0;
}

Expected behavior

According to its description, rclcpp::spin_some should execute "any immediately available work", so I would expect it to process all the available messages in the subscriber queue.

Running spin_some
Published: 0
Published: 1
Published: 2
Published: 3
Published: 4
Published: 5
Published: 6
Published: 7
Published: 8
Published: 9
Running spin_some
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Published: 10
Published: 11
...

Actual behavior

It only processes one message in the queue at a time, causing it to accumulate.

Running spin_some
Published: 0
Published: 1
Published: 2
Published: 3
Published: 4
Published: 5
Published: 6
Published: 7
Published: 8
Published: 9
Running spin_some
Received: 0
Published: 10
Published: 11
...

Additional information


@fujitatomoya
Copy link
Collaborator

@RaphLins thanks for creating issue. it looks like this is not expected behavior. I would like to ask you to check if the problem stays (the same behavior) with jazzy and humble as well. that would be useful information if some changes in rolling (or jazzy) if the problem is not observed in humble.

@RaphLins
Copy link
Author

@fujitatomoya Thank you for the feedback.

that would be useful information if some changes in rolling (or jazzy) if the problem is not observed in humble.

I've checked and can confirm that the problem is also observed in both the latest humble (v16.0.10) and jazzy.

@Barry-Xu-2018
Copy link
Collaborator

I found Iron version has the issue, and it's unrelated to the DDS type (Cyclone DDS).

After checking code, I found

Executor::get_next_ready_executable() call wait_result_->next_ready_subscription(). If the subscription has messages, it will generate an any_executable. It doesn't matter how many messages there are to receive for that subscription.

while (auto subscription = wait_result_->next_ready_subscription()) {
auto entity_iter = current_collection_.subscriptions.find(
subscription->get_subscription_handle().get());
if (entity_iter != current_collection_.subscriptions.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.subscription = subscription;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
}

std::shared_ptr<rclcpp::SubscriptionBase>
next_ready_subscription()
{
check_wait_result_dirty();
auto ret = std::shared_ptr<rclcpp::SubscriptionBase>{nullptr};
if (this->kind() == WaitResultKind::Ready) {
auto & wait_set = this->get_wait_set();
auto & rcl_wait_set = wait_set.storage_get_rcl_wait_set();
for (size_t ii = 0; ii < wait_set.size_of_subscriptions(); ++ii) {
if (rcl_wait_set.subscriptions[ii] != nullptr) {
ret = wait_set.subscriptions(ii);
rcl_wait_set.subscriptions[ii] = nullptr;
break;
}
}
}
return ret;
}

Executor::execute_any_executable() call Executor::execute_subscription(). In the processing code, it will only retrieve one message.
spin_some ensures that different subscriptions, clients, services, etc., with data will each be processed once, but only one data will be processed for the same target.

This is my understanding; please correct me if I'm wrong.

@fujitatomoya
Copy link
Collaborator

although this is not really user expectation described in this issue report header, this is designed behavior as @Barry-Xu-2018 explains above. rclcpp::spin_some only waits on once to collect executables, and executes them once then exits. (it does not have any idea how many data are available from this wait.) i see this is not ideal for user expectation, but for supporting we probably need new rmw interface to get the information how many data are available on wait. right now we do not have bandwidth for this, so i will move this to backlog for now.

@RaphLins can you use rclcpp::spin_all instead? i think that is what you are looking for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants