Computer science is no more about computers than astronomy is about telescopes. ——Dijkstra

一、条件变量

条件变量的类型是 cond_t。有两个 API:

  1. Pthread_cond_wait(&cond_t, &mutex_t)
    1. 解锁 mutex
    2. 进入 wait 状态;
    3. 重新开始执行后,争夺 mutex
    4. 返回。
  2. Pthread_cond_signal(&cond_t)
    1. 使任意一个该条件变量对应的 wait 状态的进程就绪。(就绪不意味着立即执行,只是可能被调度。)

其最简单的用法是用来实现父进程对子进程的 join,也就是任务排序问题。

  • wait() 可以阻塞父进程;
  • 子进程执行完任务后,通过 signal() 给父进程发信号,使父进程继续执行。

二、生产者-消费者问题

接下来我们关注如何使用条件变量和锁解决生产者-消费者问题。我们以对一个 int 的 get 和 put 作为例子。

(一)错误的尝试

我们首先给出一种错误的尝试:

int count = 0;
pthread_cond_t c;
pthread_mutex_t m;

void producer(int x) {
    Pthread_mutex_lock(&m);
    count = 1;
    put(x);
    Pthread_cond_signal(&c);    // 这里必须加锁
                                // 虽然不是在所有情况下都必要,但是在signal时加锁是更被建议的行为。
    Pthread_mutex_unlock(&m);
}
void customer() {
    Pthread_mutex_lock(&m);
    while (count == 0) // 这里用 while 的用意是在 wait 后再确认一下条件是否满足,减少 bug
                       // 在多个生产者和消费者的情况下,显然 while 是必要的而非建议的
        Pthread_cond_wait(&c, &m); // wait() 假定你已经获得了锁,在睡前放弃锁,在睡醒后抢夺锁
    int x = get();
    Pthread_mutex_unlock(&m);
}

我们使用了这些东西,它们缺一不可:

  • 条件:就是 count,用来在满足条件时跳过 wait()。
  • 条件变量:
  • 两方在进行条件访问时应当加上锁:防止意外的执行顺序让值被错误修改。

(二)正确的做法

上面的方法在一个生产者、一个消费者时工作的很好。但是在多个消费者和生产者时会出现致命的问题。 我们假定一个生产者 P,两个消费者 C1、C2。有以下的执行队列让大家长睡不醒:

C1(不满足条件,睡眠)
                        C2(不满足条件,睡眠)
                                                P(生产了一个东西)
C1(被选中,就绪)
                                                P(不满足条件,睡眠)
C1(消费)
                        C2(被选中,就绪)
C1(没数据可取,睡眠)
                        C2(没数据可取,睡眠)

核心问题是,我们通过 signal 叫起来的可能不是我们需要的线程。这样我们就有两种方案了:

  • 使用两个条件变量,让生产者叫消费者起床,消费者叫生产者起床;
  • 提供一个把所有线程都喊到就绪状态的函数。

第二个方法就是 pthread_cond_broadcast() 了。它会叫醒所有等待的线程,但是可能导致惊群效应(所有线程被唤醒后,只有一个线程真正能消费)。这并不是高效的做法。

但是,有的情况下使用 broadcast 代替 signal 是一种行得通的办法。如果唤起的消费者不一定能够消费生产者的产物,那唤起所有的消费者是一种可行的解决方法。

作为最后的总结,我们给出正确的代码:

cond_t empty, full;
mutex_t mutex;
int count;

void producer(int loops) {
    for (int i = 0; i < loops; i++) {
        Pthread_mutex_lock(&mutex);
        while (count == MAX) {
            Pthread_cond_wait(&empty, &mutex);
        }
        put(i); // do sth...
        Pthread_cond_signal(&full);
        Pthread_mutex_unlock(&mutex);
    }
} 

void consumer(int loops) {
    for (int i = 0; i < loops; i++) {
        Pthread_mutex_lock(&mutex);
        while (count == 0) {
            Pthread_cond_wait(&full, &mutex);
        }
        int res = get(); // do sth...
        Pthread_cond_signal(&empty);
        Pthread_mutex_unlock(&mutex);
        printf("%d\n", res);
    }
}

