Skip to content

Latest commit

 

History

History
1283 lines (862 loc) · 71.5 KB

ch19-并发.md

File metadata and controls

1283 lines (862 loc) · 71.5 KB

并发(Concurrency)

从长远来看,用面向机器的语言编写大型并发程序是不可取的,因为这种语言允许不受限制地使用存储位置及其地址.我们无法使这样的程序可靠(即使有复杂的硬件机制的帮助). --Per Brinch Hansen (1977) 通信模式是并行模式. --Whit Morriss

原文

In the long run it is not advisable to write large concurrent programs in machine-oriented languages that permit unrestricted use of store locations and their addresses. There is just no way we will be able to make such programs reliable (even with the help of complicated hardware mechanisms). --Per Brinch Hansen (1977)

Patterns for communication are patterns for parallelism. --Whit Morriss

如果你对并发的态度在你的职业生涯中发生了变化,那么你并不孤单.这是一个常见的故事.

首先,编写并发代码既简单又有趣.这些工具--线程,锁,队列等等--很容易获取和使用.这里面有很多陷阱,这是真的,但幸运的是你知道它们都是什么,并且小心翼翼地避免犯错误.

在某些时候,你必须调试其他人的多线程代码,你不得不得出这样的结论:有些人确实不应该使用这些工具.

然后在某些时候,你必须调试自己的多线程代码.

经验灌输了对所有多线程代码的健康的怀疑,如果不是彻头彻尾的不信任.偶尔有一篇文章解释为什么一些明显正确的多线程习惯用法根本不起作用.(它与"内存模型(the memory model)"有关.)但是你最终会找到一种并发的方法,你认为你可以实际使用它,而不会经常出错.你可以把几乎所有东西都塞进那个习惯用法中,并且(如果你 真的(really) 很优秀的话)你学会对增加复杂性说"不(no)".

当然,还有很多习语.系统程序员通常使用的方法包括以下内容:

  • 一个只有单个工作(job)的 后台线程(background thread) ,它会定期醒来执行.

  • 通用 任务队列(task queues) 与客户端通信的通用 工作池(worker pools) .

  • 数据从一个线程流向下一个线程的 管道(Pipelines) .每个线程做一点工作.

  • 数据并行性(Data parallelism) ,假定(正确或错误地)整个计算机将主要进行一个大型计算,因此将其分成n个部分,并在n个线程上运行,以期将机器的所有n个核心同时工作.

  • 同步对象之海(sea of synchronized objects) ,其中多个线程可以访问相同的数据,并且使用基于低级原语(如互斥锁)的临时锁方案来避免竞争.(Java包含对此模型的内置支持,这在20世纪90年代和21世纪初非常流行.)

  • 原子整数操作(Atomic integer operations) 允许多个核通过一个机器字大小的字段传递信息来进行通信.(这比其他方法更难得到正确的结果,除非交换的数据只是整数值.在实践中,它通常是指针.)

随着时间的推移,你可能会使用其中的几种方法并将它们安全地结合起来.你是这门艺术的大师.如果不允许其他人以任何方式修改系统,事情会很棒.良好地使用线程的程序充满了不成文的规则(unwritten rules).

Rust提供了一种更好的使用并发的方式,而不是强制所有程序采用单一风格(对于系统程序员来说,这根本不是解决方案),而是通过安全地支持多种风格.不成文的规则是写下来的--在代码中--并由编译器强制执行.

你已经听说Rust允许你编写安全,快速,并发的程序.这一章我们会告诉你怎么做.我们将介绍使用Rust线程的三种方法:

  • Fork-join并行性(Fork-join parallelism)

  • 通道(Channels)

  • 共享可变状态(Shared mutable state)

在此过程中,你将使用迄今为止所学到的关于Rust语言的所有内容.在单线程程序中,Rust对引用,可变性和生命周期的关注是有价值的,但是在并发编程中,这些规则的真正意义变得明显.它们可以扩展你的工具箱,快速,正确地破解多种风格的多线程代码--无需怀疑,无需不信任,无需担心.

Fork-join并行性(Fork-Join Parallelism)

当我们有几个完全独立的任务,我们想要同时执行时,最简单的线程用例就会出现.

例如,假设我们正在对大量文档进行自然语言处理.我们可以写一个循环:

fn process_files(filenames: Vec<String>) -> io::Result<()> {
    for document in filenames {
        let text = load(&document)?;  // read source file
        let results = process(text);  // compute statistics
        save(&document, results)?;    // write output file
    }
    Ok(())
}

该程序将如图19-1所示运行.

图19-1. process_files()的单线程执行.

由于每个文档都是单独处理的,因此通过将语料库分成块并在单独的线程上处理每个块来加快此任务相对容易,如图19-2所示.

此模式称为 fork-join并行性(fork-join parallelism) . fork 是为了启动一个新线程,而 join 一个线程就是等待它完成.我们已经看过这种技术:我们用它来加速第2章中的Mandelbrot程序.

图19-2. 使用fork-join方法进行多线程文件处理.

Fork-join并行性很有吸引力,原因如下:

  • 这很简单.Fork-join易于实现,Rust可以很容易地实现.

  • 它避免了瓶颈.fork-join中没有共享资源的锁定.任何线程必须等待另一个线程的唯一时间是在最后.与此同时,每个线程都可以自由运行.这有助于将任务切换(task-switching)开销降低.

  • 性能数学计算很简单.在最好的情况下,通过启动四个线程,我们可以在四分之一的时间内完成我们的工作.图19-2显示了我们不应期望这种理想加速的一个原因:我们可能无法在所有线程上均匀分配工作.另一个需要注意的原因是,有时fork-join程序必须在线程加入后花费一些时间,并 结合(combining) 线程计算的结果.也就是说,完全隔离任务可能会带来一些额外的工作.尽管如此,除了这两件事之外,任何具有独立工作单元的CPU限制程序都可以获得显着提升.

  • 很容易推断出程序的正确性.只要线程确实是独立的,fork-join程序就是 确定性的(deterministic) ,就像Mandelbrot程序中的计算线程一样.无论线程速度如何变化,程序始终都会产生相同的结果.这是一个没有竞争条件的并发模型.

fork-join的主要缺点是它需要独立的工作单元.在本章的后面部分,我们将考虑一些不能如此清晰地分解的问题.

现在,让我们坚持使用自然语言处理示例.我们将展示一些将fork-join模式应用于process_files函数的方法.

spawn和join(spawn and join)

函数std::thread::spawn启动一个新线程.

spawn(|| {
    println!("hello from a child thread");
})

它接受一个参数,一个FnOnce闭包或函数.Rust启动一个新线程来运行该闭包或函数的代码.新线程是一个真正的操作系统线程,它有自己的堆栈,就像C++,C#和Java中的线程一样.

这是一个更实际的例子,使用spawn实现一个之前process_files函数的并行版本:

use std::thread::spawn;

fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {
    // Divide the work into several chunks.
    const NTHREADS: usize = 8;
    let worklists = split_vec_into_chunks(filenames, NTHREADS);

    // Fork: Spawn a thread to handle each chunk.
    let mut thread_handles = vec![];
    for worklist in worklists {
        thread_handles.push(
            spawn(move || process_files(worklist))
        );
    }

    // Join: Wait for all threads to finish.
    for handle in thread_handles {
        handle.join().unwrap()?;
    }

    Ok(())
}

让我们逐行分析这个函数.

fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {

我们的新函数与原始process_files具有相同的类型签名,使其成为一个方便的替代品.

// Divide the work into several chunks.
const NTHREADS: usize = 8;
let worklists = split_vec_into_chunks(filenames, NTHREADS);

我们使用一个工具函数split_vec_into_chunks(此处未显示)来划分工作.结果(worklists)是向量的向量.它包含八个大小均匀的原始向量filenames切片.

// Fork: Spawn a thread to handle each chunk.
let mut thread_handles = vec![];
for worklist in worklists {
    thread_handles.push(
        spawn(move || process_files(worklist)
    );
}

我们为每个worklist.spawn()生成一个线程,返回一个名为JoinHandle的值,稍后我们将使用它.

现在,我们将所有JoinHandle放入向量中.

注意我们如何将文件名列表添加到工作线程中:

  • worklist由父线程中的for循环定义和填充.

  • 一旦创建了move闭包,就会将worklist移动到闭包中.

  • spawn然后将闭包(包括worklist向量)移动到新的子线程.

这些移动很便宜.就像我们在第4章中讨论过的Vec<String>移动一样,String没有被克隆.事实上,没有任何东西被分配或释放.移动的唯一数据是Vec本身:三个机器字.

你创建的大多数线程都需要代码和数据才能开始.方便地,Rust闭包包含你想要的任何代码以及你想要的任何数据.

继续:

// Join: Wait for all threads to finish.
for handle in thread_handles {
    handle.join().unwrap()?;
}

我们使用前面收集的JoinHandle.join()方法等待所有八个线程完成.连接线程通常是正确性所必需的,因为一旦main返回,Rust程序就会退出,即使其他线程仍在运行.没有调用析构函数;额外的线程刚被杀死.如果这不是你想要的,请确保在从main返回之前加入你关心的任何线程.

如果我们设法完成此循环,则意味着所有八个子线程都已成功完成.因此,我们的函数以返回Ok(())结束:

    Ok(())
}

跨线程错误处理(Error Handling Across Threads)

由于错误处理,我们在示例中用于连接子线程的代码比它看起来更复杂.让我们重新审视这行代码:

handle.join().unwrap()?;

.join()方法为我们做了两件好事.

第一,handle.join()返回一个std::thread::Result, 如果子线程出现恐慌(if the child thread panicked) ,则返回一个错误.这使得Rust中的线程比C++中的线程更健壮.在C++中,越界数组访问是未定义行为,并且没有保护系统的其余部分免受后果影响.在Rust中,恐慌是安全的,每个线程的.线程之间的界限充当恐慌的防火墙;恐慌不会自动从一个线程传播到依赖它的线程.相反,一个线程中的恐慌被报告为其他线程中的错误Result.整个程序很容易恢复.

但是,在我们的程序中,我们不尝试任何花哨的恐慌处理.相反,我们立即在此Result上使用.unwrap(),断言它是Ok结果而不是Err结果.如果一个子线程 确实(did) 恐慌,那么这个断言就会失败,所以父线程也会出现恐慌.我们显式地将恐慌从子线程传播到父线程.

第二,handle.join()将子线程的返回值传递回父线程.我们传递给spawn的闭包有一个io::Result<()>返回类型.因为这就是process_files返回的内容.不会丢弃此返回值.子线程完成后,保存其返回值,JoinHandle::join()将该值传回父线程.

handle.join()在此程序中返回的完整类型是std::thread::Result<std::io::Result<()>>. thread::Resultspawn/joinAPI的一部分;io::Result是我们应用的一部分.

在我们的例子中,在解开thread::Result之后,我们在io :: Result上使用了?运算符,显式地将I/O错误从子线程传播到父线程.

所有这些看起来都很复杂.但请考虑它只是一行代码,然后将其与其他语言进行比较.Java和C#中的默认行为是将子线程中的异常转储到终端,然后将其遗忘.在C++中,默认是中止进程.在Rust中,错误是Result值(数据)而不是异常(控制流).它们像任何其他值一样跨线程传递.任何时候你使用低级线程API,你最终都必须编写仔细的错误处理代码,但 鉴于你必须编写它(given that you have to write it) ,Result就非常适合使用.

跨线程共享不可变数据(Sharing Immutable Data Across Threads)

假设我们正在进行的分析需要一个庞大的英语单词和短语数据库:

// before
fn process_files(filenames: Vec<String>)

// after
fn process_files(filenames: Vec<String>, glossary: &GigabyteMap)

这个glossary会很大,所以我们通过引用传递它.我们如何更新process_files_in_parallel以将词汇表传递给工作线程?

明显的更改不起作用:

fn process_files_in_parallel(filenames: Vec<String>,
                             glossary: &GigabyteMap)
    -> io::Result<()>
{
    ...
    for worklist in worklists {
        thread_handles.push(
            spawn(move || process_files(worklist, glossary))  // error
        );
    }
    ...
}

我们只是在我们的函数中添加了一个glossary参数,并将其传递给process_files.Rust抱怨:

error[E0477]: the type `[closure@...]` does not fulfill the required lifetime
  --> concurrency_spawn_lifetimes.rs:35:13
   |
35 |             spawn(move || process_files(worklist, glossary))  // error
   |             ^^^^^
   |
   = note: type must satisfy the static lifetime

Rust抱怨我们传递给spawn的闭包的生命周期.

spawn启动独立线程.Rust无法知道子线程将运行多长时间,因此它假设最坏情况:它假设子线程可能会在父线程完成并且父线程中的所有值都消失后继续运行.显然,如果子线程将持续那么长时间,那么它运行的闭包也需要持续那么长时间.但是这个闭包有一个有限的生命周期:它取决于引用glossary,并且引用不会永远持续下去.

请注意,Rust拒绝此代码是正确的!按照我们编写此函数的方式,一个线程 确实(is) 可能遇到I/O错误,导致process_files_in_parallel在其他线程完成之前退出.在主线程释放词汇表之后,子线程可能最终尝试使用它.这将是一场竞赛--如果主线能够获胜的话,它会以未定义行为作为奖项.Rust不允许这样做.

spawn似乎过于开放以支持跨线程共享引用.实际上,我们已经在第306页的"偷窃的闭包(Closures That Steal)"中看到了这样的情况.在那里,我们的解决方案是使用move闭包将数据的所有权转移到新线程.这在这里不起作用,因为我们有许多线程都需要使用相同的数据.一个安全的替代方法是为每个线程clone整个词汇表,但由于它很大,我们希望避免这种情况.幸运的是,标准库提供了另一种方式:原子引用计数(atomic reference counting).

我们在第90页的"Rc和Arc:共享所有权(Rc and Arc:Shared Ownership)"中描述了Arc.现在是时候使用它了:

use std::sync::Arc;

fn process_files_in_parallel(filenames: Vec<String>,
                             glossary: Arc<GigabyteMap>)
    -> io::Result<()>
{
    ...
    for worklist in worklists {
        // This call to .clone() only clones the Arc and bumps the
        // reference count. It does not clone the GigabyteMap.

        let glossary_for_child = glossary.clone();
        thread_handles.push(
            spawn(move || process_files(worklist, &glossary_for_child))
        );
    }
    ...
}

我们更改了glossary的类型:要并行运行分析,调用者必须通过执行Arc::new(giga_map)传入Arc<GigabyteMap>,这是指向已移入堆中的GigabyteMap的智能指针.

当我们调用glossary.clone()时,我们正在制作Arc智能指针的副本,而不是整个GigabyteMap.这相当于递增引用计数.

通过此更改,程序将编译并运行,因为它不再依赖于引用生命周期.只要 任何(any) 线程拥有Arc<GigabyteMap>,即使父线程提前退出,它也会使映射保持活动状态.不会有任何数据争用,因为Arc中的数据是不可变的.

Rayon(Rayon)

标准库的spawn函数是一个重要的原语,但它不是专门为fork-join并行性设计的.更好的fork-join API在它之上构建.例如,在第2章中,我们使用Crossbeam库将一些工作分散在8个线程.Crossbeam的 范围线程(scoped threads) 非常自然地支持fork-join并行性.

Niko Matsakis的Rayon库是另一个例子.它提供了两种并发运行任务的方式:

extern crate rayon;
use rayon::prelude::*;

// "do 2 things in parallel"
let (v1, v2) = rayon::join(fn1, fn2);

// "do N things in parallel"
giant_vector.par_iter().for_each(|value| {
    do_thing_with_value(value);
});

rayon::join(fn1, fn2)简单地调用两个函数并返回两个结果..par_iter()方法创建一个ParallelIterator,一个带map,filter和其他方法的值,就像RustIterator一样.在这两种情况下,Rayon都会使用自己的工作线程池来尽可能地分散工作.你只需告诉Rayon 可以(can) 并行完成哪些任务;Rayon管理线程并尽可能地分发工作.

图19-3中的图表说明了调用huge_vector.par_iter().for_each(...)的两种思考方式.(a) Rayon的行为好像它为在向量中每个元素产生一个线程.(b) 在幕后,Rayon每个CPU核心有一个工作线程,效率更高.这个工作线程池由所有程序的线程共享.当数千个任务同时进入时,Rayon将这项工作分开.

图19-3. Rayon理论与实践.

这是一个使用Rayon的process_files_in_parallel版本:

extern crate rayon;

use rayon::prelude::*;

fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)
    -> io::Result<()>
{
    filenames.par_iter()
        .map(|filename| process_file(filename, glossary))
        .reduce_with(|r1, r2| {
            if r1.is_err() { r1 } else { r2 }
        })
        .unwrap_or(Ok(()))
}

这个代码比使用std::thread::spawn的版本更短,更简洁.让我们逐行看一下:

  • 首先,我们使用filenames.par_iter()来创建并行迭代器.

  • 我们使用.map()在每个文件名上调用process_file.这会生成在一系列io::Result<()>值上的ParallelIterator.

  • 我们使用.reduce_with()来组合结果.在这里,我们保留第一个错误(如果有的话),并丢弃其余的错误.如果我们想积累所有错误或打印它们,我们可以在这里做.

