之前在503接触过些多线程编程。现在将其总结下!

每个人在时代中都是一粒沙子~

多线程注意事项

避免使用全局变量

原因:全局变量在多线程环境中是共享的,多个线程可能同时访问或修改它,导致数据竞争。

解决方案

  • 使用 std::mutex 来保护对全局变量的访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int global_counter = 0;

void increment() {
std::lock_guard<std::mutex> lock(mtx);
++global_counter;
}

int main() {
std::thread t1(increment);
std::thread t2(increment);

t1.join();
t2.join();

std::cout << "Global counter: " << global_counter << std::endl;
return 0;
}

进一步思考:全局变量在一个进程中修改,会影响到另一个进程吗?

答案:不会

在操作系统中,每个进程都有自己独立的内存空间。全局变量是定义在函数之外的变量,它们的作用域是整个文件,但在多进程环境中,每个进程都有自己的全局变量副本。因此,一个进程中修改全局变量不会影响到另一个进程。

验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <stdio.h>
#include <unistd.h>

int global_var = 0;

int main() {
pid_t pid = fork();

if (pid == 0) {
// 子进程
global_var = 1;
printf("Child process: global_var = %d\n", global_var);
} else if (pid > 0) {
// 父进程
sleep(1); // 等待子进程修改全局变量
printf("Parent process: global_var = %d\n", global_var);
} else {
// fork 失败
perror("fork");
return 1;
}

return 0;
}

原子操作

使用:使用 std::atomic 进行原子操作,避免使用锁。

1.自己不会被优化,相当于voliate(经验之谈:当两个线程都访问同一个变量,请使用atomic,防止优化产生逾期错误,因为atomic修饰的变量不会被优化)

2.使用内存序

避免:直接操作共享变量没有加锁或使用不安全的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<int> atomic_counter(0);

void increment() {
for (int i = 0; i < 1000; ++i) {
++atomic_counter;
}
}

int main() {
std::thread t1(increment);
std::thread t2(increment);

t1.join();
t2.join();

std::cout << "Atomic counter: " << atomic_counter << std::endl;
return 0;
}

std::atomic 的底层原理

  1. 原子指令
    • CPU 提供了专门的原子指令来保证操作的原子性。例如,Intel 的 x86 架构提供了 LOCK 前缀和 CMPXCHG(比较并交换)指令,ARM 架构提供了 LDREXSTREX 指令。
    • 这些原子指令确保在执行操作时,其他线程或进程不能干扰当前操作,从而避免数据竞争和一致性问题。
  2. 内存屏障
    • 内存屏障(Memory Barriers)或内存顺序(Memory Order)用于确保指令执行的顺序和可见性。std::atomic 提供了不同的内存顺序选项(如 memory_order_relaxedmemory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst)来控制这些屏障。
    • 内存屏障可以确保在多线程环境下对原子变量的操作按预期顺序执行。例如,memory_order_acquire 确保在读取原子变量后,所有后续操作的读写在该线程中都不会被重排到读取之前。
  3. CAS(比较并交换)
    • std::atomic 中许多原子操作(如 compare_exchange_weakcompare_exchange_strong)基于 CAS 操作。CAS 操作的基本原理是:比较当前值和期望值,如果它们相等,则将变量更新为新值。
    • 这个操作是原子的,确保在更新过程中不会被其他线程的操作干扰。

std::atomic 的常见操作

  1. 原子加载和存储
    • std::atomic 提供了 load()store() 方法【load()和store()是重载函数】来进行原子读取和写入。底层这些操作会利用 CPU 提供的原子指令来保证操作的原子性。
  2. 原子交换
    • std::atomic 提供了 exchange() 方法来进行原子交换操作。它会将原子变量的当前值替换为新值,并返回旧值。
  3. 原子加法和减法
    • std::atomic 提供了 fetch_add()fetch_sub() 方法来进行原子加法和减法操作。这些操作保证了对变量的增加或减少是原子的。
  4. 原子比较和交换
    • std::atomic 提供了 compare_exchange_weak()compare_exchange_strong() 方法来进行原子比较和交换操作。它们允许在某个值等于期望值时进行原子更新。

简单问题:++i和i++是否是原子操作?

答案:不是

无论是 ++i 还是 i++,它们都涉及以下步骤:

  1. 读取变量值:从内存中读取变量的当前值。
  2. 修改值:将读取的值加 1。
  3. 写回变量:将修改后的值写回内存中的变量。

【可以防止被编译器优化】

线程本地存储(TLS)

使用:如果每个线程需要独立的数据副本,可以使用 thread_local 关键字。

避免:共享变量不适用于每个线程需要独立的数据场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <thread>

thread_local int thread_local_variable = 0;

void thread_func()
{
thread_local_variable++;
std::cout << "Thread-local variable: " << thread_local_variable << std::endl;
}

int main()
{
std::thread t1(thread_func);
std::thread t2(thread_func);

t1.join();
t2.join();

return 0;
}

底层原理

  1. 线程本地存储(TLS)
    • thread_local 变量的底层实现通常利用操作系统提供的 TLS 机制。线程本地存储允许线程在其自身的地址空间中拥有私有的数据区域,这些数据区域对其他线程是不可见的。
    • 在 Windows 上,TLS 由线程局部存储的 API(如 TlsAllocTlsSetValueTlsGetValue)提供支持。在 POSIX 系统(如 Linux)上,通常通过线程控制块(Thread Control Block,TCB)或线程局部存储段(Thread-Local Storage Segment)实现。
  2. 编译器支持
    • 编译器为 thread_local 变量生成特定的代码,以确保每个线程都有独立的数据副本。编译器会将 thread_local 变量的数据存储在专门的线程本地存储区域,并使用操作系统提供的机制来分配和管理这些区域。【thread_local 变量通常被存储在线程本地存储区域(Thread-Local Storage, TLS)
    • 在编译时,编译器通常会为每个 thread_local 变量生成额外的代码,以处理变量的初始化、销毁和访问。
  3. 线程局部存储的访问
    • 访问 thread_local 变量时,编译器生成的代码会根据当前线程的标识符(Thread ID)从线程本地存储区域中检索变量的值。这通常涉及到查询线程控制块或其他线程本地数据结构。
    • 线程本地存储机制可能会在变量首次被访问时初始化变量,并在线程结束时销毁变量。

锁的使用

锁粒度优化

使用:在访问共享资源时使用 std::mutexstd::shared_mutex。使用 RAII 的锁类(如 std::lock_guard)以确保锁的正确释放。

避免:使用过多或不恰当的锁导致性能低下或死锁。

读写锁

读写锁

读写锁允许多个线程同时读取数据,但在写数据时会对所有读线程和其他写线程加锁。这种锁适用于读操作远多于写操作的场景,可以显著减少锁竞争。

示例(使用 std::shared_mutex):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>

std::shared_mutex rw_lock;
std::vector<int> data;

void read_data() {
std::shared_lock<std::shared_mutex> lock(rw_lock);
std::cout << "Read data: ";
for (const auto& item : data) {
std::cout << item << " ";
}
std::cout << std::endl;
}

void write_data(int value) {
std::unique_lock<std::shared_mutex> lock(rw_lock);
data.push_back(value);
std::cout << "Added value: " << value << std::endl;
}

int main() {
std::thread writer1(write_data, 1);
std::thread writer2(write_data, 2);
std::thread reader1(read_data);
std::thread reader2(read_data);

writer1.join();
writer2.join();
reader1.join();
reader2.join();

return 0;
}

上述代码是如何保证线程安全的呢?

write_data 函数中使用 std::unique_lock 来锁定 std::shared_mutex,确保在写入数据时,其他线程不能持有共享锁(读操作),从而保证写操作的线程安全。具体机制如下:

  1. 写操作的独占锁
    • 当一个线程调用 write_data 函数并尝试获取 std::unique_lock 时,它会尝试获取 std::shared_mutex 的独占锁。
    • 如果存在其他线程持有共享锁或独占锁,当前线程会被阻塞,直到这些锁被释放。
    • 一旦独占锁成功获取,所有其他线程(无论是持有共享锁还是独占锁)都会被阻塞。
  2. 读操作的共享锁
    • 当线程调用 read_data 函数并尝试获取 std::shared_lock 时,它会尝试获取 std::shared_mutex 的共享锁。
    • 如果没有线程持有独占锁,线程可以成功获取共享锁。
    • 如果有线程持有独占锁,获取共享锁的线程会被阻塞,直到独占锁被释放。

我们也可以自己实现下share_mutex【本质:条件变量,锁机制和引用计数来实现!】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SharedMutex {
public:
SharedMutex() : readers(0), writer_waiting(false), writer_active(false) {}

void lock_shared() {
std::unique_lock<std::mutex> lk(mutex);
reader_cv.wait(lk, [this]() {
return !writer_active && !writer_waiting;
});
++readers;
}

void unlock_shared() {
std::unique_lock<std::mutex> lk(mutex);
if (--readers == 0 && writer_waiting) {
writer_cv.notify_one();
}
}

void lock() {
std::unique_lock<std::mutex> lk(mutex);
writer_waiting = true;
writer_cv.wait(lk, [this]() {
return readers == 0 && !writer_active;
});
writer_active = true;
}

void unlock() {
std::unique_lock<std::mutex> lk(mutex);
writer_active = false;
writer_waiting = false;
reader_cv.notify_all();
}

private:
std::mutex mutex;
std::condition_variable reader_cv;
std::condition_variable writer_cv;
int readers;
bool writer_waiting;
bool writer_active;
};

