Rust基础拾遗--并发和异步编程
Rust基础拾遗
- 前言
- 1.并发
- 1.1 分叉与合并并行
- 1.1.1 启动与联结
- 1.1.2 跨线程错误处理
- 1.1.3 跨线程共享不可变数据
- 1.1.4 rayon
- 1.2 通道
- 1.2.1 发送值
- 1.2.2 接收值
- 1.2.3 运行管道
- 1.2.4 通道的特性与性能
- 1.2.5 线程安全:Send与Sync
- 1.3 共享可变状态
- 2.异步编程
- 2.1 从同步到异步
前言
通过Rust程序设计-第二版
笔记的形式对Rust相关重点知识
进行汇总,读者通读此系列文章就可以轻松的把该语言基础捡起来。
1.并发
-
为什么一些看似正确的多线程惯用法却根本不起作用?
与“内存模型”有关 -
你最终会找到一种自己用起来顺手且不会经常出错的并发惯用法。
-
系统程序员常用的方法包括以下几种。
-
具有单一作业的后台线程,需要定期唤醒执行作业。
-
通过任务队列与客户端通信的通用工作池。
-
管道,数据在其中从一个线程流向下一个线程,每个线程只负责一部分工作。
-
数据并行处理,假设整个计算机只进行一次主要的大型计算,将这次计算分成 n 个部分且在 n 个线程上运行,并期望机器的所有 n 个核心都能立即开始工作。
-
同步复杂对象关系,其中多个线程可以访问相同的数据,并且使用基于互斥锁等底层原语的临时加锁方案避免了竞争。
-
原子化整数操作允许多个核心借助一个机器字大小的字段传递信息来进行通信。(数据通常是指针)
- Rust的3种 线程的方法。
- 分叉与合并(fork-join)并行
- 通道
- 共享可变状态
1.1 分叉与合并并行
对大量文档进行自然语言处理。可以写这样一个循环:
fn process_files(filenames: Vec<String>) -> io::Result<()> {for document in filenames {let text = load(&document)?; // 读取源文件let results = process(text); // 计算统计信息save(&document, results)?; // 写入输出文件}Ok(())
}
由于每个文档都是单独处理的,因此要想加快任务处理速度,可以将语料库分成多个块并在单独的线程上处理每个块,如图所示(使用分叉与合并方法的多线程文件处理)。
这种模式称为分叉与合并并行。fork(分叉)是启动一个新线程,join(合并)是等待线程完成。
优点:
避免了瓶颈。分叉与合并中没有对共享资源的锁定。任何线程只会在最后一步才不得不等待另一个线程。同时,每个线程都可以自由运行。这有助于降低任务切换开销。
这种模式在性能方面的数学模型对程序员来说比较直观。在最好的情况下,通过启动 4 个线程,我们只花 1/4 的时间就能完成原本的工作。图 19-2 展示了不应该期望这种理想加速的一个原因:我们可能无法在所有线程之间平均分配工作。另一个需要注意的原因是,有时分叉与合并程序必须在线程联结后花费一些时间来组合各线程的计算结果。也就是说,完全隔离这些任务可能会产生一些额外的工作。不过,除了这两个原因,任何具有独立工作单元的 CPU 密集型程序都可以获得显著的性能提升。
很容易推断出程序是否正确。只要线程真正隔离了,分叉与合并程序就是确定性的,就像曼德博程序中的计算线程一样。
缺点:
分叉与合并的主要缺点是要求工作单元彼此隔离
1.1.1 启动与联结
函数 std::thread::spawn 会启动一个新线程:
use std::thread;
thread::spawn(|| {println!("hello from a child thread");
});
它会接受一个参数,即一个 FnOnce 闭包或函数型的参数。Rust 会启动一个新线程来运行该闭包或函数的代码。
下面是使用 spawn
实现了之前的 process_files
函数的并行版本:
use std::;
fn process_files_in_parallel(filenames: Vec<String>) -> io::Result<()> {// 把工作拆分成几块const NTHREADS: usize = 8;/*使用了尚未展示过的实用函数 split_vec_into_chunks 来拆分工作。它的返回值 worklists 是由向量组成的向量,其中包含从原始向量 filenames 中均分出来的 8 个部分。*/let worklists = split_vec_into_chunks(filenames, NTHREADS);// 分叉:启动一个线程来处理每一个块let mut thread_handles = vec![];// 为每个 worklist 启动一个线程。for worklist in worklists {// spawn() 会返回一个名为 JoinHandle 的值,稍后会用到。现在,先将所有 JoinHandle 放入一个向量中。thread_handles.push(thread::spawn(move || process_files(worklist)));}// 联结:等待所有线程结束/*我们使用之前收集的 JoinHandle 的 .join() 方法来等待所有 8 个线程完成。联结这些线程对于保证程序的正确性是必要的,因为 Rust 程序会在 main 返回后立即退出,即使其他线程仍在运行。这些线程并不会调用析构器,而是直接被“杀死”了。如果这不是你想要的结果,请确保在从 main 返回之前联结了任何你关心的线程。*/for handle in thread_handles {handle.join().unwrap()?;}/*如果我们通过了这个循环,则意味着所有 8 个子线程都成功完成了。因此,该函数会以返回 Ok(()) 结束。*/Ok(())
}
1.1.2 跨线程错误处理
由于要做错误处理,我们在示例中用于联结子线程的代码比看起来更棘手。再重温一下那行代码:
handle.join().unwrap()?;
.join() 方法为我们做了两件事。
首先,handle.join() 会返回 std::thread::Result,如果子线程出现了 panic,就返回一个错误(Err)。在 Rust 中,panic 是安全且局限于每个线程的。线程之间的边界充当着 panic 的防火墙,panic 不会自动从一个线程传播到依赖它的其他线程。相反,一个线程中的 panic 在其他线程中会报告为错误型 Result。
不过,在本程序中,我们不会尝试任何花哨的 panic 处理,而是会立即在 Result 上使用 .unwrap(),断言它是一个 Ok 结果而不是 Err 结果。如果一个子线程确实发生了 panic,那么这个断言就会失败,所以父线程也会出现 panic。如此一来,我们就显式地将 panic 从子线程传播到了父线程。
其次,handle.join() 会将子线程的返回值传回父线程。我们传给 spawn 的闭包的返回类型是 io::Result<()>,因为它就是 process_files 返回值的类型。此返回值不会被丢弃。当子线程完成时,它的返回值会被保存下来,并且 JoinHandle::join() 会把该值传回父线程。
在这个程序中,handle.join() 返回的完整类型是 std::thread::Result>
。thread::Result 是 spawn/ joinAPI 的一部分,而 io::Result 是我们的应用程序的一部分。在这个例子中,展开(unwrap)thread::Result 之后,我们就用 io::Result 上的 ? 运算符显式地将 I/O 错误从子线程传播到了父线程。
在 Rust 中,错误是 Result 值(数据)而不是异常(控制流)。它们会像其他值一样跨线程传递。每当你使用底层线程 API 时,最终都必须仔细编写错误处理代码,但如果不得不编写错误处理代码,那么 Result 是非常合适的选择。
1.1.3 跨线程共享不可变数据
如果正在进行的分析需要一个大型的英语单词和短语的数据库:
// 之前
fn process_files(filenames: Vec<String>)// 之后
fn process_files(filenames: Vec<String>, glossary: &GigabyteMap)
这个 glossary 会很大,所以要通过引用传递它。
该如何修改 process_files_in_parallel 以便将词汇表传给工作线程呢?
1.1.4 rayon
rayon 库提供了两种运行并发任务的方式:
use rayon::prelude::*;
// “并行做两件事”
let (v1, v2) = rayon::join(fn1, fn2);
// “并行做N件事”
giant_vector.par_iter().for_each(|value| {do_thing_with_value(value);
});
- rayon::join(fn1, fn2) 只是调用这两个函数并返回两个结果。
- .par_iter() 方法会创建 ParallelIterator,这是一个带有 map、filter 和其他方法的值,很像 Rust 的 Iterator。
- 在这两种情况下,rayon 都会用自己的工作线程池来尽可能拆分工作。只要告诉 rayon 哪些任务可以并行完成就可以了,rayon 会管理这些线程并尽其所能地分派工作。
下图展示了对 giant_vector.par_iter().for_each(…) 调用的两种思考方式。(a) rayon 表现得就好像它为向量中的每个元素启动了一个线程。(b) 在幕后,rayon 在每个 CPU 核心上都有一个工作线程,这样效率更高。这个工作线程池由程序中的所有线程共享。当成千上万个任务同时进来时,rayon 会拆分这些工作。
下面是一个使用 rayon 的 process_files_in_parallel 版本和一个接受 Vec 型而非 &str 型参数的 process_file:
use rayon::prelude::*;fn process_files_in_parallel(filenames: Vec<String>, glossary: &GigabyteMap)-> io::Result<()>
{// 首先,用 filenames.par_iter() 创建一个并行迭代器。filenames.par_iter()// 用 .map() 在每个文件名上调用 process_file。这会在一系列 io::Result<()> 型的值上生成一个 ParallelIterator。.map(|filename| process_file(filename, glossary))/*用 .reduce_with() 来合并结果。在这里,我们会保留第一个错误并丢弃其余错误。如果想累积所有的错误或者打印它们,也可以在这里修改。当传递一个能在成功时返回有用值的 .map() 闭包时,.reduce_with() 方法也非常好用。这时可以给 .reduce_with() 传入一个闭包,指定如何组合两个成功结果。*/.reduce_with(|r1, r2| {if r1.is_err() { r1 } else { r2 }})/*reduce_with 只有在 filenames 为空时才会返回一个为 None 的 Option。在这种情况下,我们会用 Option 的 .unwrap_or() 方法来生成结果 Ok(())。*/.unwrap_or(Ok(()))
}
在后台,rayon 使用了一种叫作工作窃取
的技术来动态平衡线程间的工作负载。相比手动预先分配工作的方式,这通常能更好地让所有 CPU 都处于忙碌状态。
另外,rayon 还支持跨线程共享引用。幕后发生的任何并行处理都能确保在 reduce_with 返回时完成。这解释了为什么即使该闭包会在多个线程上调用,也能安全地将 glossary 传给 process_file。
1.2 通道
通道是一种单向管道,用于将值从一个线程发送到另一个线程。换句话说,通道是一个线程安全的队列。
使用通道,线程可以通过彼此传值来进行通信。这是线程协同工作的一种非常简单的方法,无须使用锁或共享内存。
我们一般会认为管道具有灵活性和可组合性,而没有意识到它还具有并发的特性
Rust 通道比 Unix 管道更快。发送值只是移动而不是复制,即使要移动的数据结构包含数兆字节数据速度也很快。
1.2.1 发送值
use std::;
use std::sync::mpsc;
// 先创建一个通道,channel 函数会返回一个值对:发送者和接收者。
let (sender, receiver) = mpsc::channel();
// 使用 std::thread::spawn 来启动一个线程。sender的所有权会通过这个 move 闭包转移给新线程。
let handle = thread::spawn(move || {// 从磁盘读取文件for filename in documents {let text = fs::read_to_string(filename)?;/*成功读取文件后,要将其文本发送到通道中:sender.send(text) 会将 text 值移动到通道中。最终,通道会再次把 text 值转交给接收到该值的任何对象。*/if sender.send(text).is_err() {break;}}Ok(())
});
为便于使用,程序会把所有这些代码都包装在一个函数中,该函数会返回至今尚未用到的 receiver 和新线程的 JoinHandle:
fn start_file_reader_thread(documents: Vec<PathBuf>)-> (mpsc::Receiver<String>, thread::JoinHandle<io::Result<()>>)
{let (sender, receiver) = mpsc::channel();let handle = thread::spawn(move || {...});(receiver, handle)
}
1.2.2 接收值
一个线程运行发送值的循环,接下来可以启动第二个线程来运行调用 receiver.recv() 的循环,但 Receiver 是可迭代的,所以还有如下写法:
for text in receiver {do_something_with(text);
}
控制流到达循环顶部时,只要通道恰好为空,接收线程在其他线程发送值之前都会阻塞。当通道为空且 Sender 已被丢弃时,循环将正常退出。在程序中,当读取线程退出时,循环会自然随即退出。该线程正在运行一个拥有变量 sender 的闭包,当闭包退出时,sender 会被丢弃。
管道的第二阶段代码如下:
fn start_file_indexing_thread(texts: mpsc::Receiver<String>)-> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>)
{let (sender, receiver) = mpsc::channel();let handle = thread::spawn(move || {for (doc_id, text) in texts.into_iter().enumerate() {let index = InMemoryIndex::from_single_document(doc_id, text);if sender.send(index).is_err() {break;}}});(receiver, handle)
}
该函数会启动一个线程,此线程会从一个通道(texts)接收 String 值并将 InMemoryIndex 值发送给另一个通道(sender/receiver)。这个线程的工作是获取第一阶段加载的每个文件,并将每个文档变成一个小型单文件内存倒排索引。
这个线程的主循环很简单。索引文档的所有工作都是由函数 InMemoryIndex::from_single_document 完成的。它会在单词边界处拆分输入字符串,然后生成从单词到位置列表的映射。
这个阶段不会执行 I/O,所以不必处理各种 io::Error。它会返回 () 而非 io::Result<()>。
1.2.3 运行管道
其余 3 个阶段的设计也是类似的。每个阶段都会使用上一阶段创建的 Receiver。对管道的其余部分,我们设定的目标是将所有小索引合并到磁盘上的单个大索引文件中。最快的方法是将这个任务分为 3 个阶段。
首先,合并内存中的索引(第三阶段):
fn start_in_memory_merge_thread(file_indexes: mpsc::Receiver<InMemoryIndex>)-> (mpsc::Receiver<InMemoryIndex>, thread::JoinHandle<()>)
然后,将这些大型索引写入磁盘(第四阶段):
fn start_index_writer_thread(big_indexes: mpsc::Receiver<InMemoryIndex>,output_dir: &Path)-> (mpsc::Receiver<PathBuf>, thread::JoinHandle<io::Result<()>>)
最后,如果有多个大文件,就用基于文件的合并算法合并它们(第五阶段):
fn merge_index_files(files: mpsc::Receiver<PathBuf>, output_dir: &Path)-> io::Result<()>
最后一个阶段不会返回 Receiver,因为它是此管道的末尾。这个阶段会在磁盘上生成单个输出文件。它也不会返回 JoinHandle,因为我们没有为这个阶段启动线程。这项工作是在调用者的线程上完成的。
现在来看一下启动线程和检查错误的代码:
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)-> io::Result<()>
{// 启动管道的所有5个阶段let (texts, h1) = start_file_reader_thread(documents);let (pints, h2) = start_file_indexing_thread(texts);let (gallons, h3) = start_in_memory_merge_thread(pints);let (files, h4) = start_index_writer_thread(gallons, &output_dir);let result = merge_index_files(files, &output_dir);// 等待这些线程结束,保留它们遇到的任何错误let r1 = h1.join().unwrap();h2.join().unwrap();h3.join().unwrap();let r4 = h4.join().unwrap();// 返回遇到的第一个错误(如果有的话)(如你所见,h2和h3// 不会失败,因为这些线程都是纯粹的内存数据处理)r1?;r4?;result
}
和以前一样,使用 .join().unwrap() 显式地将 panic 从子线程传播到主线程。这里的差异点是:我们没有马上使用 ?,而是将 io::Result 值放在一边,直到所有 4 个线程都联结完成。
这个管道比等效的单线程管道快 40%。但与曼德博程序曾获得的 675% 的提升相比就有点儿微不足道了。我们显然没有让系统的 I/O 容量或所有 CPU 核心的工作量饱和。这是怎么回事?
管道就像制造业工厂中的装配流水线,其性能受限于最慢阶段的吞吐量。一条全新的、未调整过的装配线可能和单元化生产一样慢,只有对装配流水线做针对性的调整才能获得回报。在这个例子中,测量表明第二阶段是瓶颈。我们的索引线程使用了 .to_lowercase() 和 .is_alphanumeric(),因此它会花费大量时间在 Unicode 表中查找。对于索引下游的其他阶段,它们大部分时间在 Receiver::recv 中休眠,等待输入。
只要解决了这些瓶颈,并行度就会提高。既然知道了如何使用通道,再加上程序是由孤立的代码片段组成的,那么就很容易找到解决第一个瓶颈的方法。可以手动优化第二阶段的代码,就像优化其他代码一样,将工作拆分成两个或更多阶段,或同时运行多个文件索引线程。
1.2.4 通道的特性与性能
std::sync::mpsc
中的 mpsc 代表多生产者
、单消费者
(multi-producer, single-consumer
),这是对 Rust 通道提供的通信类型的简洁描述。
这个示例程序中的通道会将值从单个发送者传送到单个接收者。这是相当普遍的案例。但是 Rust 通道也支持多个发送者,如果需要的话,你可以用一个线程来处理来自多个客户端线程的请求。
Sender< T>实现了 Clone 特型。要获得具有多个发送者的通道,只需创建一个常规通道并根据需要多次克隆发送者即可。可以将每个 Sender 值转移给不同的线程。
Receiver< T> 不能被克隆,所以如果需要让多个线程从同一个通道接收值,就需要使用 Mutex。
Rust 的通道经过了精心优化。首次创建通道时,Rust 会使用特殊的“一次性”队列实现。如果只通过此通道发送一个对象,那么开销是最低的。如果要发送第二个值,Rust 就会切换到第二种队列实现。实际上,第二种实现就是为长期使用而设计的,它会准备好传输许多值的通道,同时最大限度地降低内存分配开销。如果你克隆了 Sender,那么 Rust 就必须回退到第三种实现,使得多个线程可以安全地同时尝试发送值,这种实现是安全的。当然,即便这 3 种实现中最慢的一种也是无锁队列,所以发送或接收一个值最多就是执行几个原子化操作和堆分配,再加上移动本身。只有当队列为空时才需要系统调用,这时候接收线程就会让自己进入休眠状态。当然,在这种情况下,走这个通道的流量无论如何都不会满载。
尽管进行了所有这些优化工作,但应用程序很容易在通道性能方面犯一个错误:发送值的速度快于接收值和处理值的速度。这会导致通道中积压的值不断增长。例如,在这个程序中,我们发现文件读取线程(第一阶段)加载文件的速度比文件索引线程(第二阶段)更快。结果导致数百兆字节的原始数据从磁盘中读取出来后立即填充到了队列中。
这种不当行为会消耗内存并破坏局部性。更糟糕的是,发送线程还会继续运行,耗尽 CPU 和其他系统资源只是为了发出更多的值,而此时却恰恰是接收端最需要资源来处理它们的时候。这显然不对劲。
Unix 使用了一个优雅的技巧来提供一些背压
,以迫使超速的发送者放慢速度:Unix 系统上的每个管道都有固定的大小,如果进程试图写入暂时已满的管道,那么系统就会简单地阻塞该进程直到管道中有了空间。这在 Rust 中的等效设计称为同步通道
:
use std::sync::mpsc;
let (sender, receiver) = mpsc::sync_channel(1000);
同步通道与常规通道非常像,但在创建时可以指定它能容纳多少个值。对于同步通道,sender.send(value) 可能是一个阻塞操作。毕竟,有时候阻塞也不是坏事。在我们的示例程序中,将 start_file_reader_thread 中的 channel 更改为具有 32 个值空间的 sync_channel 后,可将基准数据集上的内存使用量节省 2/3,却不会降低吞吐量。
1.2.5 线程安全:Send与Sync
迄今为止,我们一直假定所有值都可以在线程之间自由移动和共享。这基本正确,但 Rust 完整的线程安全故事取决于两个内置特型,即 std::marker::Send 和 std::marker::Sync。
- 实现了 Send 的类型可以安全地按值传给另一个线程。它们可以跨线程移动。
- 实现了 Sync 的类型可以安全地将一个值的不可变引用传给另一个线程。它们可以跨线程共享。
这里所说的安全(没有数据竞争和其他未定义行为
)。例如, process_files_in_parallel
示例中,我们使用闭包将 Vec 从父线程传给了每个子线程。虽然我们当时没有指出,但这意味着向量及其字符串会在父线程中分配,但会在子线程中释放。Vec 实现了 Send,这事实上代表一个关于“可以怎么做”的 API 承诺:Vec 和 String 在内部使用的分配器是线程安全的。
(如果要用快速但非线程安全的分配器编写自己的 Vec 类型和 String 类型,就不得不使用非 Send 的类型(如不安全的指针)来实现它们。然后 Rust 就会推断出 NonThreadSafeVec 类型和 NonThreadSafeString 类型没有实现 Send 而将它们限制为在单线程中使用。但需要这么做的情况非常罕见。)
如图所示,大多数类型既实现了 Send 也实现了 Sync。你甚至不必使用 #[derive] 来为程序中的结构体和枚举实现这些特型。Rust 会自动帮你实现。如果结构体或枚举的所有字段都是 Send 的,那它自然是 Send 的;如果结构体或枚举的所有字段都是 Sync 的,那它自然是 Sync 的。
有些类型是 Send 的但不是 Sync 的。这通常是刻意设计的,就像 mpsc::Receiver 一样,它是为了保证 mpsc 通道的接收端一次只能被一个线程使用。
少数既不是 Send 也不是 Sync 的类型大多使用了非线程安全的可变性,比如引用计数智能指针类型 std::rc::Rc。
如果 Rc 是 Sync 的,那么允许线程通过共享引用共享单个 Rc 会发生什么呢?如图 所示,如果两个线程碰巧同时尝试克隆 Rc,就会发生数据竞争,因为两个线程都会增加共享引用计数。结果引用计数可能变得不准确,导致释放后仍在使用(use-after-free)或稍后出现双重释放,这都是未定义行为。
当然,Rust 会阻止这种情况。下面是试图建立这种数据竞争的代码:
use std::thread;
use std::rc::Rc;fn main() {let rc1 = Rc::new("ouch".to_string());let rc2 = rc1.clone();thread::spawn(move || { // 错误rc2.clone();});rc1.clone();
}
Rust 会拒绝编译这段代码,并给出详细的错误消息:
error: `Rc<String>` cannot be sent between threads safely
现在可以看出 Send 和 Sync 如何帮助 Rust 加强线程安全了。对于跨线程边界传输数据的函数,Send 和 Sync 会作为函数类型签名中的限界。当你生成(spawn)一个线程时,传入的闭包必须实现了 Send 特型,这意味着它包含的所有值都必须是 Send 的。同样,如果要通过通道将值发送到另一个线程,则该值必须是 Send 的。
1.3 共享可变状态
- Mutex
在 Rust 中如何实现等待列表。在我们的蕨类帝国游戏服务器中,每个玩家都有一个唯一的 ID:
type PlayerId = u32;
等待列表只是玩家的集合:
const GAME_SIZE: usize = 8;/// 等候列表永远不会超过GAME_SIZE个玩家
type WaitingList = Vec<PlayerId>;
等待列表会被存储为 FernEmpireApp 中的一个字段,这是在服务器启动期间在 Arc 中设置的一个单例。每个线程都有一个 Arc 指向它。它包含我们程序中所需的全部共享配置和其他“零件”,其中大部分是只读的。由于等待列表既是共享的又是可变的,因此必须由 Mutex 提供保护:
use std::sync::Mutex;/// 所有线程都可以共享对这个大型上下文结构体的访问
struct FernEmpireApp {...waiting_list: Mutex<WaitingList>,...
}
在 Rust 中,受保护的数据存储于 Mutex 内部。建立此 Mutex 的代码如下所示:
use std::sync::Arc;let app = Arc::new(FernEmpireApp {...waiting_list: Mutex::new(vec![]),...
});
创建新的 Mutex 看起来就像创建新的 Box 或 Arc,但是 Box 和 Arc 意味着堆分配,而 Mutex 仅与锁操作有关。如果希望在堆中分配 Mutex,则必须明确写出来,就像这里所做的这样:对整个应用程序使用 Arc::new,而仅对受保护的数据使用 Mutex::new。这两个类型经常一起使用,Arc 用于跨线程共享数据,而 Mutex 用于跨线程共享的可变数据。
现在可以实现使用互斥锁的 join_waiting_list 方法了:
impl FernEmpireApp {/// 往下一个游戏的等候列表中添加一个玩家。如果有足够/// 的待进入玩家,则立即启动一个新游戏fn join_waiting_list(&self, player: PlayerId) {// 锁定互斥锁,并授予内部数据的访问权。`guard`的作用域是一个临界区let mut guard = self.waiting_list.lock().unwrap();// 现在开始执行游戏逻辑guard.push(player);if guard.len() == GAME_SIZE {let players = guard.split_off(0);self.start_game(players);}}
}
获取数据的唯一方法就是调用 .lock() 方法:
let mut guard = self.waiting_list.lock().unwrap();
self.waiting_list.lock() 会阻塞,直到获得互斥锁。这个方法调用所返回的 MutexGuard 值是 &mut WaitingList 的浅层包装。多亏了之前讨论过的“隐式解引用”机制,我们可以直接在此守卫上调用 WaitingList 的各种方法:
guard.push(player);
此守卫甚至允许我们借用对底层数据的直接引用。Rust 的生命周期体系会确保这些引用的生命周期不会超出守卫本身。如果不持有锁,就无法访问 Mutex 中的数据。
当 guard 被丢弃时,锁就被释放了。这通常会发生在块的末尾,但也可以手动丢弃。
if guard.len() == GAME_SIZE {let players = guard.split_off(0);drop(guard); // 启动游戏时就不必锁定列表了self.start_game(players);
}
-
mut与互斥锁
在 Rust 中,&mut 表示独占访问。普通 & 表示共享访问。 -
为什么互斥锁不是“银弹” ?
Rust 的 Mutex 设计几乎肯定会让你比以往任何时候都更系统、更明智地使用互斥锁。
思考一下 Rust 的安全保证可以帮你做什么,不能帮你做什么?
使用互斥锁的线程会遇到 Rust 无法为你修复的另一些问题?
-
有效的 Rust 程序不会有数据竞争,但仍然可能有其他竞态条件——程序的行为取决于各线程之间的运行时间长短,因此可能每次运行时都不一样。有些竞态条件是良性的,有些则表现为普遍的不稳定性和难以修复的 bug。以非结构化方式使用互斥锁会引发竞态条件。你需要确保竞态条件是良性的。
-
共享可变状态也会影响程序设计。通道作为代码中的抽象边界,可以轻松地拆出彼此隔离的组件以进行测试,而互斥锁则会鼓励一种“只要再添加一个方法就行了”的工作方式,这可能会导致彼此有联系的代码耦合成一个单体。
注:要尽可能使用更结构化的方法,只在必要时使用 Mutex。
- 死锁
Rust 的借用系统不能保护你免于死锁。最好的保护是保持临界区尽可能小:进入,开始工作,完成后马上离开。
通道也有可能陷入死锁。
-
互斥锁
互斥锁(mutex)或锁(lock)用于强制多个线程在访问某些数据时轮流读写。
互斥锁因 panic 而“中毒”的原因并非害怕出现未定义行为。 -
使用互斥锁的多消费者通道
-
读/写锁(RwLock )
-
条件变量(Condvar)
-
原子化类型
-
全局变量
2.异步编程
可以使用 Rust 异步任务在单个线程或工作线程池中交替执行许多彼此独立的活动。
异步任务类似于线程,但其创建速度更快,在它们之间可以更有效地传递控制权,并且其内存开销比线程少一个数量级。
一般来说,异步 Rust 代码看上去很像普通的多线程代码,但实际上那些可能导致阻塞的操作(如 I/O 或获取互斥锁)会以略有不同的方式处理。通过对这些操作进行特殊处理,Rust 能够获得关于这段代码行为的更多信息以辅助优化,这就是它能提高性能的原因。前面代码的异步版本如下所示:
use async_std::;
// async_std 这个 crate 的网络模块和任务模块,并在可能发生阻塞的调用之后添加了 .await。
let listener = net::TcpListener::bind(address).await?;let mut new_connections = listener.incoming();
while let Some(socket_result) = new_connections.next().await {let socket = socket_result?;let groups = chat_group_table.clone();task::spawn(async {log_error(serve(socket, groups).await);});
}
本章的目标是尽可能详细地展示它的工作原理,以便你可以预知如何在应用程序中执行异步代码以及把它用在哪里最能发挥出其价值。
1)为了展示异步编程的机制,会列举一组涵盖所有核心概念的最小语言特性集:Future
(未来值)、异步函数、await
表达式、任务以及 block_on
执行器和 spawn_local
执行器。
2)然后,介绍异步块和 spawn 执行器。在此过程中,我们会指出你可能会遇到的一些异步编程特有的问题,并解释该如何处理这些问题。
3)通过浏览一遍聊天服务器和客户端的完整代码讲解这些“零件”是如何协同工作的
4)通过spawn_blocking 和 block_on 的简单而实用的实现,阐述原生 Future 和执行器的工作原理
5)最后,我们会解释 Pin 类型,该类型在异步接口中会不时出现,以保证异步函数和异步式 Future 的安全使用。
2.1 从同步到异步
- Future
Rust 支持异步操作的方法是引入特型 std::future::Future
:
trait Future {type Output;// 现在,暂时把`Pin<&mut Self>`当作`&mut Self`fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {Ready(T),Pendin
}
Future 代表一个你可以测试其完成情况的操作。Future 的 poll(轮询)方法从来不会等待操作完成,它总是立即返回。如果操作已完成,则 poll 会返回 Poll::Ready(output),其中 output 是它的最终结果。否则,它会返回 Pending。如果 Future 值得再次轮询,它承诺会通过调用 Context 中提供的回调函数 waker 来通知我们。我们将这种实现方式称为异步编程的“皮纳塔模型”:对于 Future,你唯一能做的就是通过轮询来“敲打”它,直到某个值“掉”出来。
所有现代操作系统都包含其系统调用的一些变体,我们可以使用它们来实现这种轮询接口。例如,在 Unix 和 Windows 上,如果将网络套接字设置为非阻塞模式,那么一旦这些读写发生阻塞,就会返回某种错误。你必须稍后再试。
因此,异步版本的 read_to_string 的签名大致如下所示:
fn read_to_string(&mut self, buf: &mut String)-> impl Future<Output = Result<usize>>;
除了返回类型,这与我们之前展示过的签名基本相同:异步版本会返回携带 Result 的 Future。你需要轮询这个 Future,直到从中获得 Ready(result)。每次轮询时,都会尽可能读取更多的内容。最终 result 会为你提供成功值或错误值,就像普通的 I/O 操作一样。这是一种通用模式:任何函数的异步版本都会接受与其同步版本完全相同的参数,但返回类型包裹在 Future 中。
调用这个版本的 read_to_string 并没有实际读取任何内容,它唯一的职责是构建并返回一个 Future,该 Future 会在轮询时完成其真正的工作。这个 Future 必须包含执行调用请求所需的全部信息。例如,此 read_to_string 返回的 Future 必须记住调用它的输入流,以及附加了传入数据的 String。事实上,由于 Future 包含 self 和 buf 的引用,因此 read_to_string 的正确签名必然是如下形式:
fn read_to_string<'a>(&'a mut self, buf: &'a mut String)-> impl Future<Output = Result<usize>> + 'a;
这增加了生命周期以表明返回的 Future 的生存期只能与 self 和 buf 借用的值一样长。
async-std crate 提供了所有 std 中 I/O 设施的异步版本,包括带有 read_to_string 方法的异步 Read 特型。async-std 选择紧紧跟随 std 的设计,尽可能在它自己的接口中重用 std 的类型,因此 Error、Result、网络地址和大多数其他相关数据在“两个世界”之间是兼容的。熟悉 std 有助于使用 async-std,反之亦然。
Future 特型的一个规则是,一旦 Future 返回了 Poll::Ready,它就会假定自己永远不会再被轮询(poll)。当某些 Future 被过度轮询时,它们只会永远返回 Poll::Pending,而其他 Future 则可能会 panic 或被挂起。(但是,它们绝不会违反内存安全或线程安全规则,或以其他方式导致未定义行为。)Future 特型上的 fuse 适配器方法能把任何 Future 变成被过度轮询时总会返回 Poll::Pending 的 Future。但所有常用的 Future 消耗方式都会遵守这一规则,因此通常不必动用 fuse。
完全没必要一听到轮询就觉得效率低下。Rust 的异步架构是经过精心设计的,只要你正确实现了基本的 I/O 函数(如 read_to_string),就只会在值得尝试时才轮询 Future。每当调用 poll 时,必然有某个地方的某些代码返回了 Ready,或者至少朝着那个目标前进了一步。20.3 节会对此工作原理进行解释。
但使用 Future 似乎很具挑战性:当轮询时,如果得到了 Poll::Pending,**应该做些什么呢?**你必须四处寻找这个线程暂时可以做的其他工作,还不能忘记稍后回到这个 Future 并再次轮询它。整个程序将充斥着辅助性代码,以跟踪谁在等待处理以及一旦就绪应该做些什么之类的事情。cheapo_request 函数的简单性被破坏了。
- 异步函数与await表达式
下面是一个写成异步函数的 cheapo_request 版本:
use async_std::io::prelude::*;
use async_std::net;
// 函数以 async fn 而不是 fn 开头。// 使用 async_std crate 的异步版本的 TcpStream::connect、write_all 和 read_to_string。这些都会返回其结果的 Future。
async fn cheapo_request(host: &str, port: u16, path: &str)-> std::io::Result<String>
{// 每次返回 Future 的调用之后,代码都会 .await。虽然这看起来像是在引用结构体中名为 await 的字段,但它实际上是语言中内置的特殊语法,用于等待 Future 就绪。let mut socket = net::TcpStream::connect((host, port)).await?;let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);socket.write_all(request.as_bytes()).await?;socket.shutdown(net::Shutdown::Write)?;let mut response = String::new();socket.read_to_string(&mut response).await?;Ok(response)
}
当你调用异步函数时,它会在函数体开始执行之前立即返回。显然,调用的最终返回值还没有计算出来,你得到的只是承载它最终值的 Future。
所以如果执行下面这段代码:
let response = cheapo_request(host, port, path);
那么 response 将是 std::io::Result 型的 Future,而 cheapo_request 的函数体尚未开始执行。你不需要调整异步函数的返回类型,Rust 会自动把 async fn f(…) -> T 函数的返回值视为承载 T 的 Future,而非直接的 T 值。
异步函数返回的 Future 中包含函数体运行时所需的一切信息:函数的参数、局部变量的内存空间等。(就像是把要调用的栈帧捕获成了一个普通的 Rust 值。)所以 response 必须保存传给 host、port 和 path 的值,因为 cheapo_request 的函数体将需要这些值来运行。
Future 的特化类型是由编译器根据函数的主体和参数自动生成的。这种类型没有名字,你只知道它实现了 Future,其中 R 是异步函数的返回类型。从这个意义上说,异步函数的 Future 就像闭包:闭包也有由编译器生成的匿名类型,该类型实现了 FnOnce 特型、Fn 特型和 FnMut 特型。
当你首次轮询 cheapo_request 返回的 Future 时,会从函数体的顶部开始执行,一直运行到 TcpStream::connect 返回的 Future 的第一个 await。await 表达式会轮询 connect 返回的 Future,如果它尚未就绪,则向调用者返回 Poll::Pending:程序不能从这个 await 继续向前运行了,直到对这个 Future 的某次轮询返回了 Poll::Ready。因此,表达式 TcpStream::connect(…).await 大致等价于如下内容:
{
// 注意:这是伪代码,不是有效的Rust
let connect_future = TcpStream::connect(...);
'retry_point:match connect_future.poll(cx) {Poll::Ready(value) => value,Poll::Pending => {// 安排对`cheapo_request`返回的Future进行// 下一次`poll`,以便在'retry_point处恢复执行...return Poll::Pending;}}
}
await 表达式会获取 Future 的所有权,然后轮询它。如果已就绪,那么 Future 的最终值就是 await 表达式的值,然后继续执行。否则,此 Future 返回 Poll::Pending。
但至关重要的是,下一次对 cheapo_request 返回的 Future 进行轮询时不会再从函数的顶部开始,而是会在即将轮询 connect_future 的中途时间点恢复执行函数。直到 Future 就绪之前,我们都不会继续处理异步函数的其余部分。
随着对其返回的 Future 继续进行轮询,cheapo_request 将通过函数体从一个 await 走到下一个,仅当它等待的子 Future 就绪时才会继续。因此,要对 cheapo_request 返回的 Future 进行多少次轮询,既取决于子 Future 的行为,也取决于该函数自己的控制流。cheapo_request 返回的 Future 会跟踪下一次 poll 应该恢复的点,以及恢复该点所需的所有本地状态,比如变量、参数和临时变量。
在函数中间暂停执行稍后再恢复,这种能力是异步函数所独有的。当一个普通函数返回时,它的栈帧就永远消失了。由于 await 表达式依赖于这种恢复能力,因此只能在异步函数中使用它们。
- 从同步代码调用异步函数:block_on
从某种意义上说,异步函数就是在转移责任。的确,在异步函数中很容易获得 Future 的值:只要使用 await 就可以。但是异步函数本身也会返回 Future,所以现在调用者的工作是以某种方式进行轮询。但最终还是得有人实际等待一个值。可以使用 async_std 的 task::block_on 函数从普通的同步函数(如 main)调用 cheapo_request,这会接受一个 Future 并轮询,直到它生成一个值:
fn main() -> std::io::Result<()> {use async_std::task;let response = task::block_on(cheapo_request("example.com", 80, "/"))?;println!("{}", response);Ok(())
}
由于 block_on 是一个会生成异步函数最终值的同步函数,因此可以将其视为从异步世界到同步世界的适配器。但 block_on 的阻塞式特征意味着我们不应该在异步函数中使用它,因为在值被准备好之前它会一直阻塞整个线程。异步函数中请改用 await。
- 启动异步任务
在 Future 的值就绪之前,async_std::task::block_on 函数会一直阻塞。但是把线程完全阻塞在单个 Future 上并不比同步调用好:本章的目标是让线程在等待的同时做其他工作。
为此,可以使用 async_std::task::spawn_local。该函数会接受一个 Future 并将其添加到任务池中,只要正阻塞着 block_on 的 Future 还未就绪,就会尝试轮询。因此,如果你将一堆 Future 传给 spawn_local,然后将 block_on 应用于最终结果的 Future,那么 block_on 就会在可以向前推进时轮询每个启动(spawn)后的 Future,并行执行整个任务池,直到你想要的结果就绪。
在撰写本章时,要想在 async-std 中使用 spawn_local,就必须启用该 crate 的 unstable 特性。为此,需要在 Cargo.toml 中使用下面这行代码去引用 async-std:
async-std = { version = "1", features = ["unstable"] }
spawn_local 函数是标准库的 std::thread::spawn 函数的异步模拟,用于启动线程。
std::thread::spawn© 会接受闭包 c 并启动线程来运行它,然后返回 std::thread::JoinHandle,其中 std::thread::JoinHandle 的 join 方法会等待线程完成并返回 c 中返回的任何内容。
async_std::task::spawn_local(f) 会接受 Future f 并将其添加到当前线程在调用 block_on 时要轮询的池中。spawn_local 会返回自己的 async_std::task::JoinHandle 类型,它本身就是一个 Future,你可以等待(await)它以获取 f 的最终值。
假设我们想同时发出一整套 HTTP 请求。下面是第一次尝试:
pub async fn many_requests(requests: Vec<(String, u16, String)>)-> Vec<std::io::Result<String>>
{use async_std::task;let mut handles = vec![];for (host, port, path) in requests {handles.push(task::spawn_local(cheapo_request(&host, port, &path)));}let mut results = vec![];for handle in handles {results.push(handle.await);}results
}
该函数会在 requests 的每个元素上调用 cheapo_request,并将每个调用返回的 Future 传给 spawn_local。该函数还会将生成的 JoinHandle 收集到一个向量中,然后等待每一个 JoinHandle。可以用任意顺序等待这些 JoinHandle:由于请求已经发出,因此只要此线程调用了 block_on 并且没有更有价值的事情可做,请求的 Future 就会根据需要进行轮询。所有请求都将并行执行。一旦完成操作,many_requests 就会把结果返回给它的调用者。
前面的代码几乎是正确的,但 Rust 的借用检查器报错说它很担心 cheapo_request 返回的 Future 的生命周期:
error: `host` does not live long enough
path 也会出现类似的错误。
自然,如果将引用传给一个异步函数,那么它返回的 Future 就必须持有这些引用,因此,安全起见,Future 的生命周期不能超出它们借来的值。这和任何包含引用的值所受的限制是一样的。
问题是 spawn_local 无法确定你会在 host 和 path 被丢弃之前等待任务完成。事实上,spawn_local 只会接受生命周期为 'static 的 Future,因为你也可以简单地忽略它返回的 JoinHandle,并在程序执行其他部分时让此任务继续运行。这不是异步任务独有的问题:如果尝试使用 std::thread::spawn 启动一个线程,那么该线程的闭包也会捕获对局部变量的引用,并得到类似的错误。
解决此问题的方法是创建另一个接受这些参数的拥有型版本的异步函数:
async fn cheapo_owning_request(host: String, port: u16, path: String)-> std::io::Result<String> {cheapo_request(&host, port, &path).await
}
此函数会接受 String 引用而不是 &str 引用,因此它的 Future 拥有 host 字符串和 path 字符串本身,并且其生命周期为 'static。通过借用检查器可以发现它立即开始等待 cheapo_request 返回的 Future,因此,如果该 Future 被轮询,那么它借用的 host 变量和 path 变量必然仍旧存在。一切顺利。
可以使用 cheapo_owning_request 像下面这样分发所有请求:
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}
可以借助 block_on 从同步 main 函数中调用 many_requests:
let requests = vec![("example.com".to_string(), 80, "/".to_string()),("www.red-bean.com".to_string(), 80, "/".to_string()),("en.wikipedia.org".to_string(), 80, "/".to_string()),
];let results = async_std::task::block_on(many_requests(requests));
for result in results {match result {Ok(response) => println!("{}", response),Err(err) => eprintln!("error: {}", err),}
}
上述代码会在对 block_on 的调用中同时运行所有 3 个请求。每一个都会在某种时机取得进展,而其他的则会被阻塞,所有这些都发生在调用线程上。
- 异步块
除了异步函数,Rust 还支持异步块。普通的块语句会返回其最后一个表达式的值,而异步块会返回其最后一个表达式值的 Future。可以在异步块中使用 await 表达式。
异步块看起来就像普通的块语句,但其前面有 async 关键字:
let serve_one = async {use async_std::net;// 监听连接并接受其中一个let listener = net::TcpListener::bind("localhost:8087").await?;let (mut socket, _addr) = listener.accept().await?;// 在`socket`上与客户端对话...
};
上述代码会将 serve_one 初始化为一个 Future(当被轮询时),以侦听并处理单个 TCP 连接。直到轮询 serve_one 时才会开始执行代码块的主体,就像直到轮询 Future 时才会开始执行异步函数的主体一样。
如果在异步块中使用 ? 运算符处理错误,那么它只会从块中而不是围绕它的函数中返回。如果前面的 bind 调用返回了错误,则 ? 运算符会将其作为 serve_one 的最终值返回。同样,return 表达式也会从异步块而不是其所在函数中返回。
如果异步块引用了围绕它的代码中定义的变量,那么它的 Future 就会捕获这些变量的值,就像闭包所做的那样。与 move 闭包的用法一样,也可以用 async move 启动该块以获取捕获的值的所有权,而不仅仅持有对它们的引用。
为了将你想要异步运行的那部分代码分离出去,异步块提供了一种简洁的方法。例如,spawn_local 需要一个 'static 的 Future,因此我们定义了包装函数 cheapo_owning_request 来为我们提供一个拥有其参数所有权的 Future。只需从异步块中调用 cheapo_request 即可获得相同的效果,不用花心思去写包装函数:
pub async fn many_requests(requests: Vec<(String, u16, String)>)-> Vec<std::io::Result<String>>
{use async_std::task;let mut handles = vec![];for (host, port, path) in requests {handles.push(task::spawn_local(async move {cheapo_request(&host, port, &path).await}));}...
}
由于这是一个 async move 块,因此它的 Future 获取了 String 值 host 和 path 的所有权,和 move 闭包一样。然后该 Future 会传递对 cheapo_request 的引用。借用检查器可以看到块的 await 表达式接手了 cheapo_request 返回的 Future 的所有权,因此对 host 和 path 的引用的生命周期不能比它们借来的已捕获变量的生命周期长。对于 cheapo_owning_request 所能做的事,async 块也能完成,且使用的样板代码更少。
你可能会遇到的一个棘手问题是,与异步函数不同,没有任何语法可用于指定异步块的返回类型。这在使用 ? 运算符时会导致问题:
let input = async_std::io::stdin();
let future = async {let mut line = String::new();// 这会返回`std::io::Result<usize>`input.read_line(&mut line).await?;println!("Read line: {}", line);Ok(())
};
运行失败并出现以下错误:
error: type annotations needed
Rust 无法判断异步块的返回类型是什么。read_line 方法会返回 Result<(), std::io::Error>,但是因为 ? 运算符会使用 From 特型将手头的错误类型转换为场景要求的任何类型,所以异步块的返回类型 Result<(), E> 中的 E 可以是实现了 From 的任意类型。
Rust 的未来版本中可能会新增相应的语法来指出 async 块的返回类型。目前,可以通过明确写出块的最终 Ok 的类型来解决这个问题:
let future = async {...Ok::<(), std::io::Error>(())
};
由于 Result 是一个希望以成功类型和错误类型作为其参数的泛型类型,因此,如上例所示,可以在使用 Ok 或 Err 时指定这些类型参数。
- 从异步块构建异步函数
异步块为我们提供了另一种实现与异步函数相同效果的方式,并且这种方式更加灵活。例如,可以将我们的 cheapo_request 示例改写为一个普通的同步函数,该函数会返回一个异步块的 Future:
use std::io;
use std::future::Future;fn cheapo_request<'a>(host: &'a str, port: u16, path: &'a str)-> impl Future<Output = io::Result<String>> + 'a
{async move {……函数体……}
}
当你调用这个版本的函数时,它会立即返回异步块返回值的 Future。这会捕获该函数的参数表,并且表现得就像异步函数返回的 Future 一样。由于没有使用 async fn 语法,因此需要在返回类型中写上 impl Future。但就调用者而言,这两个定义是具有相同函数签名的可互换实现。
如果想在调用函数时立即进行一些计算,然后再创建其结果的 Future,那么第二种方法会很有用。例如,另一种让 cheapo_request 和 spawn_local 协同工作的方法是将其变成一个返回 'static Future 的同步函数,这会捕获由其参数完全拥有的副本:
fn cheapo_request(host: &str, port: u16, path: &str)-> impl Future<Output = io::Result<String>> + 'static
{let host = host.to_string();let path = path.to_string();async move {……使用&*host、port和path……}
}
这个版本允许异步块将 host 和 path 捕获为拥有型 String 值,而不是 &str 引用。由于 Future 拥有其运行所需的全部数据,因此它会在整个 'static 生命周期内有效。(在前面所展示的签名中我们明确写出了 + 'static,但 'static 本来就是各种 -> impl 返回类型的默认值,因此将其省略也不会有任何影响。)
由于这个版本的 cheapo_request 返回的是 'static Future,因此可以将它们直接传给 spawn_local。
let join_handle = async_std::task::spawn_local(cheapo_request("areweasyncyet.rs", 80, "/")
);……其他工作……let response = join_handle.await?;
- 在线程池中启动异步任务
迄今为止,我们展示的这些示例把几乎所有时间都花在了等待 I/O 上,但某些工作负载主要是 CPU 任务和阻塞的混合体。当计算量繁重到无法仅靠单个 CPU 满足时,可以使用 async_std::task::spawn 在工作线程池中启动 Future,线程池专门用于轮询那些已准备好向前推进的 Future。
async_std::task::spawn 用起来很像 async_std::task::spawn_local:
use async_std::task;let mut handles = vec![];
for (host, port, path) in requests {handles.push(task::spawn(async move {cheapo_request(&host, port, &path).await}));
}
与 spawn_local 一样,spawn 也会返回一个 JoinHandle 值,你可以等待它,以获得 Future 的最终值。但与 spawn_local 不同,Future 不必等到调用 block_on 才进行轮询。一旦线程池中的某个线程空闲了,该线程就会试着轮询它。
在实践中,spawn 比 spawn_local 用得多。这只是因为人们更希望看到他们的工作负载在机器资源上均匀分配,而不关心工作负载的计算和阻塞是如何混杂的。
使用 spawn 时要记住一点:线程池倾向于保持忙碌。因此无论哪个线程率先得到轮询的机会,都会轮询到你的 Future。异步调用可能在一个线程上开始执行,阻塞在 await 表达式上,然后在另一个线程中恢复。因此,虽然将异步函数调用视为单一的、连续的代码执行是一种合理的简化(实际上,异步函数和 await 表达式的设计目标就是鼓励你以这种方式思考),但实际上可能会通过许多不同的线程来承载此次调用。
如果你正在使用线程本地存储,可能会惊讶地看到你在 await 表达式之前放置的数据后来被换成了完全不同的东西。这是因为你的任务现在正由线程池中的不同线程轮询。如果你觉得这是一个问题,就应该改用任务本地存储,具体请参阅 async-std crate 的 task_local! 宏的详细信息。
- 你的Future实现Send了吗?
spawn 具有 spawn_local 所没有的一项限制。由于 Future 会被发送到另一个线程运行,因此它必须实现标记特型 Send。只有当 Future 包含的所有值都符合 Send 要求时,它自己才符合 Send 要求:所有函数参数、局部变量,甚至匿名临时值都必须安全地转移给另一个线程。
和生命周期方面的限制一样,这项要求也不是异步任务独有的:如果尝试用 std::thread::spawn 启动其闭包以捕获非 Send 值的线程,那么也会遇到类似的错误。不同点在于,虽然传给 std::thread::spawn 的闭包会留在创建并运行它的线程上,但在线程池中启动的 Future 可以在等待期间的任意时刻从一个线程转移给另一个线程。
这项限制很容易意外触发。例如,下面的代码乍看起来没问题:
use async_std::task;
use std::rc::Rc;async fn reluctant() -> String {let string = Rc::new("ref-counted string".to_string());some_asynchronous_thing().await;format!("Your splendid string: {}", string)
}task::spawn(reluctant());
异步函数的 Future 需要保存足够的信息,以便此函数能从 await 表达式继续。在这种情况下,reluctant 返回的 Future 必须在 await 之后使用 string 的值,因此 Future(至少在某些时刻)会包含一个 Rc 值。由于 Rc 指针不能在线程之间安全地共享,因此 Future 本身也不能是 Send 的。因为 spawn 只接受符合 Send 要求的 Future,所以 Rust 不会接受 Rc 指针:
error: future cannot be sent between threads safely
此错误消息很长,包含很多有用的详细信息。
- 解释了为什么 Future 需要符合 Send 的要求:task::spawn 需要它。
- 解释了哪个值不符合 Send 的要求:局部变量 string,其类型是 Rc。
- 解释了为什么 string 会影响 Future:它的作用域跨越了 await。
有两种方法可以解决此问题。一种方法是限制非 Send 值的作用域,使其不跨越任何 await 表达式的作用域,因此也不需要保存在函数的 Future 中:
async fn reluctant() -> String {let return_value = {let string = Rc::new("ref-counted string".to_string());format!("Your splendid string: {}", string)// `Rc<String>`在此离开了作用域……};// ……因此当我们在这里暂停时,它不在周边环境里some_asynchronous_thing().await;return_value
}
另一种方法是简单地使用 std::sync::Arc 而非 Rc。Arc 使用原子更新来管理引用计数,这会让它略慢,但 Arc 指针是符合 Send 要求的。
虽然最终你将学会识别和避免非 Send 类型,但一开始它们可能有点儿令人吃惊。[ 至少,我们(本书作者)曾感到惊讶。] 例如,旧的 Rust 代码有时会使用下面这样的泛型结果类型:
// 别这样做!
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
这个 GenericError 类型使用了装箱过的特型对象来保存实现了 std::error::Error 的任意类型的值,但没有对它施加任何进一步的限制:如果有某个非 Send 类型实现了 Error,那么就可以将该类型的装箱值转换为 GenericError。由于这种可能性,GenericError 不符合 Send 要求,并且下面的代码无法工作:
fn some_fallible_thing() -> GenericResult<i32> {...
}// 这个函数的Future不符合`Send`要求……
async fn unfortunate() {// ……因为此调用的值……match some_fallible_thing() {Err(error) => {report_error(error);}Ok(output) => {// ……其生命周期跨越了这个await……use_output(output).await;}}
}// ……因此这个`spawn`会出错
async_std::task::spawn(unfortunate());
与前面的示例一样,编译器的错误消息解释了正在发生的事情,并指出 Result 类型是罪魁祸首。由于 Rust 认为 some_fallible_thing 的结果存在于整个 match 语句(包括 await 表达式)中,所以它确定 unfortunate 返回的 Future 不符合 Send 的要求。对于这个错误,Rust 过于谨慎了:虽然 GenericError 确实不能安全地发送到另一个线程,但 await 只有在结果为 Ok 时才会发生,因此当我们等待 use_output 返回的 Future 时其实并不存在错误值。
理想的解决方案是使用更严格的泛型错误类型,比如 之前提到的错误类型:
type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
type GenericResult<T> = Result<T, GenericError>;
这个特型对象会明确要求底层错误类型实现 Send。一切顺利。
即使你的 Future 不符合 Send 要求,而且不容易把它变成符合形式,仍然可以使用 spawn_local 在当前线程上运行它。当然,你需要确保此线程会在某个时刻调用 block_on 以便让它有机会运行,并且你无法受益于跨多个处理器分派工作的能力。
- 长时间运行的计算:yield_now与spawn_blocking
为了让 Future 更好地与其他任务共享线程,它的 poll 方法应该总是尽可能快地返回。但是,如果你正在进行长时间的计算,就可能需要很长时间才能到达下一个 await,从而让其他异步任务等待的时间比你预想的更久些。
避免这种情况的一种方法是偶尔等待某些事情。async_std::task::yield_now 函数会返回一个为此而设计的简单的 Future:
while computation_not_done() {// 完成一个中等规模的计算步骤...async_std::task::yield_now().await;
}
当 yield_now 返回的 Future 第一次被轮询时,它会返回 Poll::Pending,但表示自己很快就值得再次轮询。因此你的异步调用放弃了线程,以使其他任务有机会运行,但很快会再次轮到它。第二次轮询 yield_now 返回的 Future 时,它会返回 Poll::Ready(()),让你的异步函数恢复执行。
然而,这种方法并不总是可行。如果你使用外部 crate 进行长时间运行的计算或者调用 C 或 C++,那么将上述代码更改为异步友好型代码可能并不方便。或者很难确保计算所经过的每条路径一定会时不时地等待一下。
对于这种情况,可以使用 async_std::task::spawn_blocking。该函数会接受一个闭包,开始在独立的线程上运行它,并返回携带其返回值的 Future。异步代码可以等待那个 Future,将其线程让给其他任务,直到本次计算就绪。通过将繁重的工作放在单独的线程上,可以委托给操作系统去负责,让它更友善地分享处理器。
假设我们要根据存储在身份验证数据库中的密码哈希值来检查用户提供的密码。为安全起见,验证密码需要进行大量计算,这样即使攻击者获得了数据库的副本,也无法简单地通过尝试数万亿个可能的密码来查看是否有匹配项。argonautica crate 提供了一个专为存储密码而设计的哈希函数:正确生成的 argonautica 哈希需要相当一部分时间才能验证。可以在异步应用程序中使用 argonautica(0.2 版),如下所示:
async fn verify_password(password: &str, hash: &str, key: &str)-> Result<bool, argonautica::Error>
{// 制作参数的副本,以使闭包的生命周期是'staticlet password = password.to_string();let hash = hash.to_string();let key = key.to_string();async_std::task::spawn_blocking(move || {argonautica::Verifier::default().with_hash(hash).with_password(password).with_secret_key(key).verify()}).await
}
如果 password 与 hash 匹配,则返回 Ok(true),给定的 key 是整个数据库的键。通过在传给 spawn_blocking 的闭包中进行验证,可以将昂贵的计算推给其各自的线程,确保它不会影响我们对其他用户请求的响应。
- 对几种异步设计进行比较
在许多方面,Rust 的异步编程方式与其他语言所采用的方法相似。例如,JavaScript 和 Rust 都有带 await 表达式的异步函数。所有这些语言都有代表未完成计算的值:Rust 中叫作“Future”,JavaScript 中叫作“承诺”(Promise),但它们都代表一种你可能不得不等待的值。
然而,Rust 对轮询的使用独树一帜。在 JavaScript 中,异步函数在调用后会立即开始运行,并且系统库中内置了一个全局事件循环,可在等待的值可用时恢复挂起的异步函数调用。不过,在 Rust 中,异步调用什么都不会做,直到你将它传给 block_on、spawn 或 spawn_local 之类的函数,这些函数将轮询它并驱动此事直到完成。我们称这些函数为执行器,它们承担着与其他语言中全局事件循环类似的职责。
因为 Rust 会让你选择一个执行器来轮询你的 Future,所以它并不需要在系统中内置全局事件循环。async-std crate 提供了迄今为止本章使用过的这些执行器函数,但是 tokio crate自己定义了一组类似的执行器函数。
- 一个真正的异步HTTP客户端
下面是对 many_requests 的重写,它甚至比基于 cheapo_request 的重写更简单,而且会用 surf 同时运行一系列请求:
pub async fn many_requests(urls: &[String])-> Vec<Result<String, surf::Exception>>
{/*使用单个 surf::Client 发出所有请求可以让我们重用 HTTP 连接,并且不需要异步块:因为 recv_string 是一个返回 Send + 'static 型 Future 的异步方法,所以可以将它返回的 Future 直接传给 spawn。*/let client = surf::Client::new();let mut handles = vec![];for url in urls {let request = client.get(&url).recv_string();handles.push(async_std::task::spawn(request));}let mut results = vec![];for handle in handles {results.push(handle.await);}results
}
fn main() {let requests = &["http://example.com".to_string(),"https://www.red-bean.com".to_string(),"https://en.wikipedia.org/wiki/Main_Page".to_string()];let results = async_std::task::block_on(many_requests(requests));for result in results {match result {Ok(response) => println!("*** {}\n", response),Err(err) => eprintln!("error: {}\n", err),}}
}
相关文章:

Rust基础拾遗--并发和异步编程
Rust基础拾遗 前言1.并发1.1 分叉与合并并行1.1.1 启动与联结1.1.2 跨线程错误处理1.1.3 跨线程共享不可变数据1.1.4 rayon 1.2 通道1.2.1 发送值1.2.2 接收值1.2.3 运行管道1.2.4 通道的特性与性能1.2.5 线程安全:Send与Sync 1.3 共享可变状态 2.异步编…...

Javascript怎么输出内容?两种常见方式以及控制台介绍
javascript是一种非常重要的编程语言,在许多网页中它被广泛使用,可以实现许多交互效果和动态效果。输出是javascript中最基本的操作之一,下面将介绍两种常见的输出方式。 一、使用console.log()函数输出 console.log()函数是常用的输出函数…...

机器人路径平滑——线性插值
C++代码 //要实现平滑二维曲线的算法,你可以使用贝塞尔曲线或B样条曲线。下面是一个使用B样条曲线的C++算法的示例:#include <iostream> #include <vector> #include <fstream> #include <iomanip>...

2024-2-21-多线程基础作业
作业: 源代码: #include <myhead.h> #define MAXSIZE 64 //定义要传递的结构体类型 struct Info {const char *src;const char *dest;int len; }; int get_file_len(const char *srcfile, const char *destfile) {//以只读的形式打开源文件int sr…...

MySQL8的ONLY_FULL_GROUP_BY SQL模式兼容问题
文章目录 1. 问题描述2. 解决方法1. 修改查询2. 修改SQL模式3. 使用ANY_VALUE()函数 1. 问题描述 Cause: java.sql.SQLSyntaxErrorException: Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column btc-cloud.t1.id which is not funct…...

Django使用Celery异步
安装包 pip install celerypip install eventlet 1.在项目文件的根目录下创建目录结果 2. 在main.py文件中 # !/usr/bin/env python # -*-coding:utf-8 -*-""" # Author :skyTree # version :python 3.11 # Description&#…...

vue3 + ts + echart 实现柱形图表
首先封装Echart一个文件 代码如下 <script setup lang"ts"> import { ECharts, EChartsOption, init } from echarts; import { ref, watch, onMounted, onBeforeUnmount } from vue;// 定义props interface Props {width?: string;height?: string;optio…...

c语言结构体与共用体
前面我们介绍了基本的数据类型 在c语言中 有一种特殊的数据类型 由程序员来定义类型 目录 一结构体 1.1概述 1.2定义结构体 1.3 结构体变量的初始化 1.4 访问结构体的成员 1.5结构体作为函数的参数 1.6指向结构的指针 1.7结构体大小的计算 二共用体 2.1概述 2.2 访…...

vue系列--vue封装拖拽指令v-drag
1.首先将下面的代码引入代码中 export const initVDrag (Vue) > {Vue.directive("drag", (el) > {const oDiv el // 当前元素const minTop oDiv.getAttribute("drag-min-top")const ifMoveSizeArea 20oDiv.onmousedown (e) > {let target …...

devc++ 使用 winsock 实现 UDP 局域网 WIFI 广播
参考链接 使用UDP发送广播报_udp广播 inaddr_broadcast-CSDN博客 UDP接收端收不到广播的消息问题排查_unity upd广播连接不上是什么情况-CSDN博客 如何禁用自己电脑的虚拟网卡-百度经验 (baidu.com) 但是wifi 会屏蔽255.255.255.255 广播地址,所以 255.255.255.2…...

JS实现根据数组对象的某一属性排序
JS实现根据数组对象的某一属性排序 一、冒泡排序(先了解冒泡排序机制)二、根据数组对象的某一属性排序(引用sort方法排序) 一、冒泡排序(先了解冒泡排序机制) 以从小到大排序为例,冒泡排序的原…...

CSP-J 2023 复赛第2题:公路 ← 贪心算法
【题目来源】https://www.luogu.com.cn/problem/P9749https://www.acwing.com/problem/content/5311/【题目描述】 小苞准备开着车沿着公路自驾。 公路上一共有 n 个站点,编号为从 1 到 n。 其中站点 i 与站点 i1 的距离为 vi 公里。 公路上每个站点都可以加油&…...

【LeetCode打卡】Day23|669. 修剪二叉搜索树、108.将有序数组转换为二叉搜索树、538.把二叉搜索树转换为累加树
学习目标: 669. 修剪二叉搜索树 108.将有序数组转换为二叉搜索树 538.把二叉搜索树转换为累加树 学习内容: 669. 修剪二叉搜索树 题目链接&&文章讲解 给你二叉搜索树的根节点 root ,同时给定最小边界low 和最大边界 high。通过修剪…...

Transformer位置表示(Position Encoding)
为什么需要位置表示 对比CNN、RNN和Self-Attention: CNN处理相邻窗口的内容;RNN天然是序列操作,考虑了位置先后关系;Self-Attention的计算时是无序的,所以需要位置表示来知道Token之间的位置信息。 绝对位置表示 典型如…...

LPDDR6与LPDDR5 State Diagram技术探讨
相对于LPDDR5: 1)去掉DSM 2)idle到per-bank-refresh变成per-2-bank-refresh,LPDDR6下可自由组合任两个bank刷新,以提高性能 3)sref到进入command bus training后可MRR、MRW、CAS、MPC等命令 4)idle power down期间可MRR、MRW、CAS、MPC等命令 5)idle到进入command bus train…...