传递.map()闭包时,.reduce_with()方法也很方便,该闭包在成功时返回有用的值.然后你可以传递给.reduce_with()一个知道如何组合两个成功结果的闭包.

  • reduce_with返回Option,仅在filenames为空时,才是None.在这种情况下,我们使用Option.unwrap_or()方法使结果为Ok(()).

在幕后,Rayon使用称为 工作窃取(work-stealing) 的技术,动态地平衡线程间的工作负载.它通常会比我们手动提前划分工作(如第461页的"spawn和join(spawn and join)"),更好地使所有CPU繁忙.

另外,Rayon支持跨线程共享引用.任何在幕后发生的并行处理都保证在reduce_with返回时完成.这就解释了为什么我们能够将glossary传递给process_file,即使该闭包在多个线程上调用.

(顺便说一句,我们使用map方法和reduce方法并不是巧合.由Google和Apache Hadoop推广的MapReduce编程模型与fork-join有很多共同之处.它可以看作是查询分布式数据的fork-join方法.)

再探Mandelbrot集(Revisiting the Mandelbrot Set)

回到第2章,我们使用fork-join并发来渲染Mandelbrot集.这使得渲染速度提高了四倍--令人印象深刻,但是考虑到我们让程序产生了8个工作线程并在8核机器上运行它,没有那么令人印象深刻!

问题是我们没有均匀地分配工作量.计算图像的一个像素相当于运行一个循环(参见第24页的"Mandelbrot Set实际上是什么(What the Mandelbrot Set Actually Is)").事实证明,图像的浅灰色部分(循环快速退出)比黑色部分(其中循环运行完整的255次迭代)渲染要快得多.因此,虽然我们将区域划分为相等大小的水平带,但我们创建了不相等的工作负载,如图19-4所示.

图19-4. Mandelbrot程序中的不均匀工作分配.

使用Rayon很容易解决这个问题.我们可以为输出中的每行像素启动并行任务.这创建了数百个Rayon可以在其线程中分发的任务.由于work-stealing,任务的大小不同并不重要.Rayon将平衡工作.

这里是代码.第一行和最后一行是我们在第35页的"并发Mandelbrot程序(A Concurrent Mandelbrot Program)"中显示的main函数的一部分,但我们已经更改了渲染代码,这是中间的所有内容.

let mut pixels = vec![0; bounds.0 * bounds.1];

// Scope of slicing up `pixels` into horizontal bands.
{
    let bands: Vec<(usize, &mut [u8])> = pixels
        .chunks_mut(bounds.0)
        .enumerate()
        .collect();

    bands.into_par_iter()
        .weight_max()
        .for_each(|(i, band)| {
            let top = i;
            let band_bounds = (bounds.0, 1);
            let band_upper_left = pixel_to_point(bounds, (0, top),
                           upper_left, lower_right);
            let band_lower_right = pixel_to_point(bounds, (bounds.0, top + 1),
                           upper_left, lower_right);
            render(band, band_bounds, band_upper_left, band_lower_right);
        });
}

write_bitmap(&args[1], &pixels, bounds).expect("error writing PNG file");

首先,我们创建bands,我们将传递给Rayon的任务集合.每个任务只是一个类型(usize, &mut [u8])的元组:行号,因为计算需要;和要填充的pixels的切片.我们使用chunks_mut方法将图像缓冲区分解成行,enumerate将行号附加到每一行,collect将所有数字切片对提取到一个向量中.(我们需要一个向量,因为Rayon只使用数组和向量创建并行迭代器.)

接下来,我们将band转换为并行迭代器,调用.weight_max()向Rayon提示,这些任务是非常cpu密集的,然后使用.for_each()方法告诉Rayon我们想要做什么工作.

由于我们使用Rayon,我们必须将这些行添加到 main.rs :

extern crate rayon;
use rayon::prelude::*;

这些行添加到Cargo.toml :

[dependencies]
rayon = "0.4"

通过这些更改,该程序现在在8核计算机上使用大约7.75个核心.它比以前我们手动划分工作时,快75%.而且代码更短,反映了让crate做工作(工作分配)而不是自己做的好处.

通道(Channels)

通道(channel) 是用于将值从一个线程发送到另一个线程的单向管道.换句话说,它是一个线程安全的队列.

图19-5说明了如何使用通道.它们类似于Unix管道:一端用于发送数据,另一端用于接收.两端通常由两个不同的线程拥有.但是,虽然Unix管道用于发送字节,但通道用于发送Rust值.sender.send(item)将单个值放入通道;receiver.recv()删除一个.所有权从发送线程转移到接收线程.如果通道为空,则receiver.recv()将阻塞,直到发送一个值.

图19-5. 字符串的通道.字符串msg的所有权从线程1传输到线程2.

通过通道,线程可以通过将值传递给彼此来进行通信.这是线程在不使用锁或共享内存的情况下协同工作的一种非常简单的方法.

这不是一种新技术.Erlang已经有孤立的进程和消息传递30年了.Unix管道已存在近50年.我们倾向于认为管道提供了灵活性和可组合性,而不是并发性,但事实上,它们完成了上述所有工作.Unix管道的一个例子如图19-6所示.所有三个程序当然可以同时工作.

图19-6. Unix管道的执行.

Rust通道比Unix管道快.发送值会移动而不是复制它,即使移动包含许多兆字节数据的数据结构,移动也很快.

发送值(Sending Values)

在接下来的几节中,我们将使用通道构建一个并发程序,创建一个 倒排索引(inverted index) ,这是搜索引擎的关键组成部分之一.每个搜索引擎都在特定的文档集合上工作.倒排索引是指示哪些单词出现在哪里的数据库.

我们将展示与线程和通道有关的代码部分.完整的程序可以在https://github.com/ProgrammingRust/fingertips找到.它很简短,大约有一千行代码.

我们的程序结构为管道,如图19-7所示.管道只是使用通道的众多方法之一--我们将在稍后讨论其他一些用法--但它们是将并发引入现有单线程程序的简单方法.

图19-7. 索引构建器管道.箭头表示通过通道从一个线程发送到另一个线程的值.磁盘I/O未显示.

我们将使用总共5个线程,每个线程执行不同的任务.每个线程在程序的生命周期内不断产生输出.例如,第一个线程只是将源文档从磁盘一个接一个地读入内存.(我们想要一个线程来执行此操作,因为我们将使用File::openread_to_string编写最简单的代码,它们是阻塞API.我们不希望CPU在磁盘工作时处于空闲状态.)此阶段的输出是每个文档一个长String.因此该线程通过String的通道连接到下一个线程.

我们的程序将通过生成读取文件的线程开始.假设documentVec<PathBuf>,它是文件名的向量.启动我们的文件读取线程的代码如下所示:

use std::fs::File;
use std::io::prelude::*;  // for `Read::read_to_string`
use std::thread::spawn;
use std::sync::mpsc::channel;

let (sender, receiver) = channel();

let handle = spawn(move || {
    for filename in documents {
        let mut f = File::open(filename)?;
        let mut text = String::new();
        f.read_to_string(&mut text)?;

        if sender.send(text).is_err() {
            break;
        }
    }
    Ok(())
});

通道是std::sync::mpsc模块的一部分.我们稍后将解释这个名字意味着什么;首先,让我们看看这段代码是如何工作的.我们首先创建一个通道:

let(sender, receiver) = channel();

channel函数返回一对值:发送者和接收者.底层队列数据结构是标准库不公开的实现细节.

通道是有类型的.我们将使用此通道发送每个文件的文本,因此我们有一个Sender<String>类型的sender和一个Receiver<String>类型的receiver.我们可以通过编写channel::<String>()显式地要一个字符串通道.相反,我们让Rust的类型推断推断出来.

let handle = spawn(move || {

和以前一样,我们使用std::thread::spawn来启动一个线程.sender(而不是receiver)的所有权通过此move闭包转移到新线程.

接下来的几行代码只是从磁盘读取文件:

for filename in documents {
    let mut f = File::open(filename)?;
    let mut text = String::new();
    f.read_to_string(&mut text)?;

成功读取文件后,我们将其文本发送到通道:

    if sender.send(text).is_err() {
        break;
    }
}

sender.send(text)将值text移动到通道中.最终,它将再次移动到接收值的一方.无论text是包含10行文本还是10兆字节,此操作都会复制三个机器字(String的大小),相应的receiver.recv()调用也将复制三个机器字.

sendrecv方法都返回Result,但只有在删除了通道的另一端时,这些方法才会失败.如果Receiver被删除,则send调用将失败,因为否则该值将永远位于通道中:没有Receiver,任何线程都无法接收它.同样,如果没有值在通道中等待并且Sender已被删除,则recv调用将失败,因为否则recv将永远等待:没有Sender,任何线程都无法发送下一个值.删除通道的端是"挂断(hanging up)"的正常方式,当你完成时关闭连接.

在我们的代码中,仅当接收者的线程提前退出时,sender.send(text)才会失败.这是使用通道的代码的典型情况.无论是故意发生还是由于错误,我们的读取器线程安静地关闭自己是可以的.

当发生这种情况,或者线程完成读取所有文档时,它返回Ok(()):

    Ok(())
});