死锁

死锁:通过锁排序和分层锁来避免。

分层锁

分层锁的原理

  • 层次结构:将锁分为多个层次,每个层次的锁都有一个固定的优先级。例如,层次 1 是最基础的锁,层次 2 是中级锁,层次 3 是高级锁。
  • 固定顺序:线程在获取多个锁时,必须按照从低层到高层的顺序获取锁。这可以确保在任意情况下,所有线程都遵循相同的顺序来获取锁,从而避免了循环等待条件。

为什么分层锁可以防止死锁?

死锁发生的四个必要条件是:

  1. 互斥条件:至少有一个资源被一个线程持有,并且其他线程不能获取。
  2. 请求和保持条件:持有资源的线程正在等待其他线程持有的资源。
  3. 不可剥夺条件:已获得的资源不能被强制剥夺。
  4. 循环等待条件:存在一种资源的循环等待关系。

分层锁通过确保线程在获取多个锁时始终按照相同的顺序进行,可以避免循环等待条件,从而防止死锁。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <iostream>
#include <thread>
#include <mutex>

// 定义锁的层次
enum LockLevel {
LEVEL_1,
LEVEL_2,
LEVEL_3
};

// 自定义锁类,包含层次信息
class HierarchicalMutex {
public:
explicit HierarchicalMutex(LockLevel level) : level_(level) {}

void lock() {
// 获取锁时,确保按层次顺序
check_lock_order();
mutex_.lock();
}

void unlock() {
mutex_.unlock();
}

private:
void check_lock_order() const {
// 检查当前锁的层次是否低于之前获取的锁的层次
// 这部分可以根据实际需求实现
}

std::mutex mutex_;
LockLevel level_;
};

// 锁的层次
HierarchicalMutex level1(LEVEL_1);
HierarchicalMutex level2(LEVEL_2);
HierarchicalMutex level3(LEVEL_3);

void thread1() {
level1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level2.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level3.lock();

std::cout << "Thread1 acquired all locks" << std::endl;

level3.unlock();
level2.unlock();
level1.unlock();
}

void thread2() {
level1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level3.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level2.lock();

std::cout << "Thread2 acquired all locks" << std::endl;

level2.unlock();
level3.unlock();
level1.unlock();
}

int main() {
std::thread t1(thread1);
std::thread t2(thread2);

t1.join();
t2.join();

return 0;
}

非常好的学习例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>

// 定义锁的层次
enum LockLevel {
LEVEL_1,
LEVEL_2,
LEVEL_3
};

// 自定义锁类,包含层次信息
class HierarchicalMutex {
public:
explicit HierarchicalMutex(LockLevel level) : level_(level) {}

void lock() {
// 获取锁时,确保按层次顺序
check_lock_order();
mutex_.lock();
// 更新当前线程持有的最大锁层次
max_lock_level = std::max(max_lock_level.load(), level_);
}

void unlock() {
// 释放锁时,更新当前线程持有的最大锁层次
max_lock_level = std::max_element(lock_levels.begin(), lock_levels.end(), [](const auto& a, const auto& b) {
return a < b;
});
mutex_.unlock();
}

private:
void check_lock_order() const {
// 检查当前锁的层次是否低于之前获取的锁的层次
LockLevel held_lock_level = max_lock_level.load();
if (level_ < held_lock_level) {
throw std::runtime_error("Lock order violation detected");
}
}

std::mutex mutex_;
LockLevel level_;
static thread_local std::atomic<LockLevel> max_lock_level;
};

// 初始化线程本地变量
thread_local std::atomic<LockLevel> HierarchicalMutex::max_lock_level(LEVEL_1 - 1);

// 锁的层次
HierarchicalMutex level1(LEVEL_1);
HierarchicalMutex level2(LEVEL_2);
HierarchicalMutex level3(LEVEL_3);

void thread1() {
try {
level1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level2.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level3.lock();

std::cout << "Thread1 acquired all locks" << std::endl;

level3.unlock();
level2.unlock();
level1.unlock();
} catch (const std::runtime_error& e) {
std::cerr << "Thread1: " << e.what() << std::endl;
}
}

void thread2() {
try {
level1.lock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level3.lock(); // This will cause a lock order violation
std::this_thread::sleep_for(std::chrono::milliseconds(100));
level2.lock(); // This will also cause a lock order violation

std::cout << "Thread2 acquired all locks" << std::endl;

level2.unlock();
level3.unlock();
level1.unlock();
} catch (const std::runtime_error& e) {
std::cerr << "Thread2: " << e.what() << std::endl;
}
}

int main() {
std::thread t1(thread1);
std::thread t2(thread2);

t1.join();
t2.join();

return 0;
}
scoped_lock

std::scoped_lock 是 C++17 引入的一个锁管理类,用于简化多互斥量的加锁过程并防止死锁。它在构造时会尝试锁定所有提供的互斥量,并在析构时自动解锁,确保了加锁和解锁的正确性和异常安全性。

主要特点

  • 自动加锁和解锁std::scoped_lock 在构造时加锁,在析构时解锁,确保锁的正确管理。
  • 支持多个互斥量:可以同时锁定多个 std::mutex 或其他同步原语,防止死锁。
  • 异常安全:即使在加锁后抛出异常,析构函数也会保证锁被正确释放。

示例代码

下面是一个使用 std::scoped_lock 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

// 定义两个互斥量
std::mutex mutex1;
std::mutex mutex2;

// 函数,演示如何使用 std::scoped_lock
void thread_function(int id) {
std::cout << "Thread " << id << " attempting to acquire locks...\n";

// 使用 std::scoped_lock 同时锁定两个互斥量
{
std::scoped_lock lock(mutex1, mutex2); // 自动加锁
std::cout << "Thread " << id << " acquired both locks!\n";

// 模拟工作
std::this_thread::sleep_for(std::chrono::seconds(2));

std::cout << "Thread " << id << " releasing locks...\n";
} // 自动解锁

std::cout << "Thread " << id << " finished execution.\n";
}

int main() {
std::vector<std::thread> threads;

// 启动多个线程
for (int i = 1; i <= 3; ++i) {
threads.emplace_back(thread_function, i);
}

// 等待所有线程完成
for (auto& t : threads) {
t.join();
}

return 0;
}

解释

  1. std::scoped_lock
    • 在构造时,std::scoped_lock 会尝试锁定 mutex1mutex2。它会按照一定的顺序加锁这些互斥量,避免死锁问题。
    • std::scoped_lock 对象离开作用域时,它的析构函数会自动解锁这些互斥量。
  2. 应用场景
    • 多个互斥量的加锁:当一个操作需要同时锁定多个互斥量时,使用 std::scoped_lock 可以简化代码,避免手动管理锁的复杂性。
    • 死锁预防std::scoped_lock 通过确保按照固定的顺序加锁多个互斥量,可以减少死锁的风险。
    • 异常安全std::scoped_lock 确保即使在持锁期间发生异常,锁也能被正确释放。

注意事项

  • 锁顺序std::scoped_lock 会自动处理锁的顺序,但在某些复杂的应用场景下,仍然需要确保设计上的锁顺序一致性。
  • 性能:虽然 std::scoped_lock 简化了锁管理,但在性能要求极高的场景下,仍然需要考虑锁的开销和锁的竞争。

通过使用 std::scoped_lock,可以提高代码的可读性和可靠性,简化多互斥量加锁的管理。

活锁

举个有意思的例子吧

例子:两个自动门

想象一下,两个自动门(A和B)在同一条走廊的两侧,门上装有传感器。如果有人靠近一扇门,门会自动打开。门的设计是这样的:当两扇门同时检测到有人接近时,它们都会尝试打开。假设门的逻辑是这样的:

  • 如果门A检测到门B正在打开,它会等待并重新检测,如果门B打开得足够久,门A就会尝试打开。
  • 如果门B检测到门A正在打开,它会等待,并重新检测,如果门A打开得足够久,门B就会尝试打开。

活锁的发生

  1. 状态检测:门A和门B都检测到对方正在尝试打开。
  2. 状态改变:门A和门B都决定等待并重新检测。
  3. 不断改变:由于它们在不断检测对方的状态并改变自己的状态,它们永远不会真正打开。

实际结果:两个门不断检测对方,尝试解决冲突,但没有任何一个门能够完全打开,因为它们始终在等待对方完成操作,造成了一个典型的活锁情形。

概念:

活锁发生在系统中的线程一直在改变状态,但是系统并没有向前推进,所有线程都在忙碌地试图解决冲突。活锁通常发生在设计上不是很合理的重试逻辑中。

活锁:通过合理的重试机制和退出条件来避免。

示例:活锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <atomic>

std::atomic<bool> flag1(false);
std::atomic<bool> flag2(false);

