仍然是面试速记。

死锁的 408 八股

考研 408 必背八股,谁都会问,但笔者全都以举例的方式逃过去了,不过还是记一下。

死锁的四大条件与预防死锁的四大方法(一一对应)

  • 资源互斥,最不可能取消的条件,不互斥的资源自然也不会有同步需求,各管各的就好
  • 持有并等待,反义词是检测到死锁就回退并重新获取锁,抛开极端情况不谈(时机恰恰好到无论怎么回退,两个资源争夺方都会以相同的顺序抢夺资源然后失败),这种方式比较好实现
  • 不可剥夺,反义词是按某种规则检测到死锁后就老杀新或者新杀老,这样能保证同一时间一定有一个任务能被推进下去,被杀死的任务自动回退即可
  • 环路等待,反义词是保持资源申请顺序一致,比如数据库给数据加行锁,无论是给哪条数据加,都要从第一条数据开始加,然后给每条数据依次加,不如直接加个大表锁,性能肯定更好,因此实践中其实比较难保证资源申请顺序一致。当然也有 B-tree 的 crabbing 协议这样的精巧的数据结构上锁方式。这个条件是基于持有并等待不可剥夺才能成立的条件,一般来说无论是不满足持有并等待还是不满足不可剥夺环路等待都将不成立。

避免死锁

预防死锁是设计静态的系统规则预防死锁,而避免死锁则是使用安全序列申请资源以避免系统进入死锁状态(换言之使用不安全序列申请资源的话,系统还是会进入死锁状态)。

银行家算法。不介绍,因为这是建立在申请资源信息全知的状态下的算法,银行由于使用某些算法能算出来大家的信用额度和需求所以还能用一下,计算机系统是基本做不到的。这个算法就和 OPT 页面置换算法一样,无法使用在实际系统中。

检测和处理死锁

环路等待 条件的细化处理,反正死锁肯定是有环了,检测出来再破坏掉持有并等待或者不可剥夺条件就好了。

CAS(Compare And Set 或者 Compare And Swap)函数

CAS 锁威名远扬,但首先需要清楚 CAS 函数并非 CAS 锁,CAS 锁与 CAS 函数之间尚且存在一些实现思想需要记一下。

CAS 锁是依托于 Test And Set 等 CPU 指令实现的锁,其核心在于 CAS 函数,以下是 CAS 函数的伪代码:

1
2
3
4
5
6
int compare_and_swap (int* reg, int oldval, int newval)
{
int old_reg_val = *reg;
if (old_reg_val == oldval) *reg = newval;
return old_reg_val;
}

需要将这三行代码看作一个整体,举例有:

使用 CAS 函数实现 lock 与 unlock 函数(最普通的 CAS 锁)

  1. 约定 lock 操作是 while(compare_and_swap(lock, 0, 1)==1),unlock 操作是 lock = 0
  2. compare_and_swap(lock, 0, 1) 意味着 lock 为 0 时函数设置 lock 为 1 并返回 0,lock 为 1 时函数直接返回 1。
  3. 考虑两个线程 A 与 B 同时发出了这条指令:
    • A 线程首先执行,lock 初始化为 0,它将 lock 置为 1 后并返回 0,得以退出 while 循环执行后续代码;
    • 而执行到 B 这条指令时,lock 为 1,因此 B 会不断的执行 while(1=1) 这一逻辑,直到 A unlock。

换言之,只要双方实现约定好两个值,这里的 0 和 1 可以变成任意 x 和 y 变量以达成相同的效果。

从上述过程也能看出 CAS 锁为什么被称为轻量级锁的原因。相较于会导致线程切换(陷入内核,保存线程 context,加入等待序列巴拉巴拉)的锁,CAS 锁只需一条指令检测一次,lock 为 0 了立马就能获得锁,这可太轻量级了。

那么 CAS 锁又在什么时候更高效呢?线程切换需要先将 B 切走然后再切回来,这之间起码几十条指令执行的时间。也就是说,当判断程序临界区只需最多十几条指令(反正小于两次线程切换所需的时间,JVM 的锁升级过程就是先来 10 条 CAS 指令等一等,等不到才换成线程切换的锁)就能执行完成时,CAS 锁理论上一定是会比线程切换更加高效的。

使用 CAS 函数实现的无锁链表

