explorer

万丈高楼平地起,勿在浮沙筑高台

0%

C++ concurrency:设计可以并发操作的代码

比起设计可并发访问的数据结构,现在需要站在更高的层次来看待并发。

分离为多个线程的方法

这里涉及到的一些需要考虑的方面: 1. 需要设计多少个线程? 2. 是设计通用线程可以处理常用任务,还是设计专用线程处理特定的任务? 3. 新增加的线程会提高整体的并发度吗,与代码的简洁性相比是否划算?

将数据分离到多个线程来处理

当需要使用简单的算法将一大堆数据进行处理时,就可以考虑将该数据分为多个组进行并行处理。

每个线程的处理算法是一样的(也就是说它们是通用线程),只是它们获取到的数据索引是不同的,最终的结果是由主线程来完成合并。

比较好的例子就是对一大堆数据进行求和时,分为多组进行求和,最后又来合并求和结果,示例代码在此

应用这种方法必须需要充分利用 CPU 的硬件核心来完成并行处理。

如果只有一个 CPU 核心,那么这种分离数据到多线程的方法还比不上单线程。因为比起单线程,多线程还多了线程切换的开销。

递归的分解数据

前一种方法由于数据分解规律是已知的,所以可以一次性创建很多线程。

但有些时候数据是在运行到一定阶段才能获得的,这就需要边获取数据边创建线程。

比如之前使用并发方式来处理快速排序

与上面处理方式不同的是,该代码在创建线程时使用了std::async,并没有明确指定其一定要异步运行。这就让标准库来根据当前的环境来决定是否需要真正创建一个线程来运行。这种方式就有了更大的灵活性。

根据任务的类型来分离线程

前面两种是站在数据的角度分离了多个相同任务的线程,现在需要站在功能的角度来分离多个不同任务的线程,每个线程都专注于独立做一件事。

数据分离的相同任务多线程: 1. 由于处理的数据地址都不同,所以不需要互斥操作。 2. 主线程则是等待任务处理完后再来合并结果,所以各个子线程间也不需要同步操作。 3. 并行处理数据需要硬件并行度来加速处理

而以功能来划分的不同任务线程: 1. 由于多个线程可能会处理到相同的数据,则需要互斥操作。 2. 多个线程的处理会有流水线形式的协同方式,所以会有同步机制来完成同步。 3. 即使在单核的情况下,多任务线程也是适用的。因为它将一个复杂的任务分解为了多个子任务,以一定的优先级进行更为科学的处理。所以能提高系统的吞吐性能。

单一职责

以前用合作式调度器来写单片机程序时,整个大循环就是一个单线程,该单线程轮询的调度每个任务。

这些任务有 IO 密集型的(比如获取用户按键,等待网络数据),也有 CPU 密集型的(比如获取到数据后进行一段运算)。

这些任务需要主动的让出 CPU 资源,以让其他任务能够及时的运行。但是当任务太多,或某个任务执行时间过长时,就会有以下问题: 1. 系统的响应速度不够:因为 IO 密集型任务不能被及时的调用,导致用户感觉到卡顿。 2. 系统的吞吐量上不去:因为 CPU 密集型任务总是要主动让出资源来让其他任务可以调度,而很多时候 CPU 是在空转。

这种单线程轮询下,只有当任务较少且相对轻量时可以使用,否则就是两头不讨好。

而使用多线程的情况下: - IO 密集型任务设定为高优先级,一旦有数据输入该线程便能获取到,这就能提高响应速度。 - CPU 密集型任务设定为低优先级,只要 IO 密集型线程没有运行,该线程就从 IO 密集型数据队列中取出数据处理。

这样子,通过多线程就可以得到较高的响应速度和吞吐量。

但是,多线程就需要考虑好互斥和同步的问题。

如果线程间需要共享的数据太多或者是有互相等待的情况,则需要警惕这种线程分割方法是否合理。

流水线

如果数据流向是有顺序的,那么就可以将对该数据处理的任务分解为多个阶段来处理。

假设现在有 20 组数据,并且有 4 个核心来处理这 20 组数据,每组数据需要分为 4 个步骤来完成,且每步需要 3 秒来完成。

