避免data race
的底层方法有 3 种: 1.
使用互斥机制,保证访问共享数据的原子性 2. 使用无锁编程 3.
使用事务机制(如同数据库一样,将多个操作做成日志形式,一旦日志被打断则重新开始)
而互斥机制是相对简单易用的方式。
使用互斥量保护临界区
基本使用
c++
提供了std::mutex
来表示一个互斥量,其对应的lock()
和unlock()
则分别代表获取和释放锁。
但如果直接这样使用,容易造成获取的锁忘了释放的情况(比如线程异常退出,还来不及释放锁)。那么下次再来获取锁就会造成死锁。
所以,c++
提供了std::lock_guard
,在构造函数中获取锁,在析构函数中释放锁。这种
RAII 操作大大的降低了程序员的心智负担。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <iostream> #include <mutex> #include <vector> #include <thread>
static int val = 0; static std::mutex val_mtx; static void Task(void) { std::lock_guard<std::mutex> mtx_guard(val_mtx);
for (int i = 0; i < 1000; ++i) { val += 1; } }
int main(void) { std::vector<std::thread> threads; for (int i = 0; i < 10; ++i) { threads.emplace_back(Task); } for (auto &v : threads) { v.join(); }
std::cout << "The value of val is " << val << "\n";
return 0; }
|
实际使用中,一般将临界区数据和互斥量放在类的私有权限中,通过成员函数的方式来进行互斥的操作。
结构化共享数据
结构化共享数据需要注意的是要避免将共享数据的指针或引用传递出去,因为这样会绕过互斥量而造成未定义行为,这就包括以下几种情况:
1. 将共享数据的指针或引用通过函数返回 2.
将共享数据的指针或引用赋值给全局指针 3.
将共享数据的指针或引用传递给其他用户可调用函数,而该函数参数是以指针或引用的方式获取该共享数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class some_data { int a; std::string b; public: void do_something(); }; class data_wrapper { private: some_data data; std::mutex m; public: template<typename Function> void process_data(Function func) { std::lock_guard<std::mutex> l(m); func(data); } }; some_data* unprotected;
void malicious_function(some_data& protected_data) { unprotected = &protected_data; } data_wrapper x; void foo() { x.process_data(malicious_function); unprotected->do_something(); }
|
注意接口调用所造成的内部竞态
以std::stack
为例,假设它的每个接口都保证是多线程的安全的。但是对其接口的应用却需要特别注意:
1 2 3 4 5 6 7 8 9 10
| std::stack<int> s; if (!s.empty()) { int const value = s.top(); s.pop(); do_something(value); }
|
要解决这个问题,最简单粗暴的方式就是使用互斥量将这整个步骤原子化。但如果
stack
元素所占内存太大时,进行元素拷贝的时间过长占用内存过大,有些类型将会抛出std::bad_alloc
异常。那么这种解决方案在元素占用内存大的情况下就不适用了。
比如在stack
中的元素是vector
,而vector
的内存是从堆中申请的。
在执行top()
操作时,会进行整个vector
拷贝,这意味着需要申请堆内存。
如果vector
很大,则可能由于内存不够或被限制而抛出异常。
所以标准库将top()
和pop()
拆分成两个动作,如果top()
失败,则stack
内的内容仍然没有被改变。
现在使用互斥量将它们捆绑在一起,如果由于内存过大而失败,又继续执行后面的pop()
,则会导致stack
内容被删除了一个元素,而用户又没有获取到该元素的内容。
为了避免大内存抛异常的情况,有以下几个解决方案:
形参传递使用引用
在调用函数前,首先用户申请内存。如果申请失败,也不会导致pop
的问题:
1 2
| std::vector<int> result; some_stack.pop(result);
|
但是这有一定的局限性:有些用户数据是不太容易构建对象的或者是构建成本比较高
使用不会抛出异常的元素
有些类型进行拷贝或移动时并不会抛出异常,那么就可以限制stack
只使用这些类型。
返回元素的地址
另外一个避免拷贝的方式就是返回元素的地址,然后使用std::shared_ptr
来管理元素的内存。
最终版本
上面解决方案的 1 和 3
是比较合理的,最终对std::stack
的封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| #include <exception> #include <stack> #include <mutex> #include <memory> #include <iostream>
struct empty_stack: std::exception { const char* what() const throw() { return "empty stack"; } };
template<typename T> class threadsafe_stack { private: std::stack<T> data; mutable std::mutex m; public: threadsafe_stack(){} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; } threadsafe_stack& operator= (const threadsafe_stack&) = delete;
void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); std::shared_ptr<T> const res(std::make_shared<T>(data.top())); data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); value = data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
int main() { threadsafe_stack<int> si; si.push(5); if (!si.empty()) { int x; si.pop(x);
std::cout << "The value of x is " << x << "\n"; } }
|
可以看到:
- 使用互斥量将原始的
top()
和pop()
进行原子化以避免竞态
- 返回元素使用引用或地址,以避免拷贝出现异常
- 当元素为空时,抛出异常以提醒用户代码
死锁问题及其解决
死锁问题在编码中经常遇到,其中一个比较简单的解决办法就是:保持获取和释放互斥量的顺序一致。
但是这在实际的操作中并不是那么的容易。
c++
提供了std::lock
用于一次性获取多个锁,以避免死锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::lock(lhs.m,rhs.m); std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); swap(lhs.some_detail,rhs.some_detail); } };
|
使用std::lock
加上std::lock_guard
的 RAII
机制,就能避免很多场合的死锁情况。
当然,实际编码场景中也会有散布在各处的获取互斥量的操作,这就需要程序员有良好的习惯来避免死锁了。
避免死锁的一些编码准则
除了获取锁会导致死锁以外,两个线程的相互join()
也会导致死锁,这些都是需要在编码过程中尽量避免的。
避免嵌套获取锁
当一个线程已经获取了一个锁并且还未释放的时候,就不要再获取其它的锁了。
如果每个线程都遵守这个规则,那么就不会那么容易造成死锁,因为每个线程都获取单独的锁。
>
即使别的线程已经获取了该锁,那么该线程所做的就是等待。但别的线程不会等待该线程,所以也不会造成死锁。
如果确实需要同时获取到多个锁,那么使用std::lock
和std::lock_guard
来管理这些锁以避免死锁。
获取锁时避免调用用户提供的代码
当获取到一个锁时,要避免调用用户提供的代码。因为并不知道用户提供的代码是否也获取了其它的锁。
按确定的顺序获取锁
有的时候获取锁的代码散布在多处,获取多个锁时不能使用std::lock
这种简单的写法。
这种情况下就需要所有相关线程按照统一的顺序进行锁的获取和释放,以避免死锁。
使用一个获取锁的层级
简单的说就是:为各个锁分一个高低层级,并且获取锁的顺序必须是由高向低层级的方向获取,然后由低到高的方向释放。
那么这里的重点就是要设计一个可以识别出层级高低的锁,核心就在于使用thread_local
变量以共享同一个线程中的层级:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| class hierarchical_mutex { std::mutex internal_mutex; unsigned long const hierarchy_value; unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; void check_for_hierarchy_violation() { if (this_thread_hierarchy_value <= hierarchy_value) { throw std::logic_error(“mutex hierarchy violated”); } } void update_hierarchy_value() { previous_hierarchy_value = this_thread_hierarchy_value; this_thread_hierarchy_value = hierarchy_value; } public: explicit hierarchical_mutex(unsigned long value): //创建锁的时候,就规定好了它的层级,以后就不能改了 hierarchy_value(value), previous_hierarchy_value(0) { } void lock() { check_for_hierarchy_violation(); internal_mutex.lock(); update_hierarchy_value(); } void unlock() { if (this_thread_hierarchy_value != hierarchy_value) throw std::logic_error(“mutex hierarchy violated”); this_thread_hierarchy_value = previous_hierarchy_value; internal_mutex.unlock(); } bool try_lock() { check_for_hierarchy_violation(); if(!internal_mutex.try_lock()) return false; update_hierarchy_value(); return true; } };
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
|
那么在使用的时候,只要是按照层级顺序进行获取就可以正常工作,否则就会抛出异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| hierarchical_mutex high_level_mutex(10000); hierarchical_mutex low_level_mutex(5000); hierarchical_mutex other_mutex(6000); int do_low_level_stuff(); int low_level_func() { std::lock_guard<hierarchical_mutex> lk(low_level_mutex); return do_low_level_stuff(); } void high_level_stuff(int some_param); void high_level_func() { std::lock_guard<hierarchical_mutex> lk(high_level_mutex); high_level_stuff(low_level_func()); } void thread_a() { high_level_func(); } void do_other_stuff(); void other_stuff() { high_level_func(); do_other_stuff(); } void thread_b() { std::lock_guard<hierarchical_mutex> lk(other_mutex); other_stuff(); }
|
hierarchical_mutex
可以被std::lock_guard
所使用,是因为它提供
lock
,unlock
,try_lock
标准处理函数。
使用std::unique_lock
来灵活的使用锁
std::defer_lock
使得std::unique_lock
只是刚开始获得锁的关联,但并不会lock()
这个锁。
std::unique_lock
可以在后面主动使用lock()
、unlock()
这些方法来主动获取和释放锁。
>
由于可以主动的释放锁,所以在操作完以后立即释放锁,可以提高整个系统的并发性能
这里的应用场景主要是在于灵活的使用锁,前面的 swap
函数还有更好的实现方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); std::lock(lock_a,lock_b); swap(lhs.some_detail,rhs.some_detail); } };
|
如果std::lock
成功获取了锁,则在析构时,std::unique_lock
会释放该锁。反之则不会。
传递互斥量的所有权
当一个函数获取了一个锁,然后需要该函数的使用者来释放该锁,这种情况下就需要传递锁的所有权了。而std::unique_lock
内部私有成员保存了该锁,所以使用它来移动所有权是最合适的。
当传递的锁本就是一个右值,那么这种传递会自动完成(比如返回一个临时的锁)。
当传递的锁是左值时,就需要使用std::move
来显示的指定。
1 2 3 4 5 6 7 8 9 10 11
| std::unique_lock<std::mutex> get_lock() { extern std::mutex some_mutex; std::unique_lock<std::mutex> lk(some_mutex); prepare_data(); return lk; } void process_data() { std::unique_lock<std::mutex> lk(get_lock()); do_something(); }
|
如果多个线程在获取锁之后都需要一个共同的处理过程,那么可以将这个处理过程和std::unique_lock
封装为一个函数:
- 在函数内部先获取锁,然后在处理共有过程 -
将锁的所有权传递出去,以继续各个线程独有的处理流程
锁的粒度
基本原则是:尽量保证临界区的执行时间最短,以保证系统的运行效率。
std::unique_lock
在这种情况下可以比较灵活的使用,因为它既可以主动的获取和释放锁,也可以在其本身被释放时,自动的释放锁。
1 2 3 4 5 6 7 8 9
| void get_and_process_data() { std::unique_lock<std::mutex> my_lock(the_mutex); some_class data_to_process = get_next_data_chunk(); my_lock.unlock(); result_type result = process(data_to_process); my_lock.lock(); write_result(data_to_process,result); }
|
有的时候,为了降低锁保持的时间,可以先将共享的数据在栈上做一次拷贝,然后再进行接下来的操作,这样锁的粒度就只有对共享数据进行拷贝的那一小段。
但有些情况下需要格外注意:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class Y { private: int some_detail; mutable std::mutex m; int get_detail() const { std::lock_guard<std::mutex> lock_a(m); return some_detail; } public: Y(int sd):some_detail(sd) { } friend bool operator==(Y const& lhs, Y const& rhs) { if(&lhs==&rhs) return true; int const lhs_value=lhs.get_detail(); int const rhs_value=rhs.get_detail(); return lhs_value==rhs_value; } };
|
如果两个对象的值再两个get_detail()
方法之间被改变了,则它们是不等的。
这种情况下需要再将两个操作打包为一个临界区。 >
但很多情况下,即使出现了这种情况也不会有什么影响。这是根据业务逻辑而定的。
>
比如有些业务逻辑已经完全考虑了这种情况,只要保证获取some_detail
对象是原子性的就行了。
保护临界区的其它方法
在共享数据初始化的时候给予保护
有些数据仅仅是在初始化的时候需要考虑并发问题,初始化完毕后,这些数据可能是以只读的形式访问,那么就不会存在并发问题。
假设有一个类的构造所需要花费的时间较长,一般情况下会在需要使用该类的时候才进行构造:
1 2 3 4 5 6 7 8
| std::shared_ptr<some_resource> resource_ptr; void foo() { if (!resource_ptr) { resource_ptr.reset(new some_resource); } resource_ptr->do_something(); }
|
如果简单粗暴的使用互斥量以避免构造时的并发问题的话,就会导致不管类是否已经构造,都需要获取一次锁从而降低了程序的吞吐量:
1 2 3 4 5 6 7 8 9 10
| std::shared_ptr<some_resource> resource_ptr; std::mutex resource_mutex; void foo() { std::unique_lock<std::mutex> lk(resource_mutex); if (!resource_ptr) { resource_ptr.reset(new some_resource); } lk.unlock(); resource_ptr->do_something(); }
|
将上面的方式优化一下,就是使用二次判断的方式:
1 2 3 4 5 6 7 8 9 10
| void undefined_behaviour_with_double_checked_locking() { if(!resource_ptr) { std::lock_guard<std::mutex> lk(resource_mutex); if (!resource_ptr) { resource_ptr.reset(new some_resource); } } resource_ptr->do_something(); }
|
但是这种方式可能会出现未定义行为,因为第一次检查resource_ptr
时并没有互斥。
那么就有可能在另一个线程初始化到一半的时候,当前线程就判定指针已经被初始化了,继而直接去执行do_something
,这就会造成未定义行为。
c++
标准库提供了std::once_flag
和std::call_once
来应对这种需求,它比使用互斥量在性能上有所提升:
1 2 3 4 5 6 7 8 9 10 11
| std::shared_ptr<some_resource> resource_ptr; std::once_flag resource_flag; void init_resource() { resource_ptr.reset(new some_resource); } void foo() { std::call_once(resource_flag,init_resource); resource_ptr->do_something(); }
|
除了初始化类,它们也可以用于类中,对于一个对象的部分操作只执行一次,比如下面这个类,无论是在操作读还是写,都需要且仅需要一次的提前连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class X { private: connection_info connection_details; connection_handle connection; std::once_flag connection_init_flag; void open_connection() { connection = connection_manager.open(connection_details); } public: X(connection_info const& connection_details_): connection_details(connection_details_) {} void send_data(data_packet const& data) { std::call_once(connection_init_flag,&X::open_connection,this); connection.send_data(data); } data_packet receive_data() { std::call_once(connection_init_flag,&X::open_connection,this); return connection.receive_data(); } };
|
保护非频繁更新的数据结构
当一个数据结构更新频率很低,大部分时候都是读操作时。使用读写锁是最为合适的,因为这可以保证读的并发性,从而相比互斥锁有更高的吞吐量。
c++11 并没有提供读写锁,c++14
提供了std::shared_timed_mutex
,c++17 在 14
的基础上还增加了std::shared_mutex
。
std::shared_mutex
是std::shared_timed_mutex
的简易版本,少了一些操作。
它们可以和std::lock_guard
、std::unique_lock
联合使用,但更为合适的是在读线程中使用std::shared_lock
,以使得可以并发的进行读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include <map> #include <string> #include <mutex> #include <shared_mutex>
class dns_entry; class dns_cache { std::map<std::string,dns_entry> entries; mutable std::shared_mutex entry_mutex; public: dns_entry find_entry(std::string const& domain) const { std::shared_lock<std::shared_mutex> lk(entry_mutex); std::map<std::string,dns_entry>::const_iterator const it = entries.find(domain); return (it == entries.end()) ? dns_entry() : it->second; } void update_or_add_entry(std::string const& domain, dns_entry const& dns_details) { std::lock_guard<std::shared_mutex> lk(entry_mutex); entries[domain] = dns_details; } };
|
递归锁
同一个线程如果递归的获取std::mutex
则可能会导致死锁或其它未定义的行为,c++
标准库为此提供了std::recursive_mutex
以保证可以递归的获取锁。
需要注意的时,递归获取std::recursive_mutex
的次数和释放的次数需要等同,所以使用std::lock_guard<std::recursive_mutex>
和std::unique_lock<std::recursive_mutex>
是明智的做法。
但一般情况下都不建议这么做,如果代码中出现了递归锁,建议还是要重新思考代码逻辑是否可以优化。