深入Phtread(三):线程的同步-Condition Variables

继续昨天的线程同步,条件变量(Condition Variables)是用于线程间,通信共享数据状态改变的机制。

简介

当线程互斥地访问一些共享的状态时,往往会有些线程需要等到这些状态改变后才应该继续执行。如:有一个共享的队列,一个线程往队列里面插入数据,另一个线程从队列中取数据,当队列为空的时候,后者应该等待队列里面有值才能取数据。而共享数据(队列)应该用mutex来保护,为了检查共享数据的状态(队列是否为空),线程必须先锁定mutex,然后检查,最后解锁mutex。

问题出来了:当另外一个线程B锁定mutex后,往队列里面插入了一个值,B并不知道A在等着它往队列里面放入一个值。,线程A(等待状态改变)一直在运行,线程B可能已经检查过队列是空的,并不知道队列里已经有值了,所以一直阻塞着自己。为了解决这样的问题引入了条件变量机制。线程B等待于一个条件变量,当线程A插入了一个值后,signal或broadcast这个条件变量,通知线程B状态已改变,A发现条件变量被signaled了,就继续执行。就这样,当一个线程改变共享数据状态后,可以及时通知那些等待于该状态的线程。图示下:

条件变量

中间的矩形代表条件变量,当线程线位于矩形内,表示线程等待该条件变量。位于中心线下下方,则表示signal了该条件变量。

开始线程1 signal 了条件变量,由于没有其他线程等待于该条件变量,所以没什么效果。然后,线程1和线程2先后等待该条件变量,过了一会,线程3 signal了条件变量,线程3的信号解除了线程1的阻塞。然后,线程3等待该条件变量。最后线程1 broadcast了该条件变量,同时解除了等待于条件变量的线程1和线程2。

条件变量的创建和销毁

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t* condattr);
int pthread_cond_destroy(pthread_cond_t* cond);

和互斥量一样,可以动态创建和静态创建。

静态创建:条件变量声明为extern或static变量时。

例程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

#include <pthread.h>
#include "error.h"

typedef struct my_struct_tag
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int value;
} my_struct_t;

my_struct_t data = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0};

int main()
{
return 0;
}

动态创建:一般情况下,条件变量要和它的判定条件定义在一起,此时若包含该条件变量的数据动态创建了,则条件变量也需要动态创建,不过记得不用时用pthread_cond_destroy销毁。

例程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

#include <pthread.h>
#include "error.h"

typedef struct my_struct_tag
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int value;
} my_struct_t;

int main()
{
my_struct_t* data;
data = (my_struct_t*)malloc(sizeof(my_struct_t));
if(data == NULL)
ERROR_ABORT(errno,"Allocate structure");

int status;
status = pthread_mutex_init(&data->mutex, NULL);
if(status != 0)
ERROR_ABORT(status, "Initial mutex");
status = pthread_cond_init(&data->cond, NULL);
if(status != 0)
ERROR_ABORT(status, "Initial condition");

/* .... */

status = pthread_cond_destroy(&data->cond);
if(status != 0)
ERROR_ABORT(status, "Destroy cond");
status = pthread_mutex_destroy(&data->mutex);
if(status != 0)
ERROR_ABORT(status, "Destroy mutex");

free(data);

return 0;
}

等待条件变量

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, struct timespec* expiration);

条件变量与互斥量一起使用,调用pthread_cond_wait或pthread_cond_timedwait时,记得在前面锁定mutex,尽可能多的判断判定条件。上面提到的两个等待条件变量的函数,显示解锁mutex,然后阻塞线程等待状态改变,等待的条件变量signaled后,锁定mutex,返回。记着,这两个函数返回时,mutex一定是锁定的。

多个条件变量可以共享一个互斥变量,相反则不成立。

例程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

#include <pthread.h>
#include <time.h>
#include "error.h"
#include <errno.h>

typedef struct my_struct_tag
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int value;
} my_struct_t;

my_struct_t data = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0};

int hibernation = 1;

void* wait_thread(void* arg)
{
int status;
sleep(hibernation);

status = pthread_mutex_lock(&data.mutex);
if(status != 0)
ERROR_ABORT(status, "Lock mutex");

data.value = 1;
status = pthread_cond_signal(&data.cond);
if(status != 0)
ERROR_ABORT(status, "Singal cond");

status = pthread_mutex_unlock(&data.mutex);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");

return NULL;
}

