不要说死亡的烛光何须倾倒

生命依然生长在忧愁的河水上

月光照着月光 月光普照

今夜美丽的月光合在一起流淌~

Lock Manager

为了保证事务们的操作的正确间隔,DBMs使用一个lock manager来控制何时事务被允许访问data items。LM的基本思想是维持一个内部的关于活跃的txns当前持有的locks的数据结构txns在被允许访问一个data item之前向LM发出lock请求,然后LM会选择向请求的txn授予lock或者block txn或者abort txn

在你的实现中,这里会有一个global Lock manager for the entire system(跟bpm一样)TableHeap和Executor classes会在一个事务想要访问/修改一个tuple时使用你的LM来获取locks on tuple records(by RID)

这个任务要求你实现一个tuple级别的LM,其支持三种常见的隔离级别,READ_UNCOMMITED, READ_COMMITTED, and REPEATABLE_READ. LM应该根据一个事务的隔离级别来授予和释放locks。

我们提供给你了一个 Transaction context(include/concurrency/transaction.h) 来处理隔离级别这一属性(i.e., READ_UNCOMMITED, READ_COMMITTED, and REPEATABLE_READ)以及其已经获得的锁的信息。LM需要检查事务的隔离级别并expose correct behavior on lock/unlock requests。任何失败的lock 操作都应该产生一个ABORTED 事务状态(隐式abort)并抛出异常(比如txn处于shrinking阶段时又尝试申请锁,又或者死锁了被aborted)。事务管理器txn manager应该之后捕获这个异常并将该事务已经执行的操作rollback

REQUIREMENTS AND HINTS

这个任务你唯一需要修改的文件就是LockManager class(concurrency/lock_manager.cpp and concurrency/lock_manager.h)。你会需要实现下列函数

  • LockShared(Transaction, RID):txn尝试获取一个shared lock on recoid id rid。这个应该阻塞等待并且在获得锁的授予时返回true。如果txn被回滚(aborts),返回false
  • LockExclusive(Transaction, RID):txn尝试获取一个exclusive lock on record id rid。这个应该阻塞等待并且在获得锁的授予时返回true。如果txn被回滚(aborts),返回false
  • LockUpgrade(Transaction, RID):txn尝试在一个record id rid上将一个shared 升级到exclusive lock。这个应该阻塞等待并且在获得锁的授予时返回true。如果txn被回滚(aborts),返回false。这个应该把txn abort并返回false,如果另一个事务已经在等待升级它们的锁
    之所以要有这个锁,是因为加了读锁以后就加不了写锁了,但是锁中途是不能释放的(2PL)
  • Unlock(Transaction, RID): Unlock txn持有的rid指定的record上的锁

lock manager采取的上锁机制依赖于事务的隔离级别。你应该首先看一下transaction.h和lock_manager.h,熟悉一下提供的api和成员变量。我们同样推荐复习下隔离级别的概念,因为这些函数的实现应该和提出lock/unlock请求的事务的隔离级别相兼容。你可以自由选择往lock_manager.h中增加任何需要的数据结构。你应该参考课本的15.1-15.2以及lecture中覆盖的隔离级别的概念以保证你的实现满足要求

TA的建议

  • 尽管你的lock manager需要使用死锁检测,我们建议先实现没有任何死锁处理的lcok manager,然后在你验证了在没有死锁发生时lock manager上锁和解锁的正确性后再增加死锁检测机制
  • 你会需要一些方式记录哪些txn在等待一个lock。看一下LockRequestQueue class in lock_manager.h.
  • 你需要一些方式来通知等待的txns当它们准备好获取锁时。我们建议使用std::condition_variable
  • 尽管一些隔离级别是通过确保strict 二段锁的性质实现的,你的LM的实现只被要求保证二段锁的性质。strict 2PL的概念是通过你的executors和transaction manager中的逻辑实现的。看一眼Commit和Abort方法(这两个方法会把txn持有的所有锁都释放)
  • 你应该维持事务的状态。比如事务的状态可能从GROWING phase转变为SHRINKING phase,因为unlock操作(hint:看一下transaction.h中的方法)
  • 你同样需要使用 shared_lock_set_ and exclusive_lock_set_ 来记录一个事务获取的shared/exclusive lock,从而当TransactionManager想要commit/abort一个txn时,LM可以适当地释放它们
  • 将一个事务的状态设为ABORTED 即隐式地abort it,但是该事务并没有被显式地abort直到TransactionManager::Abort被调用。你应该通读这个函数来理解它干了什么,以及你的lock manager在abort process中是如何被使用的(会调用lock_manager_->unlock()将txn的所有lock都释放)

