深入Phtread(二):线程的同步-Mutex

并行的世界,没有同步,就失去了秩序,就会乱作一团!试想,交通没有红绿灯,生产线产品装配没有一定的顺序… 结果是显而易见的。多个线程也需要同步,否则程序运行起来结果不可预测,这是我们最不能容忍的。交通的同步机制就是红绿灯,Pthread提供了互斥量(mutex)和条件变量(Condition Variables)两种机制去同步线程。

  1. 不变量,临界区和判定条件
  2. 互斥量(Mutex)
  3. 创建和销毁互斥量
  4. 锁定和解锁
  5. 调整mutex大小
  6. 使用多个mutex
  7. 锁定链

1. 不变量,临界区和判定条件

不变量(Invariant):程序所做的一些假设,特别是指变量之间的关系。如:一个queue,有头节点,和其它数据节点,这些元素之间的连接关系就是不变量。当程序里面不变量遭受破坏时,后果往往是很严重的,轻则数据出错,重则程序直接崩溃。

临界区(Critical Section):处理共享数据的一段代码。

判定条件(Predicates):描述不变量状态的逻辑表达式。

2. 互斥量(Mutex)

一般,多个线程之间都会共享一些数据,当多个线程同时访问操作这些共享数据时。问题出来了,一个线程正在修改数据时,另外一个可能也去操作这些数据,结果就会变得不一致了。如(gv=0是共享的数据):

线程A:a = gv; gv = a + 10; 
线程B: b = gv; gv = a + 100;

可能发生A执行完a=gv(0)时,B开始执行b=gv(0); gv=a+100,此时gv=100,然后a执行gv=a+10,最后gv=10。并不是我们要的结果,我们的想法是两个线程并发的给gv加上一个值,期望结果110。^_^ 若这是你银行卡的余额,若没有同步,那就惨了(你往卡里打钱,你有个朋友也同时往你卡里汇钱,很有可能余额只仅加上一方打的)。

互斥量就是为了解决这种问题而设计的,它是Dijkstra信号量的一种特殊形式。它使得线程可以互斥地访问共享数据。如:

互斥量

上图展示了三个线程共享一个互斥量,位于矩形中心线下方的线程锁定了该互斥量;位于中心线上方且在矩形范围内的线程等待该互斥量被解锁,出于阻塞状态,在矩形外面的线程正常运行。刚开始,mutex是解锁的,线程1成功将其锁定,据为己有,因为并没有其它线程拥有它。然后,线程2尝试去锁定,发现被线程1占用,所以阻塞于此,等到线程1解锁了该mutex,线程2立马将mutex锁定。过了会,线程3尝试去锁定mutex,由于mutex被锁定,所以阻塞于此。线程1调用pthread_mutex_trylock尝试去锁定个mutex,发现该mutex被锁定,自己返回继续执行,并没有阻塞。继续线程2解锁,线程3锁定成功,最后线程3完成任务解锁mutex。

3. 创建和销毁互斥量

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t* mutex, pthread_mutexattr_t* attr);
int pthread_mutex_destroy(pthread_mutex_t* mutex);

不要尝试去使用复制的的mutex,结果未定义。

静态创建,当mutex以extern或者static存储时,可以用PTHREAD_MUTEX_INITIALIZER初始化,此时该mutex使用默认属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

#include "error.h"
#include <pthread.h>

typedef struct my_struct_tag
{
pthread_mutex_t mutex;
int value;
} my_struct_t;

my_struct_t data = { PTHREAD_MUTEX_INITIALIZER, 0};

int main()
{
return 0;
}

动态创建,往往使用mutex时,都会将它和共享数据绑在一起,此时就需要pthread_mutex_init去动态初始化了,记得用完后pthread_mutex_destroy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

#include "error.h"
#include <pthread.h>

typedef struct my_struct_tag
{
pthread_mutex_t mutex;
int value;
} my_struct_t;

int main()
{
my_struct_t* data;
int status;

data = (my_struct_t*)malloc(sizeof(my_struct_t));
status = pthread_mutex_init(&data->mutex, NULL);
if(status != 0)
ERROR_ABORT(status, "pthread_mutex_init");

pthread_mutex_destroy(&data->mutex);
free(data);

return 0;
}