void thread1() {
while (true) {
if (!flag1 && !flag2) {
flag1 = true;
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟工作
flag2 = false;
break;
}
flag1 = false;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "Thread1 finished." << std::endl;
}

void thread2() {
while (true) {
if (!flag1 && !flag2) {
flag2 = true;
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模拟工作
flag1 = false;
break;
}
flag2 = false;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "Thread2 finished." << std::endl;
}

int main() {
std::thread t1(thread1);
std::thread t2(thread2);

t1.join();
t2.join();

return 0;
}

问题:两个线程不断地尝试切换 flag1flag2,但由于没有充分的条件,始终在重试而没有进展。

避免活锁

要避免活锁,需要设计合理的重试机制和退出条件,确保线程能够在某些情况下放弃重试,从而避免无限循环。

递归锁

适用场景

  • 递归调用: std::recursive_mutex 允许在同一线程中多次调用 lock() 而不会导致死锁,因此在递归函数中可以安全地锁定和解锁。
  • 适用场景: 当需要在同一线程中递归地访问共享资源时,使用 std::recursive_mutex 可以避免常规互斥量带来的死锁问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::recursive_mutex rMutex; // 声明一个递归互斥量

// 递归函数
void recursiveFunction(int count) {
if (count <= 0) {
return;
}

rMutex.lock(); // 锁定互斥量
std::cout << "Count: " << count << std::endl;

// 递归调用
recursiveFunction(count - 1);

rMutex.unlock(); // 解锁互斥量
}

int main() {
std::thread t1(recursiveFunction, 5);
std::thread t2(recursiveFunction, 5);

t1.join();
t2.join();

return 0;
}

底层原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <atomic>
#include <thread>
#include <iostream>

class MyRecursiveMutex {
private:
std::atomic<int> count; // 计数器
std::thread::id owner; // 当前持有锁的线程 ID

public:
MyRecursiveMutex() : count(0) {}

void lock() {
std::thread::id this_id = std::this_thread::get_id();

if (owner == this_id) {
// 如果当前线程已经持有锁,增加计数器
count++;
return;
}

// 否则,尝试获取锁
// 这里使用一个简单的自旋锁示例
while (count.load() != 0) {
// 自旋等待
}

// 获取锁
owner = this_id; // 设置当前线程为持有者
count = 1; // 初始化计数器
}

void unlock() {
if (count.load() > 0) {
count--; // 减少计数器
if (count.load() == 0) {
owner = std::thread::id(); // 重置持有者
}
}
}
};

strand库

在Boost.Asio中,strand(即boost::asio::strand)是一种用于同步处理的机制,确保在多线程环境中,提交到同一strand的事件或回调函数不会同时执行。strand的主要作用是避免数据竞争,简化对共享资源的并发访问控制。

strand的作用

  • 串行化事件处理strand 保证了通过它提交的所有事件或回调函数将按提交的顺序被串行执行,即使它们是在不同的线程中执行的。
  • 避免数据竞争strand 允许开发者在不显式使用锁的情况下,安全地访问共享资源。它确保在一个strand上的操作不会与其他并发操作相冲突。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <boost/asio.hpp>
#include <iostream>
#include <thread>

void print_message(const std::string& message, int id) {
std::cout << "Thread " << id << ": " << message << std::endl;
}

int main() {
boost::asio::io_context io_context;

// 创建strand对象,关联到io_context
boost::asio::strand<boost::asio::io_context::executor_type> strand(io_context.get_executor());

// 向strand提交两个任务,它们将按顺序执行
strand.post([&](){ print_message("Hello from strand", 1); });
strand.post([&](){ print_message("Strand ensures order", 2); });

// 启动两个线程来处理io_context中的任务
std::thread t1([&](){ io_context.run(); });
std::thread t2([&](){ io_context.run(); });

// 等待线程结束
t1.join();
t2.join();

return 0;
}

解释:

  • io_contextboost::asio::io_context是Asio库中的I/O服务提供者,用于分发事件和执行异步操作。
  • strandboost::asio::strandio_context关联,用于确保提交给它的任务在同一时间只能有一个线程执行。
  • post:使用strand.post()将任务提交到strand中,确保这些任务按照提交顺序执行,即使是在多线程环境中。

再来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <boost/asio.hpp>
#include <iostream>
#include <thread>

void print_message(const std::string &message, int id)
{
std::cout << "Thread " << id << ": " << message << std::endl;
}

int main()
{
boost::asio::io_context io_context;

// 在独立的 strand 中并行执行任务
boost::asio::io_context::strand strand1(io_context);
boost::asio::io_context::strand strand2(io_context);

strand1.post([&]()
{ print_message("Task 1 in strand 1", 1); });
strand1.post([&]()
{ print_message("Task 2 in strand 2", 2); });
strand2.post([&]()
{ print_message("Task 3 in strand 3", 3); });
// 启动两个线程处理 io_context 中的任务
std::thread t1([&]()
{ io_context.run(); });
std::thread t2([&]()
{ io_context.run(); });

// 等待线程结束
t1.join();
t2.join();

return 0;
}

上述代码我的理解是可以将strand1提交的任务看成一个整体,这个整体里面执行的顺序和post顺序是一致的。

io_context可以分成不同的包,每个包类似strand1,strand2这样,就可以起多个线程去执行里面的内容!

补充:上述代码中,我们可以使用1个线程或者3个以及以上的线程同时去运行,都是可以实现正常执行的。

因此,在一个strand里面运行的任务是单线程执行的!

1
2
3
4
5
6
7
8
 penge@penge-virtual-machine  ~/Desktop/test  ./main
Thread 3: Task 3 in strand 3
Thread 1: Task 1 in strand 1
Thread 2: Task 2 in strand 2
penge@penge-virtual-machine  ~/Desktop/test  ./main
Thread 3: Task 3 in strand 3
Thread 1: Task 1 in strand 1
Thread 2: Task 2 in strand 2

asio中strand的post和dispatch的区别

  • post: 将任务添加到 strand 的队列中,异步执行。

  • dispatch: 立即在当前 strand 中执行任务,阻塞当前线程。

  • wrap() 函数用于将一个执行器绑定到一个处理程序上。这样可以确保该处理程序在指定的执行器上执行,而不是在默认的执行器上执行。

    • 在较新版本的 Boost.Asio 中,wrap 函数已经被移除,取而代之的是使用 bind_executor 函数来绑定回调函数到指定的 strand

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      #include <boost/asio.hpp>
      #include <iostream>

      using boost::asio::io_context;
      using boost::asio::strand;

      void print(const std::string &message)
      {
      std::cout << message << std::endl;
      }

      void async_operation(strand<io_context::executor_type> &my_strand, int value)
      {
      boost::asio::post(my_strand, [value]()
      { print("Processing value: " + std::to_string(value)); });
      }

      int main()
      {
      io_context io;
      strand<io_context::executor_type> my_strand(io.get_executor());

      // 使用 bind_executor 函数绑定回调到 strand
      auto bound_callback = boost::asio::bind_executor(my_strand, [&]()
      { print("Bound callback executed"); });

      // 提交绑定的回调到 io_context
      boost::asio::post(io, bound_callback);

      // 提交一些异步操作到 strand
      for (int i = 0; i < 5; ++i)
      {
      async_operation(my_strand, i);
      }

      io.run();

      return 0;
      }

boost asio 介绍 关于strand的介绍不错

线程池

使用:使用线程池来管理线程资源,避免频繁创建和销毁线程。可以使用 std::thread 结合 std::futurestd::promise 实现简单的线程池。

避免:直接使用裸线程增加管理难度和资源泄漏风险。

无锁编程

使用:在高性能场景下,可以考虑使用无锁数据结构(如无锁队列)。

避免:无锁编程复杂且容易出错,仅在必要时使用。

线程安全的数据结构

tbb

C++11的std::call_once

std::call_once 是 C++11 引入的一种机制,用于确保某段代码在多线程环境中只被执行一次。这对于初始化操作或需要保证单一执行的场景特别有用。例如,在单例模式实现中,我们可以使用 std::call_once 来保证初始化代码只执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <mutex>

std::once_flag flag;

void initialize() {
std::cout << "Initialization code executed.\n";
}

void threadFunction() {
std::call_once(flag, initialize);
std::cout << "Thread finished executing.\n";
}

int main() {
std::thread t1(threadFunction);
std::thread t2(threadFunction);
std::thread t3(threadFunction);

t1.join();
t2.join();
t3.join();

return 0;
}

std::once_flag flag: 这是用于控制 std::call_once 的标志,确保初始化代码只会执行一次。

initialize 函数: 这是我们希望只执行一次的初始化代码。在多线程环境中,确保 initialize 只被执行一次是关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <mutex>
#include <thread>

class Singleton {
public:
// 获取单例实例的静态方法
static Singleton& getInstance() {
std::call_once(initFlag, initSingleton);
return *instance;
}

// 删除拷贝构造函数和赋值操作符,防止拷贝
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;

private:
// 私有构造函数,防止外部实例化
Singleton() {
std::cout << "Singleton instance created." << std::endl;
}

// 初始化单例实例的函数
static void initSingleton() {
instance = new Singleton();
}

static std::once_flag initFlag;
static Singleton* instance;
};

// 初始化静态成员变量
std::once_flag Singleton::initFlag;
Singleton* Singleton::instance = nullptr;

void threadFunc() {
Singleton& singleton = Singleton::getInstance();
std::cout << "Thread " << std::this_thread::get_id() << " got the singleton instance." << std::endl;
}

int main() {
std::thread t1(threadFunc);
std::thread t2(threadFunc);

t1.join();
t2.join();

return 0;
}

异常捕获

异常处理:合理处理线程中的异常,避免异常导致线程终止或整个应用崩溃。可以使用 try-catch 块捕获异常,并根据具体情况选择合适的处理方式,例如记录日志、重新抛出异常或进行适当的回滚操作。

异步回调加锁

异步回调(Asynchronous Callback)是指在异步操作完成后,系统或库调用的回调函数。由于异步回调通常在不同的线程或事件循环中执行,因此加锁是为了避免多个回调同时访问共享资源而引发竞态条件。

可以学习陈硕大佬的swap机制,加速以及防止死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

// 共享资源
struct Stats {
int request_count = 0;
double total_response_time = 0.0;
};

// 共享资源的锁
std::mutex stats_mutex;

// 异步回调函数
void on_response(double response_time, Stats& stats) {
// 加锁,保护共享资源
std::lock_guard<std::mutex> lock(stats_mutex);

// 更新共享资源
stats.request_count++;
stats.total_response_time += response_time;

// 解锁后可以继续处理其他操作
std::cout << "Response handled. Count: " << stats.request_count
<< ", Total Time: " << stats.total_response_time << std::endl;
}

// 模拟异步网络请求
void simulate_request(Stats& stats) {
// 模拟一个网络延迟
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// 假设收到响应,调用回调函数
double simulated_response_time = (rand() % 1000) / 100.0;
on_response(simulated_response_time, stats);
}

int main() {
Stats stats;
std::vector<std::thread> threads;

// 启动多个线程模拟并发请求
for (int i = 0; i < 10; ++i) {
threads.push_back(std::thread(simulate_request, std::ref(stats)));
}

// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}

// 最终结果
std::cout << "Final Count: " << stats.request_count
<< ", Final Total Time: " << stats.total_response_time << std::endl;

return 0;
}

volatile

在多线程编程中,volatile 关键字的主要用途是告诉编译器该变量可能会被异步修改,从而防止编译器优化该变量的访问。然而,在现代 C++ 编程中,更推荐使用 std::atomic 来处理多线程中的共享数据访问,因为 std::atomic 不仅提供了防止优化的功能,还确保了操作的原子性和线程安全性。

这个示例演示了如何使用 volatile 变量来标记一个线程的停止标志。请注意,这种做法在多线程编程中并不是最佳实践,仅用于演示 volatile 的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <chrono>

// 使用 volatile 关键字声明停止标志
volatile bool stop_flag = false;

void thread_function() {
while (!stop_flag) {
// 模拟做一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread working..." << std::endl;
}
std::cout << "Thread stopped." << std::endl;
}

int main() {
// 启动线程
std::thread t(thread_function);

// 主线程等待一段时间,然后设置停止标志
std::this_thread::sleep_for(std::chrono::seconds(2));
stop_flag = true; // 设置标志以停止线程

// 等待线程结束
t.join();

return 0;
}

sleep

Notesleep 函数会将调用它的线程挂起(暂停执行)指定的时间。在这段时间内,线程不会进行任何操作,系统会将 CPU 时间分配给其他线程或进程。

定时任务调度: 使用 sleep 实现周期性任务调度。

模拟长时间运行的任务: 在测试中模拟任务的延迟。

等待外部事件: 使用 sleep 轮询条件,避免忙等待。

限流和节流: 控制请求或任务的处理频率。

模拟延迟网络请求: 测试应用程序对网络延迟的响应能力。

yield

在多线程环境下,如果一个线程在等待某个条件时采取 busy waiting 的方式,这意味着线程会不断地占用 CPU 资源来检查条件是否满足。这种做法的缺点包括:

  • 浪费 CPU 资源:线程不断地循环检查条件,而不做有意义的工作,浪费了宝贵的 CPU 时间。
  • 阻碍其他线程:由于 CPU 被忙等待的线程占用,其他可能有更高优先级或更重要任务的线程无法及时执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
void push(UTaskWrapper&& task) {
while (true) {
if (mutex_.try_lock()) {
// 可能出现⻓期⽆法抢到锁的情况
queue_.emplace_front(std::move(task));
mutex_.unlock();
break;
} else {
// 让出cpu执⾏权
std::this_thread::yield();
}
}
}
  • 场景:多个线程往 queue_ 中 push 任务时,可能遇到 mutex_ 被其他线程占用的情况。
  • 问题:如果线程在无法获取锁的情况下,持续尝试获取锁而不让出 CPU,就会导致 busy waiting。
  • 解决方法:使用 std::this_thread::yield() 函数来避免 busy waiting。

std::this_thread::yield() 的作用

std::this_thread::yield() 是 C++ 标准库中的一个函数,它的作用是在当前线程无法继续执行时,主动让出 CPU 执行权,让操作系统调度其他线程执行。当前线程会被重新放入就绪队列中,等待下一次被调度执行。

在上述代码中:

  • mutex_.try_lock() 失败时,表示锁被其他线程占用,当前线程无法继续执行任务。
  • 这时,调用 std::this_thread::yield(),让当前线程暂时停止执行,给其他线程机会运行。
  • 过一段时间后,当前线程会再次被调度,继续尝试获取锁。

好处

避免 busy waiting 的主要原因是为了提高系统的整体性能和资源利用率

  • 降低 CPU 占用率:让出 CPU 时间片给其他线程执行,可以更高效地利用 CPU 资源。
  • 提高系统响应性:通过合理的线程调度,系统能够更快响应其他重要任务。
  • 减少功耗:忙等待会导致 CPU 高负载运行,增加功耗和温度。

exit

exit() 在多线程环境中是不安全的

exit() 是一个用于终止进程的标准库函数。当你在多线程程序中调用 exit() 时,它会导致整个进程的所有线程都立即终止。这种行为在多线程环境下存在风险,因此被认为是“线程不安全的”。

exit()

  • 终止进程exit() 终止调用它的进程,并释放进程占用的所有资源。这包括关闭打开的文件、释放内存等。
  • 调用清理函数exit() 会调用通过 atexit() 注册的清理函数,例如数据刷新、文件关闭等操作。
  • 返回状态码exit() 接受一个整数参数作为状态码,用于向操作系统返回进程的退出状态。

为什么 exit() 在多线程环境中是不安全的?

理由1. 强制终止所有线程

当一个线程调用 exit() 时,整个进程中的所有线程都会立即停止执行。这意味着:

  • 未完成的操作被中断:如果其他线程正在执行文件写入、数据库操作、锁定资源等关键任务,这些操作将被强制中断,可能导致数据丢失或系统状态不一致。
  • 资源未释放:某些资源可能需要其他线程来释放,比如动态分配的内存、网络连接等。由于这些线程被突然终止,资源可能无法正常释放,导致资源泄露。

理由2. 导致竞态条件

多线程程序中的竞态条件是指多个线程并发执行,且执行的顺序或时机不确定,可能引发不正确的行为。exit() 的突然调用可能导致以下竞态条件:

  • 资源竞争:如果一个线程在使用某个资源(如文件或内存)时,另一个线程调用了 exit(),正在使用的资源可能不会被正确释放或保存。
  • 未完成的事务:如果一个线程在执行一个事务(如数据库操作)时被中断,事务可能无法正常完成,导致数据不一致。

理由3. 清理函数的执行顺序不确定

exit() 会调用通过 atexit() 注册的清理函数,但这些函数是在单线程环境下设计的。在多线程环境中,清理函数的执行顺序可能与预期不一致,特别是当不同线程依赖于这些清理函数时。

举例

假设你有一个多线程程序,其中一个线程负责读取文件,另一个线程负责写入文件。突然调用 exit() 可能导致以下问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

FILE *file;

void* write_thread(void* arg) {
for (int i = 0; i < 5; ++i) {
fprintf(file, "Writing line %d\n", i);
sleep(1); // 模拟写入操作的延迟
}
return NULL;
}

void* read_thread(void* arg) {
sleep(2); // 等待写入线程完成几次操作
exit(0); // 直接调用 exit(),试图终止程序
return NULL;
}

int main() {
file = fopen("output.txt", "w");
if (file == NULL) {
perror("Failed to open file");
return 1;
}

pthread_t writer, reader;
pthread_create(&writer, NULL, write_thread, NULL);
pthread_create(&reader, NULL, read_thread, NULL);

pthread_join(writer, NULL);
pthread_join(reader, NULL);

fclose(file);
return 0;
}

可能结果:

  1. 文件未完全写入write_thread 线程正在写入文件,而 read_thread 线程在 2 秒后调用 exit()。这会导致 write_thread 突然停止,文件未完全写入,结果可能是一个损坏的文件。
  2. 资源未正确释放exit() 会导致进程立即终止,因此文件描述符 file 可能不会被正确关闭,这会导致数据未写入到磁盘,或者文件被锁定。
  3. 不可预测的行为:由于 exit() 强制终止进程,导致程序的终止行为不可预测,可能在不同的运行中,表现出不同的结果。

解决方法

1. 使用 pthread_exit()

如果你只想终止当前线程,可以使用 pthread_exit(),它只会终止调用它的线程,不会影响其他线程。

1
2
3
4
5
6
7
void* thread_func2(void* arg) {
pthread_mutex_lock(&lock);
global_state += 2;
pthread_mutex_unlock(&lock);
pthread_exit(NULL); // 仅终止当前线程
return NULL;
}

2. 使用标志位或条件变量

通过使用标志位或条件变量来控制线程的退出,而不是直接调用 exit()

1
2
3
4
5
6
7
8
9
10
11
#include <stdbool.h>

bool should_exit = false;

void* thread_func2(void* arg) {
pthread_mutex_lock(&lock);
global_state += 2;
should_exit = true; // 设置标志位,通知其他线程退出
pthread_mutex_unlock(&lock);
return NULL;
}

3. 使用信号处理机制

在需要终止进程的场景中,可以通过信号处理机制来安全地终止进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <signal.h>

void signal_handler(int signum) {
// 执行清理操作
exit(signum);
}

int main() {
signal(SIGTERM, signal_handler);
// 程序运行,等待信号处理
while (1) {
// 程序的主要循环
}
return 0;
}

补充

补充:std::thread::hardware_concurrency

std::thread::hardware_concurrency() 是 C++11 标准库中 std::thread 类的一个静态成员函数。这个函数返回一个 unsigned int 类型的值,表示当前系统支持的并发线程数,即硬件线程数或处理器核心数。

这个值可以用来帮助程序员决定在多线程程序中创建多少个线程,以充分利用系统资源,避免创建过多的线程导致资源浪费和上下文切换开销。

补充:CPU核心绑定

在 Linux 上,可以使用 sched_setaffinity 函数来设置进程或线程的 CPU 亲和性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <pthread.h>
#include <sched.h>
#include <iostream>
#include <thread>

void threadFunction() {
// 获取当前线程的 ID
pthread_t thread = pthread_self();

// 创建 cpu_set_t 对象
cpu_set_t cpuset;
CPU_ZERO(&cpuset); // 清空 CPU 集合
CPU_SET(0, &cpuset); // 将第一个 CPU 核心添加到集合中

// 设置线程的 CPU 亲和性
pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);

// 执行线程任务
std::cout << "Thread running on CPU core: " << sched_getcpu() << std::endl;
}

int main() {
std::thread t(threadFunction);
t.join();
return 0;
}

关键点

  1. CPU 亲和性: 通过将线程绑定到特定核心,可以减少上下文切换,提高缓存利用率,从而提高性能。
  2. 适用场景: 在高性能计算、实时系统和需要高一致性的应用中,CPU 绑定可以显著提高效率。
  3. 注意事项: 过度绑定可能会导致 CPU 资源利用不均衡,因此需要根据具体情况进行调整。

经典例题:

程序执行过程中,有60%的部分是串行处理的,其余为并行处理;如果我们将处理器的数量从2个提升到16个,理论上整个程序的执行时间能减少多少(为了简单,直接使用两者的speedUp差值)?

image-20240913151324072

线程安全的模板库

Boost

描述: Boost.Lockfree 提供了无锁的数据结构,如无锁队列、无锁栈等。它们利用原子操作实现了高效的并发访问。

使用场景: 适用于需要高并发且性能敏感的场景,如任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <boost/lockfree/queue.hpp>

boost::lockfree::queue<int> queue(100);

void producer(int value) {
queue.push(value);
}

void consumer() {
int value;
if (queue.pop(value)) {
// Process value
}
}

TBB (Threading Building Blocks)

描述: TBB 是一个用于并行编程的模板库,提供了线程安全的容器(如 tbb::concurrent_queuetbb::concurrent_vector),以及并行算法和任务调度器。

使用场景: 适用于需要并行算法、线程安全容器的复杂多线程应用。

1
2
3
4
5
6
7
#include <tbb/concurrent_vector.h>

tbb::concurrent_vector<int> vec;

void addValue(int value) {
vec.push_back(value);
}

了解

tbb::concurrent_queue

  • 描述tbb::concurrent_queue 是一个线程安全的队列,支持多线程的入队和出队操作。
  • 底层机制
    • 锁-free 技术tbb::concurrent_queue 通常使用基于锁-free 的技术来实现线程安全,例如使用 CAS(Compare-And-Swap)操作来避免锁的使用。
    • 分段锁:为了减少锁的竞争,有时会使用分段锁(Segmented Locking),将队列分成多个段,每个段有自己的锁。

tbb::concurrent_vector

  • 描述tbb::concurrent_vector 是一个线程安全的动态数组,支持并发插入和访问。
  • 底层机制
    • 分段锁tbb::concurrent_vector 使用分段锁来保护其内部的数据结构。每个段有自己的锁,这样可以减少线程竞争,允许多个线程同时操作不同的段。
    • 无锁设计:在某些情况下,使用无锁算法(如乐观锁)来进一步减少锁的开销和线程竞争。

tbb::concurrent_hash_map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <vector>
#include <mutex>
#include <thread>

class Segment {
public:
Segment(size_t size) : data(size, 0) {}

// 设置指定位置的值
void set(size_t index, int value) {
std::lock_guard<std::mutex> lock(mutex_);
if (index < data.size()) {
data[index] = value;
}
}

// 获取指定位置的值
int get(size_t index) {
std::lock_guard<std::mutex> lock(mutex_);
if (index < data.size()) {
return data[index];
}
return -1; // 代表无效
}

private:
std::vector<int> data;
std::mutex mutex_;
};

class SegmentedArray {
public:
SegmentedArray(size_t segment_count, size_t segment_size)
: segments(segment_count, Segment(segment_size)) {}

// 设置指定段和位置的值
void set(size_t segment_index, size_t index, int value) {
if (segment_index < segments.size()) {
segments[segment_index].set(index, value);
}
}

// 获取指定段和位置的值
int get(size_t segment_index, size_t index) {
if (segment_index < segments.size()) {
return segments[segment_index].get(index);
}
return -1; // 代表无效
}

private:
std::vector<Segment> segments;
};

// 测试分段锁
void worker(SegmentedArray& array, size_t segment, size_t index, int value) {
array.set(segment, index, value);
std::cout << "Thread " << std::this_thread::get_id() << " set segment " << segment
<< ", index " << index << " to " << value << std::endl;
}

int main() {
const size_t num_segments = 4;
const size_t segment_size = 10;
SegmentedArray array(num_segments, segment_size);

// 创建并启动线程
std::vector<std::thread> threads;
for (size_t i = 0; i < num_segments; ++i) {
for (size_t j = 0; j < segment_size; ++j) {
threads.emplace_back(worker, std::ref(array), i, j, static_cast<int>(i + j));
}
}

// 等待所有线程完成
for (auto& t : threads) {
t.join();
}

return 0;
}

Q:TBB 的 tbb::concurrent_vector 底层实现采用了结合无锁和分段锁的策略来优化性能。是以什么策略结合的呢

  1. 数据分段
  • 分段锁:tbb::concurrent_vector 将内部存储的数据分成多个段(segments),每个段使用独立的锁来保护。这种方法减少了线程间的竞争,因为多个线程可以同时操作不同的段,而不需要竞争同一个锁。
    • 锁粒度:通过将数据分成多个段,每个段有自己独立的锁,从而实现更细粒度的锁控制,减少了锁竞争。
  1. 无锁操作
  • 无锁机制:在某些操作中,如向数组末尾追加元素,tbb::concurrent_vector使用无锁机制来提高性能。例如,可能会使用无锁队列或其他无锁数据结构来管理并发插入操作。
    • 原子操作:在处理简单的并发操作时,使用原子操作(如 CAS,Compare-And-Swap)来管理元素的插入和删除,从而避免传统的锁操作。
  1. 策略结合
  • 分段锁 + 无锁操作tbb::concurrent_vector 的实际实现结合了分段锁和无锁操作,根据不同操作的性质和并发模式来选择合适的策略。
    • 分段锁:主要用于保护和管理内部数据的不同部分,例如在进行扩展容量或删除元素时使用。
    • 无锁操作:用于优化一些简单的并发操作,例如在数组末尾追加元素时使用,以减少锁的开销。

Folly (Facebook Open-source Library)

描述: Folly 提供了高性能的并发数据结构和工具,特别是在高并发的场景下表现出色。例如,folly::ProducerConsumerQueue 是一种高效的单生产者-单消费者队列。

使用场景: 适用于高并发的生产者-消费者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <folly/ProducerConsumerQueue.h>

folly::ProducerConsumerQueue<int> pc_queue(100);

void producer(int value) {
while (!pc_queue.write(value)) {
// wait or try again
}
}

void consumer() {
int value;
while (!pc_queue.read(value)) {
// wait or try again
}
// Process value
}

并发编程支持

Folly 提供了一系列高性能的并发工具和数据结构,其中包括 ForkJoinPoolMPMCQueue

folly库安装

Folly,一个强大的C++库

ForkJoinPool

概述

ForkJoinPool 是一个并行执行框架,允许将大任务拆分成更小的子任务,然后并行执行这些子任务。最终,子任务的结果会被合并,形成整体的结果。

特点

  • 任务分解与并行执行: 可以递归地将任务分解成子任务,并将这些子任务分配给多个线程并行执行。
  • 工作窃取机制: 实现了工作窃取机制,空闲线程可以从忙碌线程的任务队列中窃取任务,从而平衡负载,提高性能。
  • 灵活的线程池管理: 可以配置线程池的大小,并灵活地管理线程的生命周期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <folly/experimental/ThreadedExecutor.h>
#include <folly/experimental/ForkJoinPool.h>
#include <iostream>
#include <thread>

// 一个递归计算斐波那契数列的任务
int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}

