在多线程编程当中,线程安全(thread safety)是非常重要的。借用英文维基的定义:一个函数如果能够被多个线程同时调用或访问而不会导致意外行为、竞态条件或数据损坏,那么这个函数就是线程安全(thread-safe)的。
由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:
Rust 为多线程编程提供了多种解决以上问题的途径,下面请听我娓娓道来。
Arc<T>原子引用计数(Atomically reference counted)Arc——一种线程安全的引用计数指针。它和 Rc 的功能非常类似,最大的不同是它更适合多线程环境。
与 Rc<T> 不同,Arc<T> 使用原子操作进行引用计数,因此它是线程安全的。但代价是原子操作比普通内存访问的开销更大。所以仅在必要时使用 Arc,不需要在线程间共享引用计数时老老实实用 Rc 绝对没问题。
Important
Arc<T> 会在 T 实现 Send 和 Sync 的前提下自动实现 Send 和 Sync。这代表 Arc 只能在内部数据本身是线程安全的前提下为其提供保障,而不能把本不是线程安全的数据变得安全。
>
一个残酷的事实是像
Arc<RefCell<T>>这样的用法不可取:RefCell<T>不是Sync的,内部数据本身线程不安全,即使外部套一层安全的保护壳也无济于事。
>
因此,最终你可能需要使用
std::sync提供的类型来同Arc组合使用。
Rc<T> 与 Arc<T> 同为引用计数指针,它们在诸多方面有着相似的特性,例如共享所有权、解引用、内部可变性;另外在 Rc 中出现的循环引用也可能在 Arc 中出现,解决方法同样是使用 std::sync::Weak 打破循环。
下面是一个在线程间共享不可变数据的例子:
use std::sync::Arc;
use std::thread;
let five = Arc::new(5usize);
for _ in 0..10 {
let five = Arc::clone(&five);
thread::spawn(move || {
println!("{five:?}");
});
}
想在多线程中对共享数据进行更复杂的操作?你也可以像下面这样使用原子类型。
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let val = Arc::new(AtomicUsize::new(5));
for _ in 0..10 {
let val = Arc::clone(&val);
thread::spawn(move || {
let v = val.fetch_add(1, Ordering::Relaxed);
println!("{v:?}");
});
}
Mutex在 Rust 当中,互斥锁(Mutual exclusion)Mutex 是一种用于保护共享数据的互斥原语。它确保了同一时刻最多只有一个线程能够访问某些数据。
互斥锁可以通过 new 创建。每个互斥锁都有一个类型参数,代表它所保护的数据。
如果某个线程想访问数据,只能通过 lock 和 try_lock 方法返回的 RAII1 守卫 MutexGuard 来访问,这保证了数据只有在互斥锁被锁定时才能被访问。需要注意的是,如果当前线程已经持有了该锁,再次调用 lock 是很危险的——可能会发生死锁或 panic。
lock 和 try_lock 方法的不同点在于:lock 会阻塞当前线程直到成功获取锁,如果使用不当会在代码中埋下死锁的隐患;而 try_lock 方法则只是进行一次获取锁的尝试,如果无法获取会立即返回 Err 而不阻塞线程。既然它不阻塞线程那就大大降低了死锁的可能,因此 try_lock 有时会用于替代 lock 以避免死锁。
这两种方法返回的 MutexGuard 是一种智能指针。由于它实现了 Deref 特征,被自动解引用后会获得一个指向 Mutex 内部数据的引用。同时它还实现了 Drop 特征,因此当守卫离开其作用域时,将自动解锁该 Mutex,以便其它线程能继续获取锁。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 调用 `lock` 来获取互斥锁,这会阻塞当前线程直到成功获取
// 它返回一个 RAII 守卫,负责在作用域内自动管理锁的释放
// 当守卫离开其作用域时,将自动解锁该互斥锁
let mut num = counter.lock().unwrap();
*num += 1;
}); // 进行到这里,线程结束,锁会被守卫自动释放
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
互斥锁有一种称为“中毒”的策略:当互斥锁检测到持有它的线程发生 panic 时,该互斥锁就会“中毒”。一旦互斥锁中毒,默认情况下所有其他线程都将无法访问其中的数据,因为数据可能已被破坏(某些不变量可能不再是不变的)。
当中毒的锁被发生 panic 的线程释放后,另一线程调用 lock 试图获取锁时将返回 Err。此时本应返回的 MutexGuard 会被包含在此 Err 当中,这允许用户对其进行处理来重新恢复访问。
让我们来模拟一下互斥锁中毒的情况,以及中毒时如何恢复对锁的访问:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(1));
let c_mutex = Arc::clone(&mutex);
// 互斥锁将在此线程中毒
let _ = thread::spawn(move || {
// 此线程将首先获取互斥锁,但现在锁尚未中毒
// 因此可以安全地对 `lock` 返回值调用 `unwrap`
let mut data = c_mutex.lock().unwrap();
*data = 2;
// 故意 panic 此线程让锁中毒
panic!();
}).join();
// 在主线程试图访问中毒的锁
match mutex.lock() {
Ok(_) => unreachable!(),
Err(p_err) => {
let data = p_err.into_inner();
// 也可以是
// let data = p_err.get_ref();
println!("recovered: {data}"); // recovered: 2
}
};
Mutex 还很贴心地为用户提供了清除中毒状态的方法——clear_poison:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(0));
let c_mutex = Arc::clone(&mutex);
let _ = thread::spawn(move || {
let _lock = c_mutex.lock().unwrap();
panic!(); // 锁中毒了
}).join();
assert_eq!(mutex.is_poisoned(), true);
let x = mutex.lock().unwrap_or_else(|mut e| {
**e.get_mut() = 1; // 第一次解引用取得 MutexGuard
// 第二次解引用取得守卫对应内部值的可变引用
mutex.clear_poison(); // “解毒”
e.into_inner() // 消耗掉 PoisonError 来获取守卫
});
assert_eq!(mutex.is_poisoned(), false);
assert_eq!(*x, 1);
Arc 与 Mutex 结合实现内部可变性在谈到智能指针的时候,Rc 和 RefCell 结合可以实现内部可变性,而到多线程的情境下这条路走不通了,新的解决方案就是 Arc 和 Mutex!
Mutex 提供修改内部数据的能力,而 Arc 则共享数据所有权。两者一拍即合。
现在介绍几种常见的死锁情景。通过了解这些潜在的陷阱,能帮助你更好地避免写出带有死锁隐患的代码。
前面讲 lock 方法时提到如果当前线程已经持有了该锁,就禁止重复调用该方法,这可能导致死锁。此现象对应的简单事实是——只要你在另一个锁还未被释放时去申请新的锁,就容易触发死锁。当然,解决方法就是使用前面提到的 try_lock 替代 lock,因为 try_lock 不会阻塞线程。
下面是单线程触发死锁的例子:
use std::sync::Mutex;
// 主线程发生死锁,导致此程序永远无法终止
fn main() {
let data = Mutex::new(0);
let _d1 = data.lock();
let _d2 = data.lock();
}
// 当程序很复杂时
// 死锁并不一定像这样显而易见
还有一种情况是:当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁,此时就可能发生死锁。
下面这个例子就是创建了两个锁,且两个线程各自先使用了其中一个,并且都想去访问另一线程所使用的另一个锁。结果就是你只会看到屏幕输出每个线程获取了它的第一个锁,却看不到其余信息。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let data1 = Arc::new(Mutex::new(42));
let data2 = Arc::new(Mutex::new(true));
let mut handles = vec![];
for id in 0..2 {
if id == 0 {
let data1_1 = Arc::clone(&data1);
let data2_1 = Arc::clone(&data2);
let h1 = thread::spawn(move || {
let _g1 = data1_1.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g2 = data2_1.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h1);
} else {
let data1_2 = Arc::clone(&data1);
let data2_2 = Arc::clone(&data2);
let h2 = thread::spawn(move || {
let _g2 = data2_2.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g1 = data1_2.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h2);
}
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("Deadlock didn't occur.");
println!("Main thread ends here.");
}
解决方法同样是使用 try_lock 替代 lock。
RwLock读写锁(reader-writer lock)RwLock 提供这样一种互斥机制:允许同时有多个读取者,但同一时刻只允许有一个写入者。在某些情况下,这比互斥锁更高效。
即 C++ 中的 Resource Acquisition Is Initialization,资源获取即初始化。表示将资源的生命周期绑定到对象的生命周期上,初始化对象时自动获取资源,对象离开作用域时自动释放资源。在这里是说该锁唯一的读写权被绑定到了该线程的守卫的生命周期上。 ↩