1 #include <asio/ts/executor.hpp>
2 #include <condition_variable>
3 #include <future>
4 #include <memory>
5 #include <mutex>
6 #include <queue>
7 #include <thread>
8 #include <vector>
9
10 using asio::execution_context;
11 using asio::executor_binder;
12 using asio::get_associated_executor;
13 using asio::package;
14 using asio::post;
15 using asio::system_executor;
16 using asio::use_service;
17
18 // An executor that launches a new thread for each function submitted to it.
19 // This class satisfies the Executor requirements.
20 class thread_executor
21 {
22 private:
23 // Service to track all threads started through a thread_executor.
24 class thread_bag : public execution_context::service
25 {
26 public:
27 static execution_context::id id;
28
thread_bag(execution_context & ctx)29 explicit thread_bag(execution_context& ctx)
30 : execution_context::service(ctx)
31 {
32 }
33
add_thread(std::thread && t)34 void add_thread(std::thread&& t)
35 {
36 std::unique_lock<std::mutex> lock(mutex_);
37 threads_.push_back(std::move(t));
38 }
39
40 private:
shutdown()41 virtual void shutdown()
42 {
43 for (auto& t : threads_)
44 t.join();
45 }
46
47 std::mutex mutex_;
48 std::vector<std::thread> threads_;
49 };
50
51 public:
context()52 execution_context& context() noexcept
53 {
54 return system_executor().context();
55 }
56
on_work_started()57 void on_work_started() noexcept
58 {
59 // This executor doesn't count work.
60 }
61
on_work_finished()62 void on_work_finished() noexcept
63 {
64 // This executor doesn't count work.
65 }
66
67 template <class Func, class Alloc>
dispatch(Func && f,const Alloc & a)68 void dispatch(Func&& f, const Alloc& a)
69 {
70 post(std::forward<Func>(f), a);
71 }
72
73 template <class Func, class Alloc>
post(Func f,const Alloc &)74 void post(Func f, const Alloc&)
75 {
76 thread_bag& bag = use_service<thread_bag>(context());
77 bag.add_thread(std::thread(std::move(f)));
78 }
79
80 template <class Func, class Alloc>
defer(Func && f,const Alloc & a)81 void defer(Func&& f, const Alloc& a)
82 {
83 post(std::forward<Func>(f), a);
84 }
85
operator ==(const thread_executor &,const thread_executor &)86 friend bool operator==(const thread_executor&, const thread_executor&)
87 {
88 return true;
89 }
90
operator !=(const thread_executor &,const thread_executor &)91 friend bool operator!=(const thread_executor&, const thread_executor&)
92 {
93 return false;
94 }
95 };
96
97 execution_context::id thread_executor::thread_bag::id;
98
99 namespace asio {
100 template <> struct is_executor<thread_executor> : std::true_type {};
101 }
102
103 // Base class for all thread-safe queue implementations.
104 class queue_impl_base
105 {
106 template <class> friend class queue_front;
107 template <class> friend class queue_back;
108 std::mutex mutex_;
109 std::condition_variable condition_;
110 bool stop_ = false;
111 };
112
113 // Underlying implementation of a thread-safe queue, shared between the
114 // queue_front and queue_back classes.
115 template <class T>
116 class queue_impl : public queue_impl_base
117 {
118 template <class> friend class queue_front;
119 template <class> friend class queue_back;
120 std::queue<T> queue_;
121 };
122
123 // The front end of a queue between consecutive pipeline stages.
124 template <class T>
125 class queue_front
126 {
127 public:
128 typedef T value_type;
129
queue_front(std::shared_ptr<queue_impl<T>> impl)130 explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
131 : impl_(impl)
132 {
133 }
134
push(T t)135 void push(T t)
136 {
137 std::unique_lock<std::mutex> lock(impl_->mutex_);
138 impl_->queue_.push(std::move(t));
139 impl_->condition_.notify_one();
140 }
141
stop()142 void stop()
143 {
144 std::unique_lock<std::mutex> lock(impl_->mutex_);
145 impl_->stop_ = true;
146 impl_->condition_.notify_one();
147 }
148
149 private:
150 std::shared_ptr<queue_impl<T>> impl_;
151 };
152
153 // The back end of a queue between consecutive pipeline stages.
154 template <class T>
155 class queue_back
156 {
157 public:
158 typedef T value_type;
159
queue_back(std::shared_ptr<queue_impl<T>> impl)160 explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
161 : impl_(impl)
162 {
163 }
164
pop(T & t)165 bool pop(T& t)
166 {
167 std::unique_lock<std::mutex> lock(impl_->mutex_);
168 while (impl_->queue_.empty() && !impl_->stop_)
169 impl_->condition_.wait(lock);
170 if (!impl_->queue_.empty())
171 {
172 t = impl_->queue_.front();
173 impl_->queue_.pop();
174 return true;
175 }
176 return false;
177 }
178
179 private:
180 std::shared_ptr<queue_impl<T>> impl_;
181 };
182
183 // Launch the last stage in a pipeline.
184 template <class T, class F>
pipeline(queue_back<T> in,F f)185 std::future<void> pipeline(queue_back<T> in, F f)
186 {
187 // Get the function's associated executor, defaulting to thread_executor.
188 auto ex = get_associated_executor(f, thread_executor());
189
190 // Run the function, and as we're the last stage return a future so that the
191 // caller can wait for the pipeline to finish.
192 return post(ex, package([in, f = std::move(f)]() mutable { f(in); }));
193 }
194
195 // Launch an intermediate stage in a pipeline.
196 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)197 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
198 {
199 // Determine the output queue type.
200 typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
201
202 // Create the output queue and its implementation.
203 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
204 queue_front<output_value_type> out(out_impl);
205 queue_back<output_value_type> next_in(out_impl);
206
207 // Get the function's associated executor, defaulting to thread_executor.
208 auto ex = get_associated_executor(f, thread_executor());
209
210 // Run the function.
211 post(ex, [in, out, f = std::move(f)]() mutable
212 {
213 f(in, out);
214 out.stop();
215 });
216
217 // Launch the rest of the pipeline.
218 return pipeline(next_in, std::move(t)...);
219 }
220
221 // Launch the first stage in a pipeline.
222 template <class F, class... Tail>
pipeline(F f,Tail...t)223 std::future<void> pipeline(F f, Tail... t)
224 {
225 // Determine the output queue type.
226 typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
227
228 // Create the output queue and its implementation.
229 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
230 queue_front<output_value_type> out(out_impl);
231 queue_back<output_value_type> next_in(out_impl);
232
233 // Get the function's associated executor, defaulting to thread_executor.
234 auto ex = get_associated_executor(f, thread_executor());
235
236 // Run the function.
237 post(ex, [out, f = std::move(f)]() mutable
238 {
239 f(out);
240 out.stop();
241 });
242
243 // Launch the rest of the pipeline.
244 return pipeline(next_in, std::move(t)...);
245 }
246
247 //------------------------------------------------------------------------------
248
249 #include <asio/ts/thread_pool.hpp>
250 #include <iostream>
251 #include <string>
252
253 using asio::bind_executor;
254 using asio::thread_pool;
255
reader(queue_front<std::string> out)256 void reader(queue_front<std::string> out)
257 {
258 std::string line;
259 while (std::getline(std::cin, line))
260 out.push(line);
261 }
262
filter(queue_back<std::string> in,queue_front<std::string> out)263 void filter(queue_back<std::string> in, queue_front<std::string> out)
264 {
265 std::string line;
266 while (in.pop(line))
267 if (line.length() > 5)
268 out.push(line);
269 }
270
upper(queue_back<std::string> in,queue_front<std::string> out)271 void upper(queue_back<std::string> in, queue_front<std::string> out)
272 {
273 std::string line;
274 while (in.pop(line))
275 {
276 std::string new_line;
277 for (char c : line)
278 new_line.push_back(std::toupper(c));
279 out.push(new_line);
280 }
281 }
282
writer(queue_back<std::string> in)283 void writer(queue_back<std::string> in)
284 {
285 std::size_t count = 0;
286 std::string line;
287 while (in.pop(line))
288 std::cout << count++ << ": " << line << std::endl;
289 }
290
main()291 int main()
292 {
293 thread_pool pool;
294
295 auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
296 f.wait();
297 }
298