前言

在上一篇博客中,介绍了通过指定原子变量的操作的内存顺序, 实现线程同步问题。本文看看原子变量和内存顺序的应用 – 无锁队列(Lock-Free Queue)。本文介绍无锁队列的实现,学习无锁队列设计的基本思路。

数组版本

单写单读队列

单写单读的队列比较简单, 这里我们使用循环队列实现. 如下图所示, 队列维护两个指针 headtail, 分别指向队首和队尾. tail 始终指向 dummy 节点, 这样 tail == head 表示队列为空, (tail + 1) % Cap == head 表示队列已满, 不用维护 size 成员.

image-20240507202501356

入队的时候移动 tail 指针, 而出队的时候移动 head 指针, 两个操作并无冲突. 不过, 出队前需要读取 tail 指针, 判断 tail != head 确认队列不为空; 同理入队时也要判断 (tail + 1) % Cap != head 以确认队列不满. 由于存在多个线程读写这两个指针, 因此它们都应该是原子变量.

此外, 由于两个操作在不同线程中执行, 我们还需考虑内存顺序. 如果初始队列为空, 线程 a 先执行入队操作, 线程 b 后执行出队操作, 则线程 a 入队操作的内容要对线程 b 可见.

image-20240507202552069

为了做到这一点, 需要有 a(2) “happens-before” b(3). 而 a(3) 和 b(2) 分别修改了读取了 tail, 所以应该利用原子变量同步, 使得 a(3) “synchronizes-with” b(2). 可以在 a(3) 写入 tail 的操作中使用 release, b(2) 读取 tail 的操作中使用 acquire 实现同步.

同理, 如果初始队列满, 线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见. 出队的时候需要调用出队元素的析构函数, 要保证出队元素正常销毁后才能在那个位置写入新元素, 否则会导致内存损坏. 可以在出队写入 head 的操作中使用 release, 入队读取 head 的操作中使用 acquire 实现出队 “synchronizes-with” 入队.

简单来说:

1.线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见.

2.线程 a 先执行入队操作, 线程 b 后执行出队操作, 则线程 a 入队操作的结果要对线程 b 可见.

使用memory_order_acquire和memory_order_release实现线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
template <typename T, size_t Cap>
class spsc : private allocator<T> {
T *data;
atomic<size_t> head{0}, tail{0};
public:
spsc(): data(allocator<T>::allocate(Cap)) {}
spsc(const spsc&) = delete;
spsc &operator=(const spsc&) = delete;
spsc &operator=(const spsc&) volatile = delete;

bool push(const T &val) {
return emplace(val);
}
bool push(T &&val) {
return emplace(std::move(val));
}

template <typename ...Args>
bool emplace(Args && ...args) { // 入队操作
size_t t = tail.load(memory_order_relaxed);
if ((t + 1) % Cap == head.load(memory_order_acquire)) // (1)
return false;
allocator<T>::construct(data + t, std::forward<Args>(args)...);
// (2) synchronizes-with (3)
tail.store((t + 1) % Cap, memory_order_release); // (2)
return true;
}

bool pop(T &val) { // 出队操作
size_t h = head.load(memory_order_relaxed);
if (h == tail.load(memory_order_acquire)) // (3)
return false;
val = std::move(data[h]);
allocator<T>::destroy(data + h);
// (4) synchronizes-with (1)
head.store((h + 1) % Cap, memory_order_release); // (4)
return true;
}
};

这种单写单读的无锁队列的两种操作可以同时执行, 且两种操作都只需要执行确定数量的指令, 因此数据 wait-free 结构, 性能很高.

给出完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// type __sync_val_compare_and_swap(type *ptr, type oldval type newval, ...)

#include <algorithm>
#include <vector>
#include <string>
#include <unordered_map>
#include <queue>
#include <functional>
#include <stack>
#include <iostream>
#include <unistd.h>
#include <thread>
#include <list>
#include <atomic>
using namespace std;
template <typename T, size_t Cap>
class Queue
{
public:
Queue() : data(new T[Cap]) {}
~Queue() { delete[] data; }

bool push(const T &val)
{
size_t t = tail.load(std::memory_order_relaxed);
if ((t + 1) % Cap == head.load(std::memory_order_acquire))
return false;
cout << "Producer: " << val << endl;
data[t % Cap] = val;
tail.store((t + 1) % Cap, std::memory_order_release);
return true;
}

bool pop(T &val)
{
size_t h = head.load(std::memory_order_relaxed);
if (h == tail.load(std::memory_order_acquire))
return false;
val = data[h % Cap];
head.store((h + 1) % Cap, std::memory_order_release);
return true;
}

private:
T *data;
std::atomic<size_t> head{0}, tail{0};
};

