线程除了创建和销毁外,还需要控制其暂停、继续运行等状态。
线程池
线程池就相当于提前创建好的多个工作线程,从任务队列上获取任务并执行。这种方式可以避免系统中的线程过多,因为很多任务都是隔较长时间才会执行一次,如果为每个任务都单独分配一个线程则会消耗过多的系统资源。
简易的线程池
根据当前硬件核心数确定线程数,当有任务需要处理时,便将任务放入队列,工作线程依次从队列中取出任务执行。
1 2 3 4 5 6 7 8 9 10 11 12 13
| class join_threads { std::vector<std::thread>& threads; public: explicit join_threads(std::vector<std::thread>& threads_): threads(threads_) {} ~join_threads() { for (unsigned long i = 0; i < threads.size(); ++i) { if (threads[i].joinable()) threads[i].join(); } } };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class thread_pool { std::atomic_bool done; threadsafe_queue<std::function<void()> > work_queue; std::vector<std::thread> threads; join_threads joiner; void worker_thread() { while (!done) { std::function<void()> task; if (work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } } } public: thread_pool(): done(false),joiner(threads) { unsigned const thread_count = std::thread::hardware_concurrency(); try { for (unsigned i = 0; i< thread_count; ++i) { threads.push_back( std::thread(&thread_pool::worker_thread, this)); } } catch(...) { done = true; throw; } } ~thread_pool() { done = true; } template<typename FunctionType> void submit(FunctionType f) { work_queue.push(std::function<void()>(f)); } };
|
当该线程池对象被销毁时,joiner
在析构中会等待所有的线程完成并回收它们的资源。
等待任务处理的结果
简易方式的线程池适合处理无返回的简单任务,但如果需要获取到任务处理后的结果,则还需要调整。
最符合直觉的便是使用std::packaged_task
将要执行函数以及future
绑定,用户通过获取到的future
来获取执行的结果。
但由于std::packaged_task
是不能拷贝,只能移动的,所以需要构建一个支持移动语义的可执行类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| class function_wrapper { struct impl_base { virtual void call() = 0; virtual ~impl_base() {} }; std::unique_ptr<impl_base> impl; template<typename F> struct impl_type: impl_base { F f; impl_type(F&& f_): f(std::move(f_)) {} void call() { f(); } }; public: template<typename F> function_wrapper(F&& f): impl(new impl_type<F>(std::move(f))) {} void operator()() { impl->call(); } function_wrapper() = default; function_wrapper(function_wrapper&& other): impl(std::move(other.impl)) {} function_wrapper& operator = (function_wrapper&& other) { impl = std::move(other.impl); return *this; } function_wrapper(const function_wrapper&) = delete; function_wrapper(function_wrapper&) = delete; function_wrapper& operator=(const function_wrapper&) = delete; };
|
然后优化submit
函数,可以获取到future
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| class thread_pool { std::atomic_bool done; thread_safe_queue<function_wrapper> work_queue; std::vector<std::thread> threads; join_threads joiner; void worker_thread() { while(!done) { function_wrapper task; if (work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); } } } public: thread_pool(): done(false),joiner(threads) { unsigned const thread_count = std::thread::hardware_concurrency(); try { for (unsigned i = 0; i< thread_count; ++i) { threads.push_back( std::thread(&thread_pool::worker_thread, this)); } } catch(...) { done = true; throw; } } ~thread_pool() { done = true; } template<typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) { typedef typename std::result_of<FunctionType()>::type result_type; std::packaged_task<result_type()> task(std::move(f)); std::future<result_type> res(task.get_future()); work_queue.push(std::move(task)); return res; } };
|
有了这种线程池,来处理并行计算就更加简单了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| template<typename Iterator,typename T> T parallel_accumulate(Iterator first,Iterator last,T init) { unsigned long const length = std::distance(first,last); if(!length) return init; unsigned long const block_size = 25; unsigned long const num_blocks = (length + block_size - 1) / block_size; std::vector<std::future<T> > futures(num_blocks - 1); thread_pool pool; Iterator block_start = first; for (unsigned long i = 0;i < (num_blocks - 1); ++i) { Iterator block_end = block_start; std::advance(block_end, block_size); futures[i] = pool.submit([=]{ accumulate_block<Iterator,T>()(block_start,block_end); }); block_start = block_end; } T last_result = accumulate_block<Iterator,T>()(block_start,last); T result = init; for (unsigned long i = 0;i < (num_blocks - 1); ++i) { result += futures[i].get(); } result += last_result; return result; }
|
这种线程池适合用于主线程在等待工作线程的结果。
线程控制
下面是一个简易的控制线程暂停、继续、停止的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| #include <atomic> #include <complex> #include <utility>
class ControlThread { public: ControlThread() {
}
~ControlThread() {
}
ControlThread(const ControlThread& rhs) {
}
ControlThread& operator = (const ControlThread& rhs) { return *this; }
void Start(void) { if (!running_.load()) { running_.store(true); std::thread t(&ControlThread::ExecThread, this);
t.detach(); } }
void Pause(void) { pause_.store(true); }
void Continue(void) { pause_.store(false); } void Stop(void) { running_.store(false); }
void ExecThread(void) { while (running_.load()) { if (pause_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; }
} }
private: std::atomic_bool running_; std::atomic_bool pause_; };
|
需要考虑的是:如果在执行线程中有阻塞的操作,该如何控制该线程?
一般的思路就是,创造条件来主动的唤醒该线程继续执行到控制点。