基于生产者-消费者模型的线程池。
整体设计
线程池由两部分组成 存放线程的容器(ThreadPool) 和 一个任务队列(Tasks queue)。
容器中的线程都做相同的事情:
这些线程会一直重复这两个动作。
例如, 图中 id 为 1,4 的线程已经获得了一个任务,接下来它们要执行这个任务, 然后继续从任务队列中获取任务…
id 为 2, 3, 5 的线程正在从任务队列获得一个任务…
- 会有多个线程访问任务队列,因此任务队列必须要支持并发。
- 每个线程会一直运行直到线程池关闭,从任务队列获取一个任务,执行,不断重复这个过程,如果不能获取一个任务(任务队列中可能没有任务了),就要阻塞在获取任务这一步,直到获取了任务或线程池关闭。
支持并发的队列
1 2 3 4 5 6 7 8 9 10
| template <typename T> class SafeQueue { private: std::queue<T> que_; std::shared_mutex mutex_;
public: void push(T &t); T pop(); size_t size(); };
|
使用读写锁对 std::queue
做了封装。
push
1 2 3 4 5
| template<typename T> void SafeQueue<T>::push(T &t) { std::unique_lock lock(mutex_); que_.emplace(t); }
|
pop
1 2 3 4 5 6 7
| template<typename T> T SafeQueue<T>::pop() { std::unique_lock lock(mutex_); T ret = que_.front(); que_.pop(); return ret; }
|
size
1 2 3 4 5
| template<typename T> size_t SafeQueue<T>::size() { std::shared_lock lock(mutex_); return que_.size(); }
|
线程容器
线程从任务队列获取任务后执行的过程的模型是生产者-消费者。
因此需要一个条件变量 std::condition_variable
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class ThreadPool { private: class ThreadWorker { private: ThreadPool *pool_; size_t thread_id_;
public: ThreadWorker(ThreadPool *pool, size_t thread_id_) : pool_(pool), thread_id_(thread_id_) {} void operator()(); };
size_t threads_; bool shutdown_; SafeQueue<std::function<void()>> tasks_; std::vector<std::thread> works_; std::condition_variable condition_; std::mutex mutex_;
public: ThreadPool(size_t threads = 2) { init(threads); }; ThreadPool(const ThreadPool &other) = delete; ThreadPool(ThreadPool &&other) = delete; ThreadPool &operator=(const ThreadPool &other) = delete; ThreadPool &operator=(ThreadPool &&other) = delete; ~ThreadPool() { shutdown(); };
void init(size_t threads); void shutdown(); template <typename F, typename... Arg> auto submit(F &&f, Arg &&...args) -> std::future<decltype(f(args...))>; };
|
初始化
在构造函数中调用了 init
来提升易用性。
初始化时就把需要使用到的线程数量都启动起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| void ThreadPool::init(size_t threads) { threads_ = threads; shutdown_ = false; while (tasks_.size() != 0) { tasks_.pop(); } { std::vector<std::thread> tmp_works{}; works_.swap(tmp_works); } works_.reserve(threads_); works_.resize(threads_); for (size_t i = 0; i < threads_; ++i) { works_[i] = std::thread([this] { std::function<void()> func; for (;;) { { std::unique_lock lock(mutex_); condition_.wait(lock, [this] { return shutdown_ || tasks_.size() > 0; }); if (shutdown_ && tasks_.size() == 0) { return; } func = std::move(tasks_.pop()); } func(); } }); } }
|
关闭线程池
关闭线程池时,要等待线程做完事(队列中的任务被全部完成, 在 ThreadWorker 中作判断)。
在析构函数中调用了 shutdown()
, 利用 RAII 来实现自动关闭。
1 2 3 4 5 6 7 8 9 10 11 12 13
| void ThreadPool::shutdown() { if (shutdown_) { return; } shutdown_ = true; condition_.notify_all(); for (size_t i = 0; i < threads_; ++i) { if (works_[i].joinable()) { works_[i].join(); } } threads_ = 0; }
|
提交任务
任务是一个函数,可以是任意的返回类型,任意的参数。
使用可变参数模板来实现,支持函数,仿函数,Lambda, 以及任意多的参数。
有的任务在被执行过后会有返回值,需要使用 std::future<>
来获得这个值, 任务的返回值类型是多种多样的,所以使用 decltype()
来推导返回值的类型。
任务的多样性,没有办法直接确定一个类型来表示它们,对任务进行封装,使所有的任务都是 void()
类型。
任务进入队列后,要唤醒一个线程(告诉在阻塞的线程,队列中有了新的任务)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| template <typename F, typename... Arg> auto ThreadPool::submit(F &&f, Arg &&...args) -> std::future<decltype(f(args...))> { std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Arg>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrap_task = [task_ptr] { (*task_ptr)(); };
tasks_.push(wrap_task);
condition_.notify_one();
return task_ptr->get_future(); }
|
参考
C++ 并发编程 (从C++11到C++17)
C++的右值引用、移动和值类别系统,你所需要的一切
基于C++11实现的线程池
99行线程池