实现思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* LockManager handles transactions asking for locks on records.
*/
class LockManager {
enum class LockMode { SHARED, EXCLUSIVE };

class LockRequest {
public:
LockRequest(txn_id_t txn_id, LockMode lock_mode) : txn_id_(txn_id), lock_mode_(lock_mode), granted_(false) {}

txn_id_t txn_id_;
LockMode lock_mode_;
bool granted_;
};

class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
std::condition_variable cv_; // for notifying blocked transactions on this rid
// 保护这个queue的锁
std::mutex queueLatch_;
};

public:

/*
* [LOCK_NOTE]: For all locking functions, we:
* 1. return false if the transaction is aborted; and
* 2. block on wait, return true when the lock request is granted; and
* 3. it is undefined behavior to try locking an already locked RID in the same transaction, i.e. the transaction
* is responsible for keeping track of its current locks.
*/

/**
* Acquire a lock on RID in shared mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the shared lock
* @param rid the RID to be locked in shared mode
* @return true if the lock is granted, false otherwise
*/
bool LockShared(Transaction *txn, const RID &rid);

/**
* Acquire a lock on RID in exclusive mode. See [LOCK_NOTE] in header file.
* @param txn the transaction requesting the exclusive lock
* @param rid the RID to be locked in exclusive mode
* @return true if the lock is granted, false otherwise
*/
bool LockExclusive(Transaction *txn, const RID &rid);

/**
* Upgrade a lock from a shared lock to an exclusive lock.
* @param txn the transaction requesting the lock upgrade
* @param rid the RID that should already be locked in shared mode by the requesting transaction
* @return true if the upgrade is successful, false otherwise
*/
bool LockUpgrade(Transaction *txn, const RID &rid);

/**
* Release the lock held by the transaction.
* @param txn the transaction releasing the lock, it should actually hold the lock
* @param rid the RID that is locked by the transaction
* @return true if the unlock is successful, false otherwise
*/
bool Unlock(Transaction *txn, const RID &rid);


private:
std::mutex latch_;
/** Lock table for lock requests. */
std::unordered_map<RID, LockRequestQueue> lock_table_;
};

从实现角度来看怎么实现的吧~

这是最为关键的数据结构:

对于每个RID,也就是每个tuple,都有一个对应的锁请求队列LockRequestQueue。LockRequestQueue使用一个list< LockRequest > request_queue_来存储当前对该RID对应的tuple的请求。

一个请求LockRequest主要记录有发出该请求的txn_id, 请求的是哪类的锁,以及该请求是否已经被授予。一个LockRequestQueue中可能有一个或多个请求已经被授予锁

1
2
3
4
5
6
7
8
9
/** Structure that holds lock requests for a given table oid */
std::unordered_map<table_oid_t, std::shared_ptr<LockRequestQueue>> table_lock_map_;
/** Coordination */
std::mutex table_lock_map_latch_;

/** Structure that holds lock requests for a given RID */
std::unordered_map<RID, std::shared_ptr<LockRequestQueue>> row_lock_map_;
/** Coordination */
std::mutex row_lock_map_latch_;

LockRequest和LockRequestQueue的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class LockRequest {
public:
LockRequest(txn_id_t txn_id, LockMode lock_mode, table_oid_t oid) /** Table lock request */
: txn_id_(txn_id), lock_mode_(lock_mode), oid_(oid) {}
LockRequest(txn_id_t txn_id, LockMode lock_mode, table_oid_t oid, RID rid) /** Row lock request */
: txn_id_(txn_id), lock_mode_(lock_mode), oid_(oid), rid_(rid) {}

/** Txn_id of the txn requesting the lock */
txn_id_t txn_id_;
/** Locking mode of the requested lock */
LockMode lock_mode_;
/** Oid of the table for a table lock; oid of the table the row belong to for a row lock */
table_oid_t oid_;
/** Rid of the row for a row lock; unused for table locks */
RID rid_;
/** Whether the lock has been granted or not */
bool granted_{false};
};

class LockRequestQueue {
public:
/** List of lock requests for the same resource (table or row) */
std::list<std::shared_ptr<LockRequest>> request_queue_;
/** For notifying blocked transactions on this rid */
std::condition_variable cv_;
/** txn_id of an upgrading transaction (if any) */
txn_id_t upgrading_ = INVALID_TXN_ID;
/** coordination */
std::mutex latch_;
};

每个LockRequestQueue中我都增加了一个mutex来保护LockRequestQueue,one mutex per tuple的方案

《数据库系统概念》

锁管理器( lock manager)可以实现为一个过程,它从事务接受消息并反馈消息。锁管理器过程针对锁请求消息返回授予锁消息,或者要求事务回滚的消息(发生死锁时)。解锁消息只需要得到一个确认回答,但可能引发为其他等待事务的授予锁消息。

锁管理器使用以下数据结构:锁管理器为目前已加锁的每个数据项维护一个链表,每一个请求为链表中一条记录,按请求到达的顺序排序。它使用一个以数据项名称为索引的散列表来查找链表中的数据项(如果有的话);这个表叫做锁表( locktable)。一个数据项的链表中每一条记录表示由哪个事务提出的请求,以及它请求什么类型的锁,该记录还表示该请求是否已授予锁。

