From 77eeba3a471c92507b2d6cbddc95a8b2e4409c24 Mon Sep 17 00:00:00 2001 From: Andrei Listochkin Date: Mon, 22 Jan 2024 14:23:21 +0000 Subject: [PATCH 1/3] Deconstructing Send, Arc, and Mutex --- training-slides/src/SUMMARY.md | 3 + .../src/deconstructing-send-arc-mutex.md | 400 ++++++++++++++++++ 2 files changed, 403 insertions(+) create mode 100644 training-slides/src/deconstructing-send-arc-mutex.md diff --git a/training-slides/src/SUMMARY.md b/training-slides/src/SUMMARY.md index ee00051..b211664 100644 --- a/training-slides/src/SUMMARY.md +++ b/training-slides/src/SUMMARY.md @@ -56,6 +56,9 @@ Topics that go beyond [Applied Rust](#applied-rust). * [Using Types to encode State](./type-state.md) * [WASM](./wasm.md) +## Under development + +* [Deconstructing Send, Arc, and Mutex](./deconstructing-send-arc-mutex.md) # No-Std Rust Rust for the Linux Kernel and other no-std environments with an pre-existing C API. Requires [Applied Rust](#applied-rust). diff --git a/training-slides/src/deconstructing-send-arc-mutex.md b/training-slides/src/deconstructing-send-arc-mutex.md new file mode 100644 index 0000000..9a37e24 --- /dev/null +++ b/training-slides/src/deconstructing-send-arc-mutex.md @@ -0,0 +1,400 @@ + +# Deconstructing Send, Arc, and Mutex + +## `tread::spawn` Function + +```rust ignore +pub fn spawn(f: F) -> JoinHandle +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + // ... +} +``` + +## Quick Primer on Rust Closures + +* 3 categories of data + * data the closure *closes over* / *captures*: **Upvars** + * convenient compiler terminology + * not represented by closure type signature + * parameters + * returned value + +```rust ignore +let upper_threshold = 20; +let outliers: Vec<_> = data.iter().copied().filter(|n| -> bool { + // `n` is a parameter, `upper_threshold` is an *upvar* + n >= upper_threshold +}).collect(); +``` + +## Spawn closure type + +* `F: FnOnce() -> T` + * closure doesn't accept any parameters + * closure can *consume upvars* ("FnOnce") +* `F: Send + 'static` + * applies to *upvars* +* `T: Send + 'static` + * applies to returned value + +## `T: 'static` + +One of two cases: + +* the type doesn't have any references inside ("Owned data") + * `struct User { name: String }` +* the references inside the type are `'static` + * `struct Db { connection_string: &'static str }` + +## Why `F: 'static` and `T: 'static`? + +* applies to data passed from parent thread to child thread or vise versa +* prevents passing references to local variables + * one thread can finish before the other and such references may become invalid + * `+ 'static` lets borrow checker detect these situations at compile time + +## `T: Send` + +`pub unsafe auto trait Send { }` + +* `auto` means all types get this trait automatically + * opt-out instead of opt-in +* various types in standard library implement `Send` or `!Send` +* `unsafe` means you have to put `unsafe` keyword in front of `impl` when implementing `Send` or `!Send` + * precaution measure + +## Why would one implement `Send` or `!Send` + +* Rust pointers (`*const T`, `*mut T`, `NonNull`) are `!Send` + * Use-case: what if the pointer comes from FFI library that assumes that all functions using this pointer are called from the same thread? +* `Arc` has a `NonNull<..>` inside and becomes `!Send` automatically + * to override this behavior `Arc` explicitly implements `Send` + +## `Send` in `thread::spawn` Function + +`F: Send` and `T: Send` means that all data traveling from the parent thread to child thread has to be marked as `Send` + +* Rust compiler has to inherent knowledge of threads, but the use of marker traits and lifetime annotations let the type / borrow checker prevent data race errors + +## Sharing data between threads + +## Example: Message Log for TCP Echo Server + +```rust ignore +use std::{ + io::{self, BufRead as _, Write as _}, + net, thread, +}; + +fn handle_client(stream: net::TcpStream) -> Result<(), io::Error> { + let mut writer = io::BufWriter::new(&stream); + let reader = io::BufReader::new(&stream); + for line in reader.lines() { + let line = line?; + writeln!(writer, "{}", line)?; + writer.flush()?; + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + for stream in listener.incoming() { + let stream = stream?; + thread::spawn(|| { + let _ = handle_client(stream); + }); + } + Ok(()) +} +``` + +## Task + +* create a log of lengths of all lines coming from all streams +* `let mut log = Vec::::new();` +* `log.push(line.len());` + +## "Dream" API + +```rust ignore +fn handle_client(stream: net::TcpStream, log: &mut Vec) -> Result<(), io::Error> { + // ... + for line in ... { + log.push(line.len()); + // ... + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let mut log = vec![]; + + for stream in listener.incoming() { + // ... + thread::spawn(|| { + let _ = handle_client(stream, &mut log); + }); + } + Ok(()) +} +``` + +## Errors + +
+error[E0373]: closure may outlive the current function, but it borrows `log`,
+which is owned by the current function
+  --> src/main.rs:26:23
+   |
+26 |         thread::spawn(|| {
+   |                       ^^ may outlive borrowed value `log`
+27 |             let _ = handle_client(stream, &mut log);
+   |                                                --- `log` is borrowed here
+   |
+note: function requires argument type to outlive `'static`
+
+ +## Life-time problem + +Problem: + +* local data may be cleaned up prematurely + +Solution: + +* move the decision when to clean the data from compile-time to runtime + * use reference-counting + +## Attempt 1: `Rc` + +* `let mut log = Rc::new(vec![]);` +* `let mut thread_log = log.clone()` now does't clone the data, but simply increases the reference count + * both variables now have *owned* type, and satisfy `F: 'static` requirement + +```text +error[E0277]: `Rc>` cannot be sent between threads safely +``` + +## `Rc` in Rust Standard Library + +* uses `usize` for reference counting +* explicitly marked as `!Send` + +```rust ignore +pub struct Rc { + ptr: NonNull>, +} + +impl !Send for Rc {} + +struct RcBox { + strong: Cell, + weak: Cell, + value: T, +} +``` + +## `Arc` in Rust Standard Library + +* uses `AtomicUsize` for reference counting +* explicitly marked as `Send` + +```rust ignore +pub struct Arc { + ptr: NonNull>, +} + +impl Send for Arc {} + +struct ArcInner { + strong: atomic::AtomicUsize, + weak: atomic::AtomicUsize, + data: T, +} +``` + +## `Rc` vs `Arc` + +* `Arc` uses `AtomicUsize` for reference counting + * slower + * safe to increment / decrement from multiple threads +* With the help of marker trait `Send` and trait bounds on `thread::spawn` Rust compiler *forces* you to use the correct type + +## `Arc` / `Rc` "transparency" + +```rust ignore +let mut log = Arc::new(Vec::new()); +// how does this code work? +log.len(); +// and why doesn't this work? +log.push(1); +``` + +## `Deref` and `DerefMut` traits + +```rust ignore +pub trait Deref { + type Target: ?Sized; + fn deref(&self) -> &Self::Target; +} + +pub trait DerefMut: Deref { + fn deref_mut(&mut self) -> &mut Self::Target; +} +``` + +## `Deref` coercions + +* `Deref` can convert a `&self` reference to a reference of another type + * conversion function call can be inserted by the compiler for you automatically + * in most cases the conversion is a no-op or a fixed pointer offset + * deref functions can be inlined +* `Target` is an associated type + * can't `deref()` into multiple different types +* `DerefMut: Deref` allows the later to reuse the same `Target` type + * read-only and read-write references coerce to the references of the same type + +## `Arc` / `Rc` "transparency" with `Deref` + +```rust ignore +let mut log = Arc::new(Vec::new()); +// Arc implements `Deref` from `&Arc into `&T` +log.len(); +// the same as +Vec::len( as Deref>::deref(&log)); + +// Arc DOES NOT implement `DerefMut` +// log.push(1); + +// the line above would have expanded to: +// Vec::push( as DerefMut>::deref_mut(&mut log), 1); +``` + +## `Arc` and mutability + +* lack of `impl DerefMut for Arc` prevents accidental creation of multiple `&mut` to underlying data +* the solution is to move mutability decision to runtime + +```rust ignore +let log = Arc::new(Mutex::new(Vec::new())); +``` +

 

+ +* `Arc` guarantees availability of data in memory + * prevents memory form being cleaned up prematurely +* `Mutex` guarantees exclusivity of mutable access + * provides a single `&mut` to underlying data simultaneously + +## `Mutex` in Action + +* `log` is passed as `&` and is `deref`-ed from `Arc` by the compiler +* `mut`ability is localized to a local `guard` variable + * `Mutex::lock` method takes `&self` +* `MutexGuard` implements `Deref` *and* `DerefMut`! +* `'_` lifetime annotation is needed only because guard struct has a `&Mutex` inside + +```rust ignore +fn handle_client(..., log: &Mutex>) -> ... { + for line in ... { + let mut guard: MutexGuard<'_, Vec> = log.lock().unwrap(); + guard.push(line.len()); + // line above expands to: + // Vec::push( as DerefMut>::deref_mut(&mut guard), line.len()); + writeln!(writer, "{}", line)?; + writer.flush()?; + } +} +``` + +## `Mutex` locking and unlocking + +* we `lock` the mutex for exclusive access to underlying data at runtime +* old C APIs used a pair of functions to lock and unlock the mutex +* `MutexGuard` does unlocking automatically when is dropped + * time between guard creation and drop is called *critical section* + +## Lock Poisoning + +* `MutexGuard` in its `Drop` implementation checks if it is being dropped normally or during a `panic` unwind + * in later case sets a poison flag on the mutex +* calling `lock().unwrap()` on a poisoned Mutex causes `panic` + * if the mutex is *"popular"* poisoning can cause many application threads to panic, too. +* poisoning API is *problematic* + * `PoisonError` doesn't provide information about the panic that caused the poisoning + * no way to recover and revive the mutex (stays poisoned forever) + * `PoisonError::into_inner` *can* produce a guard even for poisoned mutexes + +## Critical Section "Hygiene" + +* keep it short to reduce the window when mutex is locked +* avoid calling functions that can panic +* using a named variable for Mutex guard helps avoiding unexpected temporary lifetime behavior + +```rust ignore +fn handle_client(..., log: &Mutex>) -> ... { + for line in ... { + { + let mut guard: MutexGuard<'_, Vec> = log.lock().unwrap(); + guard.push(line.len()); + } // critical section ends here, before all the IO + writeln!(writer, "{}", line)?; + writer.flush()?; + } +} +``` +

 

+ +* `drop(guard)` also works, but extra block nicely highlights the critical section + +## Lessons Learned + +* careful use of traits and trait boundaries lets Rust compiler detect problematic multithreading code at compile time +* `Arc` and `Mutex` let the program ensure data availability and exclusive mutability at runtime where the compiler can't predict the behavior of the program +* `Deref` coercions make concurrency primitives virtually invisible and transparent to use +* **Make invalid state unrepresentable** + +## Full Example + +```rust ignore +use std::{ + io::{self, BufRead as _, Write as _}, + net, + sync::{Arc, Mutex}, + thread, +}; + +fn handle_client(stream: net::TcpStream, log: &Mutex>) -> Result<(), io::Error> { + let mut writer = io::BufWriter::new(&stream); + let reader = io::BufReader::new(&stream); + for line in reader.lines() { + let line = line?; + { + let mut guard = log.lock().unwrap(); + guard.push(line.len()); + } + writeln!(writer, "{}", line)?; + writer.flush()?; + } + Ok(()) +} + +fn main() -> Result<(), io::Error> { + let log = Arc::new(Mutex::new(vec![])); + let listener = net::TcpListener::bind("0.0.0.0:7878")?; + + for stream in listener.incoming() { + let stream = stream?; + let thread_log = log.clone(); + thread::spawn(move || { + let _ = handle_client(stream, &thread_log); + }); + } + Ok(()) +} +``` From 737c945d2b547919f72288f1033ce85ed4257eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=9B=D0=B8=D1=81?= =?UTF-8?q?=D1=82=D0=BE=D1=87=D0=BA=D0=B8=D0=BD=20=28Andrei=20Listochkin?= =?UTF-8?q?=29?= Date: Mon, 5 Feb 2024 14:49:38 +0000 Subject: [PATCH 2/3] (pr-feedback) Deconstructing Send, Arc, and Mutex Co-authored-by: Jonathan Pallant --- training-slides/src/SUMMARY.md | 4 +--- .../src/deconstructing-send-arc-mutex.md | 16 ++++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/training-slides/src/SUMMARY.md b/training-slides/src/SUMMARY.md index b211664..68e8b4d 100644 --- a/training-slides/src/SUMMARY.md +++ b/training-slides/src/SUMMARY.md @@ -39,6 +39,7 @@ Topics that go beyond [Applied Rust](#applied-rust). * [Advanced Strings](./advanced-strings.md) * [Debugging Rust](./debugging-rust.md) +* [Deconstructing Send, Arc, and Mutex](./deconstructing-send-arc-mutex.md) * [Dependency Management with Cargo](./dependency-management.md) * [Deref Coercions](./deref-coercions.md) * [Design Patterns](./design-patterns.md) @@ -56,9 +57,6 @@ Topics that go beyond [Applied Rust](#applied-rust). * [Using Types to encode State](./type-state.md) * [WASM](./wasm.md) -## Under development - -* [Deconstructing Send, Arc, and Mutex](./deconstructing-send-arc-mutex.md) # No-Std Rust Rust for the Linux Kernel and other no-std environments with an pre-existing C API. Requires [Applied Rust](#applied-rust). diff --git a/training-slides/src/deconstructing-send-arc-mutex.md b/training-slides/src/deconstructing-send-arc-mutex.md index 9a37e24..eb09e71 100644 --- a/training-slides/src/deconstructing-send-arc-mutex.md +++ b/training-slides/src/deconstructing-send-arc-mutex.md @@ -1,7 +1,7 @@ # Deconstructing Send, Arc, and Mutex -## `tread::spawn` Function +## `thread::spawn` Function ```rust ignore pub fn spawn(f: F) -> JoinHandle @@ -43,7 +43,7 @@ let outliers: Vec<_> = data.iter().copied().filter(|n| -> bool { ## `T: 'static` -One of two cases: +Two allowable options: * the type doesn't have any references inside ("Owned data") * `struct User { name: String }` @@ -52,10 +52,10 @@ One of two cases: ## Why `F: 'static` and `T: 'static`? -* applies to data passed from parent thread to child thread or vise versa +* applies to data passed from parent thread to child thread or vice-versa * prevents passing references to local variables * one thread can finish before the other and such references may become invalid - * `+ 'static` lets borrow checker detect these situations at compile time + * `+ 'static` avoids this by ensuring any references point to data that has the static lifetime (i.e. that lives forever) ## `T: Send` @@ -65,7 +65,7 @@ One of two cases: * opt-out instead of opt-in * various types in standard library implement `Send` or `!Send` * `unsafe` means you have to put `unsafe` keyword in front of `impl` when implementing `Send` or `!Send` - * precaution measure + * precautionary measure ## Why would one implement `Send` or `!Send` @@ -78,7 +78,7 @@ One of two cases: `F: Send` and `T: Send` means that all data traveling from the parent thread to child thread has to be marked as `Send` -* Rust compiler has to inherent knowledge of threads, but the use of marker traits and lifetime annotations let the type / borrow checker prevent data race errors +* Rust compiler has no inherent knowledge of threads, but the use of marker traits and lifetime annotations let the type / borrow checker prevent data race errors ## Sharing data between threads @@ -224,7 +224,7 @@ struct ArcInner { * `Arc` uses `AtomicUsize` for reference counting * slower * safe to increment / decrement from multiple threads -* With the help of marker trait `Send` and trait bounds on `thread::spawn` Rust compiler *forces* you to use the correct type +* With the help of marker trait `Send` and trait bounds on `thread::spawn`, the compiler *forces* you to use the correct type ## `Arc` / `Rc` "transparency" @@ -354,7 +354,7 @@ fn handle_client(..., log: &Mutex>) -> ... { ## Lessons Learned -* careful use of traits and trait boundaries lets Rust compiler detect problematic multithreading code at compile time +* careful use of traits and trait boundaries lets the compiler detect problematic multi-threading code at compile time * `Arc` and `Mutex` let the program ensure data availability and exclusive mutability at runtime where the compiler can't predict the behavior of the program * `Deref` coercions make concurrency primitives virtually invisible and transparent to use * **Make invalid state unrepresentable** From d029d6eb2ab18a5a7d7afd08166a7890e68cc772 Mon Sep 17 00:00:00 2001 From: Andrei Listochkin Date: Tue, 3 Sep 2024 16:16:51 +0100 Subject: [PATCH 3/3] (pr-feedback 2) Deconstructing Send, Arc, and Mutex * removed "no way to recover the mutex" note since `Mutex::clear_poison` is stabilized now --- .../src/deconstructing-send-arc-mutex.md | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/training-slides/src/deconstructing-send-arc-mutex.md b/training-slides/src/deconstructing-send-arc-mutex.md index eb09e71..3fb910d 100644 --- a/training-slides/src/deconstructing-send-arc-mutex.md +++ b/training-slides/src/deconstructing-send-arc-mutex.md @@ -43,7 +43,7 @@ let outliers: Vec<_> = data.iter().copied().filter(|n| -> bool { ## `T: 'static` -Two allowable options: +Two options allowed: * the type doesn't have any references inside ("Owned data") * `struct User { name: String }` @@ -160,7 +160,7 @@ which is owned by the current function note: function requires argument type to outlive `'static` -## Life-time problem +## Lifetime problem Problem: @@ -168,13 +168,13 @@ Problem: Solution: -* move the decision when to clean the data from compile-time to runtime +* move the decision when to clean the data from *compile-time* to *run-time* * use reference-counting ## Attempt 1: `Rc` * `let mut log = Rc::new(vec![]);` -* `let mut thread_log = log.clone()` now does't clone the data, but simply increases the reference count +* `let mut thread_log = log.clone()` now doesn't clone the data, but simply increases the reference count * both variables now have *owned* type, and satisfy `F: 'static` requirement ```text @@ -257,7 +257,7 @@ pub trait DerefMut: Deref { * deref functions can be inlined * `Target` is an associated type * can't `deref()` into multiple different types -* `DerefMut: Deref` allows the later to reuse the same `Target` type +* `DerefMut: Deref` allows the `DerefMut` trait to reuse the same `Target` type * read-only and read-write references coerce to the references of the same type ## `Arc` / `Rc` "transparency" with `Deref` @@ -286,10 +286,10 @@ let log = Arc::new(Mutex::new(Vec::new())); ```

 

-* `Arc` guarantees availability of data in memory +* `Arc` guarantees *availability* of data in memory * prevents memory form being cleaned up prematurely -* `Mutex` guarantees exclusivity of mutable access - * provides a single `&mut` to underlying data simultaneously +* `Mutex` guarantees *exclusivity of mutable access* + * provides *only one* `&mut` to underlying data simultaneously ## `Mutex` in Action @@ -325,10 +325,7 @@ fn handle_client(..., log: &Mutex>) -> ... { * in later case sets a poison flag on the mutex * calling `lock().unwrap()` on a poisoned Mutex causes `panic` * if the mutex is *"popular"* poisoning can cause many application threads to panic, too. -* poisoning API is *problematic* - * `PoisonError` doesn't provide information about the panic that caused the poisoning - * no way to recover and revive the mutex (stays poisoned forever) - * `PoisonError::into_inner` *can* produce a guard even for poisoned mutexes +* `PoisonError` doesn't provide information about the panic that caused the poisoning ## Critical Section "Hygiene" @@ -336,6 +333,8 @@ fn handle_client(..., log: &Mutex>) -> ... { * avoid calling functions that can panic * using a named variable for Mutex guard helps avoiding unexpected temporary lifetime behavior +## Critical Section Example + ```rust ignore fn handle_client(..., log: &Mutex>) -> ... { for line in ... {