int main() {
// 创建一个线程池执行器
folly::ThreadedExecutor executor;

// 创建一个 ForkJoinPool,指定线程池大小为4
folly::ForkJoinPool forkJoinPool(executor, 4);

// 使用 ForkJoinPool 来执行任务
auto result = forkJoinPool.run([](auto&& self, int task) -> int {
// 如果任务足够小,直接计算结果
if (task <= 10) {
return fib(task);
}

// 否则,分割任务并行执行
auto left = self.fork(task - 1);
auto right = self.fork(task - 2);

// 等待子任务完成并合并结果
return self.join(left) + self.join(right);
}, 30); // 计算 Fibonacci(30)

std::cout << "Result: " << result << std::endl;

return 0;
}

代码解释

  1. ForkJoinPool Initialization:
    • folly::ForkJoinPool 被初始化为一个有 4 个线程的池。这意味着最多可以并行执行 4 个任务。
  2. Task Decomposition:
    • forkJoinPool.run 中,我们递归地分解了计算斐波那契数列的任务。如果任务小于或等于 10,直接计算结果;否则,将任务分解为两个子任务,并行执行。
  3. Work Stealing:
    • ForkJoinPool 中,每个线程维护一个任务队列。空闲线程会从其他线程的任务队列中窃取任务,这就是所谓的工作窃取机制。通过这种机制,可以确保所有线程都尽可能保持忙碌,从而提高整体性能。
  4. Fork and Join:
    • self.fork() 用于分解任务并将其放入队列中,而 self.join() 用于等待子任务完成并获取结果。

