在多线程编程当中,线程安全(thread safety)是非常重要的。借用英文维基的定义:一个函数如果能够被多个线程同时调用或访问而不会导致意外行为、竞态条件或数据损坏,那么这个函数就是线程安全(thread-safe)的。
由于多线程的代码是同时运行的,因此我们无法保证线程间的执行顺序,这会导致一些问题:
Rust 为多线程编程提供了多种解决以上问题的途径,下面请听我娓娓道来。
为了描述某一类型是否是线程安全的,Rust 设计了两个特征——Send 和 Sync。它们是多线程环境下保障线程安全的核心。
这两种特征都是标记特征,也是自动特征。所以它们都没有任何方法,而且编译器会为类型自动实现它们——只要该类型所有字段已经实现。
这两种特征的含义分别为:
Send 的。&T 可以在线程间传递),那么它就是 Sync 的。根据上面的区分,我们可以得出:T 是 Sync 的,与 &T 是 Send 的,是等价的两种说法。
标准库已经为绝大部分类型实现了这两种特征,因此大多数情况下我们所使用的数据类型(例如 bool、i32 和集合类型)都是线程安全的。但是总有例外:
*const T / *mut T)不会自动实现这两种特征;UnsafeCell<T> 不会自动实现 Sync;Rc 不会自动实现这两种特征。因此在讲解智能指针的时候我们特别提到了各种 cell 的线程安全版本以及 Rc 的线程安全版本 Arc。
Arc<T>原子引用计数(Atomically reference counted)Arc——一种线程安全的引用计数指针。它和 Rc 的功能非常类似,最大的不同是它更适合多线程环境。
与 Rc<T> 不同,Arc<T> 使用原子操作进行引用计数,因此它是线程安全的。但代价是原子操作比普通内存访问的开销更大。所以仅在必要时使用 Arc,不需要在线程间共享引用计数时老老实实用 Rc 绝对没问题。
Important
Arc<T> 会在 T 实现 Send 和 Sync 的前提下自动实现 Send 和 Sync。这代表 Arc 只能在内部数据本身是线程安全的前提下为其提供保障,而不能把本不是线程安全的数据变得安全。
一个残酷的事实是像 Arc<RefCell<T>> 这样的用法不可取:RefCell<T> 不是 Sync 的,内部数据本身线程不安全,即使外部套一层安全的保护壳也无济于事。
因此,最终你可能需要使用 std::sync 提供的类型来同 Arc 组合使用。
Rc<T> 与 Arc<T> 同为引用计数指针,它们在诸多方面有着相似的特性,例如共享所有权、解引用、内部可变性;另外在 Rc 中出现的循环引用也可能在 Arc 中出现,解决方法同样是使用 std::sync::Weak 打破循环。
下面是一个在线程间共享不可变数据的例子:
use std::sync::Arc;
use std::thread;
let five = Arc::new(5usize);
for _ in 0..10 {
let five = Arc::clone(&five);
thread::spawn(move || {
println!("{five:?}");
});
}
想在多线程中对共享数据进行更复杂的操作?你也可以像下面这样使用原子类型。
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let val = Arc::new(AtomicUsize::new(5));
for _ in 0..10 {
let val = Arc::clone(&val);
thread::spawn(move || {
let v = val.fetch_add(1, Ordering::Relaxed);
println!("{v:?}");
});
}
Mutex在 Rust 当中,互斥锁(Mutual exclusion)Mutex 是一种用于保护共享数据的互斥原语。它确保了同一时刻最多只有一个线程能够访问某些数据。
互斥锁可以通过 new 创建。每个互斥锁都有一个类型参数,代表它所保护的数据。
如果某个线程想访问数据,只能通过 lock 和 try_lock 方法返回的 RAII1 守卫 MutexGuard 来访问,这保证了数据只有在互斥锁被锁定时才能被访问。需要注意的是,如果当前线程已经持有了该锁,再次调用 lock 是很危险的——可能会发生死锁或 panic。
lock 和 try_lock 方法的不同点在于:lock 会阻塞当前线程直到成功获取锁,如果使用不当会在代码中埋下死锁的隐患;而 try_lock 方法则只是进行一次获取锁的尝试,如果无法获取会立即返回 Err 而不阻塞线程。既然它不阻塞线程那就大大降低了死锁的可能,因此 try_lock 有时会用于替代 lock 以避免死锁。
这两种方法返回的 MutexGuard 是一种智能指针。由于它实现了 Deref 特征,被自动解引用后会获得一个指向 Mutex 内部数据的引用。同时它还实现了 Drop 特征,因此当守卫离开其作用域时,将自动解锁该 Mutex,以便其它线程能继续获取锁。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// 调用 `lock` 来获取互斥锁,这会阻塞当前线程直到成功获取
// 它返回一个 RAII 守卫,负责在作用域内自动管理锁的释放
// 当守卫离开其作用域时,将自动解锁该互斥锁
let mut num = counter.lock().unwrap();
*num += 1;
}); // 进行到这里,线程结束,锁会被守卫自动释放
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
互斥锁有一种称为“中毒”的策略:当互斥锁检测到持有它的线程发生 panic 时,该互斥锁就会“中毒”。一旦互斥锁中毒,默认情况下所有其他线程都将无法访问其中的数据,因为数据可能已被破坏(某些不变量可能不再是不变的)。
当中毒的锁被发生 panic 的线程释放后,另一线程调用 lock 试图获取锁时将返回 Err。此时本应返回的 MutexGuard 会被包含在此 Err 当中,这允许用户对其进行处理来重新恢复访问。
让我们来模拟一下互斥锁中毒的情况,以及中毒时如何恢复对锁的访问:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(1));
let c_mutex = Arc::clone(&mutex);
// 互斥锁将在此线程中毒
let _ = thread::spawn(move || {
// 此线程将首先获取互斥锁,但现在锁尚未中毒
// 因此可以安全地对 `lock` 返回值调用 `unwrap`
let mut data = c_mutex.lock().unwrap();
*data = 2;
// 故意 panic 此线程让锁中毒
panic!();
}).join();
// 在主线程试图访问中毒的锁
match mutex.lock() {
Ok(_) => unreachable!(),
Err(p_err) => {
let data = p_err.into_inner();
// 也可以是
// let data = p_err.get_ref();
println!("recovered: {data}"); // recovered: 2
}
};
Mutex 还很贴心地为用户提供了清除中毒状态的方法——clear_poison:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(0));
let c_mutex = Arc::clone(&mutex);
let _ = thread::spawn(move || {
let _lock = c_mutex.lock().unwrap();
panic!(); // 锁中毒了
}).join();
assert_eq!(mutex.is_poisoned(), true);
let x = mutex.lock().unwrap_or_else(|mut e| {
**e.get_mut() = 1; // 第一次解引用取得 MutexGuard
// 第二次解引用取得守卫对应内部值的可变引用
mutex.clear_poison(); // “解毒”
e.into_inner() // 消耗掉 PoisonError 来获取守卫
});
assert_eq!(mutex.is_poisoned(), false);
assert_eq!(*x, 1);
Arc 与 Mutex 结合实现内部可变性在谈到智能指针的时候,Rc 和 RefCell 结合可以实现内部可变性,而到多线程的情境下这条路走不通了,新的解决方案就是 Arc 和 Mutex!
Mutex 提供修改内部数据的能力,而 Arc 则共享数据所有权。两者一拍即合。
现在介绍几种常见的死锁情景。通过了解这些潜在的陷阱,能帮助你更好地避免写出带有死锁隐患的代码。
前面讲 lock 方法时提到如果当前线程已经持有了该锁,就禁止重复调用该方法,这可能导致死锁。此现象对应的简单事实是——只要你在另一个锁还未被释放时去申请新的锁,就容易触发死锁。当然,解决方法就是使用前面提到的 try_lock 替代 lock,因为 try_lock 不会阻塞线程。
下面是单线程触发死锁的例子:
use std::sync::Mutex;
// 主线程发生死锁,导致此程序永远无法终止
fn main() {
let data = Mutex::new(0);
let _d1 = data.lock();
let _d2 = data.lock();
}
// 当程序很复杂时
// 死锁并不一定像这样显而易见
还有一种情况是:当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁,此时就可能发生死锁。
下面这个例子就是创建了两个锁,且两个线程各自先使用了其中一个,并且都想去访问另一线程所使用的另一个锁。结果就是你只会看到屏幕输出每个线程获取了它的第一个锁,却看不到其余信息。
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let data1 = Arc::new(Mutex::new(42));
let data2 = Arc::new(Mutex::new(true));
let mut handles = vec![];
for id in 0..2 {
if id == 0 {
let data1_1 = Arc::clone(&data1);
let data2_1 = Arc::clone(&data2);
let h1 = thread::spawn(move || {
let _g1 = data1_1.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g2 = data2_1.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h1);
} else {
let data1_2 = Arc::clone(&data1);
let data2_2 = Arc::clone(&data2);
let h2 = thread::spawn(move || {
let _g2 = data2_2.lock().unwrap();
println!("Thread {id} has acquired its 1st mutex.");
thread::sleep(Duration::from_millis(10));
let _g1 = data1_2.lock().unwrap();
println!("Thread {id} has acquired its 2nd mutex.");
println!("Thread {id} ends here.");
});
handles.push(h2);
}
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("Deadlock didn't occur.");
println!("Main thread ends here.");
}
解决方法同样是使用 try_lock 替代 lock。
RwLock读写锁(reader-writer lock)RwLock 提供这样一种互斥机制:允许同时有多个读取者,但同一时刻只允许有一个写入者。在某些情况下,这比互斥锁更高效。
对比 Mutex 和 RwLock,前者不区分获取锁的是读取者还是写入者,谁获取了锁,谁就同时拥有了读取和写入的权利,因此会阻塞任何等待锁可用的线程;而 RwLock 只要没有写入者持有锁,就允许任意数量的读取者获取锁(某种意义上这和借用规则很相似:同一作用域只能有一个可变引用或多个不可变引用)。
类型参数 T 表示此锁保护的数据。T 必须满足 Send 特征(以便在线程间共享)和 Sync 特征(以允许通过读取者进行并发访问)。
正如其名,读写锁最为重要的两种方法分别是“读”(read 与 try_read)和“写”(write 与 try_write)。和互斥锁的“上锁”方法类似,没有 try 字样的方法会阻塞线程直至成功读取或写入,而有 try 的方法则立即返回尝试的结果。
下面是在多个线程中进行读取的例子:
use std::sync::{Arc, RwLock};
use std::thread;
let lock = Arc::new(RwLock::new(1));
let c_lock = Arc::clone(&lock);
// 在主线程中读取
let n = lock.read().unwrap();
assert_eq!(*n, 1);
thread::spawn(move || {
// 在此线程中读取
let r = c_lock.read();
assert!(r.is_ok());
}).join().unwrap();
前面提到了,读写锁同时只能有一个写入者,且此时不允许有读取者。故称 RwLock 的写入操作是独占的,此时该锁会被锁定,其它线程无论想读取还是写入都不行。下面是单线程中只写入的例子:
use std::sync::RwLock;
let lock = RwLock::new(1);
// 在主线程中写入
let mut n = lock.write().unwrap();
*n = 2;
// 尝试在同一线程进行读取会失败
assert!(lock.try_read().is_err());
还有一个单线程先后进行读取和写入的例子:
use std::sync::RwLock;
let lock = RwLock::new(5);
{
// 可以同时进行多个读取操作
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读取者在此处被析构
{
// 同时只允许有一个写入者
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
} // 写入者在此处被析构
与 Mutex 一样,RwLock 也会有锁中毒的现象发生。但需要注意的是,只有当前线程的 RwLock 是写入者时线程 panic 才会使得锁中毒,如果是读取者所在线程发生 panic 是不会中毒的。
和 Mutex 类似,RwLock 提供了 clear_poison 方法用于“解毒”,也可以通过处理返回的 Err 来继续使用锁。
文档中举出了一个潜在发生死锁的例子,我稍微改良了一下:
─────────Thread 1────────┬────────Thread 2─────────
let _rg1 = lock.read(); │
│ // 此操作会阻塞线程
│ let _wg = lock.write();
// 可能发生死锁 │░░░░░░░░░░░░░░░░░░░░░░░░░
let _rg2 = lock.read(); │░░░░░░░░blocking░░░░░░░░░
░░░░░░░░░░░░░░░░░░░░░░░░░│░░░░░░░░░░░░░░░░░░░░░░░░░
▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒deadlock▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
两个线程如果按照从上到下的时序依次执行了对应语句,那么程序最终就可能发生死锁。
通过分析,线程被阻塞会大大增加死锁的可能,我们如果全部用非阻塞的 try 版本替换,可以预见死锁应该会销声匿迹,但代价是必须做大量的错误处理和失败重试机制。
那如何进行权衡?这是一个很难回答的问题,需要编程者去寻找属于自己的答案。
Mutex 还是 RwLock在操作简单性上 Mutex 可谓完胜。RwLock 需要考虑很多问题:读和写不能同时发生、写入者饥饿现象以及相比互斥锁的性能损失……
读写锁的写入者饥饿现象
当读多写少时,写入操作可能会因为一直无法获得锁导致连续多次的失败。此现象称为读写锁的写入者饥饿(writer starvation)。
但 RwLock 确实有其擅长的领域:追求高并发读取,且需要对读到的数据进行较多操作。在这种情况下,RwLock 恐怕是最优解。
Condvar条件变量(condition variable)Condvar 提供了在线程等待某个事件发生时将其阻塞的能力。
前面的 Mutex 和 RwLock 解决了在线程间访问与修改共享数据的安全问题,它们针对“线程安全”交出的答卷分别是:Mutex 把读写权限限定在唯一线程,而 RwLock 保证了读和写不会同时发生。但这两种类型只是保障了共享数据的安全,它们依旧存在着诸多问题,单单依靠它们还会有很多局限。这就是我们还需要其它线程安全类型的原因。
考虑下面这个场景:某个线程需要访问互斥锁,但是在访问前需要先由其它线程进行一些处理工作,如果其它线程没处理完,该线程就不能进行访问。也就是说此线程需要满足一定的条件才能获取锁。要实现这种逻辑,你可能会想到使用一个线程间共享的变量(比如布尔值),其它线程每当处理完自己的工作后就修改这个共享的变量,然后在需要等待条件成熟的线程中使用一个循环不断检测共享变量是否满足了要求,一旦满足即可获取锁。
听起来好像能行,但实际上这招漏洞百出:首先我们使用循环不间断地轮询条件是否满足,这是一种忙等待(busy-waiting)并且会浪费 100% 的 CPU 资源;其次我们要在线程间使用共享变量就需要考虑其同步问题,如何能一边在循环中不断读取该变量值,一边又在其他线程中试图修改它呢?
或许我们需要一种线程间的通信机制。而本节的条件变量,它就能应对以上这种需要“条件”的场景,并且一定程度上解决了共享数据访问时序的问题。
Mutex 搭配使用由于 Condvar 自身没有锁那样储存数据的能力,所以都会将其与 Mutex 绑定使用。关于条件变量的操作,最重要的概念是“等待”和“通知”。
Condvar 的 wait 方法会让当前线程先“等待”一下,先不要获取锁(可能因为条件尚不满足),等其它线程“通知”该线程说条件已经满足的时候,再结束阻塞继续使用锁。该方法会接收一个 MutexGuard,然后暂时性地解锁该互斥锁,并且当前线程会被阻塞。我们知道,获得了 MutexGuard 的线程就是当前持有该锁的线程,而将其传入 wait 会使锁解锁,并让其它线程能够进行获取锁。
Condvar 的 notify_one 和 notify_all 方法则用于“通知”。它们会通知某一或全部的因等待而阻塞的线程,并将之唤醒。而被唤醒线程的 wait 则会重新获取锁并返回守卫。
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
// Mutex 内部的 bool 表示当前的条件
// 为 false 表示主线程尚未具备访问锁的条件
// 为 true 表示主线程可以访问锁了
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// 派生线程
thread::spawn(move || {
println!("Spawned thread starts running.");
let (lock, cvar) = &*pair2;
// 获取锁的守卫
let mut started = lock.lock().unwrap();
println!("Spawned thread has acquired the mutex.");
// 将条件置为 true,表示主线程已经可以访问锁了
*started = true;
// 通知因条件不满足而被阻塞的主线程
cvar.notify_one();
println!("Spawned thread notifies the main thread.");
});
// 主线程作为等待线程
let (lock, cvar) = &*pair;
// 获取锁的守卫
let mut started = lock.lock().unwrap();
// 当条件还是 false 时循环等待
while !*started {
println!("Main thread is waiting for the mutex.");
// 一旦条件满足,主线程会被唤醒
// 此时 wait 会自动重新获取锁并返回锁的守卫
started = cvar.wait(started).unwrap();
println!("Main thread has reacquired the mutex.");
}
println!("Main thread has quitted the loop.");
// 现在可以开始使用锁了
assert!(*started);
}
实际上,你并不需要每次使用条件变量的时候都写 while 和 wait。标准库提供了 wait_while 方法,一次性传入守卫和循环条件即可。用 wait_while 重写上面的例子就是这样:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// 派生线程
thread::spawn(move || {
let (lock, cvar) = &*pair2;
// 获取锁的守卫
let mut started = lock.lock().unwrap();
// 将条件置为 true,表示主线程已经可以访问锁了
*started = true;
// 通知因条件不满足而被阻塞的主线程
cvar.notify_one();
});
// 主线程作为等待线程
let (lock, cvar) = &*pair;
// 当条件还是 false 时循环等待
// 一旦条件为 true,主线程会被唤醒
// 此时 wait_while 会自动重新获取锁并返回锁的守卫
let guard = cvar
// 传入锁的守卫 // 传入循环等待条件
.wait_while(lock.lock().unwrap(), |started| !*started)
.unwrap();
// 现在可以开始使用锁了
assert!(*guard);
}
看了上面的讲解,你对条件变量可能依旧是一头雾水。下面我会进一步展开,同时引出一个在条件变量当中广泛存在的问题。
条件变量实质上由一个“条件”和一个“队列”组成。许多线程(一般被称为消费者)想要等待某个条件,但此时尚不具备这样的条件,于是这些线程就被放进队列中进行阻塞等待。在未来的某个时刻,当另一个线程(一般被称为生产者)创造了这样的条件,它就会通知等待队列中的一个或多个线程起来工作。这就是条件变量的基本工作方式。
形式上,条件变量必须与另外两种外部对象配合使用:一个是“条件”(可被视为一个 Predicate),另一个是互斥锁(用于同步对“条件”的访问)。这两种外部对象对于条件变量的正确性来说是必要的,缺少其中任何一个都可能会导致信号丢失。条件变量的最佳——也是唯一正确的——实践是所谓“循环等待”模式:
wait,它会将互斥锁解锁,然后阻塞消费者线程来等待条件成熟。notify_one)或全部(notify_all)的等待线程。wait 被唤醒时,它会自动重新获取锁然后返回守卫。注意这里的第 5 步,为什么消费者被唤醒后还需要重新检查条件?答案是因为此时条件可能已经不满足要求了。当消费者被 notify 后,如果还没来得及获取到锁,其他某个线程就抢先获取了锁并修改了条件,那么消费者再获取到锁的时候很可能条件就不成立。这种现象称为虚假唤醒(spurious wakeup)。它和底层操作系统的线程调度机制密切相关,在很多现代 OS 的实现中,这种现象是可以容忍的,并建议将之交由上层进行处理。
Rust 在实际使用中,等待线程需要使用循环(比如例子的 while 以及 wait_while)来检查条件是否满足,而不是 if。这正是为了防范虚假唤醒现象发生:通过永远在循环中检查条件谓词(Predicate),而不是依赖单次 wait 返回就认为条件已满足。
std::marker::Send 官方文档std::marker::Sync 官方文档Send 和 Sync 的解释std::sync 官方文档即 C++ 中的 Resource Acquisition Is Initialization,资源获取即初始化。表示将资源的生命周期绑定到对象的生命周期上,初始化对象时自动获取资源,对象离开作用域时自动释放资源。在这里是说该锁唯一的读写权被绑定到了该线程的守卫的生命周期上。 ↩