3.20 Rust并发编程
3.20.1 使用线程
3.20.1.1 相关概念
- 进程是资源分配的最小单位,线程是CPU调度的最小单位。
- 在使用多线程时,经常会遇到如下一些问题:
- 竞争状态:多个线程以不一致的顺序访问数据或资源;
- 死锁:两个线程相互等待对方停止使用其所拥有的资源,造成两者都永久等待;
- 只会发生在特定情况下且难以稳定重现和修复的bug。
- 编程语言提供的线程叫做绿色线程,如go语言,在底层实现了M:N的模型,即M个绿色线程对应N个OS线程。但是,Rust标准库只提供1:1的线程模型的实现,即一个Rust线程对应一个Os线程。
3.20.1.2. 创建线程
创建一个新线程需要调用thread::spawn函数并传递一个闭包,示例如下:
use std::thread; use std::time::Duration; fn main() { thread::spawn(|| { // 创建一个线程 for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } }
3.20.1.3. 等待线程结束
前面的例子并不能保证子线程执行完所有的打印,因为主线程有可能在子线程执行完成前结束从而退出程序,所以需要在主线程中等待子线程结束。等待子线程结束需要调用join方法,示例如下:
use std::thread; use std::time::Duration; fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("hi number {} from the spawned thread!", i); thread::sleep(Duration::from_millis(1)); } }); for i in 1..5 { println!("hi number {} from the main thread!", i); thread::sleep(Duration::from_millis(1)); } handle.join().unwrap(); // 等待子线程结束 }
3.20.1.4. 线程与move闭包
move关键字可用于传递给thread::spawn的闭包,获取环境中的值的所有权,从而达到将值的所有权从一个线程传送到另一个线程的目的。示例如下:
use std::thread; fn main() { let v = vec![1, 2, 3]; let handle = thread::spawn(move || { // 将v移动进了闭包,完成了值从主线程到子线程的过程 println!("Here's a vector: {:?}", v); }); handle.join().unwrap(); }
3.20.2 传递消息
3.20.2.1. 通道简单介绍
Rust中实现消息传递并发的主要工具是通道。通道由发送者和接受者两部分组成:
- 发送者用来发送消息;
- 接收者用来接收消息;
- 发送者或者接收者任一被丢弃时就认为通道被关闭了。 Rust标准库中提供的通道叫做mpsc,是多个生产者,单个消费者的通道,其使用示例如下:
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); //创建channel,返回发送者、接收者 thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); //使用发送者通过channel发送 }); let received = rx.recv().unwrap(); //使用接收者通过channel接收 println!("Got: {}", received); }
关于mpsc通道的使用,有以下几点说明:
- 发送者的send方法返回一个Result类型,如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作将返回错误;
- 接收者的recv方法也返回Result类型,当通道发送端关闭时,将返回一个错误值表明不会再由新的值到来了;
- 接收还可以使用try_recv方法,recv方法会阻塞到一直等待到消息到来,而try_recv不会阻塞,它会立即返回,Ok值标识包含可用信息,而Err则代表此时没有任何信息。
3.20.2.2. 通道和所有权
在使用通道时,send 函数会获取参数的所有权并移动这个值归接收者所有。例如下面的代码将会编译错误:
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); println!("val is {}", val); //错误,此处不能使用val,因为val的所有权已经move到通道里面去了 }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
3.20.2.3. 发送多个值示例
利用通道发送多个值的示例如下:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
3.20.2.4. 多个生产者示例
mpsc是multiple producer, single consumer的缩写,此通道可以有多个生产者。相对应的,spmc通道则可以有单个生产者,多个消费者。下面的示例演示多个生产者、单个消费者一起工作:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); //通过clone来使用 thread::spawn(move || { //第一个发送者线程 let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { //第二个发送者线程 let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
上面代码中,有两个发送者发送数据,一个接收者结束数据。
3.20.3 共享内存
某些情况下,多个线程之间需要共享内存,即多个线程都访问某个变量。如何在多个线程间安全的共享内存,就需要用到锁、原子操作等机制,下面主要介绍Rust中锁的使用。
3.20.3.1. 互斥锁(Mutex)
在任意时刻,互斥锁只允许一个线程访问数据;在使用数据前需获取锁,使用完成后需要释放锁。关于Mutex的用法示例如下:
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; // 可以对Mutex内部的值进行修改,可见Mutex和RefCell类似,提供内部可变性 } // 离开作用域,Mutex<T>的锁会自动释放 println!("m = {:?}", m); }
Mutex本质上是一个智能指针,lock()返回一个叫做MutexGuard的智能指针,其内部提供了drop方法,所以当MutexGuard离开作用域时会自动释放锁。Mutex获取到锁后,可以对其内部的值进行修改,可见它和RefCell类似,也提供了内部可变性。
3.20.3.2. 多线程与多所有权
有了Mutex之后,能对共享数据进行保护;但是由于Rust的所有权机制,每个值都有且仅有一个所有者。下面的代码将无法通过编译:
use std::sync::Mutex; use std::thread; fn main() { let counter = Mutex::new(0); let mut handles = vec![]; for _ in 0..10 { let handle = thread::spawn(move || { // 不能将counter移到多个线程中 let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
因此要在多个线程间共享内存,还需要有一种机制能让多个线程都能访问Mutex保护的数据。根据本书目前学到的知识,可以想到Rc智能指针。上面的代码使用Rc智能指针后的示例如下:
use std::rc::Rc; use std::sync::Mutex; use std::thread; fn main() { let counter = Rc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Rc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
上面的代码仍将编译错误,因为Rc是非线程安全的,此时需要使用Arc类型。Arc是一个类似于Rc并可以安全的用于并发环境的类型,使用Arc的示例如下:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); // 使用Arc共享 let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
3.20.3.3. 读写锁
读变量并不会改变变量的状态,互斥锁Mutex每次读写都会加锁,当在有大量读、少量写的场景时使用Mutex就会效率不高,此时可以使用读写锁RwLock,示例如下:
use std::sync::{Arc, RwLock}; use std::thread; fn main() { let shared_data = Arc::new(RwLock::new(vec![1, 2, 3])); // 创建一个共享的 vector 数据 let mut threads = vec![]; // 创建 5 个读线程 for i in 0..5 { let shared_data = shared_data.clone(); threads.push(thread::spawn(move || { // 获取读锁 let shared_data = shared_data.read().unwrap(); println!("Thread {} read data: {:?}", i, *shared_data); })); } // 创建 2 个写线程 for i in 0..2 { let shared_data = shared_data.clone(); threads.push(thread::spawn(move || { // 获取写锁 let mut shared_data = shared_data.write().unwrap(); println!("Thread {} write data", i); shared_data.push(i); // 向 vector 中添加数据 })); } // 等待所有线程完成 for thread in threads { thread.join().unwrap(); } }
3.20.3.4. 小结
关于Mutex和RwLock的选择:
- 追求高并发读取时,使用RwLock,因为Mutex一次只允许一个线程读取;
- 如果要保证写操作的成功性,使用Mutex;
- 不知道哪个合适,统一使用Mutex。
关于Mutex和Arc类型:
- Mutex和RefCell一样具有内部可变性,不过Mutex是线程安全的,而RefCell不是线程安全的;
- Arc和Rc功能类似,但是Arc是线程安全的,而Rc不是线程安全的。
3.20.4 Send trait和Sync trait
3.20.4.1. Send和Sync
Send trait和Sync trait是Rust语言中的两个标记trait(即未定义任何行为,但是可以标记一个类型),其作用如下:
- 实现了Send的类型可以在线程间安全的传递其所有权;
- 实现了Sync的类型可以在线程间通过引用安全的共享。
实现Sync的类型是通过引用在线程间共享的,因此一个类型要在线程间安全的共享,那么它的应用必须能安全的在线程间进行传递。所以可以有结论:若&T满足Send,那么T满足Sync。
3.20.4.2. 实现了Send和Sync的类型
Rust中几乎所有类型都默认实现了Send和Sync。Send和Sync也是可自动派生的trait,因此一个复合类型,如果其内部成员都实现了Send或Sync,那么它就自动实现了Send或Sync。
Rust中绝大多数类型都实现了Send和Sync,但是下面几个是没有实现Send或者Sync的:
- 裸指针既没有实现Send也没有实现Sync,因为它本身就没有任何安全保证的;
- UnsafeCell没有实现Sync,Cell和RefCell也没有实现Sync(但是它们实现了Send);
- Rc既没有实现Send也没有实现Sync。
通常情况下不需要为某个类型手动实现Send和Sync trait,手动实现这些标记trait 涉及到编写不安全的Rust代码,本书在后面unsafe编程部分介绍。