图15-10是一个锁表的示例,该表包含5个不同数据项4、I7、123、44和I912的锁。锁表采用溢出链,因此对于锁表的每一个表项都有一个数据项的链表。每一个数据项都有一个已授予锁或等待授予锁的事务列表,已授予锁的事务用深色阴影方块表示,等待授予锁的事务则用浅色阴影方块表示。为了简化图形我们省略了锁的类型。举个例子,从图15-10中可以看到,T23在数据项1912和7上已被授予锁,并且正在等待在4上加锁。

虽然图15-10上没有标示出来,但锁表还应当维护一个基于事务标识符的索引,这样它可以有效地确定给定事务持有的锁集。

锁管理器这样处理请求:

image-20240828170430457

  • 当一条锁请求消息到达时,如果相应数据项的链表存在,在该链表末尾增加一个记录;否则,新建一个仅包含该请求记录的链表。
    • 在当前没有加锁的数据项上总是授予第一次加锁请求,但当事务向已被加锁的数据项申请加锁时,只有当该请求与当前持有的锁相容,并且所有先前的请求都已授予锁的条件下,锁管理器才为该请求授予锁,否则,该请求只好等待。
  • 当锁管理器收到一个事务的解锁消息时,它将与该事务相对应的数据项链表中的记录删除,然后检查随后的记录,如果有,如前所述,就看该请求能否被授权,如果能,锁管理器授权该请求并处理其后记录,如果还有,类似地一个接一个地处理。
  • 如果一个事务中止,锁管理器删除该事务产生的正在等待加锁的所有请求。一旦数据库系统采取适当动作撤销该事务(见16.3节),该中止事务持有的所有锁将被释放。

这个算法保证了锁请求无饿死现象,因为在先前接收到的请求正在等待加锁时,后来者不可能获得授权。我们稍后将在15.2.2节学习检测和处理死锁。17.2.1节阐述另一个实现——在锁申请/授权上利用共享内存取代消息传递。

同一个RID的多条锁请求按请求的先后次序存入request_queue_中。对于每个请求,只有该请求与当前持有的锁相容且队列中在它之前的所有请求都获取了锁的条件下才能授予该请求锁。这能保证没有饿死现象出现。

image-20240828184123074

OK!有了上述的理论基础!

我们来看一下这个代码应该如何实现

LockTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

auto LockManager::LockTable(Transaction *txn, LockMode lock_mode, const table_oid_t &oid) -> bool {

// 第一步:判断事务状态[第一步保证了锁请求、事务状态、事务隔离级别的兼容]
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
if (lock_mode == LockMode::SHARED || lock_mode == LockMode::INTENTION_SHARED ||
lock_mode == LockMode::SHARED_INTENTION_EXCLUSIVE) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_SHARED_ON_READ_UNCOMMITTED);
}
if (txn->GetState() == TransactionState::SHRINKING &&
(lock_mode == LockMode::EXCLUSIVE || lock_mode == LockMode::INTENTION_EXCLUSIVE)) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
if (txn->GetState() == TransactionState::SHRINKING && lock_mode != LockMode::INTENTION_SHARED &&
lock_mode != LockMode::SHARED) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}
if (txn->GetIsolationLevel() == IsolationLevel::REPEATABLE_READ) {
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}

// 第二步,获取 table 对应的 lock request queue。
// 获取共享队列

