explorer

万丈高楼平地起,勿在浮沙筑高台

0%

[What]Lock-based Concurrent Data Structures

如何在使用锁保证原子性的同时,尽量提高系统性能,而不是让其他的进程/线程被睡眠时间过长。

并发计数器

简单的计数

最简单的数据结构就是一个计数值(一个整数),对于其操作可以封装为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef struct __counter_t {
int value;
} counter_t;

void init(counter_t *c) {
c->value = 0;
}

void increment(counter_t *c) {
c->value++;
}

void decrement(counter_t *c) {
c->value--;
}

int get(counter_t *c) {
return c->value;
}

加上锁以保证原子性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef struct __counter_t {
int value;
pthread_mutex_t lock;
} counter_t;

void init(counter_t *c) {
c->value = 0;
Pthread_mutex_init(&c->lock, NULL);
}

void increment(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value++;
Pthread_mutex_unlock(&c->lock);
}

void decrement(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value--;
Pthread_mutex_unlock(&c->lock);
}

int get(counter_t *c) {
Pthread_mutex_lock(&c->lock);
int rc = c->value;
Pthread_mutex_unlock(&c->lock);
return rc;
}

这种方式实现起来简单,但当有多个线程都在多次执行此操作时,相互的等待会导致处理时间被拉长:

simple_count.jpg

上图 Precise 就代表的是简单计数方式随着 CPU 数的增加而消耗的时间。

灵活的计数

灵活计数方法是:为每个 CPU 创建其本地的计数值(此计数值也要加锁)。每隔一段时间, 将本地计数值附加到全局的计数值上(修改全局计数值时也要加锁),然后本地计数值设置 为 0。这样对于各 CPU 来 说,除了更新全局计数值时可能会有相互等待的情况,在更新本地计数值时是可以做到并行 计算的。

scale_count.jpg

如上图所示,一共有 4 个 CPU 对应 4 个线程,每个线程在更新本地的计数值。每隔 5 个 单位时间就将本地的计数值附加给全局计数值,然后自身清零。

上上图中的 Approximate 就代表灵活计数方式所消耗的时间,核心数的增加并不会显著 增加总消耗时间。

而更新周期会显著的影响处理时间:

scale_count_period.jpg

如果更新周期越短,那么相互等待越频繁,全局计数器更新就越及时。 如果更新周期越长,相互等待的次数就越少,但全局计数器的更新延迟就越大。

灵活计数方式示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
typedef struct __counter_t {
int global; // global count
pthread_mutex_t glock; // global lock
int local[NUMCPUS]; // per-CPU count
pthread_mutex_t llock[NUMCPUS]; // ... and locks
int threshold; // update frequency
} counter_t;

// init: record threshold, init locks, init values
// of all local counts and global count
void init(counter_t *c, int threshold) {
c->threshold = threshold;
c->global = 0;
pthread_mutex_init(&c->glock, NULL);
int i;
for (i = 0; i < NUMCPUS; i++) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}

// update: usually, just grab local lock and update
// local amount; once local count has risen ’threshold’,
// grab global lock and transfer local values to it
void update(counter_t *c, int threadID, int amt) {
int cpu = threadID % NUMCPUS;
pthread_mutex_lock(&c->llock[cpu]);
c->local[cpu] += amt;
if (c->local[cpu] >= c->threshold) {
// transfer to global (assumes amt>0)
pthread_mutex_lock(&c->glock);
c->global += c->local[cpu];
pthread_mutex_unlock(&c->glock);
c->local[cpu] = 0;
}
pthread_mutex_unlock(&c->llock[cpu]);
}

// get: just return global amount (approximate)
int get(counter_t *c) {
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val; // only approximate!
}

并发链表

简易互斥链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// basic node structure
typedef struct __node_t {
int key;
struct __node_t *next;
} node_t;

// basic list structure (one used per list)
typedef struct __list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;

void List_Init(list_t *L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}

int List_Insert(list_t *L, int key) {
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return -1; // fail
}
new->key = key;

pthread_mutex_lock(&L->lock);
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0; // success
}

int List_Lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv; // failure
}

灵活互斥链表

为了尽量提高性能,可以为每个节点增加一个锁:

  • 当进行遍历时,先获取下一个节点的锁,然后再释放当前节点的锁

并发队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
typedef struct __node_t {
int value;
struct __node_t *next;
} node_t;

typedef struct __queue_t {
node_t *head;
node_t *tail;
pthread_mutex_t head_lock, tail_lock;
}

void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp;
pthread_mutex_init(&q->head_lock, NULL);
pthread_mutex_init(&q->tail_lock, NULL);
}

void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;

pthread_mutex_lock(&q->tail_lock);
q->tail->next = tmp;
q->tail = tmp;
pthread_mutex_unlock(&q->tail_lock);
}

int Queue_Dequeue(queue_t *q, int *value) {
pthread_mutex_lock(&q->head_lock);
node_t *tmp = q->head;
node_t *new_head = tmp->next;
if (new_head == NULL) {
pthread_mutex_unlock(&q->head_lock);
return -1; // queue was empty
}
*value = new_head->value;
q->head = new_head;
pthread_mutex_unlock(&q->head_lock);
free(tmp);
return 0;
}

此队列的首位都添加了互斥锁,是为了能够并发的操作头尾。

  • 为了达到这种效果,在初始化队列时,就需要新建一个空闲的节点。

并发哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define BUCKETS (101)

typedef struct __hash_t {
list_t lists[BUCKETS];
} hash_t;

void Hash_Init(hash_t *H) {
int i;
for (i = 0; i < BUCKETS; i++)
List_Init(&H->lists[i]);
}

int Hash_Insert(hash_t *H, int key) {
return List_Insert(&H->lists[key % BUCKETS], key);
}

int Hash_Lookup(hash_t *H, int key) {
return List_Lookup(&H->lists[key % BUCKETS], key);
}

上面这个哈希表基于前面的并发链表而建立,由于每个链表都有自己的锁,所以该哈希表的 性能比较优秀。

Last Updated 2020-06-28 Sun 11:59.
Render by hexo-renderer-org with Emacs 26.3 (Org mode 9.3.7)