在这个例子中,我们将 fib(30) 的计算分割为 fib(29)fib(28) 两个子任务。每次分割都递归地创建更多子任务,这些子任务会被分配给线程池中的不同线程执行。

接下来,简单写份代码说明任务窃取的原理!

工作窃取机制简介

ForkJoinPool 中,每个工作线程都维护一个双端队列(deque)。当线程有任务要执行时,它会将任务推送到自己的任务队列中,通常是从双端队列的末端入队。线程从队列的末端取任务执行。如果某个线程的任务队列为空,它会尝试从其他线程的任务队列头部窃取任务来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <iostream>
#include <thread>
#include <deque>
#include <mutex>
#include <vector>
#include <optional>

class WorkStealingQueue {
public:
// 向队列中添加任务
void push(int task) {
std::lock_guard<std::mutex> lock(mutex_);
deque_.push_back(task);
}

// 从队列中获取任务
std::optional<int> pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (deque_.empty()) {
return std::nullopt;
}
int task = deque_.back();
deque_.pop_back();
return task;
}

// 从队列头部窃取任务
std::optional<int> steal() {
std::lock_guard<std::mutex> lock(mutex_);
if (deque_.empty()) {
return std::nullopt;
}
int task = deque_.front();
deque_.pop_front();
return task;
}

