explorer

万丈高楼平地起,勿在浮沙筑高台

0%

[What]Condition Variables

平时很少使用条件变量,正好来学习一下应用场景。

条件变量的正确使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>

static int done = 0;
static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t c = PTHREAD_COND_INITIALIZER;

static void Pthread_mutex_lock(pthread_mutex_t *mutex)
{
if(pthread_mutex_lock(mutex))
{
perror("mutex lock failed:");
exit(1);
}
}
static void Pthread_mutex_unlock(pthread_mutex_t *mutex)
{
if(pthread_mutex_unlock(mutex))
{
perror("mutex unlock failed:");
exit(1);
}
}
static void Pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
if(pthread_cond_wait(cond, mutex))
{
perror("cond wait failed:");
exit(1);
}
}
static void Pthread_cond_signal(pthread_cond_t *cond)
{
if(pthread_cond_signal(cond))
{
perror("cond signal failed:");
exit(1);
}
}
static void Pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg)
{
if(pthread_create(thread, attr, start_routine, arg))
{
perror("pthread create failed:");
exit(1);
}
}
static void con_wait(int val)
{
Pthread_mutex_lock(&m);
while (done != val)
{
Pthread_cond_wait(&c, &m);
printf("wakeup by done: %d, and I need val: %d\n",
done, val);
}

Pthread_mutex_unlock(&m);
}
static void cond_signal(int val) {
Pthread_mutex_lock(&m);
done = val;
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&m);
}

static void *child_wait(void *arg)
{
int val = (int )arg;

con_wait(val);
printf("thread:%lu has been woken up by parent val: %d\n",
pthread_self(), val);
}

int main(int argc, char *argv[])
{
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p, NULL, child_wait, (void *)1);
Pthread_create(&p, NULL, child_wait, (void *)2);
usleep(500000);
cond_signal(1);
sleep(1);
cond_signal(2);
usleep(100000);

return 0;
}

需要理解的是, pthread_cond_wait() 函数在将线程睡眠前,会释放互斥量。在线程被 唤醒后,会获取互斥量后返回。

生产者与消费者模式

示例代码

假设生产者与消费者交互的缓存仅仅是一个整型变量,并通过另外一个变量来标识该缓存是 否为空或为满:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int buffer;
int count = 0; // initially, empty

void put(int value) {
assert(count == 0);
count = 1;
buffer = value;
}

int get() {
assert(count == 1);
count = 0;
return buffer;
}

可以看到,变量 count 的作用就类似于条件变量,根据条件变量的值来确定是否操作缓 存。那么假设有两个线程分别作为生产者和消费者,并且进行多次的读写操作,就会类似下 面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void *producer(void *arg) {
int i;
int loops = (int) arg;
for (i = 0; i < loops; i++) {
put(i);
}
}

void *consumer(void *arg) {
while (1) {
int tmp = get();
printf("%d\n", tmp);
}
}

很明显,为了保证能够正确的读写,需要保证两个操作:

  1. 读写不能同时进行
  2. 读写要有先后顺序

如果仅仅使用一个互斥量来原子化临界区,仅仅能保证对缓存的读写不会同时运行。但是却 无法避免对已经写过的还未被读取的缓存,再次执行写操作。

  • 如果使用 assert 来判断,当 assert 为假时,进程就会被杀掉。
  • 也可以在临界区中用 if 判断 count ,但是这样很可能会浪费 CPU 来做无用功。

有缺陷的解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int loops; // must initialize somewhere...
cond_t cond;
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
if (count == 1) // p2
Pthread_cond_wait(&cond, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&cond); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
if (count == 0) // c2
Pthread_cond_wait(&cond, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&cond); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}

使用条件变量,通过互斥量来保证临界区的原子性,通过条件变量来保证先后顺序:

  • 当生产者写完缓存后,它需要等待变量为空(缓存被读取)后才进行后面的写操作。对于 消费者来说也是如此。

但是上面这个代码只能在一个线程生产者和一个线程消费者的情况下适用,当有多线程时就 会出问题:对于条件 count 的判断使用的是 if 语句,当下次从 Pthread_cond_wait 被唤醒时,由于没有再次判断 count ,所以一定会执行对应的缓 存区操作。

假设一个生产者线程,两个消费者线程,可能出现下面这种情况:

broken_1.jpg

同理,两个生产者和一个消费者线程,也会出现类似的情况。

解决这个问题的方法也比较简单,将 if 换为 while 来持续判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int loops; // must initialize somewhere...
cond_t cond;
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == 1) // p2
Pthread_cond_wait(&cond, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&cond); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&cond, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&cond); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}

但是上面的代码依然不完美,其根本原因在于:生产者和消费者都在等待同一个条件变量。

当有一个生产者和两个消费者时,很有可能生产者在等待缓存空闲,但当一个消费者发送信号后,唤醒了另外一个消费者线程,这样大家就都永久的处于睡眠状态了。

broken_2.jpg

也就是说,一个消费者不能够唤醒另一个消费者!

成熟的解决方案

基于上面的问题,我们需要使用两个条件变量:

  • 一个变量标识具有空位,由消费者释放,生产者等待。
  • 一个变量标识具有满位,由生产者释放,消费者等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cond_t empty, fill;
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 1)
Pthread_cond_wait(&empty, &mutex);
put(i);
Pthread_cond_signal(&fill);
Pthread_mutex_unlock(&mutex);
}
}

void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&fill, &mutex);
int tmp = get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}

基于上面这种实现方式,我们可以将缓冲区扩大,以让一个生产者可以唤醒多个消费者,提高并发度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int buffer[MAX];
int fill_ptr = 0;
int use_ptr = 0;
int count = 0;

void put(int value) {
buffer[fill_ptr] = value;
fill_ptr = (fill_ptr + 1) % MAX;
count++;
}

int get() {
int tmp = buffer[use_ptr];
use_ptr = (use_ptr + 1) % MAX;
count--;
return tmp;
}
cond_t empty, fill;
mutex_t mutex;

void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // p1
while (count == MAX) // p2
Pthread_cond_wait(&empty, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&fill); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}


void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
while (count == 0) // c2
Pthread_cond_wait(&fill, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&empty); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}
Last Updated 2020-06-28 Sun 11:59.
Render by hexo-renderer-org with Emacs 26.3 (Org mode 9.3.7)