1 #include <asio/associated_executor.hpp>
2 #include <asio/bind_executor.hpp>
3 #include <asio/execution_context.hpp>
4 #include <asio/package.hpp>
5 #include <asio/post.hpp>
6 #include <asio/system_executor.hpp>
7 #include <condition_variable>
8 #include <future>
9 #include <memory>
10 #include <mutex>
11 #include <queue>
12 #include <thread>
13 #include <vector>
14 
15 using asio::execution_context;
16 using asio::executor_binder;
17 using asio::get_associated_executor;
18 using asio::package;
19 using asio::post;
20 using asio::system_executor;
21 using asio::use_service;
22 
23 // An executor that launches a new thread for each function submitted to it.
24 // This class satisfies the Executor requirements.
25 class thread_executor
26 {
27 private:
28   // Service to track all threads started through a thread_executor.
29   class thread_bag : public execution_context::service
30   {
31   public:
32     static execution_context::id id;
33 
thread_bag(execution_context & ctx)34     explicit thread_bag(execution_context& ctx)
35       : execution_context::service(ctx)
36     {
37     }
38 
add_thread(std::thread && t)39     void add_thread(std::thread&& t)
40     {
41       std::unique_lock<std::mutex> lock(mutex_);
42       threads_.push_back(std::move(t));
43     }
44 
45   private:
shutdown()46     virtual void shutdown()
47     {
48       for (auto& t : threads_)
49         t.join();
50     }
51 
52     std::mutex mutex_;
53     std::vector<std::thread> threads_;
54   };
55 
56 public:
context()57   execution_context& context() noexcept
58   {
59     return system_executor().context();
60   }
61 
on_work_started()62   void on_work_started() noexcept
63   {
64     // This executor doesn't count work.
65   }
66 
on_work_finished()67   void on_work_finished() noexcept
68   {
69     // This executor doesn't count work.
70   }
71 
72   template <class Func, class Alloc>
dispatch(Func && f,const Alloc & a)73   void dispatch(Func&& f, const Alloc& a)
74   {
75     post(std::forward<Func>(f), a);
76   }
77 
78   template <class Func, class Alloc>
post(Func f,const Alloc &)79   void post(Func f, const Alloc&)
80   {
81     thread_bag& bag = use_service<thread_bag>(context());
82     bag.add_thread(std::thread(std::move(f)));
83   }
84 
85   template <class Func, class Alloc>
defer(Func && f,const Alloc & a)86   void defer(Func&& f, const Alloc& a)
87   {
88     post(std::forward<Func>(f), a);
89   }
90 
operator ==(const thread_executor &,const thread_executor &)91   friend bool operator==(const thread_executor&, const thread_executor&)
92   {
93     return true;
94   }
95 
operator !=(const thread_executor &,const thread_executor &)96   friend bool operator!=(const thread_executor&, const thread_executor&)
97   {
98     return false;
99   }
100 };
101 
102 execution_context::id thread_executor::thread_bag::id;
103 
104 namespace asio {
105   template <> struct is_executor<thread_executor> : std::true_type {};
106 }
107 
108 // Base class for all thread-safe queue implementations.
109 class queue_impl_base
110 {
111   template <class> friend class queue_front;
112   template <class> friend class queue_back;
113   std::mutex mutex_;
114   std::condition_variable condition_;
115   bool stop_ = false;
116 };
117 
118 // Underlying implementation of a thread-safe queue, shared between the
119 // queue_front and queue_back classes.
120 template <class T>
121 class queue_impl : public queue_impl_base
122 {
123   template <class> friend class queue_front;
124   template <class> friend class queue_back;
125   std::queue<T> queue_;
126 };
127 
128 // The front end of a queue between consecutive pipeline stages.
129 template <class T>
130 class queue_front
131 {
132 public:
133   typedef T value_type;
134 
queue_front(std::shared_ptr<queue_impl<T>> impl)135   explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
136     : impl_(impl)
137   {
138   }
139 
push(T t)140   void push(T t)
141   {
142     std::unique_lock<std::mutex> lock(impl_->mutex_);
143     impl_->queue_.push(std::move(t));
144     impl_->condition_.notify_one();
145   }
146 
stop()147   void stop()
148   {
149     std::unique_lock<std::mutex> lock(impl_->mutex_);
150     impl_->stop_ = true;
151     impl_->condition_.notify_one();
152   }
153 
154 private:
155   std::shared_ptr<queue_impl<T>> impl_;
156 };
157 
158 // The back end of a queue between consecutive pipeline stages.
159 template <class T>
160 class queue_back
161 {
162 public:
163   typedef T value_type;
164 
queue_back(std::shared_ptr<queue_impl<T>> impl)165   explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
166     : impl_(impl)
167   {
168   }
169 
pop(T & t)170   bool pop(T& t)
171   {
172     std::unique_lock<std::mutex> lock(impl_->mutex_);
173     while (impl_->queue_.empty() && !impl_->stop_)
174       impl_->condition_.wait(lock);
175     if (!impl_->queue_.empty())
176     {
177       t = impl_->queue_.front();
178       impl_->queue_.pop();
179       return true;
180     }
181     return false;
182   }
183 
184 private:
185   std::shared_ptr<queue_impl<T>> impl_;
186 };
187 
188 // Launch the last stage in a pipeline.
189 template <class T, class F>
pipeline(queue_back<T> in,F f)190 std::future<void> pipeline(queue_back<T> in, F f)
191 {
192   // Get the function's associated executor, defaulting to thread_executor.
193   auto ex = get_associated_executor(f, thread_executor());
194 
195   // Run the function, and as we're the last stage return a future so that the
196   // caller can wait for the pipeline to finish.
197   return post(ex, package([in, f]() mutable { f(in); }));
198 }
199 
200 // Launch an intermediate stage in a pipeline.
201 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)202 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
203 {
204   // Determine the output queue type.
205   typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
206 
207   // Create the output queue and its implementation.
208   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
209   queue_front<output_value_type> out(out_impl);
210   queue_back<output_value_type> next_in(out_impl);
211 
212   // Get the function's associated executor, defaulting to thread_executor.
213   auto ex = get_associated_executor(f, thread_executor());
214 
215   // Run the function.
216   post(ex, [in, out, f]() mutable
217       {
218         f(in, out);
219         out.stop();
220       });
221 
222   // Launch the rest of the pipeline.
223   return pipeline(next_in, std::move(t)...);
224 }
225 
226 // Launch the first stage in a pipeline.
227 template <class F, class... Tail>
pipeline(F f,Tail...t)228 std::future<void> pipeline(F f, Tail... t)
229 {
230   // Determine the output queue type.
231   typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
232 
233   // Create the output queue and its implementation.
234   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
235   queue_front<output_value_type> out(out_impl);
236   queue_back<output_value_type> next_in(out_impl);
237 
238   // Get the function's associated executor, defaulting to thread_executor.
239   auto ex = get_associated_executor(f, thread_executor());
240 
241   // Run the function.
242   post(ex, [out, f]() mutable
243       {
244         f(out);
245         out.stop();
246       });
247 
248   // Launch the rest of the pipeline.
249   return pipeline(next_in, std::move(t)...);
250 }
251 
252 //------------------------------------------------------------------------------
253 
254 #include <asio/thread_pool.hpp>
255 #include <iostream>
256 #include <string>
257 
258 using asio::bind_executor;
259 using asio::thread_pool;
260 
reader(queue_front<std::string> out)261 void reader(queue_front<std::string> out)
262 {
263   std::string line;
264   while (std::getline(std::cin, line))
265     out.push(line);
266 }
267 
filter(queue_back<std::string> in,queue_front<std::string> out)268 void filter(queue_back<std::string> in, queue_front<std::string> out)
269 {
270   std::string line;
271   while (in.pop(line))
272     if (line.length() > 5)
273       out.push(line);
274 }
275 
upper(queue_back<std::string> in,queue_front<std::string> out)276 void upper(queue_back<std::string> in, queue_front<std::string> out)
277 {
278   std::string line;
279   while (in.pop(line))
280   {
281     std::string new_line;
282     for (char c : line)
283       new_line.push_back(std::toupper(c));
284     out.push(new_line);
285   }
286 }
287 
writer(queue_back<std::string> in)288 void writer(queue_back<std::string> in)
289 {
290   std::size_t count = 0;
291   std::string line;
292   while (in.pop(line))
293     std::cout << count++ << ": " << line << std::endl;
294 }
295 
main()296 int main()
297 {
298   thread_pool pool;
299 
300   auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
301   f.wait();
302 }
303