int main(int argc, char* argv[])
{
pthread_t tid;
int status;
struct timespec timeout;

if(argc > 1)
hibernation = atoi(argv[1]);

status = pthread_create(&tid, NULL, wait_thread, NULL);
if(status != 0)
ERROR_ABORT(status, "Create wait thread");

timeout.tv_sec = time(NULL) + 2;
timeout.tv_nsec = 0;

status = pthread_mutex_lock(&data.mutex);
if(status != 0)
ERROR_ABORT(status, "Lock mutex");

while(data.value == 0)
{
status = pthread_cond_timedwait(&data.cond, &data.mutex, &timeout);
if(status == ETIMEDOUT)
{
printf("Condition wait timed out./n");
break;
}else
if(status != 0)
ERROR_ABORT(status, "timewait");
}

if(data.value != 0)
printf("Condition wa signaled!/n");

status = pthread_mutex_unlock(&data.mutex);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
}

唤醒等待条件变量的线程

int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);

一但有线程由于某些判定条件(predicate)没满足,等待条件变量。我们就有必要当条件满足时,发送信号去唤醒这些线程。

注意:broadcast通常很容易被认为是signal的通用版,其实不能这样理解,准确一点应该说,signal是broadcast的优化版。具体区别不大,但signal效率较broadcast高些。但你不确信有几个线程等待条件变量时用broadcast(When in doubt, broadcast!)。

例程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177

#include "error.h"
#include <pthread.h>
#include <time.h>
#include <string.h>
#include <errno.h>

typedef struct alarm_tag
{
struct alarm_tag* link;
int seconds;
time_t time;
char message[64];
} alarm_t;

pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t alarm_cond = PTHREAD_COND_INITIALIZER;
alarm_t* alarm_list = NULL;
time_t current_alarm = 0;

/**
* alarm_mutex need to be locked
*/
void alarm_insert(alarm_t* alarm)
{
int status;

alarm_t* next;
alarm_t** last;
last = &alarm_list;
next = *last;

while(next != NULL)
{
if(next->time >= alarm->time)
{
alarm->link = next;
*last = alarm;
break;
}

last = &next->link;
next = next->link;
}

if(next == NULL){
*last = alarm;
alarm->link = NULL;
}

/*for test: output the list*/
printf("[list: ");
for(next = alarm_list; next != NULL; next = next->link)
{
printf("%d(%d)[/"%s/"] ", next->time, next->time-time(NULL), next->message);
}
printf("]/n");

if(current_alarm ==0 || alarm->time < current_alarm)
{
current_alarm = alarm->time;
status = pthread_cond_signal(&alarm_cond);
if(status != 0)
ERROR_ABORT(status,"Signal cond");
}

}

void* alarm_thread(void* arg)
{
alarm_t* alarm;
int sleep_time;
time_t now;
int status, expired;
struct timespec cond_time;

while(1)
{
status = pthread_mutex_lock(&alarm_mutex);
if(status != 0)
ERROR_ABORT(status, "lock");

current_alarm = 0;

while(alarm_list == NULL)
{
status = pthread_cond_wait(&alarm_cond, &alarm_mutex);
if(status != 0 )
ERROR_ABORT(status, "Wait cond");
}

alarm = alarm_list;
alarm_list = alarm->link;
now = time(NULL);
expired = 0;

if(alarm->time > now)
{
printf("[wating: %d(%d)/"%s/"]/n", alarm->time, alarm->time - time(NULL), alarm->message);

cond_time.tv_sec = alarm->time;
cond_time.tv_nsec = 0;
current_alarm = alarm->time;
while(current_alarm == alarm->time)
{
status = pthread_cond_timedwait(&alarm_cond, &alarm_mutex,&cond_time);
if(status == ETIMEDOUT)
{
expired = 1;
break;
}
}

if(!expired)
alarm_insert(alarm);
}else
expired = 1;

if(expired)
{
printf("(%d) %s/n", alarm->seconds, alarm->message);
free(alarm);
}

status = pthread_mutex_unlock(&alarm_mutex);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
}

return 0;
}

int main()
{
pthread_t pid;
int status;
char line[128];

status = pthread_create(&pid, NULL, alarm_thread, NULL);
if(status != 0)
ERROR_ABORT(status, "pthread_create");

while(1)
{
fprintf(stdout, "Alarm>");
fgets(line, sizeof(line), stdin);
if(strlen(line) <= 0)
continue;

alarm_t* alarm = (alarm_t*)malloc(sizeof(alarm_t));
if(alarm == NULL)
ERROR_ABORT(errno,"memory can't allocated!");

if(sscanf(line, "%d %s", &alarm->seconds, alarm->message) != 2)
{
printf("Bad Command/n");
free(alarm);
continue;
}

status = pthread_mutex_lock(&alarm_mutex);
if(status != 0)
ERROR_ABORT(status, "pthread mutex locking..");

alarm->time = time(NULL) + alarm->seconds;

/* insert into list*/

alarm_insert(alarm);

status = pthread_mutex_unlock(&alarm_mutex);
if(status != 0)
ERROR_ABORT(status, "pthread mutex unlocking...");
}

return 0;
}
David++