4. 锁定和解锁

原则见上面。

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

#include <pthread.h>
#include <sys/types.h>
#include "error.h"
#include <errno.h>

#define SPIN 10000000

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
long counter;
time_t end_time;

void* counter_thread(void* arg)
{
int status;
int spin;

while(time(NULL) < end_time)
{
status = pthread_mutex_lock(&mutex);
if(status != 0)
ERROR_ABORT(status, "Lock mutex");

for(spin = 0; spin < SPIN; spin++)
counter++;

status = pthread_mutex_unlock(&mutex);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
sleep(1);
}

printf("Coutner is %#lx/n", counter);

return NULL;
}

void* monitor_thread(void* arg)
{
int status;
int misses = 0;

while(time(NULL) < end_time)
{
sleep(3);

status = pthread_mutex_trylock(&mutex);
if(status != EBUSY)
{
if(status != 0)
ERROR_ABORT(status, "Trylock mutex");

printf("Counter is %ld/n", counter/SPIN);
status = pthread_mutex_unlock(&mutex);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
}else
misses++;
}
printf("Monitro thread missed update %d times./n", misses);
return NULL;
}

int main()
{
int status;
pthread_t pid_counter;
pthread_t pid_monitor;

end_time = time(NULL) + 60;

status = pthread_create(&pid_counter, NULL, counter_thread, NULL);
if(status != 0)
ERROR_ABORT(status, "fail to create thread counter");

status = pthread_create(&pid_monitor, NULL, monitor_thread, NULL);
if(status != 0)
ERROR_ABORT(status, "fail to create monitor thread");

status = pthread_join(pid_counter, NULL);
if(status != 0 )
ERROR_ABORT(status, "fail to join counter thread");

status = pthread_join(pid_monitor, NULL);
if(status != 0)
ERROR_ABORT(status, "fail to join monitor thread");

return 0;
}

5. 调整mutex大小

mutex应该多大?这里的大小是相对的,如mutex锁定到解锁之间的代码只有一行,比起有10行的就小了。 原则是:尽可能大,但不要太大(As big as neccessary, but no bigger)。考虑下面的因素:

1> mutex并不是免费的,是有开销的,不要太小了,太小了程序只忙于锁定和解锁了。

2> mutex锁定的区域是线性执行的,若太大了,没有发挥出并发的优越性。

3> 自己掂量1和2,根据实际情况定,或者尝试着去做。

6. 使用多个mutex

使用多个mutex一定要注意,防止死锁(deadlock)发生。下面是一个典型死锁:

线程A:pthread_mutex_lock(&mutex_a); pthread_mutex_lock(&mutex_b); ...
线程B:pthread_mutex_lock(&mutex_b); pthread_mutex_lock(&mutex_a); ...

存在这种可能,线程A执行了第一句,锁定了mutex_a;然后线程开始执行第一句锁定mutex_b;然后他们互相等待解锁mutex,A等mutex_b被解锁,B等mutex_a被解锁,不肯让步,出于死锁状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

#include <pthread.h>
#include "error.h"
#include <time.h>

pthread_mutex_t mutex_a = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_b = PTHREAD_MUTEX_INITIALIZER;

void* thread1(void* arg)
{
while(1)
{
/*sleep(1);*/
pthread_mutex_lock(&mutex_a);
pthread_mutex_lock(&mutex_b);

printf("[%lu]thread 1 is running! /n", time(NULL));

pthread_mutex_unlock(&mutex_b);
pthread_mutex_unlock(&mutex_a);
}
return NULL;
}

void* thread2(void* arg)
{
while(1)
{
/*sleep(1);*/

pthread_mutex_lock(&mutex_b);
pthread_mutex_lock(&mutex_a);

printf("[%lu]thread 2 is running! /n",time(NULL));

pthread_mutex_unlock(&mutex_a);
pthread_mutex_unlock(&mutex_b);

}
return NULL;
}

int main()
{
pthread_t tid1, tid2;
int status;

status = pthread_create(&tid1, NULL, thread1, NULL);
if(status != 0)
ERROR_ABORT(status, "thread 1");

status = pthread_create(&tid2, NULL, thread2, NULL);
if(status !=0)
ERROR_ABORT(status, "thread 2");

status = pthread_join(tid1, NULL);
if(status != 0)
ERROR_ABORT(status, "join thread1");

status = pthread_join(tid2, NULL);
if(status != 0)
ERROR_ABORT(status, "join thread2");
}

