Computer science is no more about computers than astronomy is about telescopes. ——Dijkstra
一、条件变量
条件变量的类型是 cond_t
。有两个 API:
Pthread_cond_wait(&cond_t, &mutex_t)
:- 解锁
mutex
; - 进入 wait 状态;
- 重新开始执行后,争夺
mutex
; - 返回。
- 解锁
Pthread_cond_signal(&cond_t)
:- 使任意一个该条件变量对应的 wait 状态的进程就绪。(就绪不意味着立即执行,只是可能被调度。)
其最简单的用法是用来实现父进程对子进程的 join,也就是任务排序问题。
- wait() 可以阻塞父进程;
- 子进程执行完任务后,通过 signal() 给父进程发信号,使父进程继续执行。
二、生产者-消费者问题
接下来我们关注如何使用条件变量和锁解决生产者-消费者问题。我们以对一个 int 的 get 和 put 作为例子。
(一)错误的尝试
我们首先给出一种错误的尝试:
int count = 0;
pthread_cond_t c;
pthread_mutex_t m;
void producer(int x) {
Pthread_mutex_lock(&m);
count = 1;
put(x);
Pthread_cond_signal(&c); // 这里必须加锁
// 虽然不是在所有情况下都必要,但是在signal时加锁是更被建议的行为。
Pthread_mutex_unlock(&m);
}
void customer() {
Pthread_mutex_lock(&m);
while (count == 0) // 这里用 while 的用意是在 wait 后再确认一下条件是否满足,减少 bug
// 在多个生产者和消费者的情况下,显然 while 是必要的而非建议的
Pthread_cond_wait(&c, &m); // wait() 假定你已经获得了锁,在睡前放弃锁,在睡醒后抢夺锁
int x = get();
Pthread_mutex_unlock(&m);
}
我们使用了这些东西,它们缺一不可:
- 条件:就是
count
,用来在满足条件时跳过 wait()。 - 条件变量:
- 两方在进行条件访问时应当加上锁:防止意外的执行顺序让值被错误修改。
(二)正确的做法
上面的方法在一个生产者、一个消费者时工作的很好。但是在多个消费者和生产者时会出现致命的问题。 我们假定一个生产者 P,两个消费者 C1、C2。有以下的执行队列让大家长睡不醒:
C1(不满足条件,睡眠)
C2(不满足条件,睡眠)
P(生产了一个东西)
C1(被选中,就绪)
P(不满足条件,睡眠)
C1(消费)
C2(被选中,就绪)
C1(没数据可取,睡眠)
C2(没数据可取,睡眠)
核心问题是,我们通过 signal 叫起来的可能不是我们需要的线程。这样我们就有两种方案了:
- 使用两个条件变量,让生产者叫消费者起床,消费者叫生产者起床;
- 提供一个把所有线程都喊到就绪状态的函数。
第二个方法就是 pthread_cond_broadcast()
了。它会叫醒所有等待的线程,但是可能导致惊群效应(所有线程被唤醒后,只有一个线程真正能消费)。这并不是高效的做法。
但是,有的情况下使用 broadcast
代替 signal
是一种行得通的办法。如果唤起的消费者不一定能够消费生产者的产物,那唤起所有的消费者是一种可行的解决方法。
作为最后的总结,我们给出正确的代码:
cond_t empty, full;
mutex_t mutex;
int count;
void producer(int loops) {
for (int i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == MAX) {
Pthread_cond_wait(&empty, &mutex);
}
put(i); // do sth...
Pthread_cond_signal(&full);
Pthread_mutex_unlock(&mutex);
}
}
void consumer(int loops) {
for (int i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0) {
Pthread_cond_wait(&full, &mutex);
}
int res = get(); // do sth...
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", res);
}
}
三、信号量
锁和条件变量确实无所不能,可以解决全部的(可解决的)并发问题。但是,它们给出的解决方案不一定是最简洁优雅的,对于一部分问题,信号量是方便的工具。我们给出信号量的 API:
sem_t s;
sem_init(&s, 0, X); // 把 s 的值初始化为 X
sem_wait(sem_t *s); // 把 s 的值减一。如果值是负的那就 wait。
// 这意味着 s 的值如果为负,其绝对值反映了等待的线程数。
sem_post(sem_t *s); // 把 s 的值加一。唤醒一个与 s 关联的 waiting 线程。
(一)信号量实现锁
// 锁的实现是很简单的
sem_init(&m, 0, 1);
sem_wait(&m);
// do sth...
sem_post(&m);
说实话,刚开始我的感受是,fetch-and-add 硬件原语和信号量机制很像。但是:
- 前者没有提供原子性的减少值的方法,
- 而且也没有提供 wait 和唤醒的功能。
因此,它们完全不是一回事。
(二)信号量解决任务排序问题
信号量用来保证执行顺序也很直观,只是初始值应该是 0,这样就能让 wait 卡住,由 post 唤醒。
(三)信号量解决生产者-消费者问题
我们不能再犯以前犯过的错误:
- 访问条件要加锁;
- while 确认是更好的;
- 使用两个条件变量,不然会死锁。 我们写出以下代码:
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&lock, 0, 1);
void producer(int loops) {
for (int i = 0; i < loops; i++) {
sem_wait(&empty);
sem_wait(&lock);
put(i); // do sth...
sem_post(&lock);
sem_post(&full);
}
}
void consumer(int loops) {
for (int i = 0; i < loops; i++) {
sem_wait(&full);
sem_wait(&lock);
int res = get(); // do sth...
sem_post(&lock);
sem_post(&empty);
printf("%d\n", res);
}
}
因为 sem_wait()自带减值和判断功能,所以 while 循环被移除了;因为sem_wait() 不自带加锁和解锁的功能,我们只能把锁放到内部。
很高兴的是,这样做没有什么问题。这就是最终的解决方案。
(四)实现信号量
typedef struct _Zem_t {
int value;
pthread_cond_t* cond;
pthread_mutex_t* lock;
} Zem_t;
void Zem_wait(Zem_t *s) {
Pthread_mutex_lock(s->lock);
while(value < 0)
Pthread_cond_wait(s->cond, s->lock);
value--; // 我们放弃了可能的负数表示
Pthread_mutex_unlock(s->lock);
}
void Zem_post(Zem_t *s) {
Pthread_mutex_lock(s->lock);
value++;
Pthread_cond_post(s->cond);
Pthread_mutex_unlock(s->lock);
}
需要注意的是,因为信号量的 wait 是有条件的,而条件变量的 wait 是无条件的,很难用信号量实现条件变量。
四、死锁
- 产生死锁的四个条件:
- 互斥:需要互斥访问;
- 持有并等待:持有了一部分锁,等待其他的锁;
- 非抢占:不能强制持有他人的锁;
- 循环等待:锁的依赖形成了环路。
- 预防死锁:
规定获取锁的顺序:可以通过比较锁的地址确保以相同的顺序获得锁;
对抢锁的过程上锁:降低一些效率,确保锁被全部获得(这很难做到);
主动让锁:
top: lock(L1); if (trylock(L2) == -1) { unlock(L1); goto top; }
但是,太过谦让也可能导致“活锁”。一种解决方案是增加一些延迟,降低双方的礼让撞在一起的概率。
而且,这样做还是很难封装成函数。
避免互斥: 我们使用一些硬件提供的原子指令,来避免锁造成的问题。