AliLinux的使用Docker初始化服务(详细)
AliLinux的使用Docker初始化服务(详细) AliLinux是基于CentOS的。 1、java 环境 2、mysql环境 3、kafka环境 4、flink环境 5、dinky环境 这些环境,本想直接dnf安装在宿主机上,思来想去,还是用docker方便学习&…...

docker环境常用容器安装
目录 1.安装partainer 2.安装myql 3.安装redis 4.安装Minio 5.安装zibkin 6.安装nacos 7.安装RabbitMq 8.安装RocketMq 8.1启动service 8.2修改对应配置 8.3启动broker 8.4启动控制台 9.安装sentinel 10.安装elasticsearch 11.安装Kibana 12.安装logstash/file…...

【论文阅读|基于 YOLO 的红外小目标检测的逆向范例】
基于 YOLO 的红外小目标检测的逆向范例 摘要1 引言2 相关工作2.1 逆向推理2.2 物体检测方法 3 方法3.1 总体架构3.2 逆向标准的可微分积分 4 实验4.1 数据集和指标4.2 实验环境4.4 OL-NFA 为少样本环境带来稳健性 5 结论 论文题目: A Contrario Paradigm for YOLO-b…...

【presto权威指南】常用操作
shell ./bin/launcher start ./bin/launcher status ./bin/launcher stop /home/work/presto/bin/presto --server hadoop2:8443 --catalog hive --schema defult --debug --user ‘sdfyypt_2_0_eywa_admin’ //指定用户 presto -f 可以指定执行sql文件 presto -execute 可以…...