解决死锁的方法

a. 固定锁定顺序(Fixed locking hierarchy):锁定mutex的顺序固定。

线程A:pthread_mutex_lock(&mutex_a); pthread_mutex_lock(&mutex_b); ...
线程B:pthread_mutex_lock(&mutex_a); pthread_mutex_lock(&mutex_b); ...

b. 尝试和回退(Try and back off): 锁定第一个后,尝试锁定下一个,若锁定成功,继续尝试下一个,若锁定失败,解锁先去锁定的。

解锁顺序不会引起死锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

#include <pthread.h>
#include "error.h"
#include <errno.h>

#define ITERATIONS 100


pthread_mutex_t mutex[3] = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER
};

int backoff = 1;
int yield_flag = 0;

void* lock_forward(void* arg)
{
int i, iterate, backoffs;
int status;

for(iterate = 0; iterate < ITERATIONS; iterate++)
{
backoffs = 0;
for(i = 0; i < 3; i++){
if(i == 0)
{
status = pthread_mutex_lock(&mutex[i]);
if(status != 0)
ERROR_ABORT(status,"Lock mutex");
}else
{
if(backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);

if(status == EBUSY)
{
backoff++;
printf("forward locker backing off at %d./n", i);
for(; i >= 0; i--)
{
status = pthread_mutex_unlock(&mutex[i]);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
}
}else
{
if(status != 0)
ERROR_ABORT(status, "Lock mutex");

printf("forward locker got %d /n", i);
}
}

if(yield_flag){
if(yield_flag > 0)
sched_yield();
else
sleep(1);
}
}

printf("lock forward got all locks , %d backoffs/n", backoffs);

pthread_mutex_unlock(&mutex[2]);
pthread_mutex_unlock(&mutex[1]);
pthread_mutex_unlock(&mutex[0]);
sched_yield();
}

return NULL;
}

void* lock_backward(void* arg)
{
int i, iterate, backoffs;
int status;

for(iterate = 0; iterate < ITERATIONS; iterate++)
{
backoffs = 0;
for(i = 2; i >= 0; i--){
if(i == 2)
{
status = pthread_mutex_lock(&mutex[i]);
if(status != 0)
ERROR_ABORT(status,"Lock mutex");
}else
{
if(backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);

if(status == EBUSY)
{
backoff++;
printf("backward locker backing off at %d./n", i);
for(; i < 3; i++)
{
status = pthread_mutex_unlock(&mutex[i]);
if(status != 0)
ERROR_ABORT(status, "Unlock mutex");
}
}else
{
if(status != 0)
ERROR_ABORT(status, "Lock mutex");

printf("backward locker got %d /n", i);
}
}

if(yield_flag){
if(yield_flag > 0)
sched_yield();
else
sleep(1);
}
}

printf("lock backward got all locks , %d backoffs/n", backoffs);

pthread_mutex_unlock(&mutex[0]);
pthread_mutex_unlock(&mutex[1]);
pthread_mutex_unlock(&mutex[2]);
sched_yield();
}


return NULL;
}

int main(int argc, char* argv[])
{
pthread_t forward, backward;
int status;

if(argc > 1)
backoff = atoi(argv[1]);

if(argc > 2)
yield_flag = atoi(argv[2]);

status = pthread_create(&forward, NULL, lock_forward, NULL);
if(status != 0)
ERROR_ABORT(status, "Create forward");

status = pthread_create(&backward, NULL, lock_backward, NULL);
if(status != 0)
ERROR_ABORT(status, "Create backward");

pthread_exit(NULL);
}

7. 锁定链

一般用于遍历数据结果(树,链表),一个用于锁定指针,一个锁定数据。

形如:

pthread_mutex_lock(&mutex_a); 
pthread_mutex_lock(&mutex_b); 
...
pthread_mutex_unlock(&mutex_a)
...
pthread_mutex_unlock(&mutex_b)

注意,锁定链往往会出现大量的锁定和解锁操作,有时会得不偿失。

David++