请注意,此闭包返回Result.如果线程遇到I/O错误,它会立即退出,并且错误存储在线程的JoinHandle中.

当然,就像任何其他编程语言一样,Rust在错误处理方面也有许多其他的可能性.当发生错误时,我们可以使用println!将其打印出来,然后继续下一个文件.我们可以通过我们用于数据的相同通道传递错误,使其成为Result的通道--或者创建第二个通道仅用于错误.我们在这里选择的方法既轻量级又负责任:我们可以使用?运算符,所以没有一堆样板代码,甚至是Java中可能看到的显式的try/catch;然而错误不会默默地传递.

为方便起见,我们的程序将所有这些代码包装在一个函数中,该函数返回receiver(我们还没有使用)和新线程的JoinHandle:

fn start_file_reader_thread(documents: Vec<PathBuf>)
    -> (Receiver<String>, JoinHandle<io::Result<()>>)
{
    let (sender, receiver) = channel();

    let handle = spawn(move || {
        ...
    });

    (receiver, handle)
}

请注意,此函数启动新线程并立即返回.我们将为管道的每个阶段编写这样的函数.

接收值(Receiving Values)

现在我们有一个线程运行一个发送值的循环.我们可以生成第二个线程,运行一个调用receiver.recv()的循环:

while let Ok(text) = receiver.recv() {
    do_something_with(text);
}

Receiver是可迭代的,所以有一个更好的方法来写这个:

for text in receiver {
    do_something_with(text);
}

这两个循环是等价的.无论我们以哪种方式编写它,如果当控制到达循环顶部时通道恰好为空,接收线程将阻塞,直到某个其他线程发送一个值.当通道为空且Sender已被丢弃时,循环将正常退出.在我们的程序中,这在读取器线程退出时自然发生.该线程正在运行一个拥有变量sender的闭包;当闭包退出时,sender被删除.

现在我们可以为管道的第二阶段编写代码:

fn start_file_indexing_thread(texts: Receiver<String>)
    -> (Receiver<InMemoryIndex>, JoinHandle<()>)
{
    let (sender, receiver) = channel();
    let handle = spawn(move || {
        for (doc_id, text) in texts.into_iter().enumerate() {
            let index = InMemoryIndex::from_single_document(doc_id, text);
            if sender.send(index).is_err() {
                break;
            }
        }
    });
    (receiver, handle)
}

此函数生成一个线程,该线程从一个通道(texts)接收String值,并将InMemoryIndex值发送到另一个通道(sender/receiver).该线程的工作是获取第一阶段中加载的每个文件,并将每个文档转换为一个单文件,内存中的倒排索引.

这个线程的主循环很简单.索引文档的所有工作都是由函数make_single_file_index完成的.我们不会在这里显示它的源代码,但是这是一个简单的问题,即沿着字边界分割输入字符串,然后生成从单词到位置列表的映射.

此阶段不执行I/O,因此它不必处理io::Error.而是io::Result<()>,它返回().

运行管道(Running the Pipeline)

其余三个阶段的设计相似.每个都消费前一阶段创建的Receiver.我们对管道的其余部分目标是将所有小索引合并到磁盘上的单个大索引文件中.我们发现这样做的最快方法分为三个阶段.我们不会在这里显示代码,只显示这三个函数的类型签名.完整的资源在网上.

首先,我们合并内存中的索引,直到它们变得庞大(第3阶段):

fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
    -> (Receiver<InMemoryIndex>, JoinHandle<()>)

我们将这些大型索引写入磁盘(第4阶段):

fn start_index_writer_thread(big_indexes: Receiver<InMemoryIndex>,output_dir: &Path)
    -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)

最后,如果我们有多个大文件,我们使用基于文件的合并算法合并它们(第5阶段):

fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
    -> io::Result<()>

最后一个阶段不会返回Receiver,因为它是该行的结尾.它在磁盘上生成单个输出文件.它不返回JoinHandle,因为我们不打算为这个阶段产生一个线程.这项工作是在调用者的线程上完成的.

现在我们来到启动线程并检查错误的代码:

fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
    -> io::Result<()>
{
    // Launch all five stages of the pipeline.
    let (texts,   h1) = start_file_reader_thread(documents);
    let (pints,   h2) = start_file_indexing_thread(texts);
    let (gallons, h3) = start_in_memory_merge_thread(pints);
    let (files,   h4) = start_index_writer_thread(gallons, &output_dir);
    let result = merge_index_files(files, &output_dir);

    // Wait for threads to finish, holding on to any errors that they encounter.
    let r1 = h1.join().unwrap();
    h2.join().unwrap();
    h3.join().unwrap();
    let r4 = h4.join().unwrap();
    // Return the first error encountered, if any.
    // (As it happens, h2 and h3 can't fail: those threads
    // are pure in-memory data processing.)
    r1?;
    r4?;
    result
}

和以前一样,我们使用.join().unwrap()将恐慌从子线程显式传播到主线程.这里唯一不寻常的是,没有马上使用?,我们将io::Result值放在一边,直到我们加入所有四个线程.

此管道比单线程等效产品快40%.这对于一个下午的工作来说并不坏,但是我们在为Mandelbrot程序获得的675%的提升之后,看起来就微不足道.我们显然没有使系统的I/O容量或所有CPU核心饱和.这是怎么回事?

管道就像制造工厂中的装配线:性能受到最慢阶段的吞吐量的限制.一条全新的,未经调优的装配线可能与单位生产一样慢,但装配线可以增强有针对性的调优.在我们的例子中,测量表明第二阶段是瓶颈.我们的索引线程使用.to_lowercase().is_alphanumeric(),所以它在Unicode表中花了很多时间.索引下游的其他阶段花费大部分时间在Receiver::recv中休眠,等待输入.

这意味着我们应该能够更快.在解决瓶颈问题时,并行度将会提高.既然您已经知道如何使用通道,并且我们的程序是由独立的代码片段组成的,那么很容易找到解决第一个瓶颈的方法.我们可以手动优化第二阶段的代码,就像任何其他代码一样;把工作分成两个或更多个阶段;或者同时运行多个文件索引线程.

通道特征和性能(Channel Features and Performance)

std::sync::mpscmpsc部分代表 多生产者,单一消费者(multi-producer, single-consumer) ,简要描述Rust的渠道提供的通信类型.

我们的示例程序中的通道将值从单个发送者传送到单个接收者.这是一个相当普遍的情况.但Rust通道也支持多个发送者,如果你需要,例如,一个处理来自许多客户端线程的请求的线程,如图19-8所示.

图19-8. 单个通道接收来自许多发送者的请求.

Sender<T>实现Clonetrait.要获得具有多个发送者的通道,只需创建常规通道并根据需要多次克隆发送者.你可以将每个Sender值移动到其他线程.

无法克隆Receiver<T>,因此如果需要多个线程从同一通道接收值,则需要Mutex.我们将在本章后面介绍如何执行此操作.

Rust通道经过精心优化.首次创建通道时,Rust使用特殊的"一次性(one-shot)"队列实现.如果你只通过通道发送一个对象,则开销很小.如果发送第二个值,Rust将切换到不同的队列实现.它正在长期稳定下来,实际上,准备通道传输许多值,同时最大限度地减少分配开销.如果你克隆了Sender,那么Rust必须依赖于另一个实现,当多个线程试图同时发送值时,这个实现是安全的.但即使这三个实现中最慢的一个是无锁队列,因此发送或接收一个值最多只需几个原子操作和一个堆分配,再加上移动本身.仅当队列为空且接收线程因此需要将其自身置于休眠状态时才需要系统调用.在这种情况下,当然,通过你的通道的流量无论如何都不会超出.

尽管所有优化工作都存在,但是有一个错误使应用程序很容易围绕通道性能:发送值比接收和处理它们更快.这导致不断增长的积压值在通道中积累.例如,在我们的程序,我们发现文件读取器线程(阶段1)可以加载文件比文件索引线程(阶段2)索引它们快得多.结果是,将从磁盘读取数百兆字节的原始数据,并立即将其填充到队列中.

