我,无畏并发。 ——也许存在于某个平行宇宙的《BanG Dream! Ave Mujica》(但大概率也是烂番)

零、一些基本的讨论

  • 锁应当存放在大家共用的内存空间,总之不应该是栈。
  • 一个好的锁应该提供正确的互斥、保障线程公平、性能优秀。

零、一些失败的想法

  1. 关中断:这很容易被滥用,导致系统接收不到中断信息;不适用于多处理器系统。
  2. Peterson 算法:必须假设访存的原子性,但这一点现代硬件早就打破了。
int flag[2];
int turn;
void init() {
    flag[0] = flag[1] = 0;
    turn = 0;
}
void lock() {
    flag[self] = 1; // 竖起自己的旗子
    turn = 1 - self;  // 让 turn 等于对面
    // 我们假设对面此时也执行了这两句,turn 此时等于 self
    while((flag[1 - self] == 1) && (turn == 1 - self))
        ;      // 因为对面改变了turn,那我们先执行了,对面先等着
               // 先修改 turn 的才能跳出这个 while,也就是手快有手慢无
}
void unlock() {
    flag[self] = 0;
}

一、test-and-set

这种想法基于自旋锁(也就是只通过一个 flag 区分是否上锁),但是不依赖硬件显然不行,因为更精妙的 Peterson 都行不通。

x86 提供了与以下 C 代码类似的原子指令,硬件保证这个过程的原子性:

// not code, but hardware implemented atomic test-and-set operation
int TEST_AND_SET(int *old_ptr, int new) {
    int old = *old_ptr;
    *old_ptr = new;
    return old;
}
// 也就是把值改成新的,返回旧值

这样我们终于能够设计年轻人的第一把锁了:

// this is C code!!!
void init(lock *lock) {
    lock->flag = 0;
}
void lock(lock *lock) {
    while (TEST_AND_SET(&lock->flag, 1) == 1)
        ;
}
void unlock(lock *lock) {
    lock->flag = 0;
}
// 多么轻松愉快!

先冷静一下,这样做对单处理器非抢占式系统是不行的,因为一旦无法获得锁,那就只能在那里空转,一切都没有办法停下来。但是还好这些系统已经不复存在,在多处理器这套办法工作的不错。

二、compare-and-exchange

先来看与硬件原语近似的代码:

int COMPARE_AND_EXCHANGE(int* ptr, int expected, int new) {
    int actual = *ptr;
    if (actual == expected) {
        *pyr = new;
    }
    return actual;
}
// 如果和 expected 一样就换成新值,返回旧值

我们只需把上面的代码换个 while 循环就行了:

while (COMPARE_AND_EXCHANGE(&lock->flag, 0, 1) == 1)
    ;

隐约可以感觉到,这条指令用来实现自旋锁是大材小用。它还有更高级的用法。

三、load-linked & store-conditional

int LOAD_LINKED(int *ptr) {
    return *ptr;
}

int STORE_CONDITIONAL(int *ptr, int value) {
    if (在调用 LOAD_LINKED(ptr) 后没有人更新这个值) {
        *ptr = value;
        return 1;
    }
    return 0;
} 

对应的锁的实现会复杂一些:

void lock(lock *lock) {
    while (1) {
        while (LOAD_LINKED(lock->flag) == 1)
            ; // 是 1 先转着
        if (STORE_CONDITIONAL(lock->flag, 1) == 1)
            break; // 多个进程抢改变 flag 的机会
    }
}
void unlock(lock *lock) {
    lock->flag = 0;
}

四、fetch-and-add

int FETCH_AND_ADD(int* ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old; // 返回旧值,自增1
                // 相当于 (*ptr)++
}

这是我们自己写的版本,它正常工作但还是不够好:

typedef struct {
    int a;
    int b;
} lock;
void init(lock *lock) {
    lock->a = 0;
    lock->b = 0;
}
void lock(lock *lock) {
    while (FETCH_AND_ADD(ptr->a) > ptr->b)
        ;
}
void unlock(lock *lock) {
    ptr->b = ptr->a;
}
// 我们的版本没有增添“让每个调用lock的线程都会按序被调用”这样的功能

更好的实现是:

int init(lock *lock) {
    lock->ticket = 0;
    lock->turn = 0;
}
void lock(lock *lock) {
    int myturn = FETCH_AND_ADD(ptr->ticket);
    while (mytrun != ptr->turn)
        ;
}
void unlock(lock *lock) {
    ptr->turn++;
}

这种实现的好处是,让调用 FETCH_ADD_ADD 的顺序自然地变成了被解锁的顺序,确保线程的公平性。

五、为什么非得自旋呢?

自旋的进程浪费了大量CPU时间。操作系统显然应该给他们提供休眠的系统调用让其让出 CPU,然后在正确的时机再叫醒他们。

下面给出的就是 GNU libc 为 linux 提供的锁的实现了:

void mutex_lock(int *mutex) {
    int v;
    // Bit 31 was clear, we got the mutex (fastpath)
    // 第 31 位是最高位,如果是 0 说明锁是空闲的,直接返回。
    if (atomic_bit_test_set(mutex, 31) == 0)
        return;
    // 递增 mutex,记录等待的线程数
    atomic_increment(mutex);
    while (1) {
        if (atomic_bit_test_set(mutex, 31) == 0) {
            atomic_decrement(mutex);
            return;
            // 如果获得锁,减少等待线程数,返回
        }
        // Have to waitFirst to make sure futex value
        // we are monitoring is negative (locked).
        // 如果锁变成正值,那不要睡眠,再次尝试
        v = *mutex;
        if (v >= 0)
            continue;
        // 睡了睡了
        futex_wait(mutex, v);
    }
}
void mutex_unlock(int *mutex) {
    // Adding 0x80000000 to counter results in 0 if and
    // only if there are not other interested threads
    // 清零最高位,如果 mutex 变成 0 那就直接返回
    if (atomic_add_zero(mutex, 0x80000000))
        return;
    // There are other threads waiting for this mutex,
    // wake one of them up.
    // 不为 0,唤醒一个进程吧。
    futex_wake(mutex);
}

六、使用锁

最显然的使用锁的方法被称作“一把大锁保平安”。对核心数据结构的修改和读取应当在加了锁的情况下再去进行。

“一把大锁保平安”足够简单,但如果遇到了性能瓶颈,那应该考虑进行一些性能优化。(过早优化是万恶之源。)

另外,要小心控制流!如果不在特殊控制流(例如错误处理)的时候释放锁,可能造成死锁的情况。

(一)计数器

我们这样设计:给每个 CPU Core 准备一个局部计数器和对应的局部锁,每个 Core 上的计数器修改会在局部计数器上进行。当局部计数器满足一定阈值 S 时,争夺全局锁,把结果加给全局计数器。通过阈值,我们平衡准确度和性能。

(二)队列

我们希望在队列头和队列尾的操作能够并行,那么准备两把锁是个好主意。为了不让头和尾的操作互相干扰,应该添加一个特殊的空白节点。