Skip to content
This repository has been archived by the owner on Mar 23, 2020. It is now read-only.

Add first draft of futures chapter. #105

Merged
merged 1 commit into from
May 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions presentation/chapters/en-US/futures.chapter
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Futures

[Table of Contents](toc/english.html)

---

The crate [`futures-rs`](https://github.com/alexcrichton/futures-rs) is commonly used to build asyncronous functionality. It provides constructs similar to `Promise`s in Javascript.

---

## Notes on performance and usability

* Rust does not have an implicit runtime event loop like Node.js.
+ Tokio provides an explicit one.
* Futures are a zero-cost abstraction.
* The documentation for `futures-rs` is regarded as 'not good'.
* The async ecosystem is still very young. Be patient!

---

## You've got oneshot

`futures::sync::oneshot` provides a basic, single use future.

They feel like a channel to use, even coming with a `tx` and `rx` pair.

---

## You've got oneshot

<pre><code data-source="chapters/shared/code/futures/1.rs" data-trim="hljs rust" class="lang-rust"></code></pre>

---

## You've got oneshot

What happens if we swap the `rx.wait()` and the `tx.send()`?

There is **no** implicit threading, calling `rx.wait()` blocks the thread until data is received!

---

## You've got oneshot (threads)

<pre><code data-source="chapters/shared/code/futures/2.rs" data-trim="hljs rust" class="lang-rust"></code></pre>

---

## Multiple oneshots

<pre><code data-source="chapters/shared/code/futures/3.rs" data-trim="hljs rust" class="lang-rust"></code></pre>

---

## 57 channels (and nothing on)

An `futures::sync::mpsc` represents a channel that will yield a series of futures.

`mpsc::channel` has a bounded buffer size, and is concerned with back pressure.

`mpsc::unbounded` has no bounded size, and can grow to fit all of memory.

---

## 57 channels (and nothing on)

<pre><code data-source="chapters/shared/code/futures/4.rs" data-trim="hljs rust" class="lang-rust"></code></pre>

---

## Toes in the CPU pool

`futures-rs` comes with [`futures_cpupool`](https://docs.rs/futures-cpupool/0.1.7/futures_cpupool/) which provides a simple, easy to use CPU Pool type.

This allows for us to dispatch arbitrary (heterogeneous!) jobs to a pool without worrying about where (and when) it gets executed.

---

## Toes in the CPU pool

<pre><code data-source="chapters/shared/code/futures/5.rs" data-trim="hljs rust" class="lang-rust"></code></pre>

---

## Interacting with futures

Most of the times you will not be creating raw futures and sending them around.

Instead, you'll likely end up interacting with them as part of a crate.

Worry more about how to handle them, and work with them, than how to create and execute them.
18 changes: 18 additions & 0 deletions presentation/chapters/shared/code/futures/1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
fn main() {
// This is a simple future, sort of like a one-time channel.
// You get a (sender, receiver) when you invoke them.
// Sending a value consumes that side of the channel.
let (tx, rx) = oneshot::channel();

// This consumes the sender, we can't use it afterwards.
tx.send("Bears").unwrap();

// Now we can wait for it to finish
let result = rx.wait()
.unwrap();
println!("{}", result);
}

extern crate futures;
use futures::Future;
use futures::sync::oneshot;
20 changes: 20 additions & 0 deletions presentation/chapters/shared/code/futures/2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
fn main() {
let (tx, rx) = oneshot::channel();

thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx.send("Bears").unwrap();
});

let result = rx.wait()
.unwrap();
println!("{}", result);
}

extern crate futures;
extern crate rand;

use std::thread;
use std::time::Duration;
use futures::Future;
use futures::sync::oneshot;
45 changes: 45 additions & 0 deletions presentation/chapters/shared/code/futures/3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
const NUM_OF_TASKS: usize = 10;

fn main() {
let mut rx_set = Vec::new();

for index in 0..NUM_OF_TASKS {
let (tx, rx) = futures::oneshot();
rx_set.push(rx);

thread::spawn(move || {
println!("{} --> START", index);
sleep_a_little_bit();
tx.send(index).unwrap();
println!("{} <-- END", index);
});
}

// `join_all` lets us join together the set of futures.
let result = join_all(rx_set)
.wait()
.unwrap();

println!("{:?}", result);
}