table_lock_map_latch_.lock();
if (!table_lock_map_[oid]) {
// 创建队列
table_lock_map_[oid] = std::make_shared<LockRequestQueue>();
}
auto lock_request_queue = table_lock_map_.find(oid)->second;
lock_request_queue->latch_.lock();
table_lock_map_latch_.unlock();
// 第三步,检查此锁请求是否为一次锁升级。
// 遍历队列,查看是否有当前事务
bool is_update = false;
// std::cout<< lock_request_queue->request_queue_.size()<<std::endl;
// std::cout << "查看此时指向共享指针的计数"<<lock_request_queue.use_count() << std::endl;
for (auto &iter : lock_request_queue->request_queue_) {
if (iter->txn_id_ == txn->GetTransactionId()) {
// 如果当前事务与请求上锁一致,直接返回
if (iter->lock_mode_ == lock_mode) {
// 解锁
lock_request_queue->latch_.unlock();
return true;
}
// check一下锁升级是否合法
// 1.判断是否有其他事务再升级
// 当前有其他事务在升级,不允许多个事务在同一资源上同时尝试锁升级
if (lock_request_queue->upgrading_ != INVALID_TXN_ID) {
// 队列解锁
lock_request_queue->latch_.unlock();
// 设置事务为中止
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
}
// 2.判断锁升级是否符合要求
if (!(iter->lock_mode_ == LockMode::INTENTION_SHARED &&
(lock_mode == LockMode::SHARED || lock_mode == LockMode::EXCLUSIVE ||
lock_mode == LockMode::INTENTION_EXCLUSIVE || lock_mode == LockMode::SHARED_INTENTION_EXCLUSIVE)) &&
!(iter->lock_mode_ == LockMode::SHARED &&
(lock_mode == LockMode::EXCLUSIVE || lock_mode == LockMode::SHARED_INTENTION_EXCLUSIVE)) &&
!(iter->lock_mode_ == LockMode::INTENTION_EXCLUSIVE &&
(lock_mode == LockMode::EXCLUSIVE || lock_mode == LockMode::SHARED_INTENTION_EXCLUSIVE)) &&
!(iter->lock_mode_ == LockMode::SHARED_INTENTION_EXCLUSIVE && (lock_mode == LockMode::EXCLUSIVE))) {
lock_request_queue->latch_.unlock();
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::INCOMPATIBLE_UPGRADE);
}
// 通过上述的chenck -> current 符合锁升级条件
// 标记一下当前事务正在升级
lock_request_queue->upgrading_ = txn->GetTransactionId();
is_update = true;
// 1.从事务中对应的锁队列中删除
std::shared_ptr<std::unordered_set<table_oid_t>> cur;
if (iter->lock_mode_ == LockMode::EXCLUSIVE) {
cur = txn->GetExclusiveTableLockSet();
} else if (iter->lock_mode_ == LockMode::INTENTION_EXCLUSIVE) {
cur = txn->GetIntentionExclusiveTableLockSet();
} else if (iter->lock_mode_ == LockMode::INTENTION_SHARED) {
cur = txn->GetIntentionSharedTableLockSet();
} else if (iter->lock_mode_ == LockMode::SHARED) {
cur = txn->GetSharedTableLockSet();
} else if (iter->lock_mode_ == LockMode::SHARED_INTENTION_EXCLUSIVE) {
cur = txn->GetSharedIntentionExclusiveTableLockSet();
}
cur->erase(oid);
// 2.释放当前已经持有的锁,[在request_queue_队列中移除]
lock_request_queue->request_queue_.remove(iter);
break;
}
}

// 第四步,将锁请求加入请求队列 , Note:new一个是否要释放的pro =>改成智能指针的写法
auto lock_request = std::make_shared<LockRequest>(txn->GetTransactionId(), lock_mode, oid);
if (is_update) {
std::list<std::shared_ptr<LockRequest>>::iterator lr_iter;
for (lr_iter = lock_request_queue->request_queue_.begin(); lr_iter != lock_request_queue->request_queue_.end();
lr_iter++) {
if (!(*lr_iter)->granted_) {
break;
}
}
lock_request_queue->request_queue_.insert(lr_iter, lock_request);
} else {
lock_request_queue->request_queue_.push_back(lock_request);
}

// 第五步,尝试获取锁。
// std::cout << "transaction_ID" << txn->GetTransactionId() << std::endl;
std::unique_lock<std::mutex> lock(lock_request_queue->latch_, std::adopt_lock);
while (!GrantLockForTable(lock_request_queue, lock_request)) {
lock_request_queue->cv_.wait(lock);
if (txn->GetState() == TransactionState::ABORTED) {
// 如果当前事务是需要更新的话,就应该将其设置为初始化的值
if (is_update) {
lock_request_queue->upgrading_ = INVALID_TXN_ID;
}
lock_request_queue->request_queue_.remove(lock_request);
lock_request_queue->cv_.notify_all();
return false;
}
}
// lock_request_queue->latch_.unlock();
// 如果更新完成的话,标记一下 grant and upgrade function => GrantLockForTable()
if (is_update) {
lock_request_queue->upgrading_ = INVALID_TXN_ID;
}
lock_request->granted_ = true;
// 更新事务维护的锁集合

if (lock_mode == LockMode::EXCLUSIVE) {
txn->GetExclusiveTableLockSet()->insert(oid);
} else if (lock_mode == LockMode::INTENTION_EXCLUSIVE) {
txn->GetIntentionExclusiveTableLockSet()->insert(oid);
} else if (lock_mode == LockMode::INTENTION_SHARED) {
txn->GetIntentionSharedTableLockSet()->insert(oid);
} else if (lock_mode == LockMode::SHARED) {
txn->GetSharedTableLockSet()->insert(oid);
} else if (lock_mode == LockMode::SHARED_INTENTION_EXCLUSIVE) {
txn->GetSharedIntentionExclusiveTableLockSet()->insert(oid);
}
// table_lock_map_.find(oid)->second = lock_request_queue ;
if (lock_mode != LockMode::EXCLUSIVE) {
lock_request_queue->cv_.notify_all();
}
return true;
}

