1 #include <asio/ts/executor.hpp>
2 #include <condition_variable>
3 #include <future>
4 #include <memory>
5 #include <mutex>
6 #include <queue>
7 #include <thread>
8 #include <vector>
9 
10 using asio::execution_context;
11 using asio::executor_binder;
12 using asio::get_associated_executor;
13 using asio::package;
14 using asio::post;
15 using asio::system_executor;
16 using asio::use_service;
17 
18 // An executor that launches a new thread for each function submitted to it.
19 // This class satisfies the Executor requirements.
20 class thread_executor
21 {
22 private:
23   // Service to track all threads started through a thread_executor.
24   class thread_bag : public execution_context::service
25   {
26   public:
27     static execution_context::id id;
28 
thread_bag(execution_context & ctx)29     explicit thread_bag(execution_context& ctx)
30       : execution_context::service(ctx)
31     {
32     }
33 
add_thread(std::thread && t)34     void add_thread(std::thread&& t)
35     {
36       std::unique_lock<std::mutex> lock(mutex_);
37       threads_.push_back(std::move(t));
38     }
39 
40   private:
shutdown()41     virtual void shutdown()
42     {
43       for (auto& t : threads_)
44         t.join();
45     }
46 
47     std::mutex mutex_;
48     std::vector<std::thread> threads_;
49   };
50 
51 public:
context()52   execution_context& context() noexcept
53   {
54     return system_executor().context();
55   }
56 
on_work_started()57   void on_work_started() noexcept
58   {
59     // This executor doesn't count work.
60   }
61 
on_work_finished()62   void on_work_finished() noexcept
63   {
64     // This executor doesn't count work.
65   }
66 
67   template <class Func, class Alloc>
dispatch(Func && f,const Alloc & a)68   void dispatch(Func&& f, const Alloc& a)
69   {
70     post(std::forward<Func>(f), a);
71   }
72 
73   template <class Func, class Alloc>
post(Func f,const Alloc &)74   void post(Func f, const Alloc&)
75   {
76     thread_bag& bag = use_service<thread_bag>(context());
77     bag.add_thread(std::thread(std::move(f)));
78   }
79 
80   template <class Func, class Alloc>
defer(Func && f,const Alloc & a)81   void defer(Func&& f, const Alloc& a)
82   {
83     post(std::forward<Func>(f), a);
84   }
85 
operator ==(const thread_executor &,const thread_executor &)86   friend bool operator==(const thread_executor&, const thread_executor&)
87   {
88     return true;
89   }
90 
operator !=(const thread_executor &,const thread_executor &)91   friend bool operator!=(const thread_executor&, const thread_executor&)
92   {
93     return false;
94   }
95 };
96 
97 execution_context::id thread_executor::thread_bag::id;
98 
99 namespace asio {
100   template <> struct is_executor<thread_executor> : std::true_type {};
101 }
102 
103 // Base class for all thread-safe queue implementations.
104 class queue_impl_base
105 {
106   template <class> friend class queue_front;
107   template <class> friend class queue_back;
108   std::mutex mutex_;
109   std::condition_variable condition_;
110   bool stop_ = false;
111 };
112 
113 // Underlying implementation of a thread-safe queue, shared between the
114 // queue_front and queue_back classes.
115 template <class T>
116 class queue_impl : public queue_impl_base
117 {
118   template <class> friend class queue_front;
119   template <class> friend class queue_back;
120   std::queue<T> queue_;
121 };
122 
123 // The front end of a queue between consecutive pipeline stages.
124 template <class T>
125 class queue_front
126 {
127 public:
128   typedef T value_type;
129 
queue_front(std::shared_ptr<queue_impl<T>> impl)130   explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
131     : impl_(impl)
132   {
133   }
134 
push(T t)135   void push(T t)
136   {
137     std::unique_lock<std::mutex> lock(impl_->mutex_);
138     impl_->queue_.push(std::move(t));
139     impl_->condition_.notify_one();
140   }
141 
stop()142   void stop()
143   {
144     std::unique_lock<std::mutex> lock(impl_->mutex_);
145     impl_->stop_ = true;
146     impl_->condition_.notify_one();
147   }
148 
149 private:
150   std::shared_ptr<queue_impl<T>> impl_;
151 };
152 
153 // The back end of a queue between consecutive pipeline stages.
154 template <class T>
155 class queue_back
156 {
157 public:
158   typedef T value_type;
159 
queue_back(std::shared_ptr<queue_impl<T>> impl)160   explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
161     : impl_(impl)
162   {
163   }
164 
pop(T & t)165   bool pop(T& t)
166   {
167     std::unique_lock<std::mutex> lock(impl_->mutex_);
168     while (impl_->queue_.empty() && !impl_->stop_)
169       impl_->condition_.wait(lock);
170     if (!impl_->queue_.empty())
171     {
172       t = impl_->queue_.front();
173       impl_->queue_.pop();
174       return true;
175     }
176     return false;
177   }
178 
179 private:
180   std::shared_ptr<queue_impl<T>> impl_;
181 };
182 
183 // Launch the last stage in a pipeline.
184 template <class T, class F>
pipeline(queue_back<T> in,F f)185 std::future<void> pipeline(queue_back<T> in, F f)
186 {
187   // Get the function's associated executor, defaulting to thread_executor.
188   auto ex = get_associated_executor(f, thread_executor());
189 
190   // Run the function, and as we're the last stage return a future so that the
191   // caller can wait for the pipeline to finish.
192   return post(ex, package([in, f = std::move(f)]() mutable { f(in); }));
193 }
194 
195 // Launch an intermediate stage in a pipeline.
196 template <class T, class F, class... Tail>
pipeline(queue_back<T> in,F f,Tail...t)197 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
198 {
199   // Determine the output queue type.
200   typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
201 
202   // Create the output queue and its implementation.
203   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
204   queue_front<output_value_type> out(out_impl);
205   queue_back<output_value_type> next_in(out_impl);
206 
207   // Get the function's associated executor, defaulting to thread_executor.
208   auto ex = get_associated_executor(f, thread_executor());
209 
210   // Run the function.
211   post(ex, [in, out, f = std::move(f)]() mutable
212       {
213         f(in, out);
214         out.stop();
215       });
216 
217   // Launch the rest of the pipeline.
218   return pipeline(next_in, std::move(t)...);
219 }
220 
221 // Launch the first stage in a pipeline.
222 template <class F, class... Tail>
pipeline(F f,Tail...t)223 std::future<void> pipeline(F f, Tail... t)
224 {
225   // Determine the output queue type.
226   typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
227 
228   // Create the output queue and its implementation.
229   auto out_impl = std::make_shared<queue_impl<output_value_type>>();
230   queue_front<output_value_type> out(out_impl);
231   queue_back<output_value_type> next_in(out_impl);
232 
233   // Get the function's associated executor, defaulting to thread_executor.
234   auto ex = get_associated_executor(f, thread_executor());
235 
236   // Run the function.
237   post(ex, [out, f = std::move(f)]() mutable
238       {
239         f(out);
240         out.stop();
241       });
242 
243   // Launch the rest of the pipeline.
244   return pipeline(next_in, std::move(t)...);
245 }
246 
247 //------------------------------------------------------------------------------
248 
249 #include <asio/ts/thread_pool.hpp>
250 #include <iostream>
251 #include <string>
252 
253 using asio::bind_executor;
254 using asio::thread_pool;
255 
reader(queue_front<std::string> out)256 void reader(queue_front<std::string> out)
257 {
258   std::string line;
259   while (std::getline(std::cin, line))
260     out.push(line);
261 }
262 
filter(queue_back<std::string> in,queue_front<std::string> out)263 void filter(queue_back<std::string> in, queue_front<std::string> out)
264 {
265   std::string line;
266   while (in.pop(line))
267     if (line.length() > 5)
268       out.push(line);
269 }
270 
upper(queue_back<std::string> in,queue_front<std::string> out)271 void upper(queue_back<std::string> in, queue_front<std::string> out)
272 {
273   std::string line;
274   while (in.pop(line))
275   {
276     std::string new_line;
277     for (char c : line)
278       new_line.push_back(std::toupper(c));
279     out.push(new_line);
280   }
281 }
282 
writer(queue_back<std::string> in)283 void writer(queue_back<std::string> in)
284 {
285   std::size_t count = 0;
286   std::string line;
287   while (in.pop(line))
288     std::cout << count++ << ": " << line << std::endl;
289 }
290 
main()291 int main()
292 {
293   thread_pool pool;
294 
295   auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
296   f.wait();
297 }
298