private:
std::deque<int> deque_;
std::mutex mutex_;
};

// 每个线程执行的工作函数
void worker(WorkStealingQueue& myQueue, std::vector<WorkStealingQueue>& allQueues, int threadId) {
while (true) {
// 尝试从自己的队列中获取任务
if (auto task = myQueue.pop()) {
std::cout << "Thread " << threadId << " is processing task " << *task << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟任务执行
} else {
// 窃取其他线程的任务
bool stolen = false;
for (size_t i = 0; i < allQueues.size(); ++i) {
if (&allQueues[i] != &myQueue) {
if (auto stolenTask = allQueues[i].steal()) {
std::cout << "Thread " << threadId << " stole task " << *stolenTask << " from thread " << i << std::endl;
stolen = true;
break;
}
}
}
if (!stolen) {
std::cout << "Thread " << threadId << " has no tasks left and can't steal." << std::endl;
break;
}
}
}
}

int main() {
const int numThreads = 4;
std::vector<WorkStealingQueue> queues(numThreads);
std::vector<std::thread> threads;

// 初始化任务
for (int i = 0; i < 10; ++i) {
queues[i % numThreads].push(i);
}

// 创建并启动线程
for (int i = 0; i < numThreads; ++i) {
threads.emplace_back(worker, std::ref(queues[i]), std::ref(queues), i);
}

// 等待线程结束
for (auto& t : threads) {
t.join();
}

return 0;
}

当创建一个新线程并传递参数时,参数通常是按值复制的。如果你想传递一个对象的引用给线程,可以使用 std::ref

Folly MPMCQueue (Multi-Producer Multi-Consumer Queue)

概述

MPMCQueue 是一个高性能的多生产者多消费者队列,设计用于在多线程环境下进行高效的数据传递。它允许多个线程同时向队列中添加和取出元素,而不会引起数据竞争或锁的争用。

特点

  • 无锁设计: 使用无锁算法实现,高效且线程安全。
  • 高并发支持: 支持多生产者和多消费者同时进行操作,适用于需要高吞吐量的场景。
  • 固定容量: 队列的容量是固定的,可以在初始化时指定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <folly/MPMCQueue.h>
#include <thread>
#include <iostream>

folly::MPMCQueue<int> queue(1024); // 创建一个容量为1024的MPMC队列

void producer() {
for (int i = 0; i < 1000; ++i) {
queue.write(i); // 向队列中添加元素
}
}

void consumer() {
int value;
while (queue.read(value)) { // 从队列中取出元素
std::cout << "Consumed: " << value << std::endl;
}
}

int main() {
std::thread prod1(producer);
std::thread prod2(producer);
std::thread cons1(consumer);
std::thread cons2(consumer);

prod1.join();
prod2.join();
cons1.join();
cons2.join();

return 0;
}

Moodycamel

MoodyCamel的无锁队列(MoodyCamel’s Lock-Free Queue),也叫做ConcurrentQueue,是一个用于高性能多线程环境的多生产者多消费者(MPMC,Multi-Producer Multi-Consumer)无锁队列。这个队列旨在提供高效且线程安全的并发访问,而不需要传统的锁机制,从而避免了锁竞争带来的性能瓶颈。

主要特点

  1. 无锁设计: 使用无锁算法来实现高效的队列操作,避免了锁竞争,提高了并发性能。
  2. 多生产者多消费者: 支持多个生产者线程同时向队列中插入元素,以及多个消费者线程同时从队列中取出元素。
  3. 高性能: 队列采用了缓存行对齐、无伪共享(false sharing)等技术优化,最大程度减少了CPU缓存冲突和内存访问延迟。
  4. 支持单生产者单消费者优化: 虽然是多生产者多消费者队列,但在只有一个生产者或一个消费者的情况下,队列会自动优化以进一步提高性能。
  5. 有界与无界版本: 提供了有界队列(bounded queue)和无界队列(unbounded queue)的实现。前者在容量满时会阻塞插入操作,而后者会自动扩展容量。

工作原理

  1. 原子操作 (Atomic Operations)
    MoodyCamel 的无锁队列依赖于原子操作,如比较并交换(Compare-And-Swap, CAS),来保证多个线程之间的同步。CAS 是一种硬件级别的原子操作,能够在多个线程试图同时更新某个变量时,确保只有一个线程成功修改了该变量,从而避免竞态条件(Race Condition)。

  2. 分段缓存(Chunk-Based Buffering)
    队列内部使用了一种分段的缓存机制,将队列划分为多个独立的“块”(chunks)。每个块是一个固定大小的环形缓冲区。生产者线程和消费者线程各自操作不同的块,从而减少了线程之间的直接竞争。块的大小通常是固定的,并根据队列的负载动态分配。

  3. 无伪共享 (False Sharing)
    为了避免伪共享,队列通过缓存行对齐(Cache Line Alignment)技术,确保每个线程操作的数据位于不同的缓存行上。伪共享是指多个线程访问同一缓存行中的不同变量时,导致缓存一致性协议频繁触发,从而降低性能。通过缓存行对齐,MoodyCamel 队列最大限度地减少了这种冲突。

Moodycamel 会对队列的关键成员变量进行特殊的内存布局,确保它们被分布在不同的缓存行上.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct alignas(CACHE_LINE_SIZE) ProducerIndex {
std::atomic<std::uint32_t> value;
// ... other producer-specific data
};

struct alignas(CACHE_LINE_SIZE) ConsumerIndex {
std::atomic<std::uint32_t> value;
// ... other consumer-specific data
};

struct CacheAlignedPadding {
char padding[CACHE_LINE_SIZE];
};

