1 #ifndef __THREAD_PIPE_COOPERATIVE_POOL2_H__
2 #define __THREAD_PIPE_COOPERATIVE_POOL2_H__
3 
4 #include <cassert>
5 #include <thread>
6 #include <atomic>
7 #include <chrono>
8 #include <vector>
9 #include <type_traits>
10 
11 #include <thread_pipe/circular_buffer.hpp>
12 #include <thread_pipe/traits.hpp>
13 
14 /// Cooperative pool. Provide a link between many producers and many
15 /// consumers. It is cooperative in the sense that there is no
16 /// dedicated threads as producer. When the number of elements in the
17 /// queue from the producer to the consumer is less than half, then
18 /// the thread requesting an element attempts to become an additional
19 /// producer. It stays a producer until the producer to consumer queue
20 /// is full.
21 ///
22 /// This class must be subclassed using CRTP. `T` is the type of the
23 /// element passed around in the queues. The derived class must
24 /// implement the method `bool produce(uint32_t i, T& e)`. It is
25 /// called when a thread has become a producer. It must set in `e` the
26 /// new element, unless there is nothing more to produce. It returns
27 /// `true` if there is nothing more to produce (and `e` is not used),
28 /// `false` otherwise.
29 ///
30 /// The maximum number of producers is specified to the constructor of
31 /// the class (`max_producers`). The parameter `i` passed to `produce`
32 /// is in [0, max_producers) and it is guaranteed that at any given
33 /// time, no two producers have the same `i`.
34 ///
35 /// The following example will produce the integers `[0, 1000 * max)`,
36 /// with max producers.
37 ///
38 /// ~~~{.cc}
39 /// class sequence : public cooperative_pool<sequence, int> {
40 ///   const uint32_t max_;
41 ///   std::vector    cur_;
42 ///   uint32_t       done_;
43 /// public:
44 ///   sequence(uint32_t max) : max_(max), cur_(max, 0), done_(0) { }
45 ///   bool produce(uint32_t i, int& e) {
46 ///     int& cur = cur_[i];
47 ///     if(cur < max_) {
48 ///       e = i * max_ + cur++;
49 ///       return false;
50 ///     }
51 ///     return true;
52 ///   }
53 /// };
54 /// ~~~
55 ///
56 /// To access the elements (or the jobs) of the sequence, instantiate
57 /// a `sequence::job` object and check that it is not empty. If empty,
58 /// the sequence is over.
59 ///
60 /// ~~~{.cc}
61 /// sequence seq; // Sequence, instantiated in main thread
62 /// // In each consumer thread:
63 /// while(true) {
64 ///   sequence::job j(seq);
65 ///   if(j.is_empty())
66 ///     break;
67 ///   // Do computation using *j and j->
68 /// }
69 /// ~~~
70 
71 namespace thread_pipe {
72 namespace imp {
73 
74 // RAII token.
75 template<typename cbT>
76 struct take_token {
77   cbT&     tokens_;
78   uint32_t token_;
79   bool     drop_;
80 
take_tokenthread_pipe::imp::take_token81   take_token(cbT& tokens) : tokens_(tokens), token_(tokens.dequeue()), drop_(false) { }
~take_tokenthread_pipe::imp::take_token82   ~take_token() {
83     if(has_token() && !drop_) {
84       tokens_.enqueue_no_check(token_);
85       //        assert(tokens_.enqueue(token_));
86     }
87   }
has_tokenthread_pipe::imp::take_token88   bool has_token() const { return token_ != cbT::guard; }
dropthread_pipe::imp::take_token89   void drop() { drop_ = true; }
90 };
91 
92 /// Iterator, almost STL compliant. The iterator MUST be run until
93 /// reaching the end to not "loose" elements in the
94 /// pool. Alternatively, one can call release() reach the end.
95 template<typename Pool, typename Category>
96 class pool_iterator : public std::iterator<Category, typename Pool::element_type> {
97   typedef std::iterator<Category, typename Pool::element_type> super;
98   typedef typename Pool::size_type                             size_type;
99 
100   size_type m_i;
101   Pool*     m_cp;
102 public:
103   typedef typename super::value_type        value_type;
104   typedef typename super::difference_type   difference_type;
105   typedef typename super::pointer           pointer;
106   typedef typename super::reference         reference;
107   typedef typename super::iterator_category iterator_category;
108 
pool_iterator()109   pool_iterator() : m_i(Pool::cbT::guard), m_cp(nullptr) { }
pool_iterator(Pool & cp)110   pool_iterator(Pool& cp) : m_i(cp.get_element()), m_cp(&cp) { }
pool_iterator(const pool_iterator & rhs)111   pool_iterator(const pool_iterator& rhs) : m_i(rhs.m_i), m_cp(rhs.m_cp) { }
112   //  ~pool_iterator() { m_cp->release_element(m_i); }
113 
operator ==(const pool_iterator & rhs) const114   bool operator==(const pool_iterator& rhs) const { return m_i == rhs.m_i; }
operator !=(const pool_iterator & rhs) const115   bool operator!=(const pool_iterator& rhs) const { return m_i != rhs.m_i; }
operator *()116   value_type& operator*() { return m_cp->elts_[m_i]; }
operator ->()117   value_type* operator->() { return &m_cp->elts_[m_i]; }
118 
release()119   void release() { m_cp->release_element(m_i); }
operator ++()120   pool_iterator& operator++() {
121     m_cp->release_element(m_i);
122     m_i = m_cp->get_element();
123     return *this;
124   }
125 
operator ++(int)126   pool_iterator operator++(int) {
127     pool_iterator res(*this);
128     ++*this;
129     return res;
130   }
131 };
132 
133 
134 // Call the 'operator()' method on an object of type D, with 1 or 2
135 // arguments, based on arity. Check that calling types are correct.
136 template<int arity, typename D, typename T>
137 struct delegate { };
138 template<typename D, typename T>
139 struct delegate<1, D, T> {
callthread_pipe::imp::delegate140   static bool call(D* self, uint32_t i, T& e) { return (*self)(e); }
141 };
142 template<typename D, typename T>
143 struct delegate<2, D, T> {
callthread_pipe::imp::delegate144   static bool call(D* self, uint32_t i, T& e) { return (*self)(i, e); }
145 };
146 
147 // Status of the worker
148 enum ACTIVE_STATUS { WORKED, DONE, EXISTS };
149 
150 template<typename T, typename S>
151 class pool_base {
152 protected:
153   typedef S                          size_type;
154   typedef circular_buffer<size_type> cbT;
155   typedef T                          element_type;
156 
157   size_type      size_;
158   element_type*  elts_;
159   cbT            cons_prod_;    // FIFO from Consumers to Producers
160   cbT            prod_cons_;    // FIFO from Producers to Consumers
161   cbT            tokens_;       // FIFO with producer tokens
162   const uint32_t max_active_;
163   uint32_t       done_;         // Number of producer that are done
164 
165   // First 16 operations -> no delay. Then exponential back-off up to a second.
delay(int iteration)166   static void delay(int iteration) {
167     if(iteration < 16)
168       return;
169     std::this_thread::sleep_for(std::chrono::milliseconds(1 << std::min(iteration - 16, 10)));
170   }
171 
172 public:
pool_base(uint32_t max_active,size_type size)173   pool_base(uint32_t max_active, size_type size)
174     : size_(size)
175     , elts_(new element_type[size_])
176     , cons_prod_(size_ + 100)
177     , prod_cons_(size_ + 100)
178     , tokens_(max_active + 1)
179     , max_active_(max_active)
180     , done_(0)
181   {
182     // Every element is empty and ready to be filled by the producer
183     for(size_t i = 0; i < size_; ++i)
184       cons_prod_.enqueue_no_check(i);
185 
186     // Every producer token is free
187     for(uint32_t i = 0; i < max_active_; ++i)
188       tokens_.enqueue_no_check(i);
189   }
190 
~pool_base()191   ~pool_base() { delete [] elts_; }
192 
size() const193   size_type size() const { return size_; }
194 
element_begin()195   element_type* element_begin() { return elts_; }
element_end()196   element_type* element_end() { return elts_ + size_; }
197 };
198 
199 
200 template<typename D, typename T>
201 class producer_pool : public pool_base<T, uint32_t> {
202   typedef pool_base<T, uint32_t> super;
203 public:
204   typedef typename super::size_type                             size_type;
205   typedef circular_buffer<size_type>                            cbT;
206   typedef T                                                     element_type;
207   typedef pool_iterator<producer_pool, std::input_iterator_tag> iterator;
208 
209   friend iterator;
210 
211 public:
producer_pool(uint32_t max_active=1,size_type size=4* std::thread::hardware_concurrency ())212   producer_pool(uint32_t max_active = 1, size_type size = 4 * std::thread::hardware_concurrency())
213     : super(max_active, size)
214   { }
215 
216 
begin()217   iterator begin() { return iterator(*this); }
begin() const218   const iterator begin() const { return iterator(*this); }
end() const219   const iterator end() const { return iterator(); }
220 
221 
222 private:
get_element()223   size_type get_element() {
224     int iteration = 0;
225 
226     while(true) {
227       // If less than half full -> try to fill up producer to consumer
228       // queue. Disregard return value: in any case will
229       // attempt to get an element for ourselves
230       if(super::prod_cons_.fill() < super::size() / 2)
231         become_producer();
232 
233       size_type i = super::prod_cons_.dequeue();
234       if(i != cbT::guard)
235         return i;
236 
237       // Try to become producer
238       switch(become_producer()) {
239       case WORKED:
240         iteration = 0; // Produced. Attempt anew to get an element
241         break;
242       case DONE:
243         return super::prod_cons_.dequeue();
244       case EXISTS:
245         super::delay(iteration++); // Already a producer. Wait a bit it adds things to queue
246         break;
247       }
248     }
249   }
250 
release_element(uint32_t i)251   void release_element(uint32_t i) {
252     if(i != cbT::guard)
253       super::cons_prod_.enqueue_no_check(i);
254   }
255 
become_producer()256   ACTIVE_STATUS become_producer() {
257     typedef utils::function_traits<decltype(&D::operator())> fun_traits;
258     typedef delegate<fun_traits::arity, D, T> delegate;
259 
260     if(super::prod_cons_.is_closed())
261       return DONE;
262 
263     // Mark that we have a produce (myself). If not, return. Token
264     // will be release automatically at end of method.
265     take_token<cbT> producer_token(super::tokens_);
266     if(!producer_token.has_token())
267       return EXISTS;
268 
269     size_type i = cbT::guard;
270     try {
271       while(true) { // Only way out is if produce method is done (returns true or throw an exception)
272         i = super::cons_prod_.dequeue();
273         if(i == cbT::guard)
274           return WORKED;
275 
276         //        if(static_cast<D*>(this)->produce(producer_token.token_, elts_[i])) // produce returns true if done
277         if(delegate::call(static_cast<D*>(this), producer_token.token_, super::elts_[i]))
278           break;
279 
280         super::prod_cons_.enqueue_no_check(i);
281       }
282     } catch(...) { }       // Threw an exception -> same as being done
283 
284     // Producing is done for this producer
285     super::    cons_prod_.enqueue_no_check(i);
286     producer_token.drop();
287 
288     uint32_t is_done = ++super::done_;
289     if(is_done < super::max_active_)
290       return WORKED;
291 
292     super::prod_cons_.close();
293     return DONE;
294   }
295 };
296 
297 template<typename D, typename T>
298 class consumer_pool : public pool_base<T, uint32_t> {
299   typedef pool_base<T, uint32_t> super;
300 public:
301   typedef typename super::size_type                              size_type;
302   typedef circular_buffer<size_type>                             cbT;
303   typedef T                                                      element_type;
304   typedef pool_iterator<consumer_pool, std::output_iterator_tag> iterator;
305 
306   friend iterator;
307 
308 public:
consumer_pool(uint32_t max_active=1,size_type size=4* std::thread::hardware_concurrency ())309   consumer_pool(uint32_t max_active = 1, size_type size = 4 * std::thread::hardware_concurrency())
310     : super(max_active, size)
311   { }
312 
close()313   void close() {
314     while(true) {
315       switch(become_consumer(true)) {
316       case WORKED: // Consumed some, try again until no more
317         break;
318       case DONE: // If done -> quit
319       case EXISTS: // If consumer exists, let it do the closing -> quit
320         return;
321       }
322     }
323   }
324 
begin()325   iterator begin() { return iterator(*this); }
begin() const326   const iterator begin() const { return iterator(*this); }
end() const327   const iterator end() const { return iterator(); }
328 
329 
330 private:
get_element()331   size_type get_element() {
332     int iteration = 0;
333 
334     while(true) {
335       // If more than half full -> try to empty consumer to producer
336       // queue. Disregard return value: in any case will attempt to
337       // get an element for ourselves
338       if(super::prod_cons_.fill() > super::size() / 2)
339         become_consumer();
340 
341       size_type i = super::cons_prod_.dequeue();
342       if(i != cbT::guard)
343         return i;
344 
345       // Try to become consumer
346       switch(become_consumer()) {
347       case WORKED:
348         iteration = 0; // Consumed. Attempt anew to get an element
349         break;
350       case DONE:
351         return super::cons_prod_.dequeue();
352       case EXISTS:
353         super::delay(iteration++); // Already a consumer. Wait a bit it adds things to queue
354         break;
355       }
356     }
357   }
358 
release_element(size_type i)359   void release_element(size_type i) {
360     if(i != cbT::guard)
361       super::prod_cons_.enqueue_no_check(i);
362   }
363 
become_consumer(bool close=false)364   ACTIVE_STATUS become_consumer(bool close = false) {
365     typedef utils::function_traits<decltype(&D::operator())> fun_traits;
366     typedef delegate<fun_traits::arity, D, T> delegate;
367 
368     if(super::cons_prod_.is_closed())
369       return DONE;
370 
371     // Mark that we have a consume (myself). If not, return. Token
372     // will be release automatically at end of method.
373     take_token<cbT> consumer_token(super::tokens_);
374     if(!consumer_token.has_token())
375       return EXISTS;
376 
377     size_type i = cbT::guard;
378     try {
379       while(true) { // Only way out is if consume method is done (returns true or throw an exception)
380         i = super::prod_cons_.dequeue();
381         if(i == cbT::guard)
382           return !close ? WORKED : DONE;
383 
384         if(delegate::call(static_cast<D*>(this), consumer_token.token_, super::elts_[i]))
385           break;
386 
387         super::cons_prod_.enqueue_no_check(i);
388       }
389     } catch(...) { }       // Threw an exception -> same as being done
390 
391     // Consumming is done for this consumer
392     super::prod_cons_.enqueue_no_check(i);
393     consumer_token.drop();
394 
395     uint32_t is_done = ++super::done_;
396     if(is_done < super::max_active_)
397       return WORKED;
398 
399     super::cons_prod_.close();
400     return DONE;
401   }
402 };
403 
404 template<typename Pipe>
405 class pipe_input_iterator : public std::iterator<std::input_iterator_tag, typename Pipe::element_type> {
406   typedef std::iterator<std::input_iterator_tag, typename Pipe::element_type> super;
407   typedef pool_iterator<typename Pipe::pool_type, std::input_iterator_tag>    iterator;
408 
409   iterator m_it;
410   size_t   m_off;
411 
412 public:
413   typedef typename super::value_type        value_type;
414   typedef typename super::difference_type   difference_type;
415   typedef typename super::pointer           pointer;
416   typedef typename super::reference         reference;
417   typedef typename super::iterator_category iterator_category;
418 
pipe_input_iterator(iterator it)419   pipe_input_iterator(iterator it) : m_it(it), m_off(0) { }
420 
operator ==(const pipe_input_iterator & rhs) const421   bool operator==(const pipe_input_iterator& rhs) const { return m_it == rhs.m_it; }
operator !=(const pipe_input_iterator & rhs) const422   bool operator!=(const pipe_input_iterator& rhs) const { return m_it != rhs.m_it; }
operator *()423   value_type& operator*() { return m_it->elts[m_off]; }
operator ->()424   value_type* operator->() { return &this->operator*(); }
425 
operator ++()426   pipe_input_iterator& operator++() {
427     ++m_off;
428     if(m_off >= m_it->filled) {
429       m_it->filled = 0;
430       ++m_it;
431       m_off = 0;
432     }
433     return *this;
434   }
435 
operator ++(int)436   pipe_input_iterator operator++(int) {
437     pipe_input_iterator res(*this);
438     ++*this;
439     return res;
440   }
441 };
442 
443 template<typename Pipe>
444 class pipe_output_iterator : public std::iterator<std::output_iterator_tag, typename Pipe::element_type> {
445   typedef std::iterator<std::output_iterator_tag, typename Pipe::element_type> super;
446   typedef pool_iterator<typename Pipe::pool_type, std::output_iterator_tag>    iterator;
447 
448   iterator m_it;
449 
450 public:
451   typedef typename super::value_type        value_type;
452   typedef typename super::difference_type   difference_type;
453   typedef typename super::pointer           pointer;
454   typedef typename super::reference         reference;
455   typedef typename super::iterator_category iterator_category;
456 
pipe_output_iterator(iterator it)457   pipe_output_iterator(iterator it) : m_it(it) { }
458 
operator ==(const pipe_output_iterator & rhs) const459   bool operator==(const pipe_output_iterator& rhs) const { return m_it == rhs.m_it; }
operator !=(const pipe_output_iterator & rhs) const460   bool operator!=(const pipe_output_iterator& rhs) const { return m_it != rhs.m_it; }
operator *()461   value_type& operator*() {
462     auto& x = *m_it;
463     return x.elts[x.filled];
464   }
operator ->()465   value_type* operator->() { return &this->operator*(); }
flush()466   void flush() { if(m_it->filled) ++m_it; }
done()467   void done() { ++*this; flush(); m_it.release(); }
468 
operator ++()469   pipe_output_iterator& operator++() {
470     ++m_it->filled;
471     if(m_it->filled >= m_it->elts.size())
472       ++m_it;
473     return *this;
474   }
475 
operator ++(int)476   pipe_output_iterator operator++(int) {
477     pipe_output_iterator res(*this);
478     ++*this;
479     return res;
480   }
481 };
482 
483 
484 template<typename T>
485 struct group {
486   size_t         filled;
487   std::vector<T> elts;
488 };
489 
490 template<typename D, typename T>
491 class producer_pipe
492   : public producer_pool<producer_pipe<D, T>, group<T>>
493 {
494 public:
495   typedef producer_pool<producer_pipe, group<T>>  pool_type;
496   typedef typename pool_type::size_type           size_type;
497   typedef T                                       element_type;
498   typedef pipe_input_iterator<producer_pipe>      iterator;
499 
producer_pipe(uint32_t max_active=1,uint32_t depth=10,size_type size=4* std::thread::hardware_concurrency ())500   producer_pipe(uint32_t max_active = 1, uint32_t depth = 10, size_type size = 4 * std::thread::hardware_concurrency())
501     : pool_type(max_active, size)
502   {
503     for(auto it = pool_type::element_begin(); it != pool_type::element_end(); ++it) {
504       it->filled = 0;
505       it->elts.resize(depth);
506     }
507   }
508 
operator ()(uint32_t i,group<T> & g)509   bool operator()(uint32_t i, group<T>& g) {
510     typedef utils::function_traits<decltype(&D::operator())> fun_traits;
511     typedef delegate<fun_traits::arity, D, T> delegate;
512 
513     size_t& f = g.filled;
514     for(f = 0; f < g.elts.size(); ++f) {
515       if(delegate::call(static_cast<D*>(this), i, g.elts[f]))
516         break;
517     }
518     return f == 0; // Nothing filled -> done
519   }
520 
begin()521   iterator begin() { return iterator(pool_type::begin()); }
begin() const522   const iterator begin() const { return iterator(pool_type::begin()); }
end() const523   const iterator end() const { return iterator(pool_type::end()); }
524 };
525 
526 template<typename D, typename T>
527 class consumer_pipe
528   : public consumer_pool<consumer_pipe<D, T>, group<T>>
529 {
530  public:
531   typedef consumer_pool<consumer_pipe, group<T>> pool_type;
532   typedef typename pool_type::size_type          size_type;
533   typedef T                                      element_type;
534   typedef pipe_output_iterator<consumer_pipe>    iterator;
535 
consumer_pipe(uint32_t max_active=1,uint32_t depth=10,size_type size=4* std::thread::hardware_concurrency ())536   consumer_pipe(uint32_t max_active = 1, uint32_t depth = 10, size_type size = 4 * std::thread::hardware_concurrency())
537     : pool_type(max_active, size)
538   {
539     for(auto it = pool_type::element_begin(); it != pool_type::element_end(); ++it) {
540       it->filled = 0;
541       it->elts.resize(depth);
542     }
543   }
544 
operator ()(uint32_t i,group<T> & g)545   bool operator()(uint32_t i, group<T>& g) {
546     typedef utils::function_traits<decltype(&D::operator())> fun_traits;
547     typedef delegate<fun_traits::arity, D, T> delegate;
548 
549     for(uint32_t f = 0; f < g.filled; ++f) {
550       if(delegate::call(static_cast<D*>(this), i, g.elts[f]))
551         return true; // Failed -> done
552     }
553     g.filled = 0;
554     return false;
555   }
556 
begin()557   iterator begin() { return iterator(pool_type::begin()); }
begin() const558   const iterator begin() const { return iterator(pool_type::begin()); }
end() const559   const iterator end() const { return iterator(pool_type::end()); }
560 };
561 
562 } // namespace imp
563 } // namespace thread_pipe
564 
565 #endif /* __THREAD_PIPE_COOPERATIVE_POOL2_H__ */
566