From 8b2eac7a6d589cfe964a1c983bde17b890bb983f Mon Sep 17 00:00:00 2001 From: Andrew Hobden Date: Fri, 3 Nov 2017 16:09:15 +0100 Subject: [PATCH] Add first draft of futures chapter. --- presentation/chapters/en-US/futures.chapter | 91 +++++++++++++++++++ .../chapters/shared/code/futures/1.rs | 18 ++++ .../chapters/shared/code/futures/2.rs | 20 ++++ .../chapters/shared/code/futures/3.rs | 45 +++++++++ .../chapters/shared/code/futures/4.rs | 59 ++++++++++++ .../chapters/shared/code/futures/5.rs | 22 +++++ presentation/toc/english.html | 1 + presentation/toc/english.md | 1 + 8 files changed, 257 insertions(+) create mode 100644 presentation/chapters/en-US/futures.chapter create mode 100644 presentation/chapters/shared/code/futures/1.rs create mode 100644 presentation/chapters/shared/code/futures/2.rs create mode 100644 presentation/chapters/shared/code/futures/3.rs create mode 100644 presentation/chapters/shared/code/futures/4.rs create mode 100644 presentation/chapters/shared/code/futures/5.rs diff --git a/presentation/chapters/en-US/futures.chapter b/presentation/chapters/en-US/futures.chapter new file mode 100644 index 0000000..0ed48a0 --- /dev/null +++ b/presentation/chapters/en-US/futures.chapter @@ -0,0 +1,91 @@ +# Futures + +[Table of Contents](toc/english.html) + +--- + +The crate [`futures-rs`](https://github.com/alexcrichton/futures-rs) is commonly used to build asyncronous functionality. It provides constructs similar to `Promise`s in Javascript. + +--- + +## Notes on performance and usability + +* Rust does not have an implicit runtime event loop like Node.js. + + Tokio provides an explicit one. +* Futures are a zero-cost abstraction. +* The documentation for `futures-rs` is regarded as 'not good'. +* The async ecosystem is still very young. Be patient! + +--- + +## You've got oneshot + +`futures::sync::oneshot` provides a basic, single use future. + +They feel like a channel to use, even coming with a `tx` and `rx` pair. + +--- + +## You've got oneshot + +
+ +--- + +## You've got oneshot + +What happens if we swap the `rx.wait()` and the `tx.send()`? + +There is **no** implicit threading, calling `rx.wait()` blocks the thread until data is received! + +--- + +## You've got oneshot (threads) + +
+ +--- + +## Multiple oneshots + +
+ +--- + +## 57 channels (and nothing on) + +An `futures::sync::mpsc` represents a channel that will yield a series of futures. + +`mpsc::channel` has a bounded buffer size, and is concerned with back pressure. + +`mpsc::unbounded` has no bounded size, and can grow to fit all of memory. + +--- + +## 57 channels (and nothing on) + +
+ +--- + +## Toes in the CPU pool + +`futures-rs` comes with [`futures_cpupool`](https://docs.rs/futures-cpupool/0.1.7/futures_cpupool/) which provides a simple, easy to use CPU Pool type. + +This allows for us to dispatch arbitrary (heterogeneous!) jobs to a pool without worrying about where (and when) it gets executed. + +--- + +## Toes in the CPU pool + +
+ +--- + +## Interacting with futures + +Most of the times you will not be creating raw futures and sending them around. + +Instead, you'll likely end up interacting with them as part of a crate. + +Worry more about how to handle them, and work with them, than how to create and execute them. diff --git a/presentation/chapters/shared/code/futures/1.rs b/presentation/chapters/shared/code/futures/1.rs new file mode 100644 index 0000000..bf89dcd --- /dev/null +++ b/presentation/chapters/shared/code/futures/1.rs @@ -0,0 +1,18 @@ +fn main() { + // This is a simple future, sort of like a one-time channel. + // You get a (sender, receiver) when you invoke them. + // Sending a value consumes that side of the channel. + let (tx, rx) = oneshot::channel(); + + // This consumes the sender, we can't use it afterwards. + tx.send("Bears").unwrap(); + + // Now we can wait for it to finish + let result = rx.wait() + .unwrap(); + println!("{}", result); +} + +extern crate futures; +use futures::Future; +use futures::sync::oneshot; \ No newline at end of file diff --git a/presentation/chapters/shared/code/futures/2.rs b/presentation/chapters/shared/code/futures/2.rs new file mode 100644 index 0000000..6d861cd --- /dev/null +++ b/presentation/chapters/shared/code/futures/2.rs @@ -0,0 +1,20 @@ +fn main() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(500)); + tx.send("Bears").unwrap(); + }); + + let result = rx.wait() + .unwrap(); + println!("{}", result); +} + +extern crate futures; +extern crate rand; + +use std::thread; +use std::time::Duration; +use futures::Future; +use futures::sync::oneshot; \ No newline at end of file diff --git a/presentation/chapters/shared/code/futures/3.rs b/presentation/chapters/shared/code/futures/3.rs new file mode 100644 index 0000000..4252a7b --- /dev/null +++ b/presentation/chapters/shared/code/futures/3.rs @@ -0,0 +1,45 @@ +const NUM_OF_TASKS: usize = 10; + +fn main() { + let mut rx_set = Vec::new(); + + for index in 0..NUM_OF_TASKS { + let (tx, rx) = futures::oneshot(); + rx_set.push(rx); + + thread::spawn(move || { + println!("{} --> START", index); + sleep_a_little_bit(); + tx.send(index).unwrap(); + println!("{} <-- END", index); + }); + } + + // `join_all` lets us join together the set of futures. + let result = join_all(rx_set) + .wait() + .unwrap(); + + println!("{:?}", result); +} + +extern crate rand; +extern crate futures; + +use std::thread; +use futures::Future; +use futures::future::join_all; +use std::time::Duration; +use rand::distributions::{Range, IndependentSample}; + +// This function sleeps for a bit, then returns how long it slept. +pub fn sleep_a_little_bit() -> u64 { + let mut generator = rand::thread_rng(); + let possibilities = Range::new(0, 1000); + + let choice = possibilities.ind_sample(&mut generator); + + let a_little_bit = Duration::from_millis(choice); + thread::sleep(a_little_bit); + choice +} \ No newline at end of file diff --git a/presentation/chapters/shared/code/futures/4.rs b/presentation/chapters/shared/code/futures/4.rs new file mode 100644 index 0000000..1494d88 --- /dev/null +++ b/presentation/chapters/shared/code/futures/4.rs @@ -0,0 +1,59 @@ +const BUFFER_SIZE: usize = 57; + +fn main() { + // We're using a bounded channel here with a limited size. + let (mut tx, rx) = mpsc::channel(BUFFER_SIZE); + + thread::spawn(move || { + for index in 0..(BUFFER_SIZE+2) { + sleep_a_little_bit(); + // When we `send()` a value it consumes the sender. Returning + // a 'new' sender which we have to handle. In this case we just + // re-assign. + match tx.send(index).wait() { + // Why do we need to do this? This is how back pressure is implemented. + // When the buffer is full `wait()` will block. + Ok(new_tx) => tx = new_tx, + Err(_) => panic!("Oh no!"), + } + } + // Here the stream (`tx`) is dropped, completing it. + }); + + // We can `.fold()` like we would an iterator. In fact we can do many + // things like we would an iterator. + let sum = rx.fold(0, |acc, val| { + // Notice when we run that this is happening after each item of + // the stream resolves, like an iterator. + println!("--- FOLDING {} INTO {}", val, acc); + // `ok()` is a simple way to say "Yes this worked." + // `err()` also exists. + ok(acc + val) + }) + .wait() + .unwrap(); + println!("Sum: {}", sum); +} + +extern crate rand; +extern crate futures; + +use std::time::Duration; +use std::thread; +use rand::distributions::{Range, IndependentSample}; +use futures::future::{Future, ok}; +use futures::stream::Stream; +use futures::sync::mpsc; +use futures::Sink; + +// This function sleeps for a bit, then returns how long it slept. +pub fn sleep_a_little_bit() -> u64 { + let mut generator = rand::thread_rng(); + let possibilities = Range::new(0, 100); + + let choice = possibilities.ind_sample(&mut generator); + + let a_little_bit = Duration::from_millis(choice); + thread::sleep(a_little_bit); + choice +} \ No newline at end of file diff --git a/presentation/chapters/shared/code/futures/5.rs b/presentation/chapters/shared/code/futures/5.rs new file mode 100644 index 0000000..a878e9f --- /dev/null +++ b/presentation/chapters/shared/code/futures/5.rs @@ -0,0 +1,22 @@ +fn main() { + // Creates a CpuPool with workers equal to the cores on the machine. + let pool = Builder::new().create(); + + let returns_string = pool.spawn_fn(move || -> Result<_, ()> { + Ok("First") + }); + let returns_integer = pool.spawn_fn(move || -> Result<_, ()> { + Ok(2) + }); + + let resulting_string = returns_string.wait().unwrap(); + let resulting_integer = returns_integer.wait().unwrap(); + + println!("{}, {}", resulting_string, resulting_integer); + // Returns `First, 2` +} + +extern crate futures; +extern crate futures_cpupool; +use futures::future::Future; +use futures_cpupool::Builder; \ No newline at end of file diff --git a/presentation/toc/english.html b/presentation/toc/english.html index b585bde..62fd58c 100644 --- a/presentation/toc/english.html +++ b/presentation/toc/english.html @@ -86,6 +86,7 @@

Unfinished

  • Serde
  • Macros
  • WebAssembly
  • +
  • Futures
  • Inner Mutability
  • diff --git a/presentation/toc/english.md b/presentation/toc/english.md index 8a949f3..961dad5 100644 --- a/presentation/toc/english.md +++ b/presentation/toc/english.md @@ -77,4 +77,5 @@ * [Serde](../index.html?chapter=serde&locale=en-US) * [Macros](../index.html?chapter=macros) * [WebAssembly](../index.html?chapter=wasm&locale=en-US) +* [Futures](../index.html?chapter=futures&locale=en-US) * [Inner Mutability](../index.html?chapter=inner-mutability&locale=en-US)