前言
在上一篇博客中,介绍了通过指定原子变量的操作的内存顺序, 实现线程同步问题。本文看看原子变量和内存顺序的应用 – 无锁队列(Lock-Free Queue)。本文介绍无锁队列的实现,学习无锁队列设计的基本思路。
感兴趣的小伙伴可以看看这篇论文:《Lock-Free Data Structures》
强推:A common used C++ DAG framework
数组版本
单写单读队列
单写单读的队列比较简单, 这里我们使用循环队列实现. 如下图所示, 队列维护两个指针 head
和 tail
, 分别指向队首和队尾. tail
始终指向 dummy 节点, 这样 tail == head
表示队列为空, (tail + 1) % Cap == head
表示队列已满, 不用维护 size
成员.
入队的时候移动 tail
指针, 而出队的时候移动 head
指针, 两个操作并无冲突. 不过, 出队前需要读取 tail
指针, 判断 tail != head
确认队列不为空; 同理入队时也要判断 (tail + 1) % Cap != head
以确认队列不满. 由于存在多个线程读写这两个指针, 因此它们都应该是原子变量.
此外, 由于两个操作在不同线程中执行, 我们还需考虑内存顺序. 如果初始队列为空, 线程 a 先执行入队操作, 线程 b 后执行出队操作, 则线程 a 入队操作的内容要对线程 b 可见.
为了做到这一点, 需要有 a(2) “happens-before” b(3). 而 a(3) 和 b(2) 分别修改了读取了 tail
, 所以应该利用原子变量同步, 使得 a(3) “synchronizes-with” b(2). 可以在 a(3) 写入 tail
的操作中使用 release, b(2) 读取 tail
的操作中使用 acquire 实现同步.
同理, 如果初始队列满, 线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见. 出队的时候需要调用出队元素的析构函数, 要保证出队元素正常销毁后才能在那个位置写入新元素, 否则会导致内存损坏. 可以在出队写入 head
的操作中使用 release, 入队读取 head
的操作中使用 acquire 实现出队 “synchronizes-with” 入队.
简单来说:
1.线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见.
2.线程 a 先执行入队操作, 线程 b 后执行出队操作, 则线程 a 入队操作的结果要对线程 b 可见.
使用memory_order_acquire和memory_order_release实现线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 template <typename T, size_t Cap>class spsc : private allocator<T> { T *data; atomic<size_t > head{0 }, tail{0 }; public : spsc (): data (allocator<T>::allocate (Cap)) {} spsc (const spsc&) = delete ; spsc &operator =(const spsc&) = delete ; spsc &operator =(const spsc&) volatile = delete ; bool push (const T &val) { return emplace (val); } bool push (T &&val) { return emplace (std::move (val)); } template <typename ...Args> bool emplace (Args && ...args) { size_t t = tail.load (memory_order_relaxed); if ((t + 1 ) % Cap == head.load (memory_order_acquire)) return false ; allocator<T>::construct (data + t, std::forward<Args>(args)...); tail.store ((t + 1 ) % Cap, memory_order_release); return true ; } bool pop (T &val) { size_t h = head.load (memory_order_relaxed); if (h == tail.load (memory_order_acquire)) return false ; val = std::move (data[h]); allocator<T>::destroy (data + h); head.store ((h + 1 ) % Cap, memory_order_release); return true ; } };
这种单写单读的无锁队列的两种操作可以同时执行, 且两种操作都只需要执行确定数量的指令, 因此数据 wait-free 结构, 性能很高.
给出完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <algorithm> #include <vector> #include <string> #include <unordered_map> #include <queue> #include <functional> #include <stack> #include <iostream> #include <unistd.h> #include <thread> #include <list> #include <atomic> using namespace std;template <typename T, size_t Cap>class Queue { public : Queue () : data (new T[Cap]) {} ~Queue () { delete [] data; } bool push (const T &val) { size_t t = tail.load (std::memory_order_relaxed); if ((t + 1 ) % Cap == head.load (std::memory_order_acquire)) return false ; cout << "Producer: " << val << endl; data[t % Cap] = val; tail.store ((t + 1 ) % Cap, std::memory_order_release); return true ; } bool pop (T &val) { size_t h = head.load (std::memory_order_relaxed); if (h == tail.load (std::memory_order_acquire)) return false ; val = data[h % Cap]; head.store ((h + 1 ) % Cap, std::memory_order_release); return true ; } private : T *data; std::atomic<size_t > head{0 }, tail{0 }; }; Queue<int , 5 > q; void Consumer () { int val; while (true ) { if (q.pop (val)) std::cout << "Consumer: " << val << std::endl; else std::this_thread::sleep_for (std::chrono::milliseconds (10 )); } } void Producer () { while (true ) { int num = rand () % 100 ; if (!q.push (num)) std::this_thread::sleep_for (std::chrono::milliseconds (10 )); } } int main () { std::thread consumer (Consumer) ; std::thread producer (Producer) ; consumer.join (); producer.join (); return 0 ; }
CAS 操作
CAS (compare and swap) 是一种原子操作, 在一个不可被中断的过程中执行比较和交换. C++ 的 std::atomic
中有两种 CAS 操作, compare_exchange_weak
和 compare_exchange_strong
。
1 2 bool std::atomic<T>::compare_exchange_weak (T &expected, T desired);bool std::atomic<T>::compare_exchange_strong (T &expected, T desired);
这两种 CAS 操作基本上是相同的:
下面是 compare_exchange_strong
的一个伪实现
1 2 3 4 5 6 7 8 template <typename T>bool atomic<T>::compare_exchange_strong (T &expected, T desired) { std::lock_guard<std::mutex> guard (m_lock) ; if (m_val == expected) return m_val = desired, true ; else return expected = m_val, false ; }
当然实际的实现不可能是这样的.
compare_exchange_weak
和 compare_exchange_strong
的区别在于, compare_exchange_weak
有可能在当前值与 expected
相等时仍然不执行交换并返回 false
; compare_exchange_strong
则不会有这个问题. weak 版本能让编译器在一些平台下生成一些更优的代码, 在 x86 下是没区别的.
compare_exchange_*
支持指定两个内存顺序: 成功时的内存顺序和失败时的内存顺序.
1 2 3 bool compare_exchange_weak (T& expected, T desired, std::memory_order success, std::memory_order failure) ;
我们可以利用 CAS 操作实现很多无锁数据结构. 下面我们来看如何实现多写多读的队列.
多写多读队列
前面单读单写能否用到执行多写多读呢?答案是不可以的。
1 2 3 4 5 6 7 8 9 bool spsc<T, Cap>::pop (T &val) { size_t h = head.load (); if (h == tail.load ()) return false ; val = std::move (data[h]); allocator<T>::destroy (data + h); head.store ((h + 1 ) % Cap); return true ; }
假设有两个线程 a 和 b 同时调用 pop
, 执行顺序是 a(1), b(1), b(2) a(2). 这种情况下, 线程 a 和线程 b 都读到相同的 head
指针, 存储在变量 h
中. 当 a(2) 尝试读取 data[h]
时, 其中的数据已经在 b(2) 中被 move 走了. 因此这样的队列不允许多个线程同时执行 pop 操作.
简单来说,就是2个线程都调用pop,但是只要一个data[h]可给val.
解决抢占问题
可以看到, 整个 pop
函数是一个非原子过程, 一旦这个过程别其他线程抢占, 就会出问题. 如何解决这个问题呢? 在无锁数据结构中, 一种常用的做法是不断重试 . 具体的做法是, 在非原子过程的最后一步设计一个 CAS 操作, 如果过程被其他线程抢占, 则 CAS 操作失败, 并重新执行整个过程. 否则 CAS 操作成功, 完成整个过程的最后一步.
1 2 3 4 5 6 7 8 9 10 bool spsc<T, Cap>::pop (T &val) { size_t h; do { h = head.load (); if (h == tail.load ()) return false ; val = data[h]; } while (!head.compare_exchange_strong (h, (h + 1 ) % Cap)); return true ; }
首先注意到我们不再使用 std::move
和 allocator::destroy
, 而是直接复制, 使得循环体内的操作不会修改队列本身. (3) 是整个过程的最后一步, 也是唯一会修改队列的一步, 我们使用了一个 CAS 操作. 只有当 head
的值等于第 (1) 步获取的值的时候, 才会移动 head
指针, 并且返回 true
跳出循环; 否则就不断重试.【简单来说,就是只能有一个pop,其他的需要进入循环等待.利用CAS进行暴力美学】
这样如果多个线程并发执行 pop
, 则只有成功执行 (3) 的线程被视为成功执行了整个过程, 其它的线程都会因为被抢占, 导致执行 (3) 的时候 head
被修改, 因而与局部变量 h
不相等, 导致 CAS 操作失败. 这样它们就要重试整个过程.
类似的思路也可以用在 push
上. 看看如果我们用同样的方式修改 push
会怎样:
1 2 3 4 5 6 7 8 9 10 bool spsc<T, Cap>::push (const T &val) { size_t t; do { t = tail.load (); if ((t + 1 ) % Cap == head.load ()) return false ; data[t] = val; } while (!tail.compare_exchange_strong (t, (t + 1 ) % Cap)); return true ; }
与 pop
操作不同, push
操作的第 (2) 步需要对 data[t]
赋值, 导致循环体内的操作会修改队列. 假设 a, b 两个线程的执行顺序是 a(1), a(2), b(1), b(2), a(3). a 可以成功执行到 (3), 但是入队的值却被 b(2) 覆盖掉了.【简单来说,就是2个线程push,但是可能后者线程会覆盖前者线程的值的这样的问题】
我们尝试将赋值操作 data[t] = val
移到循环的外面, 这样循环体内的操作就不会修改队列了. 当循环退出时, 能确保 tail
向后移动了一格, 且 t
指向 tail
移动前的位置. 这样并发的时候就不会有其他线程覆盖我们写入的值.
1 2 3 4 5 6 7 8 9 10 bool spsc<T, Cap>::push (const T &val) { size_t t; do { t = tail.load (); if ((t + 1 ) % Cap == head.load ()) return false ; } while (!tail.compare_exchange_strong (t, (t + 1 ) % Cap)); data[t] = val; return true ; }
但是这样做的问题是, 我们先移动 tail
指针再对 data[t]
赋值, 会导致 push
与 pop
并发不正确. 回顾下 pop
的代码:
1 2 3 4 5 6 7 8 9 10 bool spsc<T, Cap>::pop (T &val) { size_t h; do { h = head.load (); if (h == tail.load ()) return false ; val = data[h]; } while (!head.compare_exchange_strong (h, (h + 1 ) % Cap)); return true ; }
同样假设有两个线程 a 和 b. 假设队列初始为空
线程 a 调用 push
, 执行 a(1), a(2). tail
被更新, 然后切换到线程 b
线程 b 调用 pop
, 执行 b(4). 因为 tail
被更新, 因此判断队列不为空
执行到 b(5), 会读取到无效的值
【简单来说,就是当满足pop条件,但是没有值的这样问题】
为了实现 push
与 pop
的并发, push
对 data[t]
的写入必须 “happens-before” pop
对 data[h]
的读取. 因此这就要求 push
操作先对 data[t]
赋值, 再移动 tail
指针. 可是前面为了实现 push
与 push
的并发我们又让 push
操作先移动 tail
再对 data[t]
赋值. 如何解决这一矛盾呢?
解决办法是引入一个新的指针 write
, 用于 push
与 pop
同步. 它表示 push
操作写到了哪个位置 .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 template <typename T, size_t Cap>class ring_buffer { T data[Cap]; atomic<size_t > head{0 }, tail{0 }, write{0 }; public : ring_buffer () = default ; ring_buffer (const ring_buffer&) = delete ; ring_buffer &operator =(const ring_buffer&) = delete ; ring_buffer &operator =(const ring_buffer&) volatile = delete ; bool push (const T &val) { size_t t, w; do { t = tail.load (); if ((t + 1 ) % Cap == head.load ()) return false ; } while (!tail.compare_exchange_weak (t, (t + 1 ) % Cap)); data[t] = val; do { w = t; } while (!write.compare_exchange_weak (w, (w + 1 ) % Cap)); return true ; } bool pop (T &val) { size_t h; do { h = head.load (); if (h == write.load ()) return false ; val = data[h]; } while (!head.compare_exchange_strong (h, (h + 1 ) % Cap)); return true ; } };
还是暴力,多加个变量解决上述问题。
push
操作的基本步骤是:
移动 tail
;
对 data[t]
赋值, t
等于 tail
移动前的位置;
移动 write
. write
移动后等于 tail
.
而 pop
操作使用 write
指针判断队列中是否有元素. 因为有 (3) “synchronizes-with” (4), 所以 (2) “happens-before” (5), pop
能读到 push
写入的值. 在 push
函数中, 只有在当前的 write
等于 t
时才将 write
移动一格, 能确保最终 write
等于 tail
.
这种多写多读的无锁队列的两种操作可以同时执行, 但是每种操作都有可能要重试, 因此属于 lock-free 结构.
考虑内存顺序
前面例子使用默认的内存顺序, 也就是 memory_order_seq_cst . 为了优化性能, 可以使用更宽松的内存顺序. 而要考虑内存顺序, 就要找出其中的 happens-before 的关系.
前面分析了, push
中的赋值操作 data[t] = val
要 “happens-before” pop
中的读取操作 val= data[h]
, 这是通过 write
原子变量实现的: push
中对 write
的修改要 “synchronizes-with” pop
中对 write
的读取. 因此 push
修改 write
的 CAS 操作应该使用 release, pop
读取 write
时则应使用 acquire.
同理, 当队列初始为满的时候, 先运行 pop
在运行 push
, 要保证 pop
中的读取操作 val = data[h]
“happens-before” push
中的赋值操作 data[t] = val
. 这是通过 head
原子变量实现的: pop
中对 head
的修改要 “synchronizes-with” push
中对 head
的读取. 因此 pop
修改 head
的 CAS 操作应该使用 release, push
读取 head
时则应使用 acquire.
这部分和上面一样,就是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 bool ring_buffer<T, Cap>::push (const T &val) { size_t t, w; do { t = tail.load (memory_order_relaxed); if ((t + 1 ) % Cap == head.load (memory_order_acquire)) return false ; } while (!tail.compare_exchange_weak (t, (t + 1 ) % Cap, memory_order_relaxed)); data[t] = val; do { w = t; } while (!write.compare_exchange_weak (w, (w + 1 ) % Cap, memory_order_release, memory_order_relaxed)); return true ; } bool ring_buffer<T, Cap>::pop (T &val) { size_t h; do { h = head.load (memory_order_relaxed); if (h == write.load (memory_order_acquire)) return false ; val = data[h]; } while (!head.compare_exchange_strong (h, (h + 1 ) % Cap, memory_order_release, memory_order_relaxed)); return true ; }
push
与 push
并发移动 tail
指针的时候, 只影响到 tail
本身. 因此 (1) 和 (3) 对 tail
读写使用 relaxed 就可以了. 同样 push
与 push
并发移动 write
指针时, 也不需要利用它做同步, 因此 (5) 处的做法是
1 2 write.compare_exchange_weak (w, (w + 1 ) % Cap, memory_order_release, memory_order_relaxed)
成功时使用 release, 为了与 pop
同步; 而失败时使用 relaxed 就可以了.
同理, pop
与 pop
并发移动 head
时, 也影响到 head
本身. 因此 (6) 读取 head
使用 relaxed 即可. 而 (9) 处为了与 push
同步, 成功时要使用 release, 失败时使用 relaxed 即可.
最后,给出完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 #include <algorithm> #include <vector> #include <string> #include <unordered_map> #include <queue> #include <functional> #include <stack> #include <iostream> #include <unistd.h> #include <thread> #include <list> #include <atomic> using namespace std;template <typename T, size_t Cap>class Queue { public : Queue () : data (new T[Cap]) {} ~Queue () { delete [] data; } bool push (const T &val) { size_t t, w; do { t = tail.load (std::memory_order_relaxed); if ((t + 1 ) % Cap == head.load (std::memory_order_acquire)) return false ; } while (!tail.compare_exchange_strong (t, (t + 1 ) % Cap, memory_order_relaxed)); data[t % Cap] = val; cout << "Producer:" << this_thread::get_id () << " data:" << val << endl; do { w = t; } while (!write.compare_exchange_strong (w, (w + 1 ) % Cap, memory_order_release, memory_order_relaxed)); return true ; } bool pop () { T val; size_t h; do { h = head.load (std::memory_order_relaxed); if (h == write.load (std::memory_order_acquire)) return false ; val = data[h % Cap]; } while (!head.compare_exchange_strong (h, (h + 1 ) % Cap, memory_order_release, memory_order_relaxed)); cout << "Comsumer:" << this_thread::get_id () << " data:" << val << endl; return true ; } private : T *data; std::atomic<size_t > head{0 }, tail{0 }, write{0 }; }; Queue<int , 5 > q; void Consumer () { int val; while (true ) { if (!q.pop ()) std::this_thread::sleep_for (std::chrono::milliseconds (10 )); else { std::this_thread::sleep_for (std::chrono::milliseconds (1 )); } } } void Producer () { while (true ) { int num = rand () % 100 ; if (!q.push (num)) std::this_thread::sleep_for (std::chrono::milliseconds (10 )); else { std::this_thread::sleep_for (std::chrono::milliseconds (2 )); } } } int main () { vector<thread> p, c; for (int i = 0 ; i < 2 ; i++) { p.emplace_back (Producer); } for (int i = 0 ; i < 2 ; i++) { c.emplace_back (Consumer); } for (auto &ci : c) ci.join (); for (auto &pi : p) pi.join (); return 0 ; }
链表版本
废话少说,直接上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 #include <iostream> #include <atomic> #include <thread> #include <memory> #include <chrono> using namespace std;template <typename T>class FreeLockQueue { private : struct Node { T data; atomic<Node *> next; Node (const T &x) : data (x), next (nullptr ) {} }; atomic<Node *> head; atomic<Node *> tail; public : FreeLockQueue () { Node *node = new Node (-1 ); head.store (node); tail.store (node); } ~FreeLockQueue () { while (Node *h = head.load ()) { Node *tmp = h->next.load (); delete h; head.store (tmp); } } void enqueue (const T &x) { Node *cur = new Node (x); Node *t = nullptr ; while (true ) { t = tail.load (); Node *next = t->next.load (); if (t != tail.load ()) { continue ; } if (next != nullptr ) { tail.compare_exchange_strong (t, next); continue ; } if (t->next.compare_exchange_strong (next, cur)) break ; } tail.compare_exchange_strong (t, cur); } T dequeue () { Node *h = nullptr ; T d; while (true ) { h = head.load (); Node *t = tail.load (); Node *next = h->next.load (); if (h != head.load ()) continue ; if (h == t && next == nullptr ) { cout << "Can not dequeue from empty queue" << endl; this_thread::sleep_for (chrono::milliseconds (1000 )); continue ; } if (h == t && next != nullptr ) { tail.compare_exchange_strong (t, next); continue ; } if (head.compare_exchange_strong (h, next)) { d = next->data; break ; } } delete h; return d; } }; FreeLockQueue<int > mFQ; void producer1 () { for (int i = 0 ; i < 10 ; i++) { mFQ.enqueue (i); cout << "Enqueue data: " << i << " successfully!" << endl; } } void producer2 () { for (int i = 10 ; i < 20 ; i++) { mFQ.enqueue (i); cout << "Enqueue data: " << i << " successfully!" << endl; } } void consumer () { while (true ) { int d = mFQ.dequeue (); if (d != -1 ) cout << "Get data: " << d << " successfully!" << endl; } } int main () { thread p1 (producer1) ; thread p2 (producer2) ; thread ca (consumer) ; thread cb (consumer) ; p1.join (); p2.join (); ca.join (); cb.join (); system ("pause" ); return 0 ; }
比较难以理解的2个点:
1.tail.compare_exchange_strong(t, next);这行代码什么时候为true?
假设现在有两个线程 A 和 B,他们同时尝试对队列进行 enqueue
操作。
线程 A 执行到 t = tail.load()
和 Node *next = t->next.load()
,此时 t
是尾节点,next
是 nullptr
。
线程 B 插入一个新节点,更新了尾指针和原尾节点的 next
指针。
线程 A 继续执行,此时它认为 t
还是尾节点,且 t
的 next
应该是 nullptr
。但实际上,由于线程 B 的插入操作,t
的 next
已经不再是 nullptr
。
所以,在这个地方进行 if (next != nullptr)
的判断,就是为了检查这种情况。如果发现 next
不为空,说明尾节点已经被其他线程修改过,此时应该重新读取尾指针,并尝试将尾指针更新为 next
。然后继续循环,直到成功将新节点添加到队列的尾部。
2.tail.compare_exchange_strong(t, next); 这行代码什么时候为true?
假设现在有两个线程 A 和 B,线程 A 正在执行 dequeue
操作,而线程 B 正在执行 enqueue
操作。
线程 A 读取头指针 h
和尾指针 t
,此时 h
和 t
都指向同一个节点,表示队列为空。
线程 B 在队列的尾部添加了一个新的节点,但尾指针 tail
还没有被更新。
线程 A 继续执行,此时它读取头部节点(也是尾部节点)的 next
指针,发现 next
不为空,即头部节点的后面有一个新的节点。
所以,线程 A 满足 if (h == t && next != nullptr)
的条件,然后尝试使用 compare_exchange_strong
方法更新尾指针 tail
。
无锁栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <atomic> #include <iostream> template <typename T>class lock_free_stack { private : struct node { T data; node* next; node (const T& data) : data (data), next (nullptr ) {} }; std::atomic<node*> head; public : lock_free_stack (): head (nullptr ) {} void push (const T& data) { node* new_node = new node (data); do { new_node->next = head.load (); }while (!head.compare_exchange_strong (new_node->next, new_node)); } T pop () { node* node; do { node = head.load (); }while (node && !head.compare_exchange_strong (node, node->next)); if (node) return node->data; } }; int main () { lock_free_stack<int > s; s.push (1 ); s.push (2 ); s.push (3 ); std::cout << s.pop () << std::endl; std::cout << s.pop () << std::endl; getchar (); return 0 ; }
ABA问题
问题概述
在多线程编程中,CAS(Compare And Swap)是一种重要的无锁算法。CAS通过原子操作比较内存中的某个值和预期值,如果它们相等,就将内存中的值替换为新的值。但是,CAS也有一个著名的问题,就是ABA问题。
ABA问题是指,在CAS操作中,如果一个变量原来是A,经过其他线程的两次操作,变成了B,然后又变成了A,那么在使用CAS检查时,会发现它的值没有发生变化,但是实际上却发生了变化。这就是所谓的ABA问题。
例如,考虑一个多线程的栈实现。假设有两个线程,线程1和线程2。首先,线程1从栈中取出一个元素A,然后线程1被挂起。此时线程2执行,它从栈中取出元素A,然后压入新的元素B,接着再次压入元素A。当线程1恢复运行时,它会发现栈顶元素仍然是A,因此它认为没有其他线程修改过栈。然而,实际上栈的状态已经发生了改变。
解决方法1
解决CAS中的ABA问题常用的方法是使用"带版本号的CAS"或者"原子引用"。这种方法在每次修改变量时,都会增加一个版本号。即使一个变量的值被改变后又被改回来,其版本号也会发生变化,因此可以避免ABA问题。
具体可以通过以下几步来解决ABA问题:
增加一个版本号的记录。每次修改变量,都让版本号自增。这样,在执行CAS操作时,不仅会检查变量的值,还会检查版本号。
在执行CAS操作时,将版本号和变量的值一起作为CAS的比较对象。
如果在执行CAS操作时,版本号或变量的值有任何一个与预期不符,那么CAS操作就会失败。
其中,简单的是直接使用boost库中的boost::lockfree::queue
。这是一个线程安全的无锁队列,可以在多线程环境下使用。
解决方法2
为了解决ABA问题,可以采取两种方法:环形缓冲区 (Ring Buffer)和Double CAS 。
环形缓冲区
环形缓冲区通过预先分配固定大小的内存区域,并使用下标来代替指针,这样可以有效避免内存的重复分配和释放,减少了ABA问题发生的可能性。下面是一个简化的示例:
示例:
假设我们有一个环形缓冲区,大小为N
,有两个变量head
和tail
分别表示缓冲区的头和尾。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 const int N = 1024 ; int buffer[N]; std ::atomic<int > head (0 ) ; std ::atomic<int > tail (0 ) ; bool enqueue (int value) { int currentTail = tail.load(); int nextTail = (currentTail + 1 ) % N; if (nextTail == head.load()) { return false ; } buffer[currentTail] = value; tail.store(nextTail); return true ; } bool dequeue (int &value) { int currentHead = head.load(); if (currentHead == tail.load()) { return false ; } value = buffer[currentHead]; head.store((currentHead + 1 ) % N); return true ; }
解释:
在这个实现中,head
和tail
都使用了数组下标,并且环形缓冲区通过数组实现。这样做的一个好处是,head
和tail
永远在0
到N-1
之间循环,这样可以避免指针的频繁分配和释放,从而减少ABA问题的发生。
由于下标是一个连续增长的整数,而不是指针,所以在这种设计中,即使下标重复了,ABA问题也不会影响CAS操作的正确性。
举例:
假设我们有一个共享的 int 型变量 x,初始值为 0。有两个线程 Thread1 和 Thread2 在并发地操作 x。我们使用 std::atomic_int 来确保操作的原子性。
不使用环形缓冲区的情况:
Thread1 读取 x 的值为 0。
Thread2 将 x 的值修改为 1。
Thread2 将 x 的值再次修改为 0。
Thread1 将 x 的值再次修改为 1。
在这种情况下,Thread1 观察到 x 的值从 0 变为 1,然后又变回 0。这就是 ABA 问题。
使用环形缓冲区的情况:
创建一个大小为 3 的环形缓冲区,初始情况下缓冲区中的值为 [0, 0, 0]。
Thread1 读取缓冲区中下标为 0 的元素,得到值 0。
Thread2 将缓冲区中下标为 0 的元素修改为 1。
Thread2 将缓冲区中下标为 1 的元素修改为 0。
Thread1 再次读取缓冲区中下标为 0 的元素,发现值已经变为 1。这时 Thread1 就知道缓冲区中的值发生了变化,从而可以避免 ABA 问题。
在这个例子中,即使 x 的值经历了 0 -> 1 -> 0 的变化,但是由于使用了环形缓冲区,Thread1 仍然能够检测到值的变化,从而避免了 ABA 问题的发生。
Double CAS
Double CAS 是一种通过增加版本号来解决ABA问题的方法。它通过在共享变量上附加一个版本号,使得每次CAS操作不仅要比较变量的值,还要比较版本号。即使值恢复为原值,由于版本号的变化,CAS操作也能检测到差异,从而避免ABA问题。
假设我们有一个指针ptr
,指向某个内存位置,我们可以通过一个结构体包含指针和版本号来实现Double CAS。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 struct PointerWithVersion { void * ptr; int version; }; std ::atomic<PointerWithVersion> sharedPtr;bool compareAndSwap (void * expectedPtr, void * newPtr) { PointerWithVersion oldVal = sharedPtr.load(); if (oldVal.ptr == expectedPtr) { PointerWithVersion newVal = {newPtr, oldVal.version + 1 }; return sharedPtr.compare_exchange_strong(oldVal, newVal); } return false ; }
扩展
补充1:RCU (Read-Copy-Update)
RCU (Read-Copy-Update) 是一种并发编程机制,它允许读取操作与写入操作并发执行,从而提高整体的并发性能。下面我们使用 C++ 来详细说明 RCU 的机制:
基本思想 :
RCU 的核心思想是,当需要更新共享数据时,不会直接修改原始数据,而是创建一个新的数据副本,在新副本上进行修改,然后在适当的时候将新副本替换原始数据。在新副本准备就绪之前,读取操作仍然可以安全地访问原始数据。
RCU 的实现 :
在 C++ 中,我们可以使用 std::atomic<>
和 std::shared_ptr<>
来实现一个基本的 RCU 机制。下面是一个示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <atomic> #include <memory> template <typename T>class RCUObject {public : std::shared_ptr<T> read () { return m_data.load (std::memory_order_acquire); } void update (std::function<void (T&)> update_func) { auto new_data = std::make_shared <T>(*read ()); update_func (*new_data); m_data.store (new_data, std::memory_order_release); } private : std::atomic<std::shared_ptr<T>> m_data; };
上述代码存在释放问题。
在这个实现中:
read()
方法使用 std::atomic<>::load()
原子操作来读取当前的数据副本,确保读取操作的安全性。
update()
方法首先创建一个新的数据副本,然后在新副本上执行更新操作,最后使用 std::atomic<>::store()
原子操作来替换原始数据。
通过使用 std::shared_ptr<>
,我们可以确保新旧数据副本的生命周期得到正确管理。
参考文献