1 #ifndef BOOST_NETWORK_UTILS_THREAD_POOL_HPP_20101020
2 #define BOOST_NETWORK_UTILS_THREAD_POOL_HPP_20101020
3
4 // Copyright 2010 Dean Michael Berris.
5 // Distributed under the Boost Software License, Version 1.0.
6 // (See accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8
9 #include <cstddef>
10 #include <memory>
11 #include <functional>
12 #include <boost/asio/io_service.hpp>
13 #include <boost/network/tags.hpp>
14 #include <boost/scope_exit.hpp>
15 #include <boost/network/utils/thread_group.hpp>
16
17 namespace boost {
18 namespace network {
19 namespace utils {
20
21 typedef std::shared_ptr<boost::asio::io_service> io_service_ptr;
22 typedef std::shared_ptr<utils::thread_group> worker_threads_ptr;
23 typedef std::shared_ptr<boost::asio::io_service::work> sentinel_ptr;
24
25 template <class Tag>
26 struct basic_thread_pool {
27 basic_thread_pool(basic_thread_pool const &) = delete;
28 basic_thread_pool &operator=(basic_thread_pool) = delete;
29 basic_thread_pool(basic_thread_pool&&) noexcept = default;
30 basic_thread_pool &operator=(basic_thread_pool&&) = default;
31
basic_thread_poolboost::network::utils::basic_thread_pool32 basic_thread_pool() : basic_thread_pool(1) {}
33
basic_thread_poolboost::network::utils::basic_thread_pool34 explicit basic_thread_pool(std::size_t threads,
35 io_service_ptr io_service = io_service_ptr(),
36 worker_threads_ptr worker_threads = worker_threads_ptr())
37 : threads_(threads),
38 io_service_(std::move(io_service)),
39 worker_threads_(std::move(worker_threads)),
40 sentinel_() {
41 bool commit = false;
42 BOOST_SCOPE_EXIT_TPL(
43 (&commit)(&io_service_)(&worker_threads_)(&sentinel_)) {
44 if (!commit) {
45 sentinel_.reset();
46 io_service_.reset();
47 if (worker_threads_.get()) {
48 // worker_threads_->interrupt_all();
49 worker_threads_->join_all();
50 }
51 worker_threads_.reset();
52 }
53 }
54 BOOST_SCOPE_EXIT_END
55
56 if (!io_service_.get()) {
57 io_service_.reset(new boost::asio::io_service);
58 }
59
60 if (!worker_threads_.get()) {
61 worker_threads_.reset(new utils::thread_group);
62 }
63
64 if (!sentinel_.get()) {
65 sentinel_.reset(new boost::asio::io_service::work(*io_service_));
66 }
67
68 for (std::size_t counter = 0; counter < threads_; ++counter) {
69 worker_threads_->create_thread([=] () { io_service_->run(); });
70 }
71
72 commit = true;
73 }
74
thread_countboost::network::utils::basic_thread_pool75 std::size_t thread_count() const { return threads_; }
76
postboost::network::utils::basic_thread_pool77 void post(std::function<void()> f) { io_service_->post(f); }
78
~basic_thread_poolboost::network::utils::basic_thread_pool79 ~basic_thread_pool() throw() {
80 sentinel_.reset();
81 try {
82 worker_threads_->join_all();
83 }
84 catch (...) {
85 BOOST_ASSERT(false &&
86 "A handler was not supposed to throw, but one did.");
87 }
88 }
89
swapboost::network::utils::basic_thread_pool90 void swap(basic_thread_pool &other) {
91 using std::swap;
92 swap(other.threads_, threads_);
93 swap(other.io_service_, io_service_);
94 swap(other.worker_threads_, worker_threads_);
95 swap(other.sentinel_, sentinel_);
96 }
97
98 protected:
99 std::size_t threads_;
100 io_service_ptr io_service_;
101 worker_threads_ptr worker_threads_;
102 sentinel_ptr sentinel_;
103
104 };
105
106 template <class T>
swap(basic_thread_pool<T> & a,basic_thread_pool<T> & b)107 void swap(basic_thread_pool<T> &a, basic_thread_pool<T> &b) {
108 a.swap(b);
109 }
110
111 typedef basic_thread_pool<tags::default_> thread_pool;
112
113 } // namespace utils
114 } // namespace network
115 } // namespace boost
116
117 #endif /* BOOST_NETWORK_UTILS_THREAD_POOL_HPP_20101020 */
118