我,无畏并发。 ——也许存在于某个平行宇宙的《BanG Dream! Ave Mujica》(但大概率也是烂番)
零、一些基本的讨论
- 锁应当存放在大家共用的内存空间,总之不应该是栈。
- 一个好的锁应该提供正确的互斥、保障线程公平、性能优秀。
零、一些失败的想法
- 关中断:这很容易被滥用,导致系统接收不到中断信息;不适用于多处理器系统。
- Peterson 算法:必须假设访存的原子性,但这一点现代硬件早就打破了。
int flag[2];
int turn;
void init() {
flag[0] = flag[1] = 0;
turn = 0;
}
void lock() {
flag[self] = 1; // 竖起自己的旗子
turn = 1 - self; // 让 turn 等于对面
// 我们假设对面此时也执行了这两句,turn 此时等于 self
while((flag[1 - self] == 1) && (turn == 1 - self))
; // 因为对面改变了turn,那我们先执行了,对面先等着
// 先修改 turn 的才能跳出这个 while,也就是手快有手慢无
}
void unlock() {
flag[self] = 0;
}
一、test-and-set
这种想法基于自旋锁(也就是只通过一个 flag 区分是否上锁),但是不依赖硬件显然不行,因为更精妙的 Peterson 都行不通。
x86 提供了与以下 C 代码类似的原子指令,硬件保证这个过程的原子性:
// not code, but hardware implemented atomic test-and-set operation
int TEST_AND_SET(int *old_ptr, int new) {
int old = *old_ptr;
*old_ptr = new;
return old;
}
// 也就是把值改成新的,返回旧值
这样我们终于能够设计年轻人的第一把锁了:
// this is C code!!!
void init(lock *lock) {
lock->flag = 0;
}
void lock(lock *lock) {
while (TEST_AND_SET(&lock->flag, 1) == 1)
;
}
void unlock(lock *lock) {
lock->flag = 0;
}
// 多么轻松愉快!
先冷静一下,这样做对单处理器非抢占式系统是不行的,因为一旦无法获得锁,那就只能在那里空转,一切都没有办法停下来。但是还好这些系统已经不复存在,在多处理器这套办法工作的不错。
二、compare-and-exchange
先来看与硬件原语近似的代码:
int COMPARE_AND_EXCHANGE(int* ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected) {
*pyr = new;
}
return actual;
}
// 如果和 expected 一样就换成新值,返回旧值
我们只需把上面的代码换个 while 循环就行了:
while (COMPARE_AND_EXCHANGE(&lock->flag, 0, 1) == 1)
;
隐约可以感觉到,这条指令用来实现自旋锁是大材小用。它还有更高级的用法。
三、load-linked & store-conditional
int LOAD_LINKED(int *ptr) {
return *ptr;
}
int STORE_CONDITIONAL(int *ptr, int value) {
if (在调用 LOAD_LINKED(ptr) 后没有人更新这个值) {
*ptr = value;
return 1;
}
return 0;
}
对应的锁的实现会复杂一些:
void lock(lock *lock) {
while (1) {
while (LOAD_LINKED(lock->flag) == 1)
; // 是 1 先转着
if (STORE_CONDITIONAL(lock->flag, 1) == 1)
break; // 多个进程抢改变 flag 的机会
}
}
void unlock(lock *lock) {
lock->flag = 0;
}
四、fetch-and-add
int FETCH_AND_ADD(int* ptr) {
int old = *ptr;
*ptr = old + 1;
return old; // 返回旧值,自增1
// 相当于 (*ptr)++
}
这是我们自己写的版本,它正常工作但还是不够好:
typedef struct {
int a;
int b;
} lock;
void init(lock *lock) {
lock->a = 0;
lock->b = 0;
}
void lock(lock *lock) {
while (FETCH_AND_ADD(ptr->a) > ptr->b)
;
}
void unlock(lock *lock) {
ptr->b = ptr->a;
}
// 我们的版本没有增添“让每个调用lock的线程都会按序被调用”这样的功能
更好的实现是:
int init(lock *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock *lock) {
int myturn = FETCH_AND_ADD(ptr->ticket);
while (mytrun != ptr->turn)
;
}
void unlock(lock *lock) {
ptr->turn++;
}
这种实现的好处是,让调用 FETCH_ADD_ADD 的顺序自然地变成了被解锁的顺序,确保线程的公平性。
五、为什么非得自旋呢?
自旋的进程浪费了大量CPU时间。操作系统显然应该给他们提供休眠的系统调用让其让出 CPU,然后在正确的时机再叫醒他们。
下面给出的就是 GNU libc 为 linux 提供的锁的实现了:
void mutex_lock(int *mutex) {
int v;
// Bit 31 was clear, we got the mutex (fastpath)
// 第 31 位是最高位,如果是 0 说明锁是空闲的,直接返回。
if (atomic_bit_test_set(mutex, 31) == 0)
return;
// 递增 mutex,记录等待的线程数
atomic_increment(mutex);
while (1) {
if (atomic_bit_test_set(mutex, 31) == 0) {
atomic_decrement(mutex);
return;
// 如果获得锁,减少等待线程数,返回
}
// Have to waitFirst to make sure futex value
// we are monitoring is negative (locked).
// 如果锁变成正值,那不要睡眠,再次尝试
v = *mutex;
if (v >= 0)
continue;
// 睡了睡了
futex_wait(mutex, v);
}
}
void mutex_unlock(int *mutex) {
// Adding 0x80000000 to counter results in 0 if and
// only if there are not other interested threads
// 清零最高位,如果 mutex 变成 0 那就直接返回
if (atomic_add_zero(mutex, 0x80000000))
return;
// There are other threads waiting for this mutex,
// wake one of them up.
// 不为 0,唤醒一个进程吧。
futex_wake(mutex);
}
六、使用锁
最显然的使用锁的方法被称作“一把大锁保平安”。对核心数据结构的修改和读取应当在加了锁的情况下再去进行。
“一把大锁保平安”足够简单,但如果遇到了性能瓶颈,那应该考虑进行一些性能优化。(过早优化是万恶之源。)
另外,要小心控制流!如果不在特殊控制流(例如错误处理)的时候释放锁,可能造成死锁的情况。
(一)计数器
我们这样设计:给每个 CPU Core 准备一个局部计数器和对应的局部锁,每个 Core 上的计数器修改会在局部计数器上进行。当局部计数器满足一定阈值 S 时,争夺全局锁,把结果加给全局计数器。通过阈值,我们平衡准确度和性能。
(二)队列
我们希望在队列头和队列尾的操作能够并行,那么准备两把锁是个好主意。为了不让头和尾的操作互相干扰,应该添加一个特殊的空白节点。