三、信号量

锁和条件变量确实无所不能,可以解决全部的(可解决的)并发问题。但是,它们给出的解决方案不一定是最简洁优雅的,对于一部分问题,信号量是方便的工具。我们给出信号量的 API:

sem_t s;
sem_init(&s, 0, X); // 把 s 的值初始化为 X
sem_wait(sem_t *s); // 把 s 的值减一。如果值是负的那就 wait。
                    // 这意味着 s 的值如果为负,其绝对值反映了等待的线程数。
sem_post(sem_t *s); // 把 s 的值加一。唤醒一个与 s 关联的 waiting 线程。

(一)信号量实现锁

// 锁的实现是很简单的
sem_init(&m, 0, 1);
sem_wait(&m);
// do sth...
sem_post(&m);

说实话,刚开始我的感受是,fetch-and-add 硬件原语和信号量机制很像。但是:

  • 前者没有提供原子性的减少值的方法,
  • 而且也没有提供 wait 和唤醒的功能。

因此,它们完全不是一回事。

(二)信号量解决任务排序问题

信号量用来保证执行顺序也很直观,只是初始值应该是 0,这样就能让 wait 卡住,由 post 唤醒。

(三)信号量解决生产者-消费者问题

我们不能再犯以前犯过的错误:

  • 访问条件要加锁;
  • while 确认是更好的;
  • 使用两个条件变量,不然会死锁。 我们写出以下代码:
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&lock, 0, 1);

void producer(int loops) {
    for (int i = 0; i < loops; i++) {
        sem_wait(&empty);
        sem_wait(&lock);
        put(i); // do sth...
        sem_post(&lock);
        sem_post(&full);
    }
} 

void consumer(int loops) {
    for (int i = 0; i < loops; i++) {
        sem_wait(&full);
        sem_wait(&lock);
        int res = get(); // do sth...
        sem_post(&lock);
        sem_post(&empty);
        printf("%d\n", res);
    }
}

因为 sem_wait()自带减值和判断功能,所以 while 循环被移除了;因为sem_wait() 不自带加锁和解锁的功能,我们只能把锁放到内部。

很高兴的是,这样做没有什么问题。这就是最终的解决方案。

(四)实现信号量

typedef struct _Zem_t {
    int value;
    pthread_cond_t* cond;
    pthread_mutex_t* lock;
} Zem_t;

void Zem_wait(Zem_t *s) {
    Pthread_mutex_lock(s->lock);
    while(value < 0) 
        Pthread_cond_wait(s->cond, s->lock);
    value--; // 我们放弃了可能的负数表示
    Pthread_mutex_unlock(s->lock);
}

void Zem_post(Zem_t *s) {
    Pthread_mutex_lock(s->lock);
    value++;
    Pthread_cond_post(s->cond);
    Pthread_mutex_unlock(s->lock);
}

需要注意的是,因为信号量的 wait 是有条件的,而条件变量的 wait 是无条件的,很难用信号量实现条件变量。

四、死锁

  1. 产生死锁的四个条件:
  • 互斥:需要互斥访问;
  • 持有并等待:持有了一部分锁,等待其他的锁;
  • 非抢占:不能强制持有他人的锁;
  • 循环等待:锁的依赖形成了环路。
  1. 预防死锁:
  • 规定获取锁的顺序:可以通过比较锁的地址确保以相同的顺序获得锁;

  • 对抢锁的过程上锁:降低一些效率,确保锁被全部获得(这很难做到);

  • 主动让锁:

    top:
        lock(L1);
        if (trylock(L2) == -1) {
            unlock(L1);
            goto top;
        }
    

    但是,太过谦让也可能导致“活锁”。一种解决方案是增加一些延迟,降低双方的礼让撞在一起的概率。

    而且,这样做还是很难封装成函数。

  • 避免互斥: 我们使用一些硬件提供的原子指令,来避免锁造成的问题。