Queue<int, 5> q;

void Consumer()
{
int val;
while (true)
{
if (q.pop(val))
std::cout << "Consumer: " << val << std::endl;
else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void Producer()
{
while (true)
{
int num = rand() % 100;
if (!q.push(num))
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

int main()
{
std::thread consumer(Consumer);
std::thread producer(Producer);
consumer.join();
producer.join();
return 0;
}

CAS 操作

CAS (compare and swap) 是一种原子操作, 在一个不可被中断的过程中执行比较和交换. C++ 的 std::atomic 中有两种 CAS 操作, compare_exchange_weakcompare_exchange_strong

1
2
bool std::atomic<T>::compare_exchange_weak(T &expected, T desired);
bool std::atomic<T>::compare_exchange_strong(T &expected, T desired);

这两种 CAS 操作基本上是相同的:

  • 如果原子变量与 expected 相等, 则将其赋值为 desired 并返回 true;

  • 否则 expected 赋值成原子变量当前的值并返回 false.

下面是 compare_exchange_strong 的一个伪实现

1
2
3
4
5
6
7
8
template <typename T>
bool atomic<T>::compare_exchange_strong(T &expected, T desired) {
std::lock_guard<std::mutex> guard(m_lock);
if (m_val == expected)
return m_val = desired, true;
else
return expected = m_val, false;
}

当然实际的实现不可能是这样的.

compare_exchange_weakcompare_exchange_strong 的区别在于, compare_exchange_weak 有可能在当前值与 expected 相等时仍然不执行交换并返回 false; compare_exchange_strong 则不会有这个问题. weak 版本能让编译器在一些平台下生成一些更优的代码, 在 x86 下是没区别的.

compare_exchange_* 支持指定两个内存顺序: 成功时的内存顺序和失败时的内存顺序.

1
2
3
bool compare_exchange_weak(T& expected, T desired,
std::memory_order success,
std::memory_order failure);

我们可以利用 CAS 操作实现很多无锁数据结构. 下面我们来看如何实现多写多读的队列.

多写多读队列

前面单读单写能否用到执行多写多读呢?答案是不可以的。

1
2
3
4
5
6
7
8
9
bool spsc<T, Cap>::pop(T &val) {
size_t h = head.load(); // (1)
if (h == tail.load())
return false;
val = std::move(data[h]); // (2)
allocator<T>::destroy(data + h);
head.store((h + 1) % Cap); // (3)
return true;
}

假设有两个线程 a 和 b 同时调用 pop, 执行顺序是 a(1), b(1), b(2) a(2). 这种情况下, 线程 a 和线程 b 都读到相同的 head 指针, 存储在变量 h 中. 当 a(2) 尝试读取 data[h] 时, 其中的数据已经在 b(2) 中被 move 走了. 因此这样的队列不允许多个线程同时执行 pop 操作.

简单来说,就是2个线程都调用pop,但是只要一个data[h]可给val.

解决抢占问题

可以看到, 整个 pop 函数是一个非原子过程, 一旦这个过程别其他线程抢占, 就会出问题. 如何解决这个问题呢? 在无锁数据结构中, 一种常用的做法是不断重试. 具体的做法是, 在非原子过程的最后一步设计一个 CAS 操作, 如果过程被其他线程抢占, 则 CAS 操作失败, 并重新执行整个过程. 否则 CAS 操作成功, 完成整个过程的最后一步.

1
2
3
4
5
6
7
8
9
10
bool spsc<T, Cap>::pop(T &val) {
size_t h;
do {
h = head.load(); // (1)
if (h == tail.load())
return false;
val = data[h]; // (2)
} while (!head.compare_exchange_strong(h, (h + 1) % Cap)); // (3)
return true;
}

首先注意到我们不再使用 std::moveallocator::destroy, 而是直接复制, 使得循环体内的操作不会修改队列本身. (3) 是整个过程的最后一步, 也是唯一会修改队列的一步, 我们使用了一个 CAS 操作. 只有当 head 的值等于第 (1) 步获取的值的时候, 才会移动 head 指针, 并且返回 true 跳出循环; 否则就不断重试.【简单来说,就是只能有一个pop,其他的需要进入循环等待.利用CAS进行暴力美学】

这样如果多个线程并发执行 pop, 则只有成功执行 (3) 的线程被视为成功执行了整个过程, 其它的线程都会因为被抢占, 导致执行 (3) 的时候 head 被修改, 因而与局部变量 h 不相等, 导致 CAS 操作失败. 这样它们就要重试整个过程.

类似的思路也可以用在 push 上. 看看如果我们用同样的方式修改 push 会怎样:

1
2
3
4
5
6
7
8
9
10
bool spsc<T, Cap>::push(const T &val) {
size_t t;
do {
t = tail.load(); // (1)
if ((t + 1) % Cap == head.load())
return false;
data[t] = val; // (2)
} while (!tail.compare_exchange_strong(t, (t + 1) % Cap)); // (3)
return true;
}

pop 操作不同, push 操作的第 (2) 步需要对 data[t] 赋值, 导致循环体内的操作会修改队列. 假设 a, b 两个线程的执行顺序是 a(1), a(2), b(1), b(2), a(3). a 可以成功执行到 (3), 但是入队的值却被 b(2) 覆盖掉了.【简单来说,就是2个线程push,但是可能后者线程会覆盖前者线程的值的这样的问题】

我们尝试将赋值操作 data[t] = val 移到循环的外面, 这样循环体内的操作就不会修改队列了. 当循环退出时, 能确保 tail 向后移动了一格, 且 t 指向 tail 移动前的位置. 这样并发的时候就不会有其他线程覆盖我们写入的值.

1
2
3
4
5
6
7
8
9
10
bool spsc<T, Cap>::push(const T &val) {
size_t t;
do {
t = tail.load(); // (1)
if ((t + 1) % Cap == head.load())
return false;
} while (!tail.compare_exchange_strong(t, (t + 1) % Cap)); // (2)
data[t] = val; // (3)
return true;
}

但是这样做的问题是, 我们先移动 tail 指针再对 data[t] 赋值, 会导致 pushpop 并发不正确. 回顾下 pop 的代码:

1
2
3
4
5
6
7
8
9
10
bool spsc<T, Cap>::pop(T &val) {
size_t h;
do {
h = head.load();
if (h == tail.load()) // (4)
return false;
val = data[h]; // (5)
} while (!head.compare_exchange_strong(h, (h + 1) % Cap));
return true;
}

同样假设有两个线程 a 和 b. 假设队列初始为空

  • 线程 a 调用 push , 执行 a(1), a(2). tail 被更新, 然后切换到线程 b
  • 线程 b 调用 pop , 执行 b(4). 因为 tail 被更新, 因此判断队列不为空
  • 执行到 b(5), 会读取到无效的值

image-20240507205821012

【简单来说,就是当满足pop条件,但是没有值的这样问题】

为了实现 pushpop 的并发, pushdata[t] 的写入必须 “happens-before” popdata[h] 的读取. 因此这就要求 push 操作先对 data[t] 赋值, 再移动 tail 指针. 可是前面为了实现 pushpush 的并发我们又让 push 操作先移动 tail 再对 data[t] 赋值. 如何解决这一矛盾呢?

解决办法是引入一个新的指针 write , 用于 pushpop 同步. 它表示 push 操作写到了哪个位置.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template <typename T, size_t Cap>
class ring_buffer {
T data[Cap];
atomic<size_t> head{0}, tail{0}, write{0};

public:
ring_buffer() = default;
ring_buffer(const ring_buffer&) = delete;
ring_buffer &operator=(const ring_buffer&) = delete;
ring_buffer &operator=(const ring_buffer&) volatile = delete;

bool push(const T &val) {
size_t t, w;
do {
t = tail.load();
if ((t + 1) % Cap == head.load())
return false;
} while (!tail.compare_exchange_weak(t, (t + 1) % Cap)); // (1)
data[t] = val; // (2)
do {
w = t;
} while (!write.compare_exchange_weak(w, (w + 1) % Cap)); // (3), (3) synchronizes-with (4)
return true;
}

bool pop(T &val) {
size_t h;
do {
h = head.load();
if (h == write.load()) // (4) 读 write 的值
return false;
val = data[h]; // (5)
} while (!head.compare_exchange_strong(h, (h + 1) % Cap));
return true;
}
};

还是暴力,多加个变量解决上述问题。

push 操作的基本步骤是:

  1. 移动 tail;
  2. data[t] 赋值, t 等于 tail 移动前的位置;
  3. 移动 write. write 移动后等于 tail.

pop 操作使用 write 指针判断队列中是否有元素. 因为有 (3) “synchronizes-with” (4), 所以 (2) “happens-before” (5), pop 能读到 push 写入的值. 在 push 函数中, 只有在当前的 write 等于 t 时才将 write 移动一格, 能确保最终 write 等于 tail.

这种多写多读的无锁队列的两种操作可以同时执行, 但是每种操作都有可能要重试, 因此属于 lock-free 结构.

考虑内存顺序

前面例子使用默认的内存顺序, 也就是 memory_order_seq_cst . 为了优化性能, 可以使用更宽松的内存顺序. 而要考虑内存顺序, 就要找出其中的 happens-before 的关系.

前面分析了, push 中的赋值操作 data[t] = val 要 “happens-before” pop 中的读取操作 val = data[h], 这是通过 write 原子变量实现的: push 中对 write 的修改要 “synchronizes-with” pop 中对 write 的读取. 因此 push 修改 write 的 CAS 操作应该使用 release, pop 读取 write 时则应使用 acquire.

同理, 当队列初始为满的时候, 先运行 pop 在运行 push, 要保证 pop 中的读取操作 val = data[h] “happens-before” push 中的赋值操作 data[t] = val. 这是通过 head 原子变量实现的: pop 中对 head 的修改要 “synchronizes-with” push 中对 head 的读取. 因此 pop 修改 head 的 CAS 操作应该使用 release, push 读取 head 时则应使用 acquire.

这部分和上面一样,就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
bool ring_buffer<T, Cap>::push(const T &val) {
size_t t, w;
do {
t = tail.load(memory_order_relaxed); // (1)
if ((t + 1) % Cap == head.load(memory_order_acquire)) //(2)
return false;
} while (!tail.compare_exchange_weak(t, (t + 1) % Cap, memory_order_relaxed)); // (3)
data[t] = val; // (4), (4) happens-before (8)
do {
w = t;
} while (!write.compare_exchange_weak(w, (w + 1) % Cap,
memory_order_release, memory_order_relaxed)); // (5), (5) synchronizes-with (7)
return true;
}

bool ring_buffer<T, Cap>::pop(T &val) {
size_t h;
do {
h = head.load(memory_order_relaxed); // (6)
if (h == write.load(memory_order_acquire)) // (7)
return false;
val = data[h]; // (8), (8) happens-before (4)
} while (!head.compare_exchange_strong(h, (h + 1) % Cap,
memory_order_release, memory_order_relaxed)); // (9), (9) synchronizes-with (2)
return true;
}

pushpush 并发移动 tail 指针的时候, 只影响到 tail 本身. 因此 (1) 和 (3) 对 tail 读写使用 relaxed 就可以了. 同样 pushpush 并发移动 write 指针时, 也不需要利用它做同步, 因此 (5) 处的做法是

1
2
write.compare_exchange_weak(w, (w + 1) % Cap,
memory_order_release, memory_order_relaxed)

成功时使用 release, 为了与 pop 同步; 而失败时使用 relaxed 就可以了.

同理, poppop 并发移动 head 时, 也影响到 head 本身. 因此 (6) 读取 head 使用 relaxed 即可. 而 (9) 处为了与 push 同步, 成功时要使用 release, 失败时使用 relaxed 即可.

最后,给出完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// type __sync_val_compare_and_swap(type *ptr, type oldval type newval, ...)

#include <algorithm>
#include <vector>
#include <string>
#include <unordered_map>
#include <queue>
#include <functional>
#include <stack>
#include <iostream>
#include <unistd.h>
#include <thread>
#include <list>
#include <atomic>
using namespace std;
template <typename T, size_t Cap>
class Queue
{
public:
Queue() : data(new T[Cap]) {}
~Queue() { delete[] data; }

bool push(const T &val)
{
size_t t, w;
do
{
t = tail.load(std::memory_order_relaxed);
if ((t + 1) % Cap == head.load(std::memory_order_acquire))
return false;
} while (!tail.compare_exchange_strong(t, (t + 1) % Cap, memory_order_relaxed));
data[t % Cap] = val;
cout << "Producer:" << this_thread::get_id() << " data:" << val << endl;
do
{
w = t;
} while (!write.compare_exchange_strong(w, (w + 1) % Cap, memory_order_release, memory_order_relaxed));
return true;
}

bool pop()
{
T val;
size_t h;
do
{
h = head.load(std::memory_order_relaxed);
if (h == write.load(std::memory_order_acquire))
return false;
val = data[h % Cap];
} while (!head.compare_exchange_strong(h, (h + 1) % Cap,
memory_order_release, memory_order_relaxed));
cout << "Comsumer:" << this_thread::get_id() << " data:" << val << endl;
return true;
}

private:
T *data;
std::atomic<size_t> head{0}, tail{0}, write{0};
};

Queue<int, 5> q;

void Consumer()
{
int val;
while (true)
{
if (!q.pop())
std::this_thread::sleep_for(std::chrono::milliseconds(10));
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}

void Producer()
{
while (true)
{
int num = rand() % 100;
if (!q.push(num))
std::this_thread::sleep_for(std::chrono::milliseconds(10));
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
}

int main()
{
vector<thread> p, c;
for (int i = 0; i < 2; i++)
{
p.emplace_back(Producer);
}
for (int i = 0; i < 2; i++)
{
c.emplace_back(Consumer);
}
for (auto &ci : c)
ci.join();
for (auto &pi : p)
pi.join();
return 0;
}

链表版本

废话少说,直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <iostream>
#include <atomic>
#include <thread>
#include <memory>
#include <chrono>
using namespace std;

template <typename T>
class FreeLockQueue
{
private:
struct Node
{
T data;
atomic<Node *> next;
Node(const T &x) : data(x), next(nullptr) {}
};

atomic<Node *> head;
atomic<Node *> tail;

public:
FreeLockQueue()
{
Node *node = new Node(-1);
head.store(node);
tail.store(node);
}

~FreeLockQueue()
{
while (Node *h = head.load())
{
Node *tmp = h->next.load();
delete h;
head.store(tmp);
}
}

// 入队操作,将一个元素添加到队列的尾部
void enqueue(const T &x)
{
Node *cur = new Node(x); // 创建一个新的节点
Node *t = nullptr;
while (true)
{
t = tail.load(); // 读取尾指针
Node *next = t->next.load(); // 读取尾节点的下一个节点

if (t != tail.load()) // 检查尾指针是否被其他线程修改
{
continue; // 如果尾指针被修改,则重新开始循环
}
// 假设现在有两个线程 A 和 B,他们同时尝试对队列进行 enqueue 操作。
if (next != nullptr) // 检查尾节点的下一个节点是否为空
{
tail.compare_exchange_strong(t, next); // 如果不为空,则尝试更新尾指针
continue; // 无论是否更新成功,都继续循环
}

if (t->next.compare_exchange_strong(next, cur)) // 尝试将尾节点的下一个节点设置为新节点
break; // 如果设置成功,则跳出循环
}
tail.compare_exchange_strong(t, cur); // 尝试更新尾指针为新节点
}
T dequeue()
{
Node *h = nullptr; // 创建一个指针 h,用来引用队列头部的节点
T d; // 创建一个变量 d,用来存储被出队元素的数据
while (true) // 循环,直到成功出队一个元素
{
h = head.load(); // 读取头指针
Node *t = tail.load(); // 读取尾指针
Node *next = h->next.load(); // 读取头部节点的下一个节点

if (h != head.load()) // 检查头指针是否被其他线程修改
continue; // 如果头指针被修改,则重新开始循环

if (h == t && next == nullptr) // 检查队列是否为空
{
cout << "Can not dequeue from empty queue" << endl; // 如果队列为空,则输出错误信息
this_thread::sleep_for(chrono::milliseconds(1000)); // 暂停一段时间,然后重新开始循环
continue;
}

if (h == t && next != nullptr) // 检查头部节点是否已经被其他线程移动到尾部
{
tail.compare_exchange_strong(t, next); // 如果是,则尝试更新尾指针
continue; // 无论是否更新成功,都继续循环
}

if (head.compare_exchange_strong(h, next)) // 尝试将头指针更新为下一个节点
{
d = next->data; // 如果更新成功,则读取被出队元素的数据
break; // 然后跳出循环
}
}

delete h; // 删除被出队的节点
return d; // 返回被出队元素的数据
}
};

FreeLockQueue<int> mFQ;

void producer1()
{
for (int i = 0; i < 10; i++)
{
mFQ.enqueue(i);
cout << "Enqueue data: " << i << " successfully!" << endl;
}
}
void producer2()
{
for (int i = 10; i < 20; i++)
{
mFQ.enqueue(i);
cout << "Enqueue data: " << i << " successfully!" << endl;
}
}

void consumer()
{
while (true)
{
int d = mFQ.dequeue();
if (d != -1)
cout << "Get data: " << d << " successfully!" << endl;
}
}

int main()
{
thread p1(producer1);
thread p2(producer2);
thread ca(consumer);
thread cb(consumer);

p1.join();
p2.join();
ca.join();
cb.join();
system("pause");
return 0;
}

比较难以理解的2个点:

1.tail.compare_exchange_strong(t, next);这行代码什么时候为true?

假设现在有两个线程 A 和 B,他们同时尝试对队列进行 enqueue 操作。

  1. 线程 A 执行到 t = tail.load()Node *next = t->next.load(),此时 t 是尾节点,nextnullptr
  2. 线程 B 插入一个新节点,更新了尾指针和原尾节点的 next 指针。
  3. 线程 A 继续执行,此时它认为 t 还是尾节点,且 tnext 应该是 nullptr。但实际上,由于线程 B 的插入操作,tnext 已经不再是 nullptr

所以,在这个地方进行 if (next != nullptr) 的判断,就是为了检查这种情况。如果发现 next 不为空,说明尾节点已经被其他线程修改过,此时应该重新读取尾指针,并尝试将尾指针更新为 next。然后继续循环,直到成功将新节点添加到队列的尾部。

2.tail.compare_exchange_strong(t, next); 这行代码什么时候为true?

假设现在有两个线程 A 和 B,线程 A 正在执行 dequeue 操作,而线程 B 正在执行 enqueue 操作。

  1. 线程 A 读取头指针 h 和尾指针 t,此时 ht 都指向同一个节点,表示队列为空。
  2. 线程 B 在队列的尾部添加了一个新的节点,但尾指针 tail 还没有被更新。
  3. 线程 A 继续执行,此时它读取头部节点(也是尾部节点)的 next 指针,发现 next 不为空,即头部节点的后面有一个新的节点。
  4. 所以,线程 A 满足 if (h == t && next != nullptr) 的条件,然后尝试使用 compare_exchange_strong 方法更新尾指针 tail

ABA问题

问题概述

在多线程编程中,CAS(Compare And Swap)是一种重要的无锁算法。CAS通过原子操作比较内存中的某个值和预期值,如果它们相等,就将内存中的值替换为新的值。但是,CAS也有一个著名的问题,就是ABA问题。

ABA问题是指,在CAS操作中,如果一个变量原来是A,经过其他线程的两次操作,变成了B,然后又变成了A,那么在使用CAS检查时,会发现它的值没有发生变化,但是实际上却发生了变化。这就是所谓的ABA问题。

例如,考虑一个多线程的栈实现。假设有两个线程,线程1和线程2。首先,线程1从栈中取出一个元素A,然后线程1被挂起。此时线程2执行,它从栈中取出元素A,然后压入新的元素B,接着再次压入元素A。当线程1恢复运行时,它会发现栈顶元素仍然是A,因此它认为没有其他线程修改过栈。然而,实际上栈的状态已经发生了改变。

解决方法

解决CAS中的ABA问题常用的方法是使用"带版本号的CAS"或者"原子引用"。这种方法在每次修改变量时,都会增加一个版本号。即使一个变量的值被改变后又被改回来,其版本号也会发生变化,因此可以避免ABA问题。

具体可以通过以下几步来解决ABA问题:

  1. 增加一个版本号的记录。每次修改变量,都让版本号自增。这样,在执行CAS操作时,不仅会检查变量的值,还会检查版本号。
  2. 在执行CAS操作时,将版本号和变量的值一起作为CAS的比较对象。
  3. 如果在执行CAS操作时,版本号或变量的值有任何一个与预期不符,那么CAS操作就会失败。

其中,简单的是直接使用boost库中的boost::lockfree::queue。这是一个线程安全的无锁队列,可以在多线程环境下使用。

参考文献