给出上述GrantLockForTable函数代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
auto LockManager::GrantLockForTable(const std::shared_ptr<LockRequestQueue> &lock_request_queue,
const std::shared_ptr<LockRequest> &lock_request) -> bool {
// std::cout<<"================table lock check start!=============="<<std::endl;
// 判断兼容性
// std::cout << lock_request->txn_id_ << " " << lock_request_queue->request_queue_.size() << std::endl;
for (auto &iter : lock_request_queue->request_queue_) {
if (iter->granted_) {
// std::cout << "current:tid"<<iter->txn_id_<<":check==>"<<(int)(iter->lock_mode_)<<"
// "<<(int)(lock_request->lock_mode_)<<std::endl;
// std::cout<<"current:tid"<<iter->txn_id_<<":check==>"<<(int)(iter->lock_mode_)<<"
// "<<(int)(lock_request->lock_mode_)<<" "; std::cout<<CheckTale(lock_request->lock_mode_,
// iter->lock_mode_)<<std::endl;
if (!CheckTale(lock_request->lock_mode_, iter->lock_mode_)) {
return false;
}
// if (!CheckTale( lock_request->lock_mode_,iter->lock_mode_)) {
// // std::cout << "CheckTaleFail~~~current:tid"<<iter->txn_id_<<":check==>"<<(int)(lock_request->lock_mode_)<<"
// "<<(int)(iter->lock_mode_)<<std::endl; return false;
// }
} else if (iter.get() != lock_request.get() && lock_request_queue->upgrading_ != lock_request->txn_id_) {
return false;
} else {
return true;
}
}
return false;
// std::cout<<"fuck1"<<std::endl;
// 判断优先级
// 存在锁升级
// if (lock_request_queue->upgrading_ != INVALID_TXN_ID) {
// // 并且就是当前锁
// if (lock_request_queue->upgrading_ == lock_request->txn_id_) {
// // 授予锁,将grant标记为true
// // for(auto &iter:lock_request_queue->request_queue_){
// // if(iter->txn_id_ == lock_request->txn_id_){
// // iter->granted_ = true;
// // break;
// // }
// // }
// lock_request->granted_ = true;
// // 由于是锁升级,升级完成置为初始化
// lock_request_queue->upgrading_ = INVALID_TXN_ID;
// return true;
// }
// return false;
// }
// 不存在锁升级
// 判断是否是第一个wait的[unused]
// int num = 1;
// for (auto &iter : lock_request_queue->request_queue_) {
// if (!iter->granted_) {
// if (iter->txn_id_ == lock_request->txn_id_) {
// break;
// }
// num = 0;
// }
// if (num == 0) {
// break;
// }
// }
// if (num == 1) {
// // 授予锁,将grant标记为true
// // for(auto iter:request_queue_){
// // if(iter->txn_id_ == txn->GetTransactionId()){
// // iter->granted_ = true;
// // break;
// // }
// // }

// lock_request->granted_ = true;
// if (lock_request_queue->upgrading_ == lock_request->txn_id_) {
// lock_request_queue->upgrading_ = INVALID_TXN_ID;
// }
// return true;
// }
// return false;
// 判断是否和之前wait的锁是否兼容
// bool flag = true;
// for (auto &iter : lock_request_queue->request_queue_) {
// if (iter->txn_id_ == lock_request->txn_id_) {
// break;
// }
// if (!iter->granted_) {
// if (!CheckTale( iter->lock_mode_,lock_request->lock_mode_)) {
// flag = false;
// break;
// }
// }
// }
// if (flag) {
// // 授予锁,将grant标记为true
// // for(auto &iter:lock_request_queue->request_queue_){
// // if(iter->txn_id_ == lock_request->txn_id_){
// // iter->granted_ = true;
// // break;
// // }
// // }
// lock_request->granted_ = true;
// }
// // std::cout<<"================table lock check end!=============="<<std::endl;

// return flag;
}

1. 事务状态判断与隔离级别检查

首先,代码根据事务的隔离级别和当前状态,对锁请求进行初步的检查,以确保兼容性:

  • READ_UNCOMMITTED: 如果事务的隔离级别是未提交读(READ_UNCOMMITTED),则该事务不能申请共享锁(SHARED)或意向共享锁(INTENTION_SHARED)等。否则,事务会被中止(ABORTED),并抛出异常。
  • SHRINKING 状态检查: 对于处于“收缩”(SHRINKING)状态的事务,不能再申请排他锁(EXCLUSIVE)或意向排他锁(INTENTION_EXCLUSIVE),否则会导致事务中止。
  • READ_COMMITTED 和 REPEATABLE_READ: 对于提交读(READ_COMMITTED)和可重复读(REPEATABLE_READ)隔离级别的事务,代码检查如果事务处于收缩状态,并且请求了不兼容的锁类型,也会中止事务。

2. 获取表对应的锁请求队列

  • 锁请求队列的获取: 使用全局锁(table_lock_map_latch_)保护对table_lock_map_的访问。table_lock_map_用于存储表OID(对象ID)与锁请求队列之间的映射。如果对应的表还没有锁请求队列,则为其创建一个。
  • 队列锁定: 在获取或创建锁请求队列后,代码会锁定该队列(latch_),以防止其他事务在这个过程中修改队列。