这种不当行为会花费内存,伤害位置.更糟糕的是,发送线程一直在运行,当接收端最需要这些资源时,它会耗尽CPU和其他系统资源来发送更多的值.

在这里,Rust再次向Unix管道学习.Unix使用一个优雅的技巧来提供一些背压(backpressure),以便快速发送者被迫减速:Unix系统上的每个管道都有一个固定的大小,如果一个进程试图写入一个暂时满的管道,系统就会阻塞该进程直到管道中有空间.Rust等效称为 同步通道(synchronous channel) .

use std::sync::mpsc::sync_channel;

let (sender, receiver) = sync_channel(1000);

同步通道与常规通道完全相同,只是在创建它时,你可以指定它可以容纳多少个值.对于同步通道,sender.send(value)可能是阻塞操作.毕竟,这个想法是阻塞并不总是坏事.在我们的示例程序中,将start_file_reader_thread中的channel更改为具有32个值的空间的sync_channel会将我们的基准数据集上的内存使用量减少三分之二,而不会降低吞吐量.

线程安全:Send和Sync(Thread Safety: Send and Sync)

到目前为止,我们一直表现得好像所有值都可以在线程之间自由移动和共享.这基本上是正确的,但Rust的完整线程安全故事取决于两个内置trait,std::marker::Sendstd::marker::Sync.

  • 实现Send的类型可以安全地通过值传递给另一个线程.它们可以跨线程移动.

  • 实现Sync的类型可以安全地通过非mut引用传递给另一个线程.它们可以跨线程共享.

在这里 安全(safe) ,我们的意思是我们永远的意思:没有数据竞争和其他未定义行为.

例如,在第461页的process_files_in_parallel示例中,我们使用闭包将Vec<String>从父线程传递到每个子线程.我们当时没有指出它,但这意味着向量及其字符串在父线程中分配,但在子线程中释放.Vec<String>实现Send的事实是一个API承诺,这是正常的:VecString内部使用的分配器是线程安全的.

(如果你使用快速但非线程安全的分配器编写自己的VecString类型,则必须使用非Send类型实现它们,例如不安全的指针.Rust会推断出你的NonThreadSafeVecNonThreadSafeString类型不是Send并限制它们单线程使用.但这是一种罕见的情况.)

如图19-9所示,大多数类型都是SendSync.你甚至不必使用#[derive]来获取程序中结构和枚举的这些trait.Rust为你做了.如果结构或枚举的字段为Send,则它为Send;如果其字段为Sync,则它为Sync.

图19-9. Send和Sync类型.

少数不是SnedSync的类型主要是那些以非线程安全的方式使用可变性的类型.例如.考虑std::rc::Rc<T>,即引用计数智能指针的类型.

如果你可以跨线程共享一个Rc<String>会发生什么?如果两个线程碰巧尝试同时克隆Rc,如图19-10所示,我们有一个数据竞争,因为两个线程都会增加共享引用计数.引用计数可能变得不准确,导致后续释放后使用(use-after-free)或双重释放(double free)--未定义行为.

图19-10. 为什么Rc<String>既不是Sync也不是Send.

当然,Rust阻止了这一点.以下是设置此数据竞争的代码:

use std::thread::spawn;
use std::rc::Rc;

fn main() {
    let rc1 = Rc::new("hello threads".to_string());
    let rc2 = rc1.clone();
    spawn(move || {  // error
        rc2.clone();
    });
    rc1.clone();
}

Rust拒绝编译它,给出了详细的错误消息:

error[E0277]: the trait bound `Rc<String>: std::marker::Send` is not satisfied
              in `[closure@...]`
  --> concurrency_send_rc.rs:10:5
   |
