diff --git a/src/bastion/examples/round_robin_dispatcher.rs b/src/bastion/examples/round_robin_dispatcher.rs index a8a5ff6c..68c597c2 100644 --- a/src/bastion/examples/round_robin_dispatcher.rs +++ b/src/bastion/examples/round_robin_dispatcher.rs @@ -1,5 +1,8 @@ use bastion::prelude::*; +use futures_timer::Delay; use std::sync::Arc; +use std::time::Duration; +use tracing::Level; /// /// Prologue: @@ -22,7 +25,28 @@ use std::sync::Arc; /// 3. We want to use a dispatcher on the second group because we don't want to /// target a particular child in the first to process the message. /// +/// The output looks like: +/// ``` +/// Running `target\debug\examples\round_robin_dispatcher.exe` +/// Aug 20 16:52:19.925 WARN round_robin_dispatcher: sending message +/// Aug 20 16:52:19.926 WARN round_robin_dispatcher: Received data_1 +/// Aug 20 16:52:20.932 WARN round_robin_dispatcher: sending message +/// Aug 20 16:52:20.933 WARN round_robin_dispatcher: Received data_2 +/// Aug 20 16:52:21.939 WARN round_robin_dispatcher: sending message +/// Aug 20 16:52:21.941 WARN round_robin_dispatcher: Received data_3 +/// Aug 20 16:52:22.947 WARN round_robin_dispatcher: sending message +/// Aug 20 16:52:22.948 WARN round_robin_dispatcher: Received data_4 +/// Aug 20 16:52:23.954 WARN round_robin_dispatcher: sending message +/// Aug 20 16:52:23.955 WARN round_robin_dispatcher: Received data_5 +/// ``` fn main() { + // Initialize tracing logger + // so we get nice output on the console. + let subscriber = tracing_subscriber::fmt() + .with_max_level(Level::WARN) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + // We need bastion to run our program Bastion::init(); // We create the supervisor and we add both groups on it @@ -57,8 +81,10 @@ fn caller_group(children: Children) -> Children { let target = BroadcastTarget::Group("Receiver".to_string()); // We iterate on each data for data in data_to_send { + Delay::new(Duration::from_secs(1)).await; + tracing::warn!("sending message"); // We broadcast the message containing the data to the defined target - ctx.broadcast_message(target.clone(), data) + ctx.broadcast_message(target.clone(), data); } // We stop bastion here, because we don't have more data to send Bastion::stop(); @@ -70,8 +96,6 @@ fn caller_group(children: Children) -> Children { fn receiver_group(children: Children) -> Children { // We create the second group of children children - // We want to have 5 children in this group - .with_redundancy(5) // We want to have a disptacher named `Receiver` .with_dispatcher(Dispatcher::with_type(DispatcherType::Named( "Receiver".to_string(), @@ -93,7 +117,7 @@ fn receiver_group(children: Children) -> Children { // Because it's a broadcasted message we can use directly the ref ref data: &str => { // And we print it - println!("Received {}", data); + tracing::warn!("Received {}", data); }; _: _ => (); } diff --git a/src/bastion/src/child_ref.rs b/src/bastion/src/child_ref.rs index 56af7149..139c830c 100644 --- a/src/bastion/src/child_ref.rs +++ b/src/bastion/src/child_ref.rs @@ -19,9 +19,28 @@ pub struct ChildRef { sender: Sender, name: String, path: Arc, + // True if the ChildRef references a child that will receive user defined messages. + // use `ChildRef::new_internal` to set it to false, for internal use children, + // such as the heartbeat children for example + is_public: bool, } impl ChildRef { + pub(crate) fn new_internal( + id: BastionId, + sender: Sender, + name: String, + path: Arc, + ) -> ChildRef { + ChildRef { + id, + sender, + name, + path, + is_public: false, + } + } + pub(crate) fn new( id: BastionId, sender: Sender, @@ -33,6 +52,7 @@ impl ChildRef { sender, name, path, + is_public: true, } } @@ -67,6 +87,38 @@ impl ChildRef { &self.id } + /// Returns true if the child this `ChildRef` is referencing is public, + /// Which means it can receive messages. private `ChildRef`s + /// reference bastion internal children, such as the heartbeat child for example. + /// This function comes in handy when implementing your own dispatchers. + /// + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx| { + /// async move { + /// if ctx.current().is_public() { + /// // ... + /// } + /// # Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// ``` + pub fn is_public(&self) -> bool { + self.is_public + } + /// Sends a message to the child this `ChildRef` is referencing. /// This message is intended to be used outside of Bastion context when /// there is no way for receiver to identify message sender diff --git a/src/bastion/src/children.rs b/src/bastion/src/children.rs index 42747e60..65d62372 100644 --- a/src/bastion/src/children.rs +++ b/src/bastion/src/children.rs @@ -453,26 +453,24 @@ impl Children { /// # use bastion::prelude::*; /// # use std::time::Duration; /// # - /// # fn main() { - /// # Bastion::init(); - /// # + /// # Bastion::init(); + /// # /// Bastion::children(|children| { - /// children - /// .with_heartbeat_tick(Duration::from_secs(5)) - /// .with_exec(|ctx| { - /// // -- Children group started. - /// async move { - /// // ... - /// # Ok(()) - /// } - /// // -- Children group stopped. - /// }) + /// children + /// .with_heartbeat_tick(Duration::from_secs(5)) + /// .with_exec(|ctx| { + /// // -- Children group started. + /// async move { + /// // ... + /// # Ok(()) + /// } + /// // -- Children group stopped. + /// }) /// }).expect("Couldn't create the children group."); - /// # - /// # Bastion::start(); - /// # Bastion::stop(); - /// # Bastion::block_until_stopped(); - /// # } + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); /// ``` /// [`std::time::Duration`]: https://doc.rust-lang.org/nightly/core/time/struct.Duration.html pub fn with_heartbeat_tick(mut self, interval: Duration) -> Self { @@ -936,7 +934,7 @@ impl Children { let id = bcast.id().clone(); let sender = bcast.sender().clone(); let path = bcast.path().clone(); - let child_ref = ChildRef::new(id.clone(), sender.clone(), name, path); + let child_ref = ChildRef::new_internal(id.clone(), sender.clone(), name, path); let children = self.as_ref(); let supervisor = self.bcast.parent().clone().into_supervisor(); diff --git a/src/bastion/src/dispatcher.rs b/src/bastion/src/dispatcher.rs index 6bccdf83..7306e9d8 100644 --- a/src/bastion/src/dispatcher.rs +++ b/src/bastion/src/dispatcher.rs @@ -9,10 +9,10 @@ use lever::prelude::*; use std::fmt::{self, Debug}; use std::hash::{Hash, Hasher}; use std::sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }; -use tracing::{trace, warn}; +use tracing::{debug, trace, warn}; /// Type alias for the concurrency hashmap. Each key-value pair stores /// the Bastion identifier as the key and the module name as the value. @@ -66,7 +66,7 @@ pub type DefaultDispatcherHandler = RoundRobinHandler; /// Dispatcher that will do simple round-robin distribution #[derive(Default, Debug)] pub struct RoundRobinHandler { - index: AtomicU64, + index: AtomicUsize, } impl DispatcherHandler for RoundRobinHandler { @@ -80,25 +80,27 @@ impl DispatcherHandler for RoundRobinHandler { } // Each child in turn will receive a message. fn broadcast_message(&self, entries: &DispatcherMap, message: &Arc) { - if entries.len() == 0 { - return; - } - - let current_index = self.index.load(Ordering::SeqCst) % entries.len() as u64; - - let mut skipped = 0; - for pair in entries.iter() { - if skipped != current_index { - skipped += 1; - continue; - } + let entries = entries + .iter() + .filter(|entry| entry.0.is_public()) + .collect::>(); - let entry = pair.0; - entry.tell_anonymously(message.clone()).unwrap(); - break; + if entries.is_empty() { + debug!("no public children to broadcast message to"); + return; } + let current_index = self.index.load(Ordering::SeqCst) % entries.len(); - self.index.store(current_index + 1, Ordering::SeqCst); + if let Some(entry) = entries.get(current_index) { + warn!( + "sending message to child {}/{} - {}", + current_index + 1, + entries.len(), + entry.0.path() + ); + entry.0.tell_anonymously(message.clone()).unwrap(); + self.index.store(current_index + 1, Ordering::SeqCst); + }; } } /// Generic trait which any custom dispatcher handler must implement for