3. 检查锁升级请求

  • 锁升级检查: 代码会检查当前事务是否已经持有锁,如果持有,则检查该锁是否与新的锁请求相同。如果相同,则直接返回true,表示锁请求成功。
  • 锁升级冲突处理: 如果当前有其他事务正在尝试升级同一资源上的锁,当前事务会被中止,因为不允许多个事务同时尝试升级锁。
  • 锁升级合法性检查: 检查锁升级是否符合规定,比如从共享锁(SHARED)升级到排他锁(EXCLUSIVE)是否允许等。如果不符合规定,事务也会被中止。
  • 锁升级流程: 如果检查通过,当前事务将被标记为正在升级锁,并移除事务原来持有的锁。

4. 将锁请求加入请求队列

  • 锁请求的创建和插入: 根据是否为锁升级,代码会将新的锁请求插入到锁请求队列中**。对于锁升级请求,新的锁请求会被插入到未被授予的锁请求之前;对于非升级请求,则直接加入队列末尾。**

5. 尝试获取锁

  • 锁的授予: 通过GrantLockForTable函数尝试授予锁。如果锁不能立即授予,当前线程会等待(使用条件变量cv_)。
  • 锁请求失败处理: 如果事务在等待期间被中止,代码会清除相应的锁请求,并唤醒等待中的其他事务。
  • 锁授予成功后: 如果锁请求成功,锁请求会被标记为已授予(granted_),并且事务对应的锁集合(根据锁类型)会被更新。

6. 通知其他等待中的事务

  • 通知机制: 如果当前锁请求是非排他锁,代码会在成功授予锁后通知其他等待中的事务(cv_.notify_all()),让它们有机会获取锁。

UnlockTable

UnlockTable 用于在事务中解锁表锁。详细步骤如下,有兴趣的·可以结合思路自己实现。

  1. 获取锁请求队列
    • 通过 table_lock_map_latch_ 锁住 table_lock_map_,以防止其他线程同时访问和修改这个锁映射表。
    • 代码中的 table_lock_map_ 是一个包含表锁请求队列的映射,它将表的 OID 映射到相应的锁请求队列。
  2. 检查表中的行锁状态
    • 首先检查事务的 SharedRowLockSetExclusiveRowLockSet,确定是否存在行锁未被释放的情况。
    • 如果事务中有尚未释放的行锁(即行锁集合中包含表 OID 或者集合非空),则解锁映射表,并将事务状态设置为 ABORTED(终止状态),然后抛出 TransactionAbortException 异常,原因是尝试在行锁解锁前解锁表锁。
  3. 检查表锁请求队列是否存在
    • 如果 table_lock_map_ 中并未找到对应的表锁请求队列(即没有表锁的记录),则同样解锁映射表,并将事务状态设置为 ABORTED,抛出 TransactionAbortException 异常,原因是尝试解锁时没有持有该表的锁。
  4. 遍历锁请求队列
    • 找到表的锁请求队列后,获取队列的锁 latch_,然后解锁映射表 table_lock_map_latch_,以便其他线程访问。
    • 遍历锁请求队列中的每个请求 iter,找到与当前事务 ID 相匹配的锁请求。
  5. 检查锁是否已授予
    • 如果找到匹配的锁请求,且该锁请求尚未被授予(即 granted_false),则解锁队列并将事务状态设置为 ABORTED,然后抛出 TransactionAbortException 异常,原因是试图解锁时没有持有该锁。
  6. 检查事务的隔离级别并更新事务状态
    • 根据事务的隔离级别(如 REPEATABLE_READREAD_COMMITTEDREAD_UNCOMMITTED)以及锁模式(如共享锁 SHARED 或排他锁 EXCLUSIVE),更新事务状态为 SHRINKING。这通常意味着事务已进入减少锁范围的阶段,在某些隔离级别下解锁表时,事务会进入这个状态。
  7. 从锁集合和请求队列中删除锁
    • 根据锁模式,将表 OID 从事务的相应表锁集合中删除(如 ExclusiveTableLockSetSharedTableLockSet 等)。
    • 然后从锁请求队列中删除该锁请求 iter
  8. 唤醒等待线程
    • 删除锁请求后,调用 notify_all() 唤醒等待该表锁的所有线程,以便其他事务可以尝试获取表锁。
  9. 解锁并返回
    • 最后,解锁请求队列的锁 latch_ 并返回 true,表示表锁成功解锁。
  10. 处理不存在锁的情况
  • 如果遍历锁请求队列后,发现没有与当前事务 ID 匹配的锁请求,则解锁队列,将事务状态设置为 ABORTED,并抛出 TransactionAbortException 异常,原因是试图解锁时没有持有该锁。