那么难道就只有 compare_and_swap(lock, 0, 1) 这样的固定写死 0 和 1 这两个值的使用方式吗?并非如此,因为 CAS 还能将 0 和 1 换成变量以实现无锁链表。

比如说希望只靠 compare_and_swap 函数实现线程安全的链表插入操作:

1
2
3
4
5
6
7
8
9
10
11
bool insert(Node* prev, Node* newNode) {
Node* next = prev->next;
newNode->next = next;
while(next = compare_and_swap(&prev->next, next, newNode)){
if(newNode->next == next){
break;
}
newNode->next = next;
}
return true;
}

上述操作的逻辑为:

  1. 首先将 newNode->next 设置为 prev->next,该操作是线程安全的,毕竟不会有多个线程同时插入同一个 newNode
  2. 然后试图将 prev->next 设置为 newNode,这里需要考虑多线程同时设置不同的 newNode 会导致某些 newNode 更改丢失的竞态问题,也就是多条 prev->next 赋值语句先后执行会导致以前的值被覆盖
  3. compare_and_swap 怎么解决这个问题的?compare_and_swap 保证了对于同一个 next(即 prev->next 的旧值),有不同 newNode 的多条指令只会成功设置一条。这条指令成功后,其他指令就会失败并返回 prev->next 的新值。
  4. 那么如何分辨谁成功谁失败了?不妨假设 newNodeA 插入成功、newNodeB\C\D 插入失败。这时可以检测 next == newNode->next
    • 相等则意味着 newNode 已经插入成功了,可以跳出 while 循环了;
    • 不等则意味着 newNode 插入失败,这时的 next 实际上已经是更新过的 prev->next 了(即成功插入的 newNodeA)所以需要重新将 newNodeB/C/D->next 设置为新的 next 再次执行 cas 操作。可以预见的是接下来假如 newNodeB 插入成功,又需要将 newNodeC/D->next 指向 B 然后继续 cas,以此类推。

注意到上述代码并没有使用 CAS 实现的 lock 操作(即 CAS 锁),而是直接使用了 CAS 函数,此谓之无锁链表(允许吐槽)。

也可以思考这个无锁链表相较于有锁链表优化了什么地方,比如经典的有锁链表实现:

1
2
3
4
5
6
7
8
bool insert(Node* prev, Node* newNode) {
mutex.lock();
Node* next = prev->next;
newNode->next = next;
prev->next = newNode;
mutex.unlock();
return true;
}

对比可以发现就是使得 newNode->next = next; 这一步可以无锁运行了,但是这也带来了额外的判断 next == newNode->next 的开销以及失败重试导致的多次 newNode->next = next; 赋值开销,因此无锁链表这种微妙的结构在实际应用场景中并不一定就比单纯的 CAS 锁性能高到哪去,相当难以用好。

要不是…腾讯起手无锁编程:(,笔者也想…

Java volatile、原子变量与内存序

多线程代码示例

3 个线程交替打印 1~100

使用条件变量(Condition Variable)来协调线程的执行顺序。

C++

C++ 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
int current = 1; // 当前要打印的数字
const int maxNum = 100;

void printNumbers(int threadID, int mod) {
while (current <= maxNum) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [mod] { return (current - 1) % 3 == mod; }); // 确保当前线程是正确的轮次
if(current <= maxNum){
std::cout << "Thread " << threadID << ": " << current++ << std::endl;
}
lock.unlock();
cv.notify_all(); // 通知其他等待的线程
}
}

int main() {
std::thread threads[3];
for (int i = 0; i < 3; ++i) {
threads[i] = std::thread(printNumbers, i + 1, i);
}
for (auto &th : threads) th.join();
return 0;
}

Java

Java 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class AlternatePrinting {

private static final Object lock = new Object();
private static int current = 1;
private static final int maxNum = 100;

public static void main(String[] args) {
Thread[] threads = new Thread[3];
for (int i = 0; i < 3; i++) {
final int index = i;
threads[i] = new Thread(() -> {
while (current <= maxNum) {
synchronized (lock) {
while ((current - 1) % 3 != index) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if(current <= maxNum){
System.out.println("Thread " + (index + 1) + ": " + current++);
}
lock.notifyAll();
}
}
});
threads[i].start();
}
for (Thread th : threads) {
try {
th.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

读写锁

C++

C++ 读优先 - 读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class SimpleRWLock {
private:
std::mutex mutex_;
std::condition_variable cv_;
int readers_ = 0; // 当前活跃的读者数量
bool writer_active_ = false; // 是否有写者在工作

public:
void lock_read() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !writer_active_; });
++readers_;
}

void unlock_read() {
std::lock_guard<std::mutex> lock(mutex_);
--readers_;
if (readers_ == 0) {
cv_.notify_all();
}
}

void lock_write() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !writer_active_ && readers_ == 0; });
writer_active_ = true;
}