10 |     spawn(move || {  // error
   |     ^^^^^ within `[closure@...]`, the trait `std::marker::Send` is not
   |           implemented for `Rc<String>`
   |
   = note: `Rc<String>` cannot be sent between threads safely
   = note: required because it appears within the type `[closure@...]`
   = note: required by `std::thread::spawn`

现在你可以看到SendSync如何帮助Rust强制执行线程安全.它们在诸如spawn之类的函数的类型签名中显示为跨线程边界传输数据的限制.当你spawn一个线程时,你传递的闭包必须是Send,这意味着它包含的所有值必须是Send.同样,如果你尝试通过通道将值发送到另一个线程,则值必须为Send.

将几乎任何迭代器传送到通道(Piping Almost Any Iterator to a Channel)

我们的倒排索引构建器构建为管道.代码很清楚,但它让我们手动设置通道和启动线程.相比之下,我们在第15章中构建的迭代器管道似乎只需要几行代码就可以完成更多的工作.我们可以为线程管道构建类似的东西吗?

事实上,如果我们可以统一迭代器管道和线程管道,那将是很好的.然后我们的索引构建器可以编写为迭代器管道.它可能会像这样开始:

documents.into_iter()
    .map(read_whole_file)
    .errors_to(error_sender)   // filter out error results
    .off_thread()              // spawn a thread for the above work
    .map(make_single_file_index)
    .off_thread()              // spawn another thread for stage 2
    ...

Traits允许我们向标准库类型添加方法,因此我们可以实际执行此操作.我们首先编写一个声明我们想要的方法的trait:

use std::sync::mpsc;

pub trait OffThreadExt: Iterator {
    /// Transform this iterator into an off-thread iterator: the
    /// `next()` calls happen on a separate worker thread, so the
    /// iterator and the body of your loop run concurrently.
    fn off_thread(self) -> mpsc::IntoIter<Self::Item>;
}

然后我们为迭代器类型实现这个特性.它有助于mpsc::Receiver已经可迭代.

use std::thread::spawn;

impl<T> OffThreadExt for T
    where T: Iterator + Send + 'static,
          T::Item: Send + 'static
{
    fn off_thread(self) -> mpsc::IntoIter<Self::Item> {
        // Create a channel to transfer items from the worker thread.
        let (sender, receiver) = mpsc::sync_channel(1024);

        // Move this iterator to a new worker thread and run it there.
        spawn(move || {
            for item in self {
                if sender.send(item).is_err() {
                    break;
                }
            }
        });

        // Return an iterator that pulls values from the channel.
        receiver.into_iter()
    }
}

此代码中的where子句是通过类似于第260页的"逆向工程限制(Reverse-Engineering Bounds)"中所述的过程确定的.起初,我们只是这样:

impl<T: Iterator> OffThreadExt for T

也就是说,我们希望实现适用于所有迭代器.Rust没有.因为我们使用spawn将类型为T的迭代器移动到新线程,所以我们必须指定T: Iterator + Send +'static.因为我们通过通道发回项,所以我们必须指定T::Item:Send +'static.有了这些更改,Rust很满意. 简而言之,这就是Rust的特点:我们可以自由地为语言中的几乎所有迭代器添加一个并发功能工具--但必须首先了解并记录使其安全使用的限制.

超越管道(Beyond Pipelines)

在本节中,我们使用管道作为示例,因为管道是使用通道的一种很好的,明显的方式.每个人都理解他们.它们是具体的,实用的和确定的.然而,通道不仅仅适用于管道.它们也是为同一进程中的其他线程提供任何异步服务的一种快速,简便的方法.

例如,假设你想要在自己的线程上进行日志记录,如图19-8所示.其他线程可以通过通道向日志记录线程发送日志消息;由于你可以克隆通道的Sender,因此许多客户端线程可以拥有将日志消息发送到同一日志记录线程的发送者.

在自己的线程上运行类似于日志记录的服务具有优势.日志记录线程可以在需要时旋转日志文件.它不必与其他线程进行任何花哨的协调.这些线程不会被阻塞.消息将在通道中无害地累积片刻,直到日志记录线程恢复工作.

通道也可用于一个线程向另一个线程发送请求并需要获得某种响应的情况.第一个线程的请求可以是包含Sender的结构或元组,这是一种自寻址的信封,第二个线程使用它发送回复.这并不意味着交互必须是同步的.第一个线程决定是阻塞并等待响应,还是使用.try_recv()方法进行轮询.

到目前为止我们提供的工具--用于高度并行计算的fork-join,用于松散连接组件的通道--足以满足各种应用.但我们还没有结束.

共享可变状态(Shared Mutable State)

自你在第8章中发布fern_simcrate以来的几个月里,你的蕨类模拟软件已经真正起飞.现在,你正在创建一个多人实时战略游戏,其中八个玩家在模拟的侏罗纪景观中比赛种植大部分真实周期的蕨类植物.这个游戏的服务器是一个大规模并行的应用程序,请求涌入许多线程.在八个玩家可用时,这些线程如何协调开始游戏?

这里要解决的问题是,许多线程需要访问等待加入游戏的玩家的共享列表.这些数据必须是可变的,并且在所有线程之间共享.如果Rust没有共享的可变状态,那么我们该怎么办?

你可以通过创建一个新线程来解决这个问题,该线程的全部工作就是管理此列表.其他线程将通过通道与之通信.当然,这要花费一个线程,会有一些操作系统开销.

另一种选择是使用Rust提供的工具来安全地共享可变数据.这样的东西确实存在.它们是低级原语,对于使用线程的任何系统程序员来说都是熟悉的.在本节中,我们将介绍互斥锁(mutexes),读/写锁(read/write locks),条件变量(condition variables)和原子整数(atomic integers).最后,我们将展示如何在Rust中实现全局可变变量.

什么是互斥锁?(What Is a Mutex?)

互斥锁(mutex) (或 锁(lock) )用于在访问某些数据时强制多个线程轮流.我们将在下一节介绍Rust的互斥锁.首先,回顾一下互斥锁在其他语言中的样子是有意义的.在C++中简单使用互斥锁可能如下所示:

// C++ code, not Rust
void FernEngine::JoinWaitingList(PlayerId player) {
    mutex.Acquire();

    waitingList.push_back(player);

    // Start a game if we have enough players waiting.
    if (waitingList.length() >= GAME_SIZE) {
        vector<PlayerId> players;
        waitingList.swap(players);
        StartGame(players);
    }

    mutex.Release();
}

调用mutex.Acquire()mutex.Release()标记此代码中 临界区(critical section) 的开头和结尾.对于程序中的每个mutex,一次只能在一个临界区内运行一个线程.如果一个线程在临界区中,则调用mutex.Acquire()的所有其他线程将阻塞,直到第一个线程到达mutex.Release().

我们说互斥锁 保护(protects) 数据:在这种情况下,mutex保护waitingList.但是,程序员有责任确保每个线程在访问数据之前始终获取互斥锁,然后将其释放.

互斥锁有用有几个原因:

  • 它们可以防止 数据竞争(data races) ,即竞争线程并发地读取和写入相同内存的情况.数据竞争在C++和Go中是未定义行为.像Java和C#这样的托管语言承诺不会崩溃,但数据竞争的结果仍然(总而言之)是无意义的.

  • 即使数据竞争不存在,即使所有读取和写入按程序顺序逐个发生,没有互斥锁,不同线程的操作可能以任意方式交错.想象一下,即使其他线程在运行时修改其数据,也要尝试编写有效的代码.想象一下试图调试它.这就像你的程序闹鬼一样.

  • 互斥锁支持使用 不变量(invariants) 编程,有关受保护数据的规则,当你按每个临界区进行设置和维护时,这些规则是构造为真的.

当然,所有这些都是出于同样的原因:不受控制的竞争条件使得编程变得棘手.互斥锁为混乱带来了一些秩序(虽然没有通道道或fork-join那么有序).

但是,在大多数语言中,互斥锁很容易搞砸.在C++中,与大多数语言一样,数据和锁是独立的对象.理想情况下,注释说明每个线程在访问数据之前都必须获取互斥锁:

class FernEmpireApp {
    ...

private:
    // List of players waiting to join a game. Protected by `mutex`.
    vector<PlayerId> waitingList;

    // Lock to acquire before reading or writing `waitingList`.
    Mutex mutex;
    ...
};

但是,即使有这么好的注释,编译器也无法在这里强制安全访问.当一段代码忽略了获取互斥锁时,我们就会得到未定义行为.实际上,这意味着很难重现和修复的bug.

即使在Java中,对象和互斥锁之间存在一些名义上的关联,这种关系也不会非常深入.编译器不会尝试强制执行它,在实践中,受锁保护的数据很少是关联对象的字段.它通常包含多个对象中的数据.锁方案仍然很棘手.注释仍然是执行它们的主要工具.

Mutex<T>(Mutex<T>)

现在我们将展示Rust中等待列表的实现.在我们的Fern Empire游戏服务器中,每个玩家都有一个唯一的ID:

type PlayerId = u32;

等待列表只是一个玩家的集合:

const GAME_SIZE: usize = 8;

/// A waiting list never grows to more than GAME_SIZE players.
type WaitingList = Vec<PlayerId>;

等待列表存储为FernEmpireApp的字段,这是在服务器启动期间在Arc中设置的单例.每个线程都有一个指向它的Arc.它包含我们程序所需的所有共享配置和其他东西.其中大多数是只读的.由于等待列表既是共享的又是可变的,因此它必须受到Mutex的保护:

use std::sync::Mutex;

/// All threads have shared access to this big context struct.
struct FernEmpireApp {
    ...
    waiting_list: Mutex<WaitingList>,
    ...
}

与C++不同,在Rust中,受保护的数据存储在Mutex 中(inside).设置Mutex看起来像这样:

let app = Arc::new(FernEmpireApp {
    ...
    waiting_list: Mutex::new(vec![]),
    ...
});

创建一个新的Mutex看起来像创建一个新的BoxArc,但BoxArc表示堆分配,Mutex只是关于锁.如果你想在堆中分配你的Mutex,你必须这样说,就像我们在这里所做的那样,整个app使用Arc::new,而受保护数据使用Mutex::new.这些类型通常一起使用:Arc可以方便地跨线程共享内容,而Mutex对于跨线程共享的可变数据非常方便.

现在我们可以实现使用互斥锁的join_waiting_list方法:

impl FernEmpireApp {
    /// Add a player to the waiting list for the next game.
    /// Start a new game immediately if enough players are waiting.
    fn join_waiting_list(&self, player: PlayerId) {
        // Lock the mutex and gain access to the data inside.
        // The scope of `guard` is a critical section.
        let mut guard = self.waiting_list.lock().unwrap();

        // Now do the game logic.
        guard.push(player);
        if guard.len() == GAME_SIZE {
            let players = guard.split_off(0);
            self.start_game(players);
        }
    }
}

获取数据的唯一方法是调用.lock()方法:

let mut guard = self.waiting_list.lock().unwrap();

self.waiting_list.lock()阻塞,直到可以获得互斥锁.此方法调用返回的MutexGuard<WaitingList>值是&mut WaitingList的薄包装器.感谢解引用强制(第289页讨论过),我们可以直接在guard上调用WaitingList方法:

guard.push(player);

guard甚至允许我们直接借用对底层数据的引用.Rust的生命周期系统确保这些引用不会比guard本身活得更久.如果不持有锁,就无法访问Mutex中的数据.

guard时删除,锁被释放.通常情况发生在块的末尾,但你也可以手动删除它:

if guard.len() == GAME_SIZE {
    let players = guard.split_off(0);
    drop(guard);  // don't keep the list locked while starting a game
    self.start_game(players);
}

mut和Mutex(mut and Mutex)

这可能看起来很奇怪--当然,一开始对我们来说也很奇怪--我们的join_waiting_list方法不会通过mut引用来获取self.它的类型签名是:

fn join_waiting_list(&self, player: PlayerId)

当你调用其push方法时,底层集合Vec<PlayerId> 确实(does) 需要mut引用.它的类型签名是:

pub fn push(&mut self, item: T)

然而,这段代码编译并运行良好.这是怎么回事?

在Rust中,mut表示 独占访问(exclusive access) .非mut意味着 共享访问(shared access) .

我们习惯于将mut访问类型从父级传递到子级,从容器传递到内容.你只希望能够在starships[id].engine上调用mut方法,如果你一开始有一个starshipsmut引用(或者你拥有starships,在这种情况下祝贺你成为Elon Musk).这是默认的,因为如果你没有父级的独占访问,Rust通常无法确保你有对子级的独占访问.

但是Mutex确实有办法:锁.实际上,互斥锁只不过是一种方法,可以提供对内部数据的 独占(exclusive) (mut)访问,即使许多线程可能对Mutex本身有 共享(shared) (非mut)访问.

Rust的类型系统告诉我们Mutex的作用.它动态地强制执行独占访问,这通常是在编译时由Rust编译器静态地完成的.

(你可能还记得std::cell::RefCell也是这样做的,只是没有尝试支持多线程.MutexRefCell都是内部可变性的风格,我们在第205页介绍了这一点.)

为什么互斥锁并不总是一个好主意(Why Mutexes Are Not Always a Good Idea)

在我们开始使用互斥锁之前,我们介绍了一些并发方法,如果你来自C++,它们可能看起来非常容易正确地使用.这并非巧合:这些方法旨在为并发编程中最令人困惑的方面提供强有力的保证.专门使用fork-join并行性的程序是确定性的,不会死锁(deadlock).使用通道的程序几乎同样表现良好.那些专门用于流水线操作的通道,比如我们的索引构建器,是确定性的:消息传递的时间可能会有所不同,但不会影响输出.等等.关于多线程程序的保证很好!

Rust的Mutex的设计几乎肯定会让你比以前更系统,更明智地使用互斥锁.但值得停下来思考的是,Rust的安全保证能和不能帮助什么.

安全Rust代码不能触发 数据竞争(data race) ,这是一种特定类型的bug,多个线程并发读取和写入相同的内存,产生无意义的结果.这很好:数据竞争总是bug,在真正的多线程程序中并不罕见.

但是,使用互斥锁的线程会遇到其他一些问题,Rust不会为你修复这些问题:

  • 有效的Rust程序不能有数据竞争,但是它们仍然可以有其他 竞争条件(race conditions) --在这种情况下,程序的行为取决于线程之间的时间,因此可能因运行而异.一些竞争条件是良性的.有些表现为一般的片状和难以置信的难以修复的bug.以非结构化方式使用互斥锁会引发竞争条件.你应该确保它们是良性的.

  • 共享可变状态也会影响程序设计.在通道作为代码中的抽象边界的情况下,可以容易地将独立的组件分开进行测试,互斥锁则鼓励使用"只需添加方法(just-add-a-method)"的工作方式,这可能会导致一团糟的相关联的代码.

  • 最后,互斥锁并不像它们一开始看起来那么简单,接下来的两个部分将会显示这一点.

所有这些问题都是工具固有的.尽可能使用更结构化的方法;必要时使用Mutex.

死锁(Deadlock)

一个线程可以通过尝试获取它已经持有的锁来死锁自己:

let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap();  // deadlock

假设第一个调用self.waiting_list.lock()成功,获取锁.第二个调用看到锁被保持,因此它会阻塞,等待它被释放.它将永远等待.等待线程是持有锁的线程.

换句话说,Mutex中的锁不是递归锁(recursive lock).

这里的bug显而易见.在实际程序中,两个lock()调用可能在两个不同的方法中,其中一个调用另一个方法.单独采用的每个方法的代码看起来都很好.还有其他方法可以实现死锁,涉及多个线程,每个线程一次获取多个互斥锁.Rust的借用系统无法保护你免受死锁.最好的保护措施是保持临界区的小型化:进入,完成工作,然后离开.

它也可能与通道陷入死锁.例如,两个线程可能会阻塞,每个线程都在等待从另一个线程接收消息.然而,再次,良好的程序设计可以让你高度放心,这在实践中不会发生.在管道中,就像我们的反向索引构建器一样,数据流是非循环的.在这样的程序中,死锁与在Unix shell管道中一样不可能.

中毒的互斥锁(Poisoned Mutexes)

Mutex::lock()返回一个Result,原因与JoinHandle::join()相同:如果另一个线程发生恐慌,则优雅地失败.当我们编写handle.join().unwrap()时,我们告诉Rust将恐慌从一个线程传播到另一个线程.习语mutex.lock().unwrap()是类似的.

如果一个线程在持有Mutex时发生恐慌,Rust会将Mutex标记为 中毒(poisoned) .任何后续lock中毒的Mutex的尝试都会收到错误结果.我们的.unwrap()调用告诉Rust如果发生这种情况会引起恐慌,将恐慌从其他线程传播到此线程.

中毒的互斥锁有多糟糕?毒药听起来很致命,但这种情况并不一定是致命的.正如我们在第7章中所说,恐慌是安全的.一个恐慌线程使程序的其余部分处于安全状态.

因此,互斥锁在恐慌中中毒的原因并不是因为害怕未定义行为.相反,关注的是你可能已经使用不变量进行编程.由于你的程序在没有完成它正在做的事情的情况下发生恐慌并从一个临界区退出,可能已经更新了受保护数据的某些字段而不是其他,因此不变量现在可能已被破坏.Rust会使互斥锁中毒,以防止其他线程在无意中陷入这种破碎的情况,并使其变得更糟.你仍然 可以(can) 锁定一个有毒的互斥锁并访问里面的数据,完全执行互斥;请参阅PoisonError::into_inner()的文档.但你不会偶然做到这一点.

使用互斥锁的多生产者通道(Multi-producer Channels Using Mutexes)

我们之前提到,Rust的通道是多生产者,单一消费者.或者更具体地说,一个通道只有一个Receiver.我们不能有一个线程池,其中许多线程使用单个mpsc通道作为共享工作列表.

然而,事实证明,只使用标准库部件有一个非常简单的解决方法.我们可以在Receiver周围添加一个Mutex并分享它.这是一个这样做的模块:

pub mod shared_channel {
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc::{channel, Sender, Receiver};

    /// A thread-safe wrapper around a `Receiver`.
    #[derive(Clone)]
    pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);

    impl<T> Iterator for SharedReceiver <T> {
        type Item = T;

        /// Get the next item from the wrapped receiver.
        fn next(&mut self) -> Option<T> {
            let guard = self.0.lock().unwrap();
            guard.recv().ok()
        }
    }

    /// Create a new channel whose receiver can be shared across threads.
    /// This returns a sender and a receiver, just like the stdlib's
    /// `channel()`, and sometimes works as a drop-in replacement.
    pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
        let (sender, receiver) = channel();
        (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
    }
}

我们正在使用Arc<Mutex<Receiver<T>>>.泛型真的堆积如山.这种情况在Rust中比在C++中更常发生.看起来这可能会让人感到困惑,但通常,在这种情况下,只需读取名称即可用简单的英语表达含义:

图19-11.

读/写锁(Read/Write Locks (RwLock<T>))

现在让我们从互斥锁转向Rust的标准库工具包std::sync中提供的其他线程同步工具.我们会迅速采取行动,因为对这些工具的完整讨论超出了本书的范围.

服务器程序通常具有一次加载且很少更改的配置信息.大多数线程只查询配置,但由于配置 可以(can) 更改--可能要求服务器从磁盘重新加载其配置,例如--它必须受到锁的保护.在这种情况下,互斥锁可以工作,但这是一个不必要的瓶颈.如果配置没有改变,线程不应该轮流查询配置.这是 读/写锁(read/write lock)RwLock的情况.

虽然互斥锁具有单个lock方法,但读/写锁定具有两种锁定方法,即readwrite.RwLock::write方法类似于Mutex::lock.它等待独占,对受保护数据的mut访问.RwLock::read方法提供非mut访问,其优点是不太可能必须等待,因为许多线程可以安全地同时读取.使用互斥锁,在任何给定时刻,受保护数据只有一个读取器或写入器(或者没有).使用读/写锁,它可以有一个写入器或许多读取器,就像Rust引用一般.

FernEmpireApp可能有一个配置的结构,受RwLock保护:

use std::sync::RwLock;

struct FernEmpireApp {
    ...
    config: RwLock<AppConfig>,
    ...
}

读取配置的方法将使用RwLock::read():

// True if experimental fungus code should be used.
fn mushrooms_enabled(&self) -> bool {
    let config_guard = self.config.read().unwrap();
    config_guard.mushrooms_enabled
}

重新加载配置的方法将使用RwLock::write():

fn reload_config(&self) -> io::Result<()> {
    let new_config = AppConfig::load()?;
    let mut config_guard = self.config.write().unwrap();
    *config_guard= new_config;
    Ok(())
}

当然,Rust非常适合强制执行RwLock数据的安全规则.单写入器或多读取器(single-writer-or-multiple-reader)概念是Rust借用系统的核心.self.config.read()返回一个守卫(guard),提供对AppConfig的非mut(共享)访问;self.config.write()返回提供mut(独占)访问的不同类型的守卫(guard).

条件变量(CondVar))(Condition Variables (Condvar))

