线程除了创建和销毁外,还需要控制其暂停、继续运行等状态。
线程池
线程池就相当于提前创建好的多个工作线程,从任务队列上获取任务并执行。这种方式可以避免系统中的线程过多,因为很多任务都是隔较长时间才会执行一次,如果为每个任务都单独分配一个线程则会消耗过多的系统资源。
简易的线程池
根据当前硬件核心数确定线程数,当有任务需要处理时,便将任务放入队列,工作线程依次从队列中取出任务执行。
1 2 3 4 5 6 7 8 9 10 11 12 13
   | class join_threads {     std::vector<std::thread>& threads; public:     explicit join_threads(std::vector<std::thread>& threads_):         threads(threads_)     {}     ~join_threads() {         for (unsigned long i = 0; i < threads.size(); ++i) {             if (threads[i].joinable())                 threads[i].join();         }     } };
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
   | class thread_pool {     std::atomic_bool done;          threadsafe_queue<std::function<void()> > work_queue;          std::vector<std::thread> threads;                  join_threads joiner;                      void worker_thread() {         while (!done) {             std::function<void()> task;                          if (work_queue.try_pop(task)) {                 task();                 } else { 				                 std::this_thread::yield();                 }         }     } public:     thread_pool():         done(false),joiner(threads) {                  unsigned const thread_count = std::thread::hardware_concurrency();          try {             for (unsigned i = 0; i< thread_count; ++i) {                              threads.push_back(                     std::thread(&thread_pool::worker_thread, this));                }         } catch(...) {             done = true;                 throw;         }     }          ~thread_pool() {         done = true;        }          template<typename FunctionType>     void submit(FunctionType f) {         work_queue.push(std::function<void()>(f));         } };
  | 
 
当该线程池对象被销毁时,joiner在析构中会等待所有的线程完成并回收它们的资源。
等待任务处理的结果
简易方式的线程池适合处理无返回的简单任务,但如果需要获取到任务处理后的结果,则还需要调整。
最符合直觉的便是使用std::packaged_task将要执行函数以及future绑定,用户通过获取到的future来获取执行的结果。
但由于std::packaged_task是不能拷贝,只能移动的,所以需要构建一个支持移动语义的可执行类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
   | class function_wrapper {     struct impl_base {         virtual void call() = 0;         virtual ~impl_base() {}     };     std::unique_ptr<impl_base> impl;     template<typename F>     struct impl_type: impl_base {         F f;         impl_type(F&& f_): f(std::move(f_)) {}                  void call() { f(); }     }; public:     template<typename F>     function_wrapper(F&& f):         impl(new impl_type<F>(std::move(f)))     {}     void operator()() { impl->call(); }     function_wrapper() = default;     function_wrapper(function_wrapper&& other):         impl(std::move(other.impl))     {}         function_wrapper& operator = (function_wrapper&& other) {         impl = std::move(other.impl);         return *this;     }          function_wrapper(const function_wrapper&) = delete;     function_wrapper(function_wrapper&) = delete;     function_wrapper& operator=(const function_wrapper&) = delete; };
  | 
 
然后优化submit函数,可以获取到future:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
   | class thread_pool {     std::atomic_bool done;     thread_safe_queue<function_wrapper> work_queue;           std::vector<std::thread> threads;                  join_threads joiner;             void worker_thread() {         while(!done) {             function_wrapper task;                                 if (work_queue.try_pop(task)) {                 task();             } else {                 std::this_thread::yield();             }         }     } public:     thread_pool():         done(false),joiner(threads) {                  unsigned const thread_count = std::thread::hardware_concurrency();          try {             for (unsigned i = 0; i< thread_count; ++i) {                              threads.push_back(                     std::thread(&thread_pool::worker_thread, this));                }         } catch(...) {             done = true;                 throw;         }     }          ~thread_pool() {         done = true;        }     template<typename FunctionType>     std::future<typename std::result_of<FunctionType()>::type>            submit(FunctionType f) {                  typedef typename std::result_of<FunctionType()>::type             result_type;                  std::packaged_task<result_type()> task(std::move(f));             std::future<result_type> res(task.get_future());              work_queue.push(std::move(task));                           return res;        } };
  | 
 
有了这种线程池,来处理并行计算就更加简单了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | template<typename Iterator,typename T> T parallel_accumulate(Iterator first,Iterator last,T init) {     unsigned long const length = std::distance(first,last);     if(!length)         return init;     unsigned long const block_size = 25;     unsigned long const num_blocks = (length + block_size - 1) / block_size;        std::vector<std::future<T> > futures(num_blocks - 1);     thread_pool pool;     Iterator block_start = first;     for (unsigned long i = 0;i < (num_blocks - 1); ++i) {         Iterator block_end = block_start;         std::advance(block_end, block_size);                  futures[i] = pool.submit([=]{             accumulate_block<Iterator,T>()(block_start,block_end);         });            block_start = block_end;     }     T last_result = accumulate_block<Iterator,T>()(block_start,last);     T result = init;          for (unsigned long i = 0;i < (num_blocks - 1); ++i) {         result += futures[i].get();     }     result += last_result;     return result; }
   | 
 
这种线程池适合用于主线程在等待工作线程的结果。
线程控制
下面是一个简易的控制线程暂停、继续、停止的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
   | #include <atomic> #include <complex> #include <utility>
 
  class ControlThread { public:     ControlThread() {
      }
      ~ControlThread() {
      }
      ControlThread(const ControlThread& rhs) {
      }
      ControlThread& operator = (const ControlThread& rhs) {         return *this;     }
      void Start(void) {         if (!running_.load()) {             running_.store(true);             std::thread t(&ControlThread::ExecThread, this);
              t.detach();         }     }
      void Pause(void) {         pause_.store(true);     }
      void Continue(void) {         pause_.store(false);     }          void Stop(void) {         running_.store(false);     }
      void ExecThread(void) {         while (running_.load()) {             if (pause_.load()) {                 std::this_thread::sleep_for(std::chrono::milliseconds(50));                 continue;             }
                       }     }    
  private:     std::atomic_bool running_;     std::atomic_bool pause_; };
   | 
 
需要考虑的是:如果在执行线程中有阻塞的操作,该如何控制该线程?
一般的思路就是,创造条件来主动的唤醒该线程继续执行到控制点。