1 #include <asio/associated_executor.hpp>
2 #include <asio/bind_executor.hpp>
3 #include <asio/execution_context.hpp>
4 #include <asio/package.hpp>
5 #include <asio/post.hpp>
6 #include <asio/system_executor.hpp>
7 #include <condition_variable>
8 #include <future>
9 #include <memory>
10 #include <mutex>
11 #include <queue>
12 #include <thread>
13 #include <vector>
14
15 using asio::execution_context;
16 using asio::executor_binder;
17 using asio::get_associated_executor;
18 using asio::package;
19 using asio::post;
20 using asio::system_executor;
21 using asio::use_service;
22
23 // An executor that launches a new thread for each function submitted to it.
24 // This class satisfies the Executor requirements.
25 class thread_executor
26 {
27 private:
28 // Service to track all threads started through a thread_executor.
29 class thread_bag : public execution_context::service
30 {
31 public:
32 static execution_context::id id;
33
thread_bag(execution_context & ctx)34 explicit thread_bag(execution_context& ctx)
35 : execution_context::service(ctx)
36 {
37 }
38
add_thread(std::thread && t)39 void add_thread(std::thread&& t)
40 {
41 std::unique_lock<std::mutex> lock(mutex_);
42 threads_.push_back(std::move(t));
43 }
44
45 private:
shutdown()46 virtual void shutdown()
47 {
48 for (auto& t : threads_)
49 t.join();
50 }
51
52 std::mutex mutex_;
53 std::vector<std::thread> threads_;
54 };
55
56 public:
context()57 execution_context& context() noexcept
58 {
59 return system_executor().context();
60 }
61
on_work_started()62 void on_work_started() noexcept
63 {
64 // This executor doesn't count work.
65 }
66
on_work_finished()67 void on_work_finished() noexcept
68 {
69 // This executor doesn't count work.
70 }
71
72 template <class Func, class Alloc>
dispatch(Func && f,const Alloc & a)73 void dispatch(Func&& f, const Alloc& a)
74 {
75 post(std::forward<Func>(f), a);
76 }
77
78 template <class Func, class Alloc>
post(Func f,const Alloc &)79 void post(Func f, const Alloc&)
80 {
81 thread_bag& bag = use_service<thread_bag>(context());
82 bag.add_thread(std::thread(std::move(f)));
83 }
84
85 template <class Func, class Alloc>
defer(Func && f,const Alloc & a)86 void defer(Func&& f, const Alloc& a)
87 {
88 post(std::forward<Func>(f), a);
89 }
90
operator ==(const thread_executor &,const thread_executor &)91 friend bool operator==(const thread_executor&, const thread_executor&)
92 {
93 return true;
94 }
95
operator !=(const thread_executor &,const thread_executor &)96 friend bool operator!=(const thread_executor&, const thread_executor&)
97 {
98 return false;
99 }
100 };
101
102 execution_context::id thread_executor::thread_bag::id;
103
104 namespace asio {
105 template <> struct is_executor<thread_executor> : std::true_type {};
106 }
107
108 // Base class for all thread-safe queue implementations.
109 class queue_impl_base
110 {
111 template <class> friend class queue_front;
112 template <class> friend class queue_back;
113 std::mutex mutex_;
114 std::condition_variable condition_;
115 bool stop_ = false;
116 };
117
118 // Underlying implementation of a thread-safe queue, shared between the
119 // queue_front and queue_back classes.
120 template <class T>
121 class queue_impl : public queue_impl_base
122 {
123 template <class> friend class queue_front;
124 template <class> friend class queue_back;
125 std::queue<T> queue_;
126 };
127
128 // The front end of a queue between consecutive pipeline stages.
129 template <class T>
130 class queue_front
131 {
132 public:
133 typedef T value_type;
134
queue_front(std::shared_ptr<queue_impl<T>> impl)135 explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
136 : impl_(impl)
137 {
138 }
139
push(T t)140 void push(T t)
141 {
142 std::unique_lock<std::mutex> lock(impl_->mutex_);
143 impl_->queue_.push(std::move(t));
144 impl_->condition_.notify_one();
145 }
146
stop()147 void stop()
148 {
149 std::unique_lock<std::mutex> lock(impl_->mutex_);
150 impl_->stop_ = true;
151 impl_->condition_.notify_one();
152 }
153
154 private:
155 std::shared_ptr<queue_impl<T>> impl_;
156 };
157
158 // The back end of a queue between consecutive pipeline stages.
159 template <class T>
160 class queue_back
161 {
162 public:
163 typedef T value_type;
164
queue_back(std::shared_ptr<queue_impl<T>> impl)165 explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
166 : impl_(impl)
167 {
168 }
169
pop(T & t)170 bool pop(T& t)
171 {
172 std::unique_lock<std::mutex> lock(impl_->mutex_);
173 while (impl_->queue_.empty() && !impl_->stop_)
174 impl_->condition_.wait(lock);
175 if (!impl_->queue_.empty())
176 {
177 t = impl_->queue_.front();
178 impl_->queue_.pop();
179 return true;
180 }
181 return false;
182 }
183
184 private:
185 std::shared_ptr<queue_impl<T>> impl_;
186 };
187
188 // Launch the last stage in a pipeline.
189 template <class T, class F>
pipeline(queue_back<T> in,F f)190 std::future<void> pipeline(queue_back<T> in, F f)
191 {
192 // Get the function's associated executor, defaulting to thread_executor.
193 auto ex = get_associated_executor(f, thread_executor());
194
195 // Run the function, and as we're the last stage return a future so that the
196 // caller can wait for the pipeline to finish.
197 return post(ex, package([in, f]() mutable { f(in); }));
198 }
199
200 // Launch an intermediate stage in a pipeline.
201 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)202 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
203 {
204 // Determine the output queue type.
205 typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
206
207 // Create the output queue and its implementation.
208 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
209 queue_front<output_value_type> out(out_impl);
210 queue_back<output_value_type> next_in(out_impl);
211
212 // Get the function's associated executor, defaulting to thread_executor.
213 auto ex = get_associated_executor(f, thread_executor());
214
215 // Run the function.
216 post(ex, [in, out, f]() mutable
217 {
218 f(in, out);
219 out.stop();
220 });
221
222 // Launch the rest of the pipeline.
223 return pipeline(next_in, std::move(t)...);
224 }
225
226 // Launch the first stage in a pipeline.
227 template <class F, class... Tail>
pipeline(F f,Tail...t)228 std::future<void> pipeline(F f, Tail... t)
229 {
230 // Determine the output queue type.
231 typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
232
233 // Create the output queue and its implementation.
234 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
235 queue_front<output_value_type> out(out_impl);
236 queue_back<output_value_type> next_in(out_impl);
237
238 // Get the function's associated executor, defaulting to thread_executor.
239 auto ex = get_associated_executor(f, thread_executor());
240
241 // Run the function.
242 post(ex, [out, f]() mutable
243 {
244 f(out);
245 out.stop();
246 });
247
248 // Launch the rest of the pipeline.
249 return pipeline(next_in, std::move(t)...);
250 }
251
252 //------------------------------------------------------------------------------
253
254 #include <asio/thread_pool.hpp>
255 #include <iostream>
256 #include <string>
257
258 using asio::bind_executor;
259 using asio::thread_pool;
260
reader(queue_front<std::string> out)261 void reader(queue_front<std::string> out)
262 {
263 std::string line;
264 while (std::getline(std::cin, line))
265 out.push(line);
266 }
267
filter(queue_back<std::string> in,queue_front<std::string> out)268 void filter(queue_back<std::string> in, queue_front<std::string> out)
269 {
270 std::string line;
271 while (in.pop(line))
272 if (line.length() > 5)
273 out.push(line);
274 }
275
upper(queue_back<std::string> in,queue_front<std::string> out)276 void upper(queue_back<std::string> in, queue_front<std::string> out)
277 {
278 std::string line;
279 while (in.pop(line))
280 {
281 std::string new_line;
282 for (char c : line)
283 new_line.push_back(std::toupper(c));
284 out.push(new_line);
285 }
286 }
287
writer(queue_back<std::string> in)288 void writer(queue_back<std::string> in)
289 {
290 std::size_t count = 0;
291 std::string line;
292 while (in.pop(line))
293 std::cout << count++ << ": " << line << std::endl;
294 }
295
main()296 int main()
297 {
298 thread_pool pool;
299
300 auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
301 f.wait();
302 }
303