Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add name for children #205

Merged
merged 2 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added rustfmt.toml
Empty file.
2 changes: 1 addition & 1 deletion src/bastion-executor/src/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub fn stats() -> &'static Stats {
#[inline]
pub fn core_retrieval() -> &'static usize {
lazy_static! {
static ref CORE_COUNT: usize = { placement::get_core_ids().unwrap().len() };
static ref CORE_COUNT: usize = placement::get_core_ids().unwrap().len();
}

&*CORE_COUNT
Expand Down
2 changes: 1 addition & 1 deletion src/bastion-executor/src/sleepers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Sleepers {

if !self.notified.swap(false, Ordering::SeqCst) {
*sleep += 1;
self.wake.wait(sleep).unwrap();
let _ = self.wake.wait(sleep).unwrap();
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/bastion/examples/broadcast_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ fn response_supervisor(supervisor: Supervisor) -> Supervisor {
}

fn input_group(children: Children) -> Children {
children
.with_redundancy(1)
.with_exec(move |ctx: BastionContext| async move {
children.with_name("input").with_redundancy(1).with_exec(
move |ctx: BastionContext| async move {
println!("[Input] Worker started!");

let data = vec!["A B C", "A C C", "B C C"];
Expand All @@ -61,11 +60,13 @@ fn input_group(children: Children) -> Children {
}

Ok(())
})
},
)
}

fn process_group(children: Children) -> Children {
children
.with_name("process")
.with_redundancy(3)
.with_dispatcher(
// Declare a dispatcher to use. All instantiated actors will be registered in
Expand Down Expand Up @@ -94,7 +95,7 @@ fn process_group(children: Children) -> Children {
*value += 1;
}

println!("[Processing] Worker #{:?} processed data. Result: `{:?}`", ctx.current().id(), counter);
println!("[Processing] Worker {} #{:?} processed data. Result: `{:?}`", ctx.current().name(), ctx.current().id(), counter);

// Push hashmap with data to the next actor group
let group_name = "Response".to_string();
Expand All @@ -113,6 +114,7 @@ fn process_group(children: Children) -> Children {

fn response_group(children: Children) -> Children {
children
.with_name("response")
.with_redundancy(1)
.with_dispatcher(
// We will re-use the dispatcher to make the example easier to understand
Expand All @@ -137,7 +139,7 @@ fn response_group(children: Children) -> Children {
let message = Arc::try_unwrap(raw_message).unwrap();
msg! { message,
ref data: HashMap<&str, u32> => {
println!("[Response] Worker received `{:?}`", data);
println!("[Response] Worker {} received `{:?}`", ctx.current().name(), data);

for (key, value) in data.iter() {
let current_value = counter.entry(key).or_insert(0);
Expand Down
20 changes: 18 additions & 2 deletions src/bastion/src/child_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ use std::sync::Arc;
pub struct ChildRef {
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
}

impl ChildRef {
pub(crate) fn new(id: BastionId, sender: Sender, path: Arc<BastionPath>) -> ChildRef {
ChildRef { id, sender, path }
pub(crate) fn new(
id: BastionId,
sender: Sender,
name: String,
path: Arc<BastionPath>,
) -> ChildRef {
ChildRef {
id,
sender,
name,
path,
}
}

/// Returns the identifier of the children group element this
Expand Down Expand Up @@ -284,6 +295,11 @@ impl ChildRef {
pub fn path(&self) -> &Arc<BastionPath> {
&self.path
}

/// Return the [`name`] of the child
pub fn name(&self) -> &str {
&self.name
}
}

impl PartialEq for ChildRef {
Expand Down
28 changes: 24 additions & 4 deletions src/bastion/src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct Children {
started: bool,
// List of dispatchers attached to each actor in the group.
dispatchers: Vec<Arc<Box<Dispatcher>>>,
// The name of children
name: Option<String>,
}

impl Children {
Expand All @@ -101,6 +103,7 @@ impl Children {
let pre_start_msgs = Vec::new();
let started = false;
let dispatchers = Vec::new();
let name = None;

Children {
bcast,
Expand All @@ -111,6 +114,7 @@ impl Children {
pre_start_msgs,
started,
dispatchers,
name,
}
}

Expand Down Expand Up @@ -156,6 +160,14 @@ impl Children {
&self.callbacks
}

pub(crate) fn name(&self) -> String {
if let Some(name) = &self.name {
name.clone()
} else {
"__Anonymous__".into()
}
}

pub(crate) fn as_ref(&self) -> ChildrenRef {
trace!(
"Children({}): Creating new ChildrenRef({}).",
Expand All @@ -171,7 +183,7 @@ impl Children {
for (id, (sender, _)) in &self.launched {
trace!("Children({}): Creating new ChildRef({}).", self.id(), id);
// TODO: clone or ref?
let child = ChildRef::new(id.clone(), sender.clone(), path.clone());
let child = ChildRef::new(id.clone(), sender.clone(), self.name(), path.clone());
children.push(child);
}

Expand All @@ -184,6 +196,12 @@ impl Children {
ChildrenRef::new(id, sender, path, children, dispatchers)
}

/// Sets the name of this children group.
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}

/// Sets the closure taking a [`BastionContext`] and returning a
/// [`Future`] that will be used by every element of this children
/// group.
Expand Down Expand Up @@ -237,7 +255,7 @@ impl Children {
self
}

/// Sets the number of number of elements this children group will
/// Sets the number of elements this children group will
/// contain. Each element will call the closure passed in
/// [`with_exec`] and run the returned future until it stops,
/// panics or another element in the group stops or panics.
Expand Down Expand Up @@ -456,7 +474,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), path);
let child_ref = ChildRef::new(id.clone(), sender.clone(), self.name(), path);

let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
Expand Down Expand Up @@ -670,6 +688,8 @@ impl Children {

pub(crate) fn launch_elems(&mut self) {
debug!("Children({}): Launching elements.", self.id());

let name = self.name();
for _ in 0..self.redundancy {
let parent = Parent::children(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new()));
Expand All @@ -678,7 +698,7 @@ impl Children {
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let path = bcast.path().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone(), path);
let child_ref = ChildRef::new(id.clone(), sender.clone(), name.clone(), path);

let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
Expand Down
21 changes: 14 additions & 7 deletions src/bastion/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

assert_eq!(instance.actors.contains_key(&child_ref), false);

Expand All @@ -457,7 +458,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

instance.register(&child_ref, "my::test::module".to_string());
assert_eq!(instance.actors.contains_key(&child_ref), true);
Expand All @@ -473,7 +475,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

instance.notify(&child_ref, NotificationType::Register);
let handler_was_called = handler.was_called();
Expand Down Expand Up @@ -540,7 +543,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone())));
Expand All @@ -561,7 +565,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let local_dispatcher = Arc::new(Box::new(Dispatcher::with_type(dispatcher_type.clone())));
Expand All @@ -583,7 +588,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let handler = Box::new(CustomHandler::new(false));
Expand All @@ -607,7 +613,8 @@ mod tests {
let bastion_id = BastionId::new();
let (sender, _) = mpsc::unbounded();
let path = Arc::new(BastionPath::root());
let child_ref = ChildRef::new(bastion_id, sender, path);
let name = "test_name".to_string();
let child_ref = ChildRef::new(bastion_id, sender, name, path);

let dispatcher_type = DispatcherType::Named("test".to_string());
let handler = Box::new(CustomHandler::new(false));
Expand Down