LockRow

给出LockRow的实现思路,详细步骤如下,有兴趣的·可以结合思路自己实现。

1. 初步检查事务状态和锁模式

  • 首先,代码对事务的状态进行检查。如果事务已经处于ABORTED(中止)或COMMITTED(提交)状态,抛出一个异常,表示逻辑错误。
  • 接着,代码检查行级别的锁类型。行级别的锁只支持共享锁(SHARED)和排他锁(EXCLUSIVE),如果请求的锁类型不符合条件,事务会被设置为ABORTED状态,并抛出一个TransactionAbortException异常,表示尝试在行级别申请无效的锁。

2. 检查表级别的锁是否存在

  • 如果申请的是共享锁(SHARED),需要检查事务是否在表级别持有适当的锁(如EXCLUSIVEINTENTION_EXCLUSIVEINTENTION_SHARED等)。如果没有持有相应的表锁,事务将被中止,并抛出TransactionAbortException异常。
  • 对于排他锁(EXCLUSIVE),要求事务在表级别至少持有EXCLUSIVEINTENTION_EXCLUSIVESHARED_INTENTION_EXCLUSIVE锁,否则事务也会被中止并抛出异常。

3. 根据隔离级别判断事务状态

  • 对于READ_UNCOMMITTED隔离级别,事务不能申请共享锁,且在处于“收缩阶段”(SHRINKING)时不能申请排他锁或意向锁。
  • 对于READ_COMMITTED隔离级别,在“收缩阶段”只能申请共享锁或意向共享锁,其他锁申请会导致事务中止。
  • 对于REPEATABLE_READ隔离级别,事务在“收缩阶段”不能申请任何锁,否则会导致事务中止。

4. 获取行的锁请求队列

  • 使用一个全局互斥锁row_lock_map_latch_来保护row_lock_map_,确保在访问和修改行锁请求队列时的线程安全。
  • 如果指定的行rid还没有锁请求队列,则为其创建一个新的锁请求队列,并将其插入到row_lock_map_中。
  • 之后,锁定该行的锁请求队列以进一步处理锁请求。

5. 检查锁升级情况

  • 遍历锁请求队列,检查当前事务是否已经在该行上申请了锁。如果发现已有锁:
    • 如果现有锁模式与请求的锁模式一致,则直接返回true,表示锁请求成功。
    • 如果需要进行锁升级,则需要确保锁升级是合法的。例如,检查当前是否已有其他事务正在进行锁升级,如果有则中止当前事务。
    • 锁升级还需要检查当前锁与目标锁模式的兼容性。如果不兼容,事务将被中止并抛出异常。

6. 处理锁升级

  • 如果需要进行锁升级,标记当前事务正在进行升级操作,并从事务的行锁集合中移除旧的锁。
  • 在锁请求队列中删除旧的锁请求,并为事务创建新的锁请求,将其插入锁请求队列的合适位置。

7. 尝试获取锁

  • 将新创建的锁请求添加到锁请求队列中后,尝试为事务授予锁。如果未能立即获取锁,则等待条件变量(cv_)的通知,直到锁被授予或事务被中止。
  • 如果事务在等待过程中被中止,锁请求将被从队列中移除,通知其他等待的事务重新尝试获取锁,并返回false表示锁申请失败。

8. 更新事务的行锁集合

  • 锁成功授予后,将锁请求标记为granted_,并更新事务管理的行锁集合:
    • 如果是排他锁(EXCLUSIVE),将行ID添加到事务的排他锁集合中。
    • 如果是共享锁(SHARED),将行ID添加到事务的共享锁集合中。

9. 释放锁和通知

  • 如果成功授予的锁不是排他锁,通知其他等待该行锁的事务,尝试获取锁。

最终,函数返回true表示锁申请成功。

由于在上述的过程中可能会出现死锁。死锁其实就是事务间彼此依赖成环,他们都在等待对面释放他们锁需要的锁。

补充:MySQL中的间隙锁

只是精简化的版本

InnoDB 的锁管理器会根据操作的类型决定是加行锁、间隙锁还是 Next-Key Lock。为了锁定区间,InnoDB 需要在 B+ 树索引中找到要锁定的前后记录,并在这两个记录之间加锁。

3.1 lock_rec_lock() 函数

InnoDB 中的 lock_rec_lock() 函数负责为指定的记录或区间加锁。这个函数会根据锁的类型(如 Gap Lock)决定锁定单个记录还是锁定区间。

