explorer

万丈高楼平地起,勿在浮沙筑高台

0%

[What]Concurrency: An Introduction

同一个进程的多个线程共享一个 address space,相比较单进程下的单线程主要具有以下两点不同:

  • 在进行切换时,进程切换需要切换页表而线程的切换则不需要,这样切换效率会高很多
  • 单线程的 address space 和 多线程的 adress space 具有一些不同:
    • 多线程下每个线程都拥有自己的栈且它们共享堆
single_more.jpg

以上各个线程有自己独立的栈,最直观的感受是通过 gdb 来切换到各个 frame 下看栈。

为什么需要线程

使用线程有以下几个好处:

  • 可以在逻辑上并行的处理任务,当有多核支持时,代码的运行效率就会更高。
  • 将 IO 密集型任务与 CPU 密集型任务分离,提高 CPU 的利用率。
  • 由于多线程共享内存空间,所以线程间通信比较简单。
  • 多线程切换效率较高。

一般需要在逻辑上区分不同任务时,才以进程作为单位分别执行。

线程的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include "common.h"
#include "common_threads.h"
void *mythread(void *arg) {
printf("%s\n", (char *) arg);
return NULL;
}

int
main(int argc, char *argv[]) {

pthread_t p1, p2;
int rc;
printf("main: begin\n");
Pthread_create(&p1, NULL, mythread, "A");
Pthread_create(&p2, NULL, mythread, "B");
// join waits for the threads to finish
Pthread_join(p1, NULL);
Pthread_join(p2, NULL);
printf("main: end\n");
return 0;
}

如上面这种简单的创建两个线程的代码,在实际执行的顺序有可能是以下几种:

thread_create_1.jpg thread_create_2.jpg

可以看出来,当这些相同优先级的线程没有相互的同步或互斥机制时,在某个时刻到底是谁在执行是无法提前预知的,并发编程是门学问……

多线程的临界区问题

临界区问题的本质在于线程的调度是无法预知的,一个线程在临界区内访问共享资源有可能被另一个线程打断,这造成的共享资源的破坏,所以临界区需要互斥。

所以解决方法就是将临界区的操作原子化。

硬件支持的原子操作实现自旋锁

Test-And-Set

获取以前的值,并为之设置新值。其伪代码如下(此实现是硬件来保证其原子性的):

1
2
3
4
5
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store ’new’ into old_ptr
return old; // return the old value
}

使用此函数作为原子操作的自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct __lock_t {
int flag;
} lock_t;

void init(lock_t *lock) {
// 0: lock is available, 1: lock is held
lock->flag = 0;
}

void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}

void unlock(lock_t *lock) {
lock->flag = 0;
}

Compare-and-swap

比较旧值和期望值相等时,才被赋予新值,其伪代码如下:

1
2
3
4
5
6
int CompareAndSwap(int *ptr, int expected, int new) {
int original = *ptr;
if (original == expected)
*ptr = new;
return original;
}

使用它作为自旋锁:

1
2
3
4
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}

Load-linked And Store-conditional

当被检测的地址值被改变后,才为其赋予新值,其伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
int LoadLinked(int *ptr) {
return *ptr;
}

int StoreConditional(int *ptr, int value) {
if (no update to *ptr since LoadLinked to this address) {
*ptr = value;
return 1; // success!
} else {
return 0; // failed to update
}
}

使用它作为自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock->flag) == 1)
; // spin until it’s zero
if (StoreConditional(&lock->flag, 1) == 1)
return; // if set-it-to-1 was a success: all done
// otherwise: try it all over again
}
}

void unlock(lock_t *lock) {
lock->flag = 0;
}

Fetch-And-Add

返回旧值,并将原来内存值加 1,其伪代码如下:

1
2
3
4
5
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}

使用其作为自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;

void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}

void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}

void unlock(lock_t *lock) {
lock->turn = lock->turn + 1;
}

比较

以上基于硬件,可以用少量几行的代码就实现了自旋锁,但是自旋锁会让等待资源的线程白白的消耗 CPU 资源。 如果临界区代码工作很少,那这些等待还可以忍受,但是如果临界区工作量很大,那么这种浪费是无法忍受的。

上面的 4 种方法中,只有 Fetch-And-Add 方法,可以让等待自旋锁的线程按照先后顺序被唤醒(因为其计数器的累加), 而其他 3 种方法并不能保证依次唤醒。这就会有一种极端的情况:有可能最开始等待自旋锁的线程会被唤醒于其他线程之后。

避免自旋

线程主动的让出 CPU

当一个线程需要获取锁,而锁被其他线程占用时,它可以主动的让出 CPU:

1
2
3
4
5
6
7
8
9
10
11
12
void init() {
flag = 0;
}

void lock() {
while (TestAndSet(&flag, 1) == 1)
yield(); // give up the CPU
}

void unlock() {
flag = 0;
}

这样可以避免一个线程在自旋时浪费 CPU 的资源,但是依然会有以下问题:

  1. yield() 是由程序员来主动调用,这需要程序员具备良好的编码素养
  2. 当有很多个线程在获取一个锁时,其他的线程都会调用 yield() ,这回导致一些线程 会频繁的被唤醒,然后又调用 yield() 这也会在调度上浪费 CPU 的资源。

将线程挂在队列上

为了避免等待的线程被随机频繁的唤醒,我们需要将线程按照先后顺序睡眠后挂在一个队列 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}

void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}

void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
m->guard = 0;
}

上述代码中, guard 是为了让获取临界区锁和加入队列的操作原子化,避免在操作期间 被其他的线程所抢占。

Last Updated 2020-06-28 Sun 11:59.
Render by hexo-renderer-org with Emacs 26.3 (Org mode 9.3.7)