Python程序员面试准备:八股文题目与解答思路
目录 描述一下Python中的列表推导式(List Comprehension)及其用法。 代码示例: 解答思路: 解释一下Python中的装饰器(Decorator)及其作用。 代码示例: 输出: 解答思路: 谈谈Python中的GIL(Global Interprete…...

如何系统地自学Python?
如何系统地自学Python? 如何系统地自学Python?1.了解编程基础2.学习Python基础语法3.学习Python库和框架4.练习编写代码5.参与开源项目6.加入Python社区7.利用资源学习8.制定学习计划9.持之以恒总结 如何系统地自学Python? 作为一个Python语…...

mysql 2-21
约束的分类 添加约束 查看表约束 非空约束 唯一性约束 复合的唯一性约束 只要有一个字段不重复,就可以添加成功 主键约束 自增列 mysql 8.0具有持久化,重启服务器会继续自增 外键约束 创建外键 关联必须有唯一性约束,或者是主键 约束等级 …...

【C#】List泛型数据集如何循环移动,最后一位移动到第一位,以此类推
欢迎来到《小5讲堂》 大家好,我是全栈小5。 这是《C#》系列文章,每篇文章将以博主理解的角度展开讲解, 特别是针对知识点的概念进行叙说,大部分文章将会对这些概念进行实际例子验证,以此达到加深对知识点的理解和掌握。…...

LeetCode23.合并K个升序链表
题目 给你一个链表数组,每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中,返回合并后的链表。 示例 : 输入:lists [[1,4,5],[1,3,4],[2,6]] 输出:[1,1,2,3,4,4,5,6] 解释:链表数组如下&…...

(01)Hive的相关概念——架构、数据存储、读写文件机制
目录 一、架构及组件介绍 1.1 Hive整体架构 1.2 Hive组件 1.3 Hive数据模型(Data Model) 1.3.1 Databases 1.3.2 Tables 1.3.3 Partitions 1.3.4 Buckets 二、Hive读写文件机制 2.1 SerDe 作用 2.2 Hive读写文件流程 2.2.1 读取文件的过程 …...

二维码扫码登录原理,其实比你想的要简单的多
二维码,大家再熟悉不过了 购物扫个码,吃饭扫个码,坐公交也扫个码 在扫码的过程中,大家可能会有疑问:这二维码安全吗? 会不会泄漏我的个人信息? 更深度的用户还会考虑:我的系统是不…...

Java 实现 Awaitable(多线程并行等待,类似 AutoEventReset 的作用)
AutoEventReset、ManualEventReset,是我们在多线程并行编程之中常常需要涉及的,但是 ManualEventReset 可能用的并没有那么多,这个多用于实现读写锁的,当然 Java 自己库提供了官方实现,就没必要自己去整了。 C/C 里面…...

AI之Sora:Sora(文本指令生成视频的里程碑模型)的简介(能力/安全性/技术细节)、使用方法、案例应用之详细攻略
AI之Sora:Sora(文本指令生成视频的里程碑模型)的简介(能力/安全性/技术细节)、使用方法、案例应用之详细攻略 导读:Sora 是OpenAI研发的一个可以根据文字描述生成视频的AI模型。它的主要特性、功能以及OpenAI在安全和应用方面的策略的核心要点如下所示&a…...

IListManger feeds流
目的:将feeds的分页加载和下拉刷新,与网络请求关联起来 ListLibRecyclerViewProxy 在this.getRecyclerView().addOnScrollListener中记录事件 recyclerView.computeVerticalScrollOffset() // 已经向下滚动的距离,为0时表示已处于顶部。 recyclerView.computeVerticalScro…...

视频推拉流EasyDSS视频直播点播平台授权出现激活码无效并报错400是什么原因?
视频推拉流EasyDSS视频直播点播平台集视频直播、点播、转码、管理、录像、检索、时移回看等功能于一体,可提供音视频采集、视频推拉流、播放H.265编码视频、存储、分发等视频能力服务,在应用场景上,平台可以运用在互联网教育、在线课堂、游戏…...