struct Queue {
ProducerIndex enqueue_index;
CacheAlignedPadding pad0;
ConsumerIndex dequeue_index;
CacheAlignedPadding pad1;
char* volatile buffer;
// ... other queue-specific data
};
  1. 环形缓冲区 (Ring Buffer)
    每个块内部使用环形缓冲区来管理元素的位置。环形缓冲区允许生产者和消费者以固定大小的数组形式存储元素,并且通过原子索引递增操作,确保多个线程可以安全地访问和更新队列。

  2. 无锁入队和出队操作 (Lock-Free Enqueue/Dequeue)
    入队操作 (Enqueue): 生产者线程首先通过原子操作获取一个块中的空闲位置,然后在该位置插入新元素。使用 CAS 操作,确保在高并发情况下多个生产者不会同时插入到同一个位置。

出队操作 (Dequeue): 消费者线程通过原子操作获取一个块中已有元素的位置,然后从该位置取出元素。类似地,使用 CAS 操作确保多个消费者不会同时访问同一个位置。

  1. 进退策略 (Backoff Strategy)
    为了提高性能,当一个线程发现它无法立即完成某个操作时(如因为另一个线程正在执行相关操作),它可能会采取一种“退避策略”。这意味着线程会短暂地放弃 CPU,以减少对其他线程的干扰,然后稍后重试。这种策略避免了忙等待的资源浪费。

  2. 内存模型 (Memory Model)
    MoodyCamel 队列的实现高度依赖于 C++ 内存模型中的内存序(Memory Order)语义,特别是 memory_order_acquire 和 memory_order_release,这些语义用于保证操作的可见性和顺序性。它们帮助在无锁队列中实现线程安全,确保在一个线程发布的数据对另一个线程可见。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "concurrentqueue.h"
#include <iostream>
#include <thread>
#include <vector>

moodycamel::ConcurrentQueue<int> queue;

void producer(int id) {
for (int i = 0; i < 100; ++i) {
queue.enqueue(i + id * 100);
std::cout << "Producer " << id << " enqueued " << i + id * 100 << std::endl;
}
}

void consumer(int id) {
int item;
for (int i = 0; i < 100; ++i) {
while (!queue.try_dequeue(item)) {
// busy-wait until an item is available
}
std::cout << "Consumer " << id << " dequeued " << item << std::endl;
}
}

int main() {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;

// Create producer threads
for (int i = 0; i < 3; ++i) {
producers.emplace_back(producer, i);
}

// Create consumer threads
for (int i = 0; i < 3; ++i) {
consumers.emplace_back(consumer, i);
}

// Join threads
for (auto& p : producers) {
p.join();
}
for (auto& c : consumers) {
c.join();
}

return 0;
}

无锁并发测试效率:moodycamel > tbb > boost

底层剖析

mutex机制

先来看下linux-mutex底层结构

1
2
3
4
5
6
7
struct mutex {
atomic_long_t owner; // 锁的拥有者
spinlock_t wait_lock; // 保护等待队列的自旋锁
struct list_head wait_list; // 等待线程的队列
unsigned int count; // 锁的状态(锁定为0,未锁定为1)
void *dep_map; // 用于死锁检测的依赖映射
};

通过点开mutex源码我们可以发现,C++ std::mutex是对glibc-nptl库pthread_mutex的封装;而nptl库在用户态完成了futex机制的一部分;最后在以linux为内核的操作系统中,又提供了futex系统调用给glibc-nptl给与底层支撑。

glibc:通过底层OS封装出C库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#endif
mutex() noexcept = default;
~mutex() = default;

mutex(const mutex&) = delete;
mutex& operator=(const mutex&) = delete;

void
lock()
{
int __e = __gthread_mutex_lock(&_M_mutex);

// EINVAL, EAGAIN, EBUSY, EINVAL, EDEADLK(may)
if (__e)
__throw_system_error(__e);
}

_GLIBCXX_NODISCARD
bool
try_lock() noexcept
{
// XXX EINVAL, EAGAIN, EBUSY
return !__gthread_mutex_trylock(&_M_mutex);
}

void
unlock()
{
// XXX EINVAL, EAGAIN, EPERM
__gthread_mutex_unlock(&_M_mutex);
}

native_handle_type
native_handle() noexcept
{ return &_M_mutex; }
};

在用户空间,互斥锁通常通过 POSIX 线程库(pthread)提供。典型的互斥锁包含两种情况:

  1. 快速路径:当锁是空闲时,线程可以直接通过原子操作获取锁。
  2. 慢速路径:如果锁已经被其他线程持有,线程会进入内核,通过 futex 等机制等待锁的释放。

以下是 pthread_mutex_lock 的简化伪代码:

1
2
3
4
5
6
7
8
int pthread_mutex_lock(pthread_mutex_t *mutex) {
if (__sync_bool_compare_and_swap(&mutex->state, 0, 1)) {
return 0; // Lock acquired successfully
}

// If the lock is already held, go to the slow path
return __pthread_mutex_lock_slow(mutex);
}

这里的 __sync_bool_compare_and_swap 是一个原子操作,它尝试将 mutex->state0 改为 1,表示尝试获取锁。如果成功,则锁定成功;否则,进入慢速路径。

内核的Futex机制

futex(fast userspace mutex)是一种混合的同步原语,它结合了用户态和内核态的机制来实现高效的线程同步。futex 的主要思想是:尽量在用户态完成锁操作,只有在需要阻塞或唤醒线程时才陷入内核态,从而减少系统调用的开销。

举例说明

多进程下futex机制

futex 用于多进程同步时,多个进程需要协同访问同一个共享内存区域。这种情况下,各个进程中的虚拟地址可能不同,但它们指向的是同一个物理地址。

假设有两个进程 AB,它们通过共享内存区域进行通信和同步。该共享内存区域映射到两个进程不同的虚拟地址上,例如:

  • 在进程 A 中,共享内存区域的起始地址是 0x7fff0000
  • 在进程 B 中,共享内存区域的起始地址是 0x7fff1000

尽管虚拟地址不同,但它们都映射到相同的物理内存,比如物理地址 0x12345000

现在,假设共享内存区域中的某个整数变量 shared_var 的偏移量为 0x0,那么:

  • 在进程 A 中,shared_var 的虚拟地址是 0x7fff0000
  • 在进程 B 中,shared_var 的虚拟地址是 0x7fff1000

但在物理内存中,它们实际指向的是同一个物理地址 0x12345000

futex_waitfutex_wake 的工作机制

  1. 进程 A 调用 futex_wait
    • 进程 A 试图对 shared_var 进行操作,发现它的值不符合预期,于是调用 futex_wait,请求挂起自己。
    • futex_wait 需要传递一个指针(即 uaddr)给内核,这个指针在进程 A 中是 0x7fff0000
    • 内核接收到这个指针后,会通过进程 A 的页表将该虚拟地址 0x7fff0000 转换为物理地址 0x12345000
    • 内核使用这个物理地址作为键,将进程 A 插入到对应的等待队列中。
  2. 进程 B 调用 futex_wake
    • 进程 B 在某个操作后希望唤醒挂起的进程,所以调用 futex_wake
    • 进程 B 传递的 uaddr0x7fff1000,这是它在自己虚拟地址空间中对 shared_var 的地址。
    • 内核将 0x7fff1000 转换为物理地址 0x12345000
    • 内核查找与 0x12345000 相关联的等待队列,将进程 A 唤醒。

多线程下 futex 机制

在多线程环境中,所有线程共享同一个虚拟地址空间。futex 利用这个特性,使得所有线程可以直接使用共享的内存地址(即虚拟地址)来进行同步操作。

futex 基本操作

  • futex_wait(int *uaddr, int val):当前线程检查 *uaddr 是否等于 val,如果相等则将线程挂起,等待其他线程来唤醒它。
  • futex_wake(int *uaddr, int n):唤醒最多 n 个正在 uaddr 上等待的线程。

详细的 futex 流程说明

假设我们有两个线程 Thread AThread B,它们使用一个共享的整数变量 lock_var 来进行同步:

  1. 共享变量 lock_var 的初始化
    • lock_var 是一个全局变量或在堆上分配的变量,所有线程共享它。
    • 初始状态下,lock_var 的值为 0,表示锁是空闲的。
  2. 线程 A 尝试获取锁
    • Thread A 尝试获取锁。它使用原子操作 compare_exchange 来检查 lock_var 是否为 0,如果是,则将 lock_var 的值设置为 1,表示锁已被占用。
    • 如果 Thread A 成功设置 lock_var1,它就进入临界区(critical section)执行任务。
  3. 线程 A 发现锁已被占用
    • 如果 Thread A 发现 lock_var 已经是 1 (即锁被占用),它调用 futex_wait(&lock_var, 1) 来挂起自己。
    • 内核会检查 lock_var 是否仍然是 1。如果是,内核将 Thread A 挂起并将其放入与 lock_var 关联的等待队列中。
  4. 线程 B 释放锁
    • Thread B 持有锁并完成了它的任务,准备释放锁。
    • Thread Block_var 设置为 0,表示锁已空闲。
    • 然后 Thread B 调用 futex_wake(&lock_var, 1),唤醒一个正在等待该锁的线程。
  5. 线程 A 被唤醒
    • 内核唤醒 Thread A,并将它从等待队列中移除。
    • Thread A 被唤醒后,重新尝试获取锁。如果 lock_var 已经被设置为 0Thread A 通过 compare_exchange 成功获取锁,并进入临界区执行任务。
  6. 线程 A 释放锁
    • Thread A 完成任务后,将 lock_var 设置为 0,并调用 futex_wake(&lock_var, 1),以便唤醒其他可能等待的线程。

