Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw publisher, convert Context to singleton #76

Merged
merged 4 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions r2r/examples/tokio_raw_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use r2r::QosProfile;
use r2r::WrappedTypesupport;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;
let duration = std::time::Duration::from_millis(2500);

let mut timer = node.create_wall_timer(duration)?;
let publisher =
node.create_publisher_untyped("/topic", "std_msgs/msg/String", QosProfile::default())?;

let handle = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});

for _ in 1..10 {
timer.tick().await?;
let msg = r2r::std_msgs::msg::String {
data: "hello from r2r".to_string(),
};
publisher.publish_raw(&msg.to_serialized_bytes()?)?;
}

handle.await?;
Ok(())
}
97 changes: 56 additions & 41 deletions r2r/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ffi::CStr;
use std::ffi::CString;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::OnceLock;
use std::sync::{Arc, Mutex};

use crate::error::*;
Expand Down Expand Up @@ -29,50 +30,64 @@ macro_rules! check_rcl_ret {

unsafe impl Send for Context {}

// Safety: Context is just a Arc<Mutex<..>> wrapper around ContextHandle
// so it should be safe to access from different threads
unsafe impl Sync for Context {}

// Memory corruption (double free and others) was observed creating multiple
// `Context` objects in a single thread
//
// To reproduce, run the tests from `tokio_testing` or `tokio_test_raw`
// without this OnceLock

static CONTEXT: OnceLock<Result<Context>> = OnceLock::new();

impl Context {
/// Create a ROS context.
pub fn create() -> Result<Context> {
let mut ctx: Box<rcl_context_t> = unsafe { Box::new(rcl_get_zero_initialized_context()) };
// argc/v
let args = std::env::args()
.map(|arg| CString::new(arg).unwrap())
.collect::<Vec<CString>>();
let mut c_args = args
.iter()
.map(|arg| arg.as_ptr())
.collect::<Vec<*const ::std::os::raw::c_char>>();
c_args.push(std::ptr::null());

let is_valid = unsafe {
let allocator = rcutils_get_default_allocator();
let mut init_options = rcl_get_zero_initialized_init_options();
check_rcl_ret!(rcl_init_options_init(&mut init_options, allocator));
check_rcl_ret!(rcl_init(
(c_args.len() - 1) as ::std::os::raw::c_int,
c_args.as_ptr(),
&init_options,
ctx.as_mut(),
));
check_rcl_ret!(rcl_init_options_fini(&mut init_options as *mut _));
rcl_context_is_valid(ctx.as_mut())
};

let logging_ok = unsafe {
let _guard = log_guard();
let ret = rcl_logging_configure(
&ctx.as_ref().global_arguments,
&rcutils_get_default_allocator(),
);
ret == RCL_RET_OK as i32
};

if is_valid && logging_ok {
Ok(Context {
context_handle: Arc::new(Mutex::new(ContextHandle(ctx))),
})
} else {
Err(Error::RCL_RET_ERROR) // TODO
}
CONTEXT.get_or_init(|| {
let mut ctx: Box<rcl_context_t> = unsafe { Box::new(rcl_get_zero_initialized_context()) };
// argc/v
let args = std::env::args()
.map(|arg| CString::new(arg).unwrap())
.collect::<Vec<CString>>();
let mut c_args = args
.iter()
.map(|arg| arg.as_ptr())
.collect::<Vec<*const ::std::os::raw::c_char>>();
c_args.push(std::ptr::null());

let is_valid = unsafe {
let allocator = rcutils_get_default_allocator();
let mut init_options = rcl_get_zero_initialized_init_options();
check_rcl_ret!(rcl_init_options_init(&mut init_options, allocator));
check_rcl_ret!(rcl_init(
(c_args.len() - 1) as ::std::os::raw::c_int,
c_args.as_ptr(),
&init_options,
ctx.as_mut(),
));
check_rcl_ret!(rcl_init_options_fini(&mut init_options as *mut _));
rcl_context_is_valid(ctx.as_mut())
};

let logging_ok = unsafe {
let _guard = log_guard();
let ret = rcl_logging_configure(
&ctx.as_ref().global_arguments,
&rcutils_get_default_allocator(),
);
ret == RCL_RET_OK as i32
};

if is_valid && logging_ok {
Ok(Context {
context_handle: Arc::new(Mutex::new(ContextHandle(ctx))),
})
} else {
Err(Error::RCL_RET_ERROR) // TODO
}
}).clone()
}

/// Check if the ROS context is valid.
Expand Down
2 changes: 1 addition & 1 deletion r2r/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub type Result<T> = std::result::Result<T, Error>;
/// These values are mostly copied straight from the RCL headers, but
/// some are specific to r2r, such as `GoalCancelRejected` which does
/// not have an analogue in the rcl.
#[derive(Error, Debug)]
#[derive(Error, Clone, Debug)]
pub enum Error {
#[error("RCL_RET_OK")]
RCL_RET_OK,
Expand Down
4 changes: 3 additions & 1 deletion r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ impl Node {
Ok(p)
}

/// Create a ROS publisher with a type given at runtime.
/// Create a ROS publisher with a type given at runtime, where the data may either be
/// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
/// (i.e. &[u8], using the `publish_raw` method).
pub fn create_publisher_untyped(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<PublisherUntyped> {
Expand Down
37 changes: 37 additions & 0 deletions r2r/src/publishers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,43 @@ impl PublisherUntyped {
}
}

/// Publish an pre-serialized ROS message represented by a `&[u8]`.
///
/// It is up to the user to make sure data is a valid ROS serialized message.
pub fn publish_raw(&self, data: &[u8]) -> Result<()> {
// TODO should this be an unsafe function? I'm not sure what happens if the data is malformed ..

// upgrade to actual ref. if still alive
let publisher = self
.handle
.upgrade()
.ok_or(Error::RCL_RET_PUBLISHER_INVALID)?;

// Safety: Not retained beyond this function
let msg_buf = rcl_serialized_message_t {
buffer: data.as_ptr() as *mut u8,
buffer_length: data.len(),
buffer_capacity: data.len(),

// Since its read only, this should never be used ..
allocator: unsafe { rcutils_get_default_allocator() }
};

let result =
unsafe { rcl_publish_serialized_message(
&publisher.handle,
&msg_buf as *const rcl_serialized_message_t,
std::ptr::null_mut()
) };

if result == RCL_RET_OK as i32 {
Ok(())
} else {
log::error!("could not publish {}", result);
Err(Error::from_rcl_error(result))
}
}

/// Gets the number of external subscribers (i.e. it doesn't
/// count subscribers from the same process).
pub fn get_inter_process_subscription_count(&self) -> Result<usize> {
Expand Down
102 changes: 59 additions & 43 deletions r2r/tests/threads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,78 @@ use std::time::Duration;

use r2r::QosProfile;

const N_NODE_PER_CONTEXT: usize = 5;
const N_CONCURRENT_ROS_CONTEXT: usize = 2;
const N_TEARDOWN_CYCLES: usize = 2;

#[test]
// Let's create and drop a lot of node and publishers for a while to see that we can cope.
fn doesnt_crash() -> Result<(), Box<dyn std::error::Error>> {
// a global shared context.
let ctx = r2r::Context::create()?;

for c in 0..10 {
let mut ths = Vec::new();
// I have lowered this from 30 to 10 because cyclonedds can only handle a hard-coded number of
// publishers in threads. See
// https://github.com/eclipse-cyclonedds/cyclonedds/blob/cd2136d9321212bd52fdc613f07bbebfddd90dec/src/core/ddsc/src/dds_init.c#L115
for i in 0..10 {
// create concurrent nodes that max out the cpu
let ctx = ctx.clone();
ths.push(thread::spawn(move || {
let mut node = r2r::Node::create(ctx, &format!("testnode{}", i), "").unwrap();
let threads = (0..N_CONCURRENT_ROS_CONTEXT).map(|i_context| std::thread::spawn(move || {

for _i_cycle in 0..N_TEARDOWN_CYCLES {
// a global shared context.
let ctx = r2r::Context::create().unwrap();

// each with 10 publishers
for _j in 0..10 {
let p = node
.create_publisher::<r2r::std_msgs::msg::String>(
&format!("/r2r{}", i),
QosProfile::default(),
)
.unwrap();
let to_send = r2r::std_msgs::msg::String {
data: format!("[node{}]: {}", i, c),
};
for c in 0..10 {
let mut ths = Vec::new();
// I have lowered this from 30 to (10 / N_CONCURRENT_ROS_CONTEXT) because cyclonedds can only handle a hard-coded number of
// publishers in threads. See
// https://github.com/eclipse-cyclonedds/cyclonedds/blob/cd2136d9321212bd52fdc613f07bbebfddd90dec/src/core/ddsc/src/dds_init.c#L115
for i_node in 0..N_NODE_PER_CONTEXT {
// create concurrent nodes that max out the cpu
let ctx = ctx.clone();
ths.push(thread::spawn(move || {
let mut node = r2r::Node::create(ctx, &format!("testnode_{}_{}", i_context, i_node), "").unwrap();

// move publisher to its own thread and publish as fast as we can
thread::spawn(move || loop {
let res = p.publish(&to_send);
thread::sleep(Duration::from_millis(1));
match res {
Ok(_) => (),
Err(_) => {
// println!("publisher died, quitting thread.");
break;
}
// each with 10 publishers
for _j in 0..10 {
let p = node
.create_publisher::<r2r::std_msgs::msg::String>(
&format!("/r2r{}", i_node),
QosProfile::default(),
)
.unwrap();
let to_send = r2r::std_msgs::msg::String {
data: format!("[node{}]: {}", i_node, c),
};

// move publisher to its own thread and publish as fast as we can
thread::spawn(move || loop {
let res = p.publish(&to_send);
thread::sleep(Duration::from_millis(1));
match res {
Ok(_) => (),
Err(_) => {
// println!("publisher died, quitting thread.");
break;
}
}
});
}
});

// spin to simulate some load
for _j in 0..100 {
node.spin_once(Duration::from_millis(10));
}

// println!("all done {}-{}", c, i);
}));
}

// spin to simulate some load
for _j in 0..100 {
node.spin_once(Duration::from_millis(10));
for t in ths {
t.join().unwrap();
}
// println!("all threads done {}", c);

// println!("all done {}-{}", c, i);
}));
}
}

}));

for t in ths {
t.join().unwrap();
}
// println!("all threads done {}", c);
for thread in threads.into_iter() {
thread.join().unwrap();
}

Ok(())
Expand Down
Loading