extern crate rand;
extern crate futures;

use std::thread;
use futures::Future;
use futures::future::join_all;
use std::time::Duration;
use rand::distributions::{Range, IndependentSample};

// This function sleeps for a bit, then returns how long it slept.
pub fn sleep_a_little_bit() -> u64 {
let mut generator = rand::thread_rng();
let possibilities = Range::new(0, 1000);

let choice = possibilities.ind_sample(&mut generator);

let a_little_bit = Duration::from_millis(choice);
thread::sleep(a_little_bit);
choice
}
59 changes: 59 additions & 0 deletions presentation/chapters/shared/code/futures/4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const BUFFER_SIZE: usize = 57;

fn main() {
// We're using a bounded channel here with a limited size.
let (mut tx, rx) = mpsc::channel(BUFFER_SIZE);

thread::spawn(move || {
for index in 0..(BUFFER_SIZE+2) {
sleep_a_little_bit();
// When we `send()` a value it consumes the sender. Returning
// a 'new' sender which we have to handle. In this case we just
// re-assign.
match tx.send(index).wait() {
// Why do we need to do this? This is how back pressure is implemented.
// When the buffer is full `wait()` will block.
Ok(new_tx) => tx = new_tx,
Err(_) => panic!("Oh no!"),
}
}
// Here the stream (`tx`) is dropped, completing it.
});

// We can `.fold()` like we would an iterator. In fact we can do many
// things like we would an iterator.
let sum = rx.fold(0, |acc, val| {
// Notice when we run that this is happening after each item of
// the stream resolves, like an iterator.
println!("--- FOLDING {} INTO {}", val, acc);
// `ok()` is a simple way to say "Yes this worked."
// `err()` also exists.
ok(acc + val)
})
.wait()
.unwrap();
println!("Sum: {}", sum);
}

extern crate rand;
extern crate futures;

use std::time::Duration;
use std::thread;
use rand::distributions::{Range, IndependentSample};
use futures::future::{Future, ok};
use futures::stream::Stream;
use futures::sync::mpsc;
use futures::Sink;

// This function sleeps for a bit, then returns how long it slept.
pub fn sleep_a_little_bit() -> u64 {
let mut generator = rand::thread_rng();
let possibilities = Range::new(0, 100);

let choice = possibilities.ind_sample(&mut generator);

let a_little_bit = Duration::from_millis(choice);
thread::sleep(a_little_bit);
choice
}
22 changes: 22 additions & 0 deletions presentation/chapters/shared/code/futures/5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
fn main() {
// Creates a CpuPool with workers equal to the cores on the machine.
let pool = Builder::new().create();

let returns_string = pool.spawn_fn(move || -> Result<_, ()> {
Ok("First")
});
let returns_integer = pool.spawn_fn(move || -> Result<_, ()> {
Ok(2)
});

let resulting_string = returns_string.wait().unwrap();
let resulting_integer = returns_integer.wait().unwrap();

println!("{}, {}", resulting_string, resulting_integer);
// Returns `First, 2`
}

extern crate futures;
extern crate futures_cpupool;
use futures::future::Future;
use futures_cpupool::Builder;
1 change: 1 addition & 0 deletions presentation/toc/english.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ <h2 id="unfinished">Unfinished</h2>
<li><a href="../index.html?chapter=serde&amp;locale=en-US">Serde</a></li>
<li><a href="../index.html?chapter=macros">Macros</a></li>
<li><a href="../index.html?chapter=wasm&amp;locale=en-US">WebAssembly</a></li>
<li><a href="../index.html?chapter=futures&amp;locale=en-US">Futures</a></li>
<li><a href="../index.html?chapter=inner-mutability&amp;locale=en-US">Inner Mutability</a></li>
</ul>
</body>
Expand Down
1 change: 1 addition & 0 deletions presentation/toc/english.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@
* [Serde](../index.html?chapter=serde&locale=en-US)
* [Macros](../index.html?chapter=macros)
* [WebAssembly](../index.html?chapter=wasm&locale=en-US)
* [Futures](../index.html?chapter=futures&locale=en-US)
* [Inner Mutability](../index.html?chapter=inner-mutability&locale=en-US)