Skip to content

Commit

Permalink
Merge pull request #102 from ferrous-systems/async-training
Browse files Browse the repository at this point in the history
Async training material
  • Loading branch information
Urhengulas authored May 28, 2024
2 parents b6be90a + 23de07a commit 6a2f782
Show file tree
Hide file tree
Showing 28 changed files with 1,731 additions and 0 deletions.
3 changes: 3 additions & 0 deletions exercise-book/book.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ title = "Rust Exercises"
[preprocessor.mermaid]
command = "mdbook-mermaid"

[rust]
edition = "2021"

[output.html.playground]
copyable = true # include the copy button for copying code snippets
editable = true # allow the code editor to work
Expand Down
14 changes: 14 additions & 0 deletions exercise-book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,18 @@
- [Interactive TCP Echo Server](./tcp-server.md)
- [Share data between connections](./tcp-server-log.md)

# Async chat

- [Implementing a chat](./async-chat/index.md)
- [Specification and Getting started](./async-chat/specification.md)
- [Writing an Accept Loop](./async-chat/accept_loop.md)
- [Receiving Messages](./async-chat/receiving_messages.md)
- [Sending Messages](./async-chat/sending_messages.md)
- [A broker as a connection point](./async-chat/connecting_readers_and_writers.md)
- [Glueing all together](./async-chat/all_together.md)
- [Clean Shutdown](./async-chat/clean_shutdown.md)
- [Handling Disconnection](./async-chat/handling_disconnection.md)
- [Final Server Code](./async-chat/final_server_code.md)
- [Implementing a Client](./async-chat/implementing_a_client.md)

# Other Topics
72 changes: 72 additions & 0 deletions exercise-book/src/async-chat/accept_loop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
## Writing an Accept Loop

Let's implement the scaffold of the server: a loop that binds a TCP socket to an address and starts accepting connections.

First of all, let's add required import boilerplate:

```rust,ignore
# extern crate tokio;
use std::future::Future; // 1
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, // 1
net::{tcp::OwnedWriteHalf, TcpListener, TcpStream, ToSocketAddrs}, // 3
sync::{mpsc, oneshot},
task, // 2
};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; // 4
```

1. Import some traits required to work with futures and streams.
2. The `task` module roughly corresponds to the `std::thread` module, but tasks are much lighter weight.
A single thread can run many tasks.
3. For the socket type, we use `TcpListener` from `tokio`, which is similar to the sync `std::net::TcpListener`, but is non-blocking and uses `async` API.
4. We will skip implementing detailled error handling in this example.
To propagate the errors, we will use a boxed error trait object.
Do you know that there's `From<&'_ str> for Box<dyn Error>` implementation in stdlib, which allows you to use strings with `?` operator?

Now we can write the server's accept loop:

```rust,ignore
# extern crate tokio;
# use tokio::net::{TcpListener, ToSocketAddrs};
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1
let listener = TcpListener::bind(addr).await?; // 2
loop { // 3
let (stream, _) = listener.accept().await?;
// TODO
}
Ok(())
}
```

1. We mark the `accept_loop` function as `async`, which allows us to use `.await` syntax inside.
2. `TcpListener::bind` call returns a future, which we `.await` to extract the `Result`, and then `?` to get a `TcpListener`.
Note how `.await` and `?` work nicely together.
This is exactly how `std::net::TcpListener` works, but with `.await` added.
3. We generally use `loop` and `break` for looping in Futures, that makes things easier down the line.

Finally, let's add main:

```rust,ignore
# extern crate tokio;
# use tokio::net::{ToSocketAddrs};
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
# async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
# Ok(())
# }
#
#[tokio::main]
pub(crate) async fn main() -> Result<()> {
accept_loop("127.0.0.1:8080").await
}
```

The crucial thing to realise that is in Rust, unlike other languages, calling an async function does **not** run any code.
Async functions only construct futures, which are inert state machines.
To start stepping through the future state-machine in an async function, you should use `.await`.
In a non-async function, a way to execute a future is to hand it to the executor.
165 changes: 165 additions & 0 deletions exercise-book/src/async-chat/all_together.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
## Gluing all together

At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat.

Scroll past the example find a list of all changes.

```rust,ignore
# extern crate tokio;
use std::{
collections::hash_map::{Entry, HashMap},
future::Future,
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{tcp::OwnedWriteHalf, TcpListener, TcpStream, ToSocketAddrs},
sync::mpsc,
task,
};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[tokio::main]
pub(crate) async fn main() -> Result<()> {
accept_loop("127.0.0.1:8080").await
}
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let (broker_sender, broker_receiver) = mpsc::unbounded_channel(); // 1
let _broker = task::spawn(broker_loop(broker_receiver));
while let Ok((stream, _socket_addr)) = listener.accept().await {
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
}
Ok(())
}
async fn connection_loop(broker: Sender<Event>, stream: TcpStream) -> Result<()> { // 2
let (reader, writer) = stream.into_split(); // 3
let reader = BufReader::new(reader);
let mut lines = reader.lines();
let name = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => return Err("peer disconnected immediately".into()),
Err(e) => return Err(Box::new(e)),
};
println!("user {} connected", name);
broker
.send(Event::NewPeer {
name: name.clone(),
stream: writer,
})
.unwrap(); // 5
loop {
if let Some(line) = lines.next_line().await? {
let (dest, msg) = match line.find(':') {
None => continue,
Some(idx) => (&line[..idx], line[idx + 1..].trim()),
};
let dest: Vec<String> = dest
.split(',')
.map(|name| name.trim().to_string())
.collect();
let msg: String = msg.trim().to_string();
broker
.send(Event::Message { // 4
from: name.clone(),
to: dest,
msg,
})
.unwrap();
} else {
break;
}
}
Ok(())
}
async fn connection_writer_loop(
messages: &mut Receiver<String>,
stream: &mut OwnedWriteHalf // 3
) -> Result<()> {
loop {
let msg = messages.recv().await;
match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break,
}
}
Ok(())
}
#[derive(Debug)]
enum Event {
NewPeer {
name: String,
stream: OwnedWriteHalf,
},
Message {
from: String,
to: Vec<String>,
msg: String,
},
}
async fn broker_loop(mut events: Receiver<Event>) {
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
loop {
let event = match events.recv().await {
Some(event) => event,
None => break,
};
match event {
Event::Message { from, to, msg } => {
for addr in to {
if let Some(peer) = peers.get_mut(&addr) {
let msg = format!("from {from}: {msg}\n");
peer.send(msg).unwrap();
}
}
}
Event::NewPeer { name, mut stream } => match peers.entry(name.clone()) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
let (client_sender, mut client_receiver) = mpsc::unbounded_channel();
entry.insert(client_sender);
spawn_and_log_error(async move {
connection_writer_loop(&mut client_receiver, &mut stream).await
});
}
},
}
}
}
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
eprintln!("{}", e)
}
})
}
```

1. Inside the `accept_loop`, we create the broker's channel and `task`.
2. We need the connection_loop to accept a handle to the broker.
3. Inside `connection_loop`, we need to split the `TcpStream`, to be able to share it with the `connection_writer_loop`.
4. On login, we notify the broker.
Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
5. Similarly, we forward parsed messages to the broker, assuming that it is alive.
Loading

0 comments on commit 6a2f782

Please sign in to comment.