Skip to content

Commit

Permalink
Implemented Message trait
Browse files Browse the repository at this point in the history
  • Loading branch information
Relrin committed Jul 4, 2021
1 parent 9db03d1 commit fb29cfb
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 112 deletions.
6 changes: 3 additions & 3 deletions src/bastion/examples/ping/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ impl Actor for Ping {

while (pong_struct.counter != 3) {
let message = ctx.recv().await?;
message.handle(ctx).await;

// Do something with the message ...

message.ack().await;
self.send(message.path(), "pong", MessageType::Tell).await?;
pong_struct.counter += 1;

message.ack().await;
}

System::stop();
Expand Down
21 changes: 11 additions & 10 deletions src/bastion/src/actor/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::sync::Arc;

use async_channel::unbounded;
use async_channel::{unbounded, Sender};

use crate::actor::local_state::LocalState;
use crate::actor::state::ActorState;
use crate::mailbox::traits::TypedMessage;
use crate::mailbox::envelope::Envelope;
use crate::mailbox::message::Message;
use crate::mailbox::Mailbox;
use crate::routing::path::ActorPath;

Expand All @@ -19,28 +20,28 @@ pub struct Context {
/// Path to the actor in the system
path: Arc<ActorPath>,
/// Mailbox of the actor
//mailbox: Mailbox<TypedMessage>,
mailbox: Mailbox<Box<dyn Message>>,
/// Local storage for actor's data
local_state: LocalState,
/// Current execution state of the actor
internal_state: ActorState,
}

impl Context {
// FIXME: Pass the correct system_rx instead of the fake one
pub(crate) fn new(path: ActorPath) -> Self {
//let (_system_tx, system_rx) = unbounded();
// let mailbox = Mailbox::new(system_rx);
pub(crate) fn new(path: ActorPath) -> (Self, Sender<Envelope<impl Message>>) {
let (system_tx, system_rx) = unbounded();

let path = Arc::new(path);
let mailbox = Mailbox::new(system_rx);
let local_state = LocalState::new();
let internal_state = ActorState::new();

Context {
let instance = Context {
path,
//mailbox,
mailbox,
local_state,
internal_state,
}
};
(instance, system_tx)
}
}
24 changes: 16 additions & 8 deletions src/bastion/src/mailbox/envelope.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
use std::cell::RefCell;
use std::fmt::{self, Debug, Formatter};

use crate::actor::actor_ref::ActorRef;
use crate::mailbox::message::MessageType;
use crate::mailbox::traits::TypedMessage;
use crate::mailbox::message::{Message, MessageType};

/// Struct that represents an incoming message in the actor's mailbox.
#[derive(Clone)]
pub struct Envelope<T>
where
T: TypedMessage,
T: Message,
{
/// The sending side of a channel. In actor's world
/// represented is a message sender. Can be used
/// for acking message when it possible.
sender: Option<ActorRef>,
/// An actual data sent by the channel
message: T,
message: RefCell<Option<T>>,
/// Message type that helps to figure out how to deliver message
/// and how to ack it after the processing.
message_type: MessageType,
}

impl<T> Envelope<T>
where
T: TypedMessage,
T: Message,
{
/// Create a message with the given sender and inner data.
pub fn new(sender: Option<ActorRef>, message: T, message_type: MessageType) -> Self {
pub fn new(sender: Option<ActorRef>, data: T, message_type: MessageType) -> Self {
let message = RefCell::new(Some(data));

Envelope {
sender,
message,
Expand All @@ -40,8 +42,14 @@ where
self.message_type.clone()
}

/// Extracts the message data and returns it to the caller. Each further
/// method call will return `None`.
pub fn read(&self) -> Option<T> {
self.message.take()
}

/// Sends a confirmation to the message sender.
pub(crate) async fn ack(&self) {
pub async fn ack(&self) {
match self.message_type {
MessageType::Ack => unimplemented!(),
MessageType::Broadcast => unimplemented!(),
Expand All @@ -52,7 +60,7 @@ where

impl<T> Debug for Envelope<T>
where
T: TypedMessage,
T: Message,
{
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Message")
Expand Down
12 changes: 12 additions & 0 deletions src/bastion/src/mailbox/message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
use std::fmt::Debug;

/// A trait that message needs to implement for typed actors (it
/// forces message to implement the following traits: [`Any`],
/// [`Send`] and [`Debug`]).
///
/// [`Any`]: https://doc.rust-lang.org/std/any/trait.Any.html
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
/// [`Debug`]: https://doc.rust-lang.org/std/fmt/trait.Debug.html
pub trait Message: Send + Debug + 'static {}
impl<T> Message for T where T: ?Sized + Send + Debug + 'static {}

/// Enum that provides information what type of the message
/// being sent through the channel.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down
103 changes: 29 additions & 74 deletions src/bastion/src/mailbox/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
mod envelope;
mod state;
pub(crate) mod envelope;
pub(crate) mod state;

pub mod message;
pub mod traits;

use std::sync::atomic::AtomicBool;
use std::sync::Arc;
Expand All @@ -11,14 +10,14 @@ use async_channel::{unbounded, Receiver, Sender};

use crate::error::{BastionError, Result};
use crate::mailbox::envelope::Envelope;
use crate::mailbox::message::Message;
use crate::mailbox::state::MailboxState;
use crate::mailbox::traits::TypedMessage;

/// Struct that represents a message sender.
#[derive(Clone)]
pub struct MailboxTx<T>
where
T: TypedMessage,
T: Message,
{
/// Indicated the transmitter part of the actor's channel
/// which is using for passing messages.
Expand All @@ -30,7 +29,7 @@ where

impl<T> MailboxTx<T>
where
T: TypedMessage,
T: Message,
{
/// Return a new instance of MailboxTx that indicates sender.
pub(crate) fn new(tx: Sender<Envelope<T>>) -> Self {
Expand All @@ -57,110 +56,66 @@ where
#[derive(Clone)]
pub struct Mailbox<T>
where
T: TypedMessage,
T: Message,
{
/// User guardian sender
user_tx: MailboxTx<T>,
/// User guardian receiver
user_rx: Receiver<Envelope<T>>,
/// Actor guardian sender
actor_tx: MailboxTx<T>,
/// Actor guardian receiver
actor_rx: Receiver<Envelope<T>>,
/// System guardian receiver
system_rx: Receiver<Envelope<T>>,
/// The current processing message, received from the
/// latest call to the user's queue
last_user_message: Option<Envelope<T>>,
/// The current processing message, received from the
/// latest call to the system's queue
last_system_message: Option<Envelope<T>>,
/// Mailbox state machine
state: Arc<MailboxState>,
}

// TODO: Add calls with recv with timeout
impl<T> Mailbox<T>
where
T: TypedMessage,
T: Message,
{
/// Creates a new mailbox for the actor.
pub(crate) fn new(system_rx: Receiver<Envelope<T>>) -> Self {
let (tx, user_rx) = unbounded();
let user_tx = MailboxTx::new(tx);
let last_user_message = None;
let last_system_message = None;
let (tx, actor_rx) = unbounded();
let actor_tx = MailboxTx::new(tx);
let state = Arc::new(MailboxState::new());

Mailbox {
user_tx,
user_rx,
actor_tx,
actor_rx,
system_rx,
last_user_message,
last_system_message,
state,
}
}

/// Forced receive message from user queue
/// Forced receive message from the actor's queue.
pub async fn recv(&mut self) -> Envelope<T> {
let message = self
.user_rx
self.actor_rx
.recv()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap();

self.last_user_message = Some(message);
self.last_user_message.clone().unwrap()
.unwrap()
}

/// Try receiving message from user queue
/// Try receiving message from the actor's queue.
pub async fn try_recv(&mut self) -> Result<Envelope<T>> {
if self.last_user_message.is_some() {
return Err(BastionError::UnackedMessage);
}

match self.user_rx.try_recv() {
Ok(message) => {
self.last_user_message = Some(message);
Ok(self.last_user_message.clone().unwrap())
}
Err(e) => Err(BastionError::ChanRecv(e.to_string())),
}
self.actor_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
}

/// Forced receive message from system queue
/// Forced receive message from the internal system queue.
pub async fn sys_recv(&mut self) -> Envelope<T> {
let message = self
.system_rx
self.system_rx
.recv()
.await
.map_err(|e| BastionError::ChanRecv(e.to_string()))
.unwrap();

self.last_system_message = Some(message);
self.last_system_message.clone().unwrap()
.unwrap()
}

/// Try receiving message from system queue
/// Try receiving message from the internal system queue.
pub async fn try_sys_recv(&mut self) -> Result<Envelope<T>> {
if self.last_system_message.is_some() {
return Err(BastionError::UnackedMessage);
}

match self.system_rx.try_recv() {
Ok(message) => {
self.last_system_message = Some(message);
Ok(self.last_system_message.clone().unwrap())
}
Err(e) => Err(BastionError::ChanRecv(e.to_string())),
}
}

/// Returns the last retrieved message from the user channel
pub async fn get_last_user_message(&self) -> Option<Envelope<T>> {
self.last_user_message.clone()
}

/// Returns the last retrieved message from the system channel
pub async fn get_last_system_message(&self) -> Option<Envelope<T>> {
self.last_system_message.clone()
self.system_rx
.try_recv()
.map_err(|e| BastionError::ChanRecv(e.to_string()))
}
}
13 changes: 0 additions & 13 deletions src/bastion/src/mailbox/traits.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/bastion/src/system/global_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@
/// transaction memory (or shortly STM) mechanisms to eliminate any
/// potential data races and provide consistency across actors.
use std::any::{Any, TypeId};
use std::ops::Deref;
use std::sync::Arc;

use lever::sync::atomics::AtomicBox;
use lever::table::lotable::LOTable;

use crate::error::{BastionError, Result};
use std::borrow::Borrow;

#[derive(Debug)]
pub struct GlobalState {
table: LOTable<TypeId, Arc<AtomicBox<Box<dyn Any + Send + Sync>>>>,
Expand Down

0 comments on commit fb29cfb

Please sign in to comment.