通常,线程需要等待直到某个条件成立为止:

  • 在服务器关闭期间,主线程可能需要等待所有其他线程完成退出.

  • 当工作线程无事做时,需要等待直到有一些数据要处理.

  • 实现分布式共识协议的线程可能需要等待直到法定数量的对等方响应.

有时,对于我们想要等待的确切条件,有一个方便的阻塞API,例如JoinHandle::join用于服务器关闭示例.在其他情况下,没有内置的阻塞API.程序可以使用 条件变量(condition variables) 来构建自己的.在Rust中,std::sync::Condvar类型实现条件变量.Condvar.wait().notify_all()方法;.wait()阻塞,直到其他一些线程调用.notify_all().

除此之外还有更多的内容,因为条件变量总是关于某个特定Mutex保护的数据的特定真或假(true-or-false)条件.因此,MutexCondvar是相关的.完整的解释超出了我们的讨论范围,但是为了以前使用过条件变量的程序员的利益,我们将展示代码的两个关键部分.

当期望的条件成立时,我们调用Condvar::notify_all(或notify_one)来唤醒任何等待的线程:

self.has_data_condvar.notify_all();

要进入睡眠并等待条件成为真,我们使用Condvar::wait():

while !guard.has_data() {
    guard = self.has_data_condvar.wait(guard).unwrap();
}

