前言

互斥锁在并发编程中经常碰到,因此对其需要有个更加深入的理解。这篇博客将使用信号量实现的互斥锁。

简易互斥锁(SimpleMutex)是一个基于原子变量和信号量的互斥锁实现,用于保护并管理多线程环境下的共享资源访问。它提供了一种简单而有效的方式来确保在多线程并发访问时,只有一个线程可以同时访问受保护的资源,从而避免数据竞争和不一致性。基于 POSIX 标准的信号量库实现,包含 Catch2 单元测试,附带了基于 Catch2 框架的单元测试,用于验证互斥锁的正确性和稳定性,使用bazel编译,google编码规范。

其中涉及C++知识(RAII、信号量、lock_guard、线程安全编程)

内核看信号量

信号量是一个整数值,用于控制对共享资源的访问。信号量有两种主要类型:

  • 计数信号量(Counting Semaphore):用于控制多个资源的访问,信号量值表示可用资源的数量。
  • 二值信号量(Binary Semaphore):类似于互斥锁,信号量值只有0和1,主要用于实现互斥。

信号量的主要操作包括:

  • P(或Wait)操作:请求资源。如果信号量的值大于0,则将其减1,允许线程/进程继续执行;如果信号量的值为0,则阻塞当前线程/进程,直到资源可用。
  • V(或Signal)操作:释放资源。如果有等待的线程/进程,则唤醒一个等待线程/进程,并将信号量的值加1。

信号量在操作系统中的实现通常包括以下几个步骤:

1 内核数据结构

操作系统内核维护一个数据结构来表示信号量。这个数据结构通常包括:

  • 信号量值:表示当前可用的资源数量(对于计数信号量)或信号量的状态(0或1,对于二值信号量)。
  • 等待队列:当信号量的值为0时,线程或进程会被阻塞并放入一个等待队列。这个队列用于管理所有等待信号量的线程/进程。

在Linux内核中,信号量的内部表示如下:

1
2
3
4
5
struct semaphore {
atomic_t count; // 信号量的值
spinlock_t lock; // 保护信号量的操作
wait_queue_head_t wait; // 等待队列
};

2 P(或Wait)操作

P操作(或wait操作)用于请求资源并可能导致线程阻塞。实现步骤如下:

  1. 获取锁:使用自旋锁(或其他同步机制)来保护信号量操作的原子性。
  2. 检查信号量值:如果信号量的值大于0,减少其值并允许线程继续执行。
  3. 阻塞线程:如果信号量的值为0,则将线程添加到等待队列,并使其阻塞。
  4. 释放锁:完成对信号量的操作后,释放自旋锁。
1
2
3
4
5
6
7
8
9
10
11
12
void semaphore_wait(struct semaphore *sem) {
spin_lock(&sem->lock);
while (atomic_read(&sem->count) <= 0) {
// 将线程加入等待队列并阻塞
__add_to_wait_queue(&sem->wait, current);
spin_unlock(&sem->lock);
schedule(); // 调度其他线程
spin_lock(&sem->lock);
}
atomic_dec(&sem->count); // 资源可用,信号量值减1
spin_unlock(&sem->lock);
}

3 V(或Signal)操作

V操作(或signal操作)用于释放资源并可能唤醒一个等待线程。实现步骤如下:

  1. 获取锁:使用自旋锁(或其他同步机制)来保护信号量操作的原子性。
  2. 增加信号量值:增加信号量的值。
  3. 唤醒线程:如果有线程在等待队列中,则从等待队列中移除一个线程并唤醒它。
  4. 释放锁:完成对信号量的操作后,释放自旋锁。
1
2
3
4
5
6
7
8
9
10
11
12
void semaphore_signal(struct semaphore *sem) {
spin_lock(&sem->lock);
atomic_inc(&sem->count); // 增加信号量值
if (atomic_read(&sem->count) > 0) {
// 从等待队列中唤醒一个线程
struct task_struct *task = __remove_from_wait_queue(&sem->wait);
if (task) {
wake_up_process(task);
}
}
spin_unlock(&sem->lock);
}

3. 信号量的高级实现

在实际操作系统中,信号量的实现可能会更加复杂,涉及到:

  • 优先级反转处理:确保高优先级线程不会因为低优先级线程持有锁而被阻塞过久。
  • 死锁检测:防止线程或进程因资源竞争而发生死锁。
  • 优化等待队列:提高性能,例如使用高效的等待队列数据结构。

前置知识

信号量API

  • 信号量的类型:sem_t

  • int sem_init(sem_t *sem, int pshared, unsigned int value);

    • 功能:初始化信号量
    • 参数
      • sem:信号量变量的地址
      • pshared:0 用在线程间 ,非0 用在进程间
      • value :信号量中的值,代表容器大小
  • int sem_destroy(sem_t *sem);

  • 功能:释放资源

  • int sem_wait(sem_t *sem);

  • 功能:对信号量加锁,调用一次对信号量的值-1,如果值为0,就阻塞

  • int sem_trywait(sem_t *sem);

  • int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

  • int sem_post(sem_t *sem);

    • 功能:对信号量解锁,调用一次对信号量的值+1
  • int sem_getvalue(sem_t *sem, int *sval);

核心逻辑

Semaphore类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Semaphore(int init_count = 0) {
assert(init_count >= 0);
sem_init(&sema_, 0, init_count);
}

void wait() {
int rc;
do {
rc = sem_wait(&sema_);
} while (rc == 1 && errno == EINTR);
}

void signal() {
sem_post(&sema_);
}

比较核心就是以上函数,需要注意的是,这里信号量初始化设置成0是因为只处理需要等待的线程。

当需要等待的线程进入到wait函数中,sem_wait函数将检查sema是否大于0,如果没有,则等待,等待时该线程为阻塞状态。当其他线程调用post增强sem的值的时候,即大于0的时候,线程将解除阻塞, 解除阻塞后sem值会减去1。

sem_post 则是 用来增加信号量的值。

SimpleMutex 类

lock() 和 unlock()

SimpleMutex 类包含一个名为 count_ 的 std::atomic 变量和一个名为 sema_ 的 Semaphore 对象。

在构造函数中,count_ 被初始化为 0。

  • lock() 函数用于获取互斥锁。它使用 fetch_add 操作和 std::memory_order_acquire 参数对 count_ 进行原子增加,并获取锁。 如果在增加之前 count_ 的值大于 0,说明互斥锁已经被其他线程锁定。在这种情况下,函数调用 sema_.wait() 来阻塞当前线程,直到信号量被发信号,表示互斥锁可用。
  • unlock() 函数用于释放互斥锁。它使用 fetch_sub 操作和 std::memory_order_release 参数对 count_ 进行原子减少,并释放锁。 如果减少之前的 count_ 值仍大于 1,说明其他线程正在等待互斥锁。在这种情况下,函数调用 sema_.signal() 发信号给信号量,允许一个等待的线程获取互斥锁。

通过结合原子变量 count_ 和信号量 sema_,该实现确保等待获取互斥锁的线程能够高效地阻塞,直到当前持有者释放锁。

Note:

代码

代码仓库:https://github.com/Penge666/Mutex.git

1
g++ src/mutex/test.cpp -std=c++11 -Ithird/catch2/ -o test -lpthread
1
./test