两个关键函数

futex_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
ktime_t *abs_time, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q q;
int ret;

// 获取 futex 锁,确保在 futex 操作期间内核态的同步
hb = hash_futex(uaddr);
spin_lock(&hb->lock);

// 检查用户态地址 uaddr 所指向的值是否等于 val
if (!futex_atomic_cmpxchg_inatomic(uaddr, val, val)) {
ret = -EAGAIN;
goto out;
}

// 初始化 futex_q 结构体
ret = futex_queue(uaddr, flags, abs_time, bitset, hb, &q);
if (ret)
goto out;

// 将当前线程挂起,等待 futex_wake 唤醒
ret = futex_wait_queue_me(hb, &q, flags, abs_time);

out:
spin_unlock(&hb->lock);
return ret;
}

总结下 futex_wait 流程:

  1. 加自旋锁
  2. 检测*uaddr是否等于val,如果不相等则会立即返回
    • *uaddr:指向用户态中某个共享变量的指针,该变量通常用于表示锁的状态。这个共享变量可以是一个整数类型(比如 int),它的值通常用于表示锁的当前状态(如空闲或被占用)。
  3. 将进程状态设置为 TASK_INTERRUPTIBLE
  4. 将当期进程插入到等待队列中
  5. 释放自旋锁
  6. 创建定时任务:当超过一定时间还没被唤醒时,将进程唤醒
  7. 挂起当前进程

futex_wake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int
futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
union futex_key key = FUTEX_KEY_INIT;
int ret;
DEFINE_WAKE_Q(wake_q); // 申明一个wake_up队列,所有满足条件的线程都存放于此
// hash处理得到某个hahs链表
ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
hb = hash_futex(&key);

spin_lock(&hb->lock);
// 遍历链表查找匹配的key
plist_for_each_entry_safe(this, next, &hb->chain, list) {
if (match_futex (&this->key, &key)) {
if (this->pi_state || this->rt_waiter) {
ret = -EINVAL;
break;
}
/* Check if one of the bits is set in both bitsets */
if (!(this->bitset & bitset))
continue;
mark_wake_futex(&wake_q, this); // 将符合条件的task_struct放入wake_up队列
if (++ret >= nr_wake) // 达到唤醒的最大总数
break;
}
}

spin_unlock(&hb->lock);
wake_up_q(&wake_q); // 遍历wake_up队列, 将其中所有的线程唤醒(将他们的进程状态改为TASK_RUNNING)

return ret;
}

futex_wake 流程如下:

  1. 找到uaddr对应的 futex_hash_bucket ,即代码中的hb
  2. 对hb加自旋锁
  3. 遍历fb的链表,找到uaddr对应的节点
  4. 调用 wake_futex 唤起等待的进程
  5. 释放自旋锁

补充一个有意思的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void lock(int lockval) {
//trylock是用户级的自旋锁
while(!trylock(lockval)) {
wait();//释放cpu,并将当期线程加入等待队列,是系统调用
}
}

boolean trylock(int lockval){
int i=0;
//localval=1代表上锁成功
while(!compareAndSet(lockval,0,1)){
if(++i>10){
return false;
}
}
return true;
}

void unlock(int lockval) {
compareAndSet(lockval,1,0);
notify();
}

上述代码的问题是trylock和wait两个调用之间存在一个窗口:如果一个线程trylock失败,在调用wait时持有锁的线程释放了锁,当前线程还是会调用wait进行等待,但之后就没有人再将该线程唤醒了。

看完上述的内容,其实上述的问题在于wait和判断不是原子操作。那么futex怎么解决这个问题呢?

答案就是底层已经帮助我们实现,通过加自旋锁,保证了判断锁状态和挂起线程原子性。

参考资料:

condition_variable机制

Q1:C++的std::condition_variable实现

condition_variable_t 在其内部实现中通常使用 futex 来实现高效的等待和通知机制。具体来说,std::condition_variable(C++ 标准库中的条件变量)在使用时可能会依赖于 futex,特别是在 Linux 等支持 futex 的系统上。

C++ 条件变量(std::condition_variable)通常依赖于操作系统提供的同步原语,如 POSIX 线程(pthreads)在 Linux 中的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
thread_queue wait_queue; // 等待队列
} condition_variable_t;

void cv_wait(condition_variable_t *cv, pthread_mutex_t *mtx) {
// 将当前线程加入等待队列
enqueue(&cv->wait_queue, current_thread);

// 解锁互斥量
pthread_mutex_unlock(mtx);

// 阻塞当前线程,等待条件满足
pthread_cond_wait(&cv->cond, mtx);

// 重新获取互斥量
pthread_mutex_lock(mtx);
}

void cv_notify_one(condition_variable_t *cv) {
// 唤醒一个等待的线程
pthread_cond_signal(&cv->cond);
}

void cv_notify_all(condition_variable_t *cv) {
// 唤醒所有等待的线程
pthread_cond_broadcast(&cv->cond);
}

pthread_cond_wait 的实现的简化示例:

1
2
3
4
5
6
7
8
9
10
11
12
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) {
// 释放互斥锁
pthread_mutex_unlock(mutex);

// 使用 futex 进入等待状态
int result = syscall(SYS_futex, &cond->__data, FUTEX_WAIT, 0, NULL, NULL, 0);

// 重新获取互斥锁
pthread_mutex_lock(mutex);

return result;
}

在这个实现中,SYS_futex 是 Linux 提供的 futex 系统调用,通过它可以在用户态和内核态之间切换,并将线程挂起。

有这个pthread_cond_wait,当然有pthread_cond_signal

1
2
3
4
int pthread_cond_signal(pthread_cond_t *cond) {
// 使用 futex 唤醒一个等待线程
return syscall(SYS_futex, &cond->__data, FUTEX_WAKE, 1, NULL, NULL, 0);
}

用于唤醒等待队列中的线程。

futexfutex 是一种混合锁机制,主要在用户态工作,但在需要时会借助内核态进行线程管理。它的基本思想是:大部分情况下,线程的同步操作都在用户态完成,只有在竞争激烈或需要挂起/唤醒线程时才进入内核态处理。这样可以减少上下文切换,从而提高性能。

Semaphore机制

在 Linux 中,信号量的实现通常依赖于 futex(“fast userspace mutex”)机制。futex 是 Linux 内核提供的一种高效的同步原语,它允许大多数情况下的操作在用户态完成,而只有在需要等待或唤醒线程时才会进入内核态。

linux下Semaphore数据结构

1
2
3
4
5
struct semaphore {
atomic_t count; // 信号量的计数器
raw_spinlock_t wait_lock; // 自旋锁,保护等待队列
struct list_head wait_list; // 等待队列,用于存储等待获取信号量的进程
};

futex 与信号量的关系

信号量的实现可以通过 futex 来管理等待和唤醒操作。futex 的核心思想是:在用户态通过共享内存中的一个整数变量进行快速的检查和更新,当需要阻塞或唤醒线程时,才通过 futex 系统调用进入内核态处理。

典型的信号量实现流程

  1. 初始化

    • 信号量通常包含一个计数器变量count,表示当前可用资源的数量。
    • count 的初始值由创建信号量时指定。
  2. P 操作waitdown):

    • 尝试将 count 减 1。如果减 1 后 count >= 0,表示有可用资源,线程可以继续执行。
    • 如果减 1 后 count < 0,表示没有可用资源。此时,线程需要进入等待状态,等待其他线程释放资源。此时,futex_wait 系统调用会被用来将线程挂起。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void sem_wait(sem_t *sem) {
    while (true) {
    int val = sem->count;
    if (val > 0) {
    if (__sync_bool_compare_and_swap(&sem->count, val, val - 1)) {
    break; // 资源获取成功
    }
    } else {
    // 资源不可用,使用 futex 挂起线程
    syscall(SYS_futex, &sem->count, FUTEX_WAIT, val, NULL, NULL, 0);
    }
    }
    }

    在上面的代码中:

    • __sync_bool_compare_and_swap 是一个原子操作,尝试将 sem->countval 更新到 val - 1。如果 count 值在操作期间没有改变,则更新成功,线程获取资源。
    • 如果 count 小于或等于 0,线程会调用 futex_wait 将自己挂起,直到信号量的值被其他线程增加并唤醒它。
  3. V 操作signalup):

    • 将信号量的 count 加 1。
    • 如果 count 加 1 后结果大于 0 且有线程因为等待资源而阻塞,则需要唤醒一个等待的线程。此时,futex_wake 系统调用会被用来唤醒等待的线程。
    1
    2
    3
    4
    5
    6
    7
    void sem_post(sem_t *sem) {
    int old_value = __sync_fetch_and_add(&sem->count, 1);
    if (old_value < 0) {
    // 唤醒挂起的线程
    syscall(SYS_futex, &sem->count, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
    }

    在上面的代码中:

    • __sync_fetch_and_add 是一个原子操作,它将 sem->count 增加 1,并返回增加前的值。
    • 如果增加前的 count 小于 0(表示有线程在等待),则调用 futex_wake 唤醒一个等待的线程。