那么有两种方式来处理: 1. 将数据分离到 4 个通用线程来处理 - 每个核心都依次处理一组数据,那么每个核心将要处理 5 组数据。 - 由于每个核心需要依次 4 步处理来完成一组数据,那么最终是每隔 12 秒可以生成 4 个最终数据 - 所有的数据处理完需要 60 秒 2. 将 4 个线程以流水线的形式排列,每个线程处理数据单独的一步 - 第一组数据首先进入线程 1 ,处理第一步,然后再进入线程 2 ,处理第二步,这样第一组数据需要等 12 秒才可以生成最终结果 - 在第一组数据进入到线程 2 时,第二组数据就可以进入到线程 1,处理第一步了。那么就是说接下来的数据是每隔 3 秒就可以生成一个最终数据 - 所有的数据处理完需要 12 + 19 * 3 = 69 秒

虽然流水线方式比通用线程方式多花了 9 秒来完成所有数据,但是流水线可以以更高的频率来输出最终结果,这在有些场合是必须的。

比如在播放音频时,有个重要的指标叫做音频延迟: - 以通用线程的方式每次可以获取到一大块音频数据,但是大块数据之间是有延迟的,这就会造成了音频延迟(这个延迟是相当于一开始就要收集到一大块数据后,才开始处理而造成的延迟) - 而以流水线的方式来处理音频数据,可以以更高频率每次产生小块数据,音频延迟就会很小,用户体验就好

这就相当于以更小的颗粒度来完成数据处理。

影响多线程性能的因素

虽然说用到的多线程,但并不代表其就一定高效。这需要理解有哪些因素会影响到多线程的性能。

硬件核心的数量

线程的数量参考硬件核心的数量: - 如果线程的数量小于硬件核心数,则没有充分的利用 CPU 资源 - 如果线程的数量太过大于硬件核心数,则会多出很多上下文切换而降低 CPU 做有用功的效率(这称之为over-subscription

使用std::thread::hardware_concurrency()可以获取硬件的核心数量,在一个应用程序使用该函数应该要避免其他线程同时也在使用,不然大家都创建相当多的线程数就会极大影响系统效率。

std::async则是由标准库来根据当前核心数来判断是否应该创建新线程,这是一种比较优雅的做法。

数据竞争和 cache 乒乓

当两个线程都分别运行在两个核心上时,如果它们仅仅是读同一份数据,那不会有任何影响。

但如果其中一个线程修改了数据,那么另一个核心的 cache 就需要被同步,这个过程就需要一些时间。在同步完成之前,另一个线程只有等待。

比如下面这个对原子整数的修改函数:

1
2
3
4
5
6
std::atomic<unsigned long> counter(0);
void processing_loop() {
while (counter.fetch_add(1, std::memory_order_relaxed) < 100000000) {
do_something();
}
}
它对原子整数执行读-修改-写的过程,这里面就包括: 1. 从其他核心同步cache 2. 修改cache

如果在多个核心的多个线程都在调用此函数,那么就会造成各自 cache 的频繁同步(cache 乒乓),相当于当前线程之间在相互等待从而造成性能下降。

这种操作就相当于在一个循环中,多个线程都在争抢互斥量一样:

1
2
3
4
5
6
7
8
std::mutex m;
my_data data;
void processing_loop_with_mutex() {
while(true) {
std::lock_guard<std::mutex> lk(m);
if (done_processing(data)) break;
}
}
但是这二者是不同的: - 原子变量的互斥是在处理器级别的互斥,当需要 cache 同步时,当前核心上的所有线程只能等待,无法上下文切换 - 互斥量的互斥是在操作系统层面的互斥,当一个线程阻塞等待时,操作系统会切换到其他线程运行

上面两个情况都有一个简单的解法:使用在循环外使用互斥量来原子化整个循环,虽然临界区变大了,但可以避免频繁的切换。

1
2
3
4
5
6
7
8
std::mutex m;
my_data data;
void processing_loop_with_mutex() {
std::lock_guard<std::mutex> lk(m);
while(true) {
if (done_processing(data)) break;
}
}
1
2
3
4
5
6
7
8
unsigned long counter(0);
std::mutex m;
void processing_loop() {
std::lock_guard<std::mutex> lk(m);
while (++counter < 100000000) {
do_something();
}
}