这个while循环是条件变量的标准习惯用法.然而,Condvar::wait的签名是不寻常的.它通过值获取MutexGuard对象,使用它,并在成功时返回一个新的MutexGuard.这捕获了wait方法释放互斥锁的直觉,然后在返回之前重新获取它.通过值传递MutexGuard是一种说法,"我赐予你,.wait()方法,这是我释放互斥锁的独占权限(I bestow upon you, .wait()method, my exclusive authority to release the mutex)."

原子(Atomics)

std::sync::atomic模块包含用于无锁并发编程的原子类型.这些类型与标准C++原子基本相同:

  • AtomicIsizeAtomicUsize是对应于单线程isizeusize类型的共享整数类型.

  • AtomicBool是共享的bool值.

  • AtomicPtr<T>是不安全指针类型*mut T的共享值.

正确使用原子数据超出了本书的范围.可以说多个线程可以同时读取和写入原子值而不会导致数据竞争.

原子类型不用通常的算术和逻辑运算符,而是公开执行 原子操作(atomic operations) ,单独加载,存储,交换和算术运算的方法,这些方法作为一个单元安全地发生,即使其他线程也在执行触及相同内存位置的原子操作.增加一个名为atomAtomicIsize看起来像这样:

use std::sync::atomic::Ordering;

atom.fetch_add(1, Ordering::SeqCst);

这些方法可以编译为专门的机器语言指令.在x86-64体系结构中,此.fetch_add()调用编译为lock incq指令,其中普通的n += 1可能编译为普通的incq指令或该主题的任意数量的变体.Rust编译器还必须放弃围绕原子操作的一些优化,因为--与正常的加载或存储不同--它可以立即被其他线程合法地观察到.

参数Ordering::SeqCst是一个 内存排序(memory ordering) .内存排序类似于数据库中的事务隔离级别.他们告诉系统你有多关心这些哲学概念,如前面的效果和没有循环的时间,而不是表现.内存排序对程序的正确性至关重要,而且理解和推理它们很棘手.令人高兴的是,选择顺序一致性(最严格的内存排序)的性能损失通常非常低--与将SQL数据库置于SERIALIZABLE模式的性能损失不同.所以如有疑问,请使用Ordering::SeqCst.Rust继承了标准C++原子的几个其他内存排序,对于存在和时间有各种较弱的保证.我们不会在这里讨论它们.

原子的一个简单用法是取消.假设我们有一个正在进行长时间运行计算的线程,比如渲染视频,我们希望能够异步地取消它.问题是与我们希望它关闭的线程进行通信.我们可以通过共享的AtomicBool来做到这一点:

use std::sync::atomic::{AtomicBool, Ordering};

let cancel_flag = Arc::new(AtomicBool::new(false));
let worker_cancel_flag = cancel_flag.clone();

此代码创建两个Arc<AtomicBool>智能指针,指向同一堆分配的AtomicBool,其初始值为false.名为cancel_flag的第一个将保留在主线程中.第二个worker_cancel_flag将被移动到工作线程.

这是工作线程的代码:

let worker_handle = spawn(move || {
    for pixel in animation.pixels_mut() {
        render(pixel); // ray-tracing - this takes a few microseconds
        if worker_cancel_flag.load(Ordering::SeqCst) {
            return None;
        }
    }
    Some(animation)
});

渲染每个像素后,线程通过调用其.load()方法检查标志的值:

worker_cancel_flag.load(Ordering::SeqCst)

如果在主线程中我们决定取消工作线程,我们在AtomicBool中存储true,然后等待线程退出:

// Cancel rendering.
cancel_flag.store(true, Ordering::SeqCst);

// Discard the result, which is probably `None`.
worker_handle.join().unwrap();

当然,还有其他方法可以实现这一点.这里的AtomicBool可以替换为Mutex<bool>或通道.主要区别在于原子具有最小的开销.原子操作从不使用系统调用.加载或存储通常编译为单个CPU指令.

原子是一种内部可变性,类似MutexRwLock,因此它们的方法通过共享(非mut)接受self.这使它们可用作简单的全局变量.

全局变量(Global Variables)

假设我们正在编写网络代码.我们希望有一个全局变量,一个我们在每次提供数据包时递增的计数器:

// Number of packets the server has successfully handled.
static PACKETS_SERVED: usize = 0;

编译良好.只有一个问题.PACKETS_SERVED不可变,所以我们永远不能改变它.

Rust尽其所能阻止全局可变状态.用const声明的常量当然是不可变的.默认情况下,静态变量也是不可变的,因此无法获得对其的mut引用.static可以声明为mut,但是访问它是不安全的.Rust坚持线程安全是所有这些规则的主要原因.

全局可变状态也会产生令人遗憾的软件工程后果:它会使程序的各个部分更紧密地耦合,更难以测试,并且以后更难以更改.尽管如此,在某些情况下,没有合理的替代方案,所以我们最好找到一种安全的方式来声明可变的静态变量.

支持递增PACKETS_SERVED,同时保持线程安全的最简单方法是使其成为原子整数:

use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};

static PACKETS_SERVED: AtomicUsize = ATOMIC_USIZE_INIT;

常量ATOMIC_USIZE_INIT是一个值为0AtomicUsize.我们使用此常量而不是表达式AtomicUsize::new(0),因为静态的初始值必须是常量;从Rust 1.17开始,不允许进行方法调用.类似地,ATOMIC_ISIZE_INITAtomicIsize0,ATOMIC_BOOL_INIT是一个值为falseAtomicBool.

声明此静态后,递增数据包计数非常简单:

PACKETS_SERVED.fetch_add(1, Ordering::SeqCst);

原子全局变量仅限于简单整数和布尔值.尽管如此,创建任何其他类型的全局变量相当于解决相同的两个问题,这两个问题都很简单:

  • 必须以某种方式使变量成为线程安全的,否则它不能是全局的:为了安全起见,静态变量必须是Sync和非mut.

幸运的是,我们已经看到了解决这个问题的方法.Rust具有用于安全共享更改值的类型:Mutex,RwLock和原子类型.即使声明为非mut,也可以修改这些类型.这就是他们所做的.(参见第488页的"mut和Mutex(mut and Mutex)".)

  • 如上所述,静态初始化无法调用函数.这意味着声明静态Mutex的明显方法不起作用:
static HOSTNAME: Mutex<String> =
    Mutex::new(String::new());// error: function call in static

我们可以使用lazy_staticcrate来解决这个问题.

我们在第426页的"构建正则表达式Lazily(Building Regex Values Lazily)"中介绍了lazy_staticcrate.使用lazy_static!宏定义变量允许你使用任何你喜欢的表达式来初始化它;它在第一次解引用变量时运行,并为所有后续使用保存该值.

我们可以像这样用lazy_static声明一个全局Mutex:

#[macro_use] extern crate lazy_static;

use std::sync::Mutex;

lazy_static! {
    static ref HOSTNAME: Mutex<String> = Mutex::new(String::new());
}

相同的技术适用于RwLockAtomicPtr变量.

使用lazy_static!每次访问静态数据时都会产生很小的性能成本.该实现使用std::sync::Once,这是一个用于一次性初始化的低级同步原语.在幕后,每次访问惰性静态时,程序都会执行原子加载指令来检查是否已经发生了初始化.(Once是特殊目的,所以我们不会在这里详细介绍它.通常使用lazy_static!更方便.但是,它对于初始化非Rust库很方便;例如,请参阅第572页的"libgit2的安全接口(A Safe Interface to libgit2)".)

Rust中的Hacking并发代码是什么样的(What Hacking Concurrent Code in Rust Is Like)

我们已经展示了在Rust中使用线程的三种技术:fork-join并行性,通道和带锁的共享可变状态.我们的目标是为Rust提供的内容提供一个很好的介绍,重点是如何将它们组合成真正的程序.

Rust坚持安全,因此从你决定编写多线程程序的那一刻起,重点就是构建安全,结构化的通信.保持线程大部分是隔离的是一个很好的方式来说服Rust你正在做的事情是安全的.碰巧隔离也是确保你正在做的事情是正确的和可维护的好方法.Rust再次引导你走向优秀的程序.

更重要的是,Rust可以让你结合技术和实验.你可以快速迭代:与编译器争论可以比调试数据竞争更快地启动和运行.