void unlock_write() {
std::lock_guard<std::mutex> lock(mutex_);
writer_active_ = false;
cv_.notify_all();
}
};

SimpleRWLock rwlock;

void reader(int id) {
rwlock.lock_read();
std::cout << "Reader " << id << " is reading.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rwlock.unlock_read();
}

void writer(int id) {
rwlock.lock_write();
std::cout << "Writer " << id << " is writing.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(300));
rwlock.unlock_write();
}

int main() {
std::vector<std::thread> threads;
for (int i = 1; i <= 5; ++i) {
threads.emplace_back(reader, i);
threads.emplace_back(writer, i);
}
for (auto &th : threads) th.join();
return 0;
}
C++ 写优先 - 读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class WriterPreferenceRWLock {
private:
std::mutex mutex_;
std::condition_variable cv_;
int readers_ = 0; // 当前活跃的读者数量
bool writer_active_ = false; // 是否有写者在工作
bool writers_waiting_ = false; // 是否有写者在等待

public:
void lock_read() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !writer_active_ && !writers_waiting_; });
++readers_;
}

void unlock_read() {
std::lock_guard<std::mutex> lock(mutex_);
--readers_;
if (readers_ == 0) {
cv_.notify_all();
}
}

void lock_write() {
std::unique_lock<std::mutex> lock(mutex_);
writers_waiting_ = true;
cv_.wait(lock, [this]() { return !writer_active_ && readers_ == 0; });
writer_active_ = true;
writers_waiting_ = false;
}

void unlock_write() {
std::lock_guard<std::mutex> lock(mutex_);
writer_active_ = false;
cv_.notify_all();
}
};

// 示例函数略...
C++ 公平锁 - 读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

class FairRWLock {
private:
std::mutex mutex_;
std::condition_variable cv_;
bool writer_active_ = false; // 是否有写者在工作
int readers_ = 0; // 当前活跃的读者数量
std::queue<std::pair<bool, std::promise<void>>> waiters_; // 请求队列,bool表示是读者(false)还是写者(true)

public:
void lock_read() {
std::unique_lock<std::mutex> lock(mutex_);
auto pr = std::promise<void>();
auto f = pr.get_future();
waiters_.emplace(false, std::move(pr));
cv_.notify_one(); // 唤醒可能的等待线程
f.wait(); // 等待信号

if (waiters_.front().first == false) { // 如果头部是我们自己的请求
++readers_;
waiters_.pop();
}
}

void unlock_read() {
std::lock_guard<std::mutex> lock(mutex_);
--readers_;
if (readers_ == 0) {
cv_.notify_all(); // 可能有一个写者在等待
}
}

void lock_write() {
std::unique_lock<std::mutex> lock(mutex_);
auto pr = std::promise<void>();
auto f = pr.get_future();
waiters_.emplace(true, std::move(pr));
cv_.notify_one(); // 唤醒可能的等待线程
f.wait(); // 等待信号

if (waiters_.front().first == true) { // 如果头部是我们自己的请求
writer_active_ = true;
waiters_.pop();
}
}

void unlock_write() {
std::lock_guard<std::mutex> lock(mutex_);
writer_active_ = false;
cv_.notify_all(); // 唤醒下一个等待者(可能是读者或写者)
}

void check_and_signal() {
std::unique_lock<std::mutex> lock(mutex_);
while (!waiters_.empty()) {
auto& front = waiters_.front();
if (front.first == false && !writer_active_) { // 如果是读者且无写者活动
front.second.set_value(); // 允许读者继续
} else if (front.first == true && readers_ == 0 && !writer_active_) { // 如果是写者且无读者和写者活动
front.second.set_value(); // 允许写者继续
} else {
break; // 没有符合条件的等待者,退出循环
}
}
}

void signal_waiters() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !waiters_.empty(); });
check_and_signal();
}
};

// 示例函数略...