在多线程编程当中,线程安全(thread safety)是非常重要的。借用英文维基的定义:一个函数如果能够被多个线程同时调用或访问而不会导致意外行为、竞态条件或数据损坏,那么这个函数就是线程安全(thread-safe)的。
由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:
Rust 为多线程编程提供了多种解决以上问题的途径,下面请听我娓娓道来。
为了描述某一类型是否是线程安全的,Rust 设计了两个特征——Send 和 Sync。它们是多线程环境下保障线程安全的核心。
这两种特征都是标记特征,也是自动特征。所以它们都没有任何方法,而且编译器会为类型自动实现它们——只要该类型所有字段已经实现。
这两种特征的含义分别为:
Send 的。&T 可以在线程间传递),那么它就是 Sync 的。根据上面的区分,我们可以得出:T 是 Sync 的,与 &T 是 Send 的,是等价的两种说法。
标准库已经为绝大部分类型实现了这两种特征,因此大多数情况下我们所使用的数据类型(例如 bool、i32 和集合类型)都是线程安全的。但是总有例外:
*const T / *mut T)不会自动实现这两种特征;UnsafeCell<T> 不会自动实现 Sync;Rc 不会自动实现这两种特征。因此在讲解智能指针的时候我们特别提到了各种 cell 的线程安全版本以及 Rc 的线程安全版本 Arc。
Arc<T>原子引用计数(Atomically reference counted)Arc——一种线程安全的引用计数指针。它和 Rc 的功能非常类似,最大的不同是它更适合多线程环境。
与 Rc<T> 不同,Arc<T> 使用原子操作进行引用计数,因此它是线程安全的。但代价是原子操作比普通内存访问的开销更大。所以仅在必要时使用 Arc,不需要在线程间共享引用计数时老老实实用 Rc 绝对没问题。
Important
Arc<T> 会在 T 实现 Send 和 Sync 的前提下自动实现 Send 和 Sync。这代表 Arc 只能在内部数据本身是线程安全的前提下为其提供保障,而不能把本不是线程安全的数据变得安全。
一个残酷的事实是像 Arc<RefCell<T>> 这样的用法不可取:RefCell<T> 不是 Sync 的,内部数据本身线程不安全,即使外部套一层安全的保护壳也无济于事。
因此,最终你可能需要使用 std::sync 提供的类型来同 Arc 组合使用。
Rc<T> 与 Arc<T> 同为引用计数指针,它们在诸多方面有着相似的特性,例如共享所有权、解引用、内部可变性;另外在 Rc 中出现的循环引用也可能在 Arc 中出现,解决方法同样是使用 std::sync::Weak 打破循环。
下面是一个在线程间共享不可变数据的例子:
use std::sync::Arc;
use std::thread;
let five = Arc::new(5usize);
for _ in 0..10 {
let five = Arc::clone(&five);
thread::spawn(move || {
println!("{five:?}");
});
}
想在多线程中对共享数据进行更复杂的操作?你也可以像下面这样使用原子类型。
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let val = Arc::new(AtomicUsize::new(5));
for _ in 0..10 {
let val = Arc::clone(&val);
thread::spawn(move || {
let v = val.fetch_add(1, Ordering::Relaxed);
println!("{v:?}");
});
}
Mutex在 Rust 当中,互斥锁(Mutual exclusion)Mutex 是一种用于保护共享数据的互斥原语。它确保了同一时刻最多只有一个线程能够访问某些数据。
互斥锁可以通过 new 创建。每个互斥锁都有一个类型参数,代表它所保护的数据。
如果某个线程想访问数据,只能通过 lock 和 try_lock 方法返回的 RAII1 守卫 MutexGuard 来访问,这保证了数据只有在互斥锁被锁定时才能被访问。需要注意的是,如果当前线程已经持有了该锁,再次调用 lock 是很危险的——可能会发生死锁或 panic。
lock 和 try_lock 方法的不同点在于:lock 会阻塞当前线程直到成功获取锁,如果使用不当会在代码中埋下死锁的隐患;而 try_lock 方法则只是进行一次获取锁的尝试,如果无法获取会立即返回 Err 而不阻塞线程。既然它不阻塞线程那就大大降低了死锁的可能,因此 try_lock 有时会用于替代 lock 以避免死锁。
这两种方法返回的 MutexGuard 是一种智能指针。由于它实现了 Deref 特征,被自动解引用后会获得一个指向 Mutex 内部数据的引用。同时它还实现了 Drop 特征,因此当守卫离开其作用域时,将自动解锁该 Mutex,以便其它线程能继续获取锁。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 调用 `lock` 来获取互斥锁,这会阻塞当前线程直到成功获取
// 它返回一个 RAII 守卫,负责在作用域内自动管理锁的释放
// 当守卫离开其作用域时,将自动解锁该互斥锁
let mut num = counter.lock().unwrap();
*num += 1;
}); // 进行到这里,线程结束,锁会被守卫自动释放
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
互斥锁有一种称为“中毒”的策略:当互斥锁检测到持有它的线程发生 panic 时,该互斥锁就会“中毒”。一旦互斥锁中毒,默认情况下所有其他线程都将无法访问其中的数据,因为数据可能已被破坏(某些不变量可能不再是不变的)。
当中毒的锁被发生 panic 的线程释放后,另一线程调用 lock 试图获取锁时将返回 Err。此时本应返回的 MutexGuard 会被包含在此 Err 当中,这允许用户对其进行处理来重新恢复访问。
让我们来模拟一下互斥锁中毒的情况,以及中毒时如何恢复对锁的访问:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(1));
let c_mutex = Arc::clone(&mutex);
// 互斥锁将在此线程中毒
let _ = thread::spawn(move || {
// 此线程将首先获取互斥锁,但现在锁尚未中毒
// 因此可以安全地对 `lock` 返回值调用 `unwrap`
let mut data = c_mutex.lock().unwrap();
*data = 2;
// 故意 panic 此线程让锁中毒
panic!();
}).join();
// 在主线程试图访问中毒的锁
match mutex.lock() {
Ok(_) => unreachable!(),
Err(p_err) => {
let data = p_err.into_inner();
// 也可以是
// let data = p_err.get_ref();
println!("recovered: {data}"); // recovered: 2
}
};
Mutex 还很贴心地为用户提供了清除中毒状态的方法——clear_poison:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(0));
let c_mutex = Arc::clone(&mutex);
let _ = thread::spawn(move || {
let _lock = c_mutex.lock().unwrap();
panic!(); // 锁中毒了
}).join();
assert_eq!(mutex.is_poisoned(), true);
let x = mutex.lock().unwrap_or_else(|mut e| {
**e.get_mut() = 1; // 第一次解引用取得 MutexGuard
// 第二次解引用取得守卫对应内部值的可变引用
mutex.clear_poison(); // “解毒”
e.into_inner() // 消耗掉 PoisonError 来获取守卫
});
assert_eq!(mutex.is_poisoned(), false);
assert_eq!(*x, 1);
Arc 与 Mutex 结合实现内部可变性在谈到智能指针的时候,Rc 和 RefCell 结合可以实现内部可变性,而到多线程的情境下这条路走不通了,新的解决方案就是 Arc 和 Mutex!
Mutex 提供修改内部数据的能力,而 Arc 则共享数据所有权。两者一拍即合。
现在介绍几种常见的死锁情景。通过了解这些潜在的陷阱,能帮助你更好地避免写出带有死锁隐患的代码。
前面讲 lock 方法时提到如果当前线程已经持有了该锁,就禁止重复调用该方法,这可能导致死锁。此现象对应的简单事实是——只要你在另一个锁还未被释放时去申请新的锁,就容易触发死锁。当然,解决方法就是使用前面提到的 try_lock 替代 lock,因为 try_lock 不会阻塞线程。
下面是单线程触发死锁的例子:
use std::sync::Mutex;
// 主线程发生死锁,导致此程序永远无法终止
fn main() {
let data = Mutex::new(0);
let _d1 = data.lock();
let _d2 = data.lock();
}
// 当程序很复杂时
// 死锁并不一定像这样显而易见
还有一种情况是:当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁,此时就可能发生死锁。
下面这个例子就是创建了两个锁,且两个线程各自先使用了其中一个,并且都想去访问另一线程所使用的另一个锁。结果就是你只会看到屏幕输出每个线程获取了它的第一个锁,却看不到其余信息。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let data1 = Arc::new(Mutex::new(42));
let data2 = Arc::new(Mutex::new(true));
let mut handles = vec![];
for id in 0..2 {
if id == 0 {
let data1_1 = Arc::clone(&data1);
let data2_1 = Arc::clone(&data2);
let h1 = thread::spawn(move || {
let _g1 = data1_1.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g2 = data2_1.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h1);
} else {
let data1_2 = Arc::clone(&data1);
let data2_2 = Arc::clone(&data2);
let h2 = thread::spawn(move || {
let _g2 = data2_2.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g1 = data1_2.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h2);
}
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("Deadlock didn't occur.");
println!("Main thread ends here.");
}
解决方法同样是使用 try_lock 替代 lock。
RwLock读写锁(reader-writer lock)RwLock 提供这样一种互斥机制:允许同时有多个读取者,但同一时刻只允许有一个写入者。在某些情况下,这比互斥锁更高效。
对比 Mutex 和 RwLock,前者不区分获取锁的是读取者还是写入者,谁获取了锁,谁就同时拥有了读取和写入的权利,因此会阻塞任何等待锁可用的线程;而 RwLock 只要没有写入者持有锁,就允许任意数量的读取者获取锁(某种意义上这和借用规则很相似:同一作用域只能有一个可变引用或多个不可变引用)。
类型参数 T 表示此锁保护的数据。T 必须满足 Send 特征(以便在线程间共享)和 Sync 特征(以允许通过读取者进行并发访问)。
正如其名,读写锁最为重要的两种方法分别是“读”(read 与 try_read)和“写”(write 与 try_write)。和互斥锁的“上锁”方法类似,没有 try 字样的方法会阻塞线程直至成功读取或写入,而有 try 的方法则立即返回尝试的结果。
下面是在多个线程中进行读取的例子:
use std::sync::{Arc, RwLock};
use std::thread;
let lock = Arc::new(RwLock::new(1));
let c_lock = Arc::clone(&lock);
// 在主线程中读取
let n = lock.read().unwrap();
assert_eq!(*n, 1);
thread::spawn(move || {
// 在此线程中读取
let r = c_lock.read();
assert!(r.is_ok());
}).join().unwrap();
前面提到了,读写锁同时只能有一个写入者,且此时不允许有读取者。故称 RwLock 的写入操作是独占的,此时该锁会被锁定,其它线程无论想读取还是写入都不行。下面是单线程中只写入的例子:
use std::sync::RwLock;
let lock = RwLock::new(1);
// 在主线程中写入
let mut n = lock.write().unwrap();
*n = 2;
// 尝试在同一线程进行读取会失败
assert!(lock.try_read().is_err());
还有一个单线程先后进行读取和写入的例子:
use std::sync::RwLock;
let lock = RwLock::new(5);
{
// 可以同时进行多个读取操作
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读取者在此处被析构
{
// 同时只允许有一个写入者
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
} // 写入者在此处被析构
与 Mutex 一样,RwLock 也会有锁中毒的现象发生。但需要注意的是,只有当前线程的 RwLock 是写入者时线程 panic 才会使得锁中毒,如果是读取者所在线程发生 panic 是不会中毒的。
和 Mutex 类似,RwLock 提供了 clear_poison 方法用于“解毒”,也可以通过处理返回的 Err 来继续使用锁。
文档中举出了一个潜在发生死锁的例子,我稍微改良了一下:
─────────Thread 1────────┬────────Thread 2─────────
let _rg1 = lock.read(); │
│ // 此操作会阻塞线程
│ let _wg = lock.write();
// 可能发生死锁 │░░░░░░░░░░░░░░░░░░░░░░░░░
let _rg2 = lock.read(); │░░░░░░░░blocking░░░░░░░░░
░░░░░░░░░░░░░░░░░░░░░░░░░│░░░░░░░░░░░░░░░░░░░░░░░░░
▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒deadlock▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
两个线程如果按照从上到下的时序依次执行了对应语句,那么程序最终就可能发生死锁。
通过分析,线程被阻塞会大大增加死锁的可能,我们如果全部用非阻塞的 try 版本替换,可以预见死锁应该会销声匿迹,但代价是必须做大量的错误处理和失败重试机制。
那如何进行权衡?这是一个很难回答的问题,需要编程者去寻找属于自己的答案。
Mutex 还是 RwLock在操作简单性上 Mutex 可谓完胜。RwLock 需要考虑很多问题:读和写不能同时发生、写入者饥饿现象以及相比互斥锁的性能损失……
读写锁的写入者饥饿现象
当读多写少时,写入操作可能会因为一直无法获得锁导致连续多次的失败。此现象称为读写锁的写入者饥饿(writer starvation)。
但 RwLock 确实有其擅长的领域:追求高并发读取,且需要对读到的数据进行较多操作。在这种情况下,RwLock 恐怕是最优解。
Condvar条件变量(condition variable)Condvar 提供了在线程等待某个事件发生时将其阻塞的能力。
前面的 Mutex 和 RwLock 解决了在线程间访问与修改共享数据的安全问题,它们针对“线程安全”交出的答卷分别是:Mutex 把读写权限限定在唯一线程,而 RwLock 保证了读和写不会同时发生。但这两种类型只是保障了共享数据的安全,它们依旧存在着诸多问题,单单依靠它们还会有很多局限。这就是我们还需要其它线程安全类型的原因。
而本节的条件变量,它一定程度上解决了共享数据访问时序的问题。
std::marker::Send 官方文档std::marker::Sync 官方文档Send 和 Sync 的解释std::sync 官方文档即 C++ 中的 Resource Acquisition Is Initialization,资源获取即初始化。表示将资源的生命周期绑定到对象的生命周期上,初始化对象时自动获取资源,对象离开作用域时自动释放资源。在这里是说该锁唯一的读写权被绑定到了该线程的守卫的生命周期上。 ↩