同理, 如果初始队列满, 线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见. 出队的时候需要调用出队元素的析构函数, 要保证出队元素正常销毁后才能在那个位置写入新元素, 否则会导致内存损坏. 可以在出队写入 head 的操作中使用 release, 入队读取 head 的操作中使用 acquire 实现出队 “synchronizes-with” 入队.
简单来说:
1.线程 a 先执行出队操作, 线程 b 后执行入队操作, 则线程 a 出队操作的结果要对线程 b 可见.
2.线程 a 先执行入队操作, 线程 b 后执行出队操作, 则线程 a 入队操作的结果要对线程 b 可见.
假设有两个线程 a 和 b 同时调用 pop, 执行顺序是 a(1), b(1), b(2) a(2). 这种情况下, 线程 a 和线程 b 都读到相同的 head 指针, 存储在变量 h 中. 当 a(2) 尝试读取 data[h] 时, 其中的数据已经在 b(2) 中被 move 走了. 因此这样的队列不允许多个线程同时执行 pop 操作.
简单来说,就是2个线程都调用pop,但是只要一个data[h]可给val.
解决抢占问题
可以看到, 整个 pop 函数是一个非原子过程, 一旦这个过程别其他线程抢占, 就会出问题. 如何解决这个问题呢? 在无锁数据结构中, 一种常用的做法是不断重试. 具体的做法是, 在非原子过程的最后一步设计一个 CAS 操作, 如果过程被其他线程抢占, 则 CAS 操作失败, 并重新执行整个过程. 否则 CAS 操作成功, 完成整个过程的最后一步.
1 2 3 4 5 6 7 8 9 10
bool spsc<T, Cap>::pop(T &val) { size_t h; do { h = head.load(); // (1) if (h == tail.load()) returnfalse; val = data[h]; // (2) } while (!head.compare_exchange_strong(h, (h + 1) % Cap)); // (3) returntrue; }
首先注意到我们不再使用 std::move 和 allocator::destroy, 而是直接复制, 使得循环体内的操作不会修改队列本身. (3) 是整个过程的最后一步, 也是唯一会修改队列的一步, 我们使用了一个 CAS 操作. 只有当 head 的值等于第 (1) 步获取的值的时候, 才会移动 head 指针, 并且返回 true 跳出循环; 否则就不断重试.【简单来说,就是只能有一个pop,其他的需要进入循环等待.利用CAS进行暴力美学】
这样如果多个线程并发执行 pop, 则只有成功执行 (3) 的线程被视为成功执行了整个过程, 其它的线程都会因为被抢占, 导致执行 (3) 的时候 head 被修改, 因而与局部变量 h 不相等, 导致 CAS 操作失败. 这样它们就要重试整个过程.
类似的思路也可以用在 push 上. 看看如果我们用同样的方式修改 push 会怎样:
1 2 3 4 5 6 7 8 9 10
bool spsc<T, Cap>::push(const T &val) { size_t t; do { t = tail.load(); // (1) if ((t + 1) % Cap == head.load()) returnfalse; data[t] = val; // (2) } while (!tail.compare_exchange_strong(t, (t + 1) % Cap)); // (3) returntrue; }
与 pop 操作不同, push 操作的第 (2) 步需要对 data[t] 赋值, 导致循环体内的操作会修改队列. 假设 a, b 两个线程的执行顺序是 a(1), a(2), b(1), b(2), a(3). a 可以成功执行到 (3), 但是入队的值却被 b(2) 覆盖掉了.【简单来说,就是2个线程push,但是可能后者线程会覆盖前者线程的值的这样的问题】
我们尝试将赋值操作 data[t] = val 移到循环的外面, 这样循环体内的操作就不会修改队列了. 当循环退出时, 能确保 tail 向后移动了一格, 且 t 指向 tail 移动前的位置. 这样并发的时候就不会有其他线程覆盖我们写入的值.
1 2 3 4 5 6 7 8 9 10
bool spsc<T, Cap>::push(const T &val) { size_t t; do { t = tail.load(); // (1) if ((t + 1) % Cap == head.load()) returnfalse; } while (!tail.compare_exchange_strong(t, (t + 1) % Cap)); // (2) data[t] = val; // (3) returntrue; }
但是这样做的问题是, 我们先移动 tail 指针再对 data[t] 赋值, 会导致 push 与 pop 并发不正确. 回顾下 pop 的代码:
1 2 3 4 5 6 7 8 9 10
bool spsc<T, Cap>::pop(T &val) { size_t h; do { h = head.load(); if (h == tail.load()) // (4) returnfalse; val = data[h]; // (5) } while (!head.compare_exchange_strong(h, (h + 1) % Cap)); returntrue; }
前面分析了, push 中的赋值操作 data[t] = val 要 “happens-before” pop 中的读取操作 val = data[h], 这是通过 write 原子变量实现的: push 中对 write 的修改要 “synchronizes-with” pop 中对 write 的读取. 因此 push 修改 write 的 CAS 操作应该使用 release, pop 读取 write 时则应使用 acquire.
同理, 当队列初始为满的时候, 先运行 pop 在运行 push, 要保证 pop 中的读取操作 val = data[h] “happens-before” push 中的赋值操作 data[t] = val. 这是通过 head 原子变量实现的: pop 中对 head 的修改要 “synchronizes-with” push 中对 head 的读取. 因此 pop 修改 head 的 CAS 操作应该使用 release, push 读取 head 时则应使用 acquire.
boolpush(const T &val) { size_t t, w; do { t = tail.load(std::memory_order_relaxed); if ((t + 1) % Cap == head.load(std::memory_order_acquire)) returnfalse; } while (!tail.compare_exchange_strong(t, (t + 1) % Cap, memory_order_relaxed)); data[t % Cap] = val; cout << "Producer:" << this_thread::get_id() << " data:" << val << endl; do { w = t; } while (!write.compare_exchange_strong(w, (w + 1) % Cap, memory_order_release, memory_order_relaxed)); returntrue; }
boolpop() { T val; size_t h; do { h = head.load(std::memory_order_relaxed); if (h == write.load(std::memory_order_acquire)) returnfalse; val = data[h % Cap]; } while (!head.compare_exchange_strong(h, (h + 1) % Cap, memory_order_release, memory_order_relaxed)); cout << "Comsumer:" << this_thread::get_id() << " data:" << val << endl; returntrue; }
private: T *data; std::atomic<size_t> head{0}, tail{0}, write{0}; };
Queue<int, 5> q;
voidConsumer() { int val; while (true) { if (!q.pop()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } }
voidProducer() { while (true) { int num = rand() % 100; if (!q.push(num)) std::this_thread::sleep_for(std::chrono::milliseconds(10)); else { std::this_thread::sleep_for(std::chrono::milliseconds(2)); } } }
intmain() { vector<thread> p, c; for (int i = 0; i < 2; i++) { p.emplace_back(Producer); } for (int i = 0; i < 2; i++) { c.emplace_back(Consumer); } for (auto &ci : c) ci.join(); for (auto &pi : p) pi.join(); return0; }