以下是关键代码片段,展示了如何锁定区间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ulint lock_rec_lock(
ulint mode, /* 锁模式:LOCK_S, LOCK_X, ... */
ulint type, /* 锁类型:LOCK_REC, LOCK_GAP, ... */
dict_index_t* index, /* 要加锁的索引 */
ulint heap_no, /* 记录在页中的位置 */
page_t* page, /* 记录所在的页 */
trx_t* trx) /* 请求锁的事务 */
{
lock_t* lock;

mutex_enter(&kernel_mutex); // 加锁以保护全局数据结构

/* 创建一个新的锁对象 */
lock = lock_rec_create(mode, type, index, heap_no, trx);

/* 如果是间隙锁,则锁定前后记录之间的区间 */
if (type & LOCK_GAP) {
rec_t* prev_rec = btr_cur_prev_rec(index, page, heap_no);
rec_t* next_rec = btr_cur_next_rec(index, page, heap_no);

/* 锁定区间 [prev_rec, next_rec) */
lock_gap(prev_rec, next_rec, lock);
}

/* 将锁添加到事务的锁列表和全局锁表中 */
lock_rec_add_to_trx_and_hash(lock, trx);

mutex_exit(&kernel_mutex); // 释放全局锁

return(DB_SUCCESS);
}
  • mode:指定锁的模式,如共享锁 (LOCK_S) 或排他锁 (LOCK_X)。
  • type:指定锁的类型,如间隙锁 (LOCK_GAP)。
  • heap_no:表示记录在页中的位置。
  • page:记录所在的页,B+ 树索引记录存储在页中。
  • trx:请求加锁的事务。

3.2 btr_cur_prev_rec()btr_cur_next_rec() 函数

这两个函数用于获取当前记录的前后记录。这对于锁定区间(间隙)非常重要,InnoDB 需要知道前后记录的位置,才能锁定它们之间的区间。

  • btr_cur_prev_rec():返回当前记录的前一个记录。
  • btr_cur_next_rec():返回当前记录的后一个记录。
1
2
3
4
5
6
7
8
9
10
rec_t* btr_cur_prev_rec(dict_index_t* index, page_t* page, ulint heap_no) {
// 查找前一个记录(逻辑上)
// 实际代码会考虑页结构和 B+ 树局部性
return page_get_prev_rec(index, page, heap_no);
}

rec_t* btr_cur_next_rec(dict_index_t* index, page_t* page, ulint heap_no) {
// 查找后一个记录(逻辑上)
return page_get_next_rec(index, page, heap_no);
}

3.3 lock_gap() 函数

lock_gap 函数负责在前后记录之间的间隙加锁,以防止其他事务在该区间内插入新记录。

1
2
3
4
5
6
7
8
9
10
11
void lock_gap(rec_t *prev_rec, rec_t *next_rec, lock_t *lock) {
/* 锁定区间 [prev_rec, next_rec) */
lock->rec_lock_gap_start = prev_rec;
lock->rec_lock_gap_end = next_rec;

/* 设置锁的类型为间隙锁 */
lock->type_mode |= LOCK_GAP;

/* 将该锁信息加入到全局锁哈希表中 */
lock_add_to_hash(lock);
}
  • prev_recnext_rec:分别表示前一个记录和下一个记录。
  • lock_gap():在这两个记录之间锁定区间。

死锁的产生需要满足四个必要条件

  • 互斥条件:一个资源每次只能被一个进程使用。

  • 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。

  • 不剥夺条件:已分配给线程的资源,在未使用完之前,不能强行剥夺。

  • 循环等待条件:在发生死锁时必然存在一个进程等待队列,其中每个进程都在等待下一个进程所占有的资源,形成一个进程等待环路。

核心就是如果事物Ti在等待事物Tj释放锁。则画一条从i–>j的边。如果检测完所有的冲突事务。如果出现环则表示出现了死锁。如果没有环则表示没有死锁。

image-20240828184742148

这样形成了一个环就发生了死锁,这个时候就需要abort。

终止哪些事务呢!

有以下四种策略判定:

1. By Age (Lowest Timestamp)

  • 描述: 选择拥有最早时间戳的事务。这意味着越早开始的事务越有可能获得优先权。
  • 优点: 避免长期运行的事务被饿死,可以确保事务按启动顺序完成。

2. By Progress (Least/Most Queries Executed)

  • 描述: 根据事务已执行的查询数量来决定优先顺序。可以选择执行最少或最多查询的事务。
  • 优点
    • Least Executed: 优先处理刚开始的事务,减少资源浪费。
    • Most Executed: 优先完成几乎完成的事务,避免浪费已用的计算资源。

3. By the Number of Items Already Locked

  • 描述: 根据事务已经锁定的资源数量来决定优先级。通常,锁定资源较少的事务更容易处理。
  • 优点: 减少锁争用,可能提高系统整体吞吐量。

4. By the Number of Transactions That We Have to Rollback With It

  • 描述: 考虑到如果回滚当前事务,需要回滚的其他事务数量。优先处理影响较小的事务。
  • 优点: 减少连锁回滚的影响,降低系统恢复的复杂性。

tip

MySQL 发现死锁就立即报错,所以使用的是死锁避免方法。

Postgresql 发现死锁需要等一会儿,所以进行的死锁检测。

参考资料: