1 #ifndef __THREAD_PIPE_COOPERATIVE_POOL2_H__ 2 #define __THREAD_PIPE_COOPERATIVE_POOL2_H__ 3 4 #include <cassert> 5 #include <thread> 6 #include <atomic> 7 #include <chrono> 8 #include <vector> 9 #include <type_traits> 10 11 #include <thread_pipe/circular_buffer.hpp> 12 #include <thread_pipe/traits.hpp> 13 14 /// Cooperative pool. Provide a link between many producers and many 15 /// consumers. It is cooperative in the sense that there is no 16 /// dedicated threads as producer. When the number of elements in the 17 /// queue from the producer to the consumer is less than half, then 18 /// the thread requesting an element attempts to become an additional 19 /// producer. It stays a producer until the producer to consumer queue 20 /// is full. 21 /// 22 /// This class must be subclassed using CRTP. `T` is the type of the 23 /// element passed around in the queues. The derived class must 24 /// implement the method `bool produce(uint32_t i, T& e)`. It is 25 /// called when a thread has become a producer. It must set in `e` the 26 /// new element, unless there is nothing more to produce. It returns 27 /// `true` if there is nothing more to produce (and `e` is not used), 28 /// `false` otherwise. 29 /// 30 /// The maximum number of producers is specified to the constructor of 31 /// the class (`max_producers`). The parameter `i` passed to `produce` 32 /// is in [0, max_producers) and it is guaranteed that at any given 33 /// time, no two producers have the same `i`. 34 /// 35 /// The following example will produce the integers `[0, 1000 * max)`, 36 /// with max producers. 37 /// 38 /// ~~~{.cc} 39 /// class sequence : public cooperative_pool<sequence, int> { 40 /// const uint32_t max_; 41 /// std::vector cur_; 42 /// uint32_t done_; 43 /// public: 44 /// sequence(uint32_t max) : max_(max), cur_(max, 0), done_(0) { } 45 /// bool produce(uint32_t i, int& e) { 46 /// int& cur = cur_[i]; 47 /// if(cur < max_) { 48 /// e = i * max_ + cur++; 49 /// return false; 50 /// } 51 /// return true; 52 /// } 53 /// }; 54 /// ~~~ 55 /// 56 /// To access the elements (or the jobs) of the sequence, instantiate 57 /// a `sequence::job` object and check that it is not empty. If empty, 58 /// the sequence is over. 59 /// 60 /// ~~~{.cc} 61 /// sequence seq; // Sequence, instantiated in main thread 62 /// // In each consumer thread: 63 /// while(true) { 64 /// sequence::job j(seq); 65 /// if(j.is_empty()) 66 /// break; 67 /// // Do computation using *j and j-> 68 /// } 69 /// ~~~ 70 71 namespace thread_pipe { 72 namespace imp { 73 74 // RAII token. 75 template<typename cbT> 76 struct take_token { 77 cbT& tokens_; 78 uint32_t token_; 79 bool drop_; 80 take_tokenthread_pipe::imp::take_token81 take_token(cbT& tokens) : tokens_(tokens), token_(tokens.dequeue()), drop_(false) { } ~take_tokenthread_pipe::imp::take_token82 ~take_token() { 83 if(has_token() && !drop_) { 84 tokens_.enqueue_no_check(token_); 85 // assert(tokens_.enqueue(token_)); 86 } 87 } has_tokenthread_pipe::imp::take_token88 bool has_token() const { return token_ != cbT::guard; } dropthread_pipe::imp::take_token89 void drop() { drop_ = true; } 90 }; 91 92 /// Iterator, almost STL compliant. The iterator MUST be run until 93 /// reaching the end to not "loose" elements in the 94 /// pool. Alternatively, one can call release() reach the end. 95 template<typename Pool, typename Category> 96 class pool_iterator : public std::iterator<Category, typename Pool::element_type> { 97 typedef std::iterator<Category, typename Pool::element_type> super; 98 typedef typename Pool::size_type size_type; 99 100 size_type m_i; 101 Pool* m_cp; 102 public: 103 typedef typename super::value_type value_type; 104 typedef typename super::difference_type difference_type; 105 typedef typename super::pointer pointer; 106 typedef typename super::reference reference; 107 typedef typename super::iterator_category iterator_category; 108 pool_iterator()109 pool_iterator() : m_i(Pool::cbT::guard), m_cp(nullptr) { } pool_iterator(Pool & cp)110 pool_iterator(Pool& cp) : m_i(cp.get_element()), m_cp(&cp) { } pool_iterator(const pool_iterator & rhs)111 pool_iterator(const pool_iterator& rhs) : m_i(rhs.m_i), m_cp(rhs.m_cp) { } 112 // ~pool_iterator() { m_cp->release_element(m_i); } 113 operator ==(const pool_iterator & rhs) const114 bool operator==(const pool_iterator& rhs) const { return m_i == rhs.m_i; } operator !=(const pool_iterator & rhs) const115 bool operator!=(const pool_iterator& rhs) const { return m_i != rhs.m_i; } operator *()116 value_type& operator*() { return m_cp->elts_[m_i]; } operator ->()117 value_type* operator->() { return &m_cp->elts_[m_i]; } 118 release()119 void release() { m_cp->release_element(m_i); } operator ++()120 pool_iterator& operator++() { 121 m_cp->release_element(m_i); 122 m_i = m_cp->get_element(); 123 return *this; 124 } 125 operator ++(int)126 pool_iterator operator++(int) { 127 pool_iterator res(*this); 128 ++*this; 129 return res; 130 } 131 }; 132 133 134 // Call the 'operator()' method on an object of type D, with 1 or 2 135 // arguments, based on arity. Check that calling types are correct. 136 template<int arity, typename D, typename T> 137 struct delegate { }; 138 template<typename D, typename T> 139 struct delegate<1, D, T> { callthread_pipe::imp::delegate140 static bool call(D* self, uint32_t i, T& e) { return (*self)(e); } 141 }; 142 template<typename D, typename T> 143 struct delegate<2, D, T> { callthread_pipe::imp::delegate144 static bool call(D* self, uint32_t i, T& e) { return (*self)(i, e); } 145 }; 146 147 // Status of the worker 148 enum ACTIVE_STATUS { WORKED, DONE, EXISTS }; 149 150 template<typename T, typename S> 151 class pool_base { 152 protected: 153 typedef S size_type; 154 typedef circular_buffer<size_type> cbT; 155 typedef T element_type; 156 157 size_type size_; 158 element_type* elts_; 159 cbT cons_prod_; // FIFO from Consumers to Producers 160 cbT prod_cons_; // FIFO from Producers to Consumers 161 cbT tokens_; // FIFO with producer tokens 162 const uint32_t max_active_; 163 uint32_t done_; // Number of producer that are done 164 165 // First 16 operations -> no delay. Then exponential back-off up to a second. delay(int iteration)166 static void delay(int iteration) { 167 if(iteration < 16) 168 return; 169 std::this_thread::sleep_for(std::chrono::milliseconds(1 << std::min(iteration - 16, 10))); 170 } 171 172 public: pool_base(uint32_t max_active,size_type size)173 pool_base(uint32_t max_active, size_type size) 174 : size_(size) 175 , elts_(new element_type[size_]) 176 , cons_prod_(size_ + 100) 177 , prod_cons_(size_ + 100) 178 , tokens_(max_active + 1) 179 , max_active_(max_active) 180 , done_(0) 181 { 182 // Every element is empty and ready to be filled by the producer 183 for(size_t i = 0; i < size_; ++i) 184 cons_prod_.enqueue_no_check(i); 185 186 // Every producer token is free 187 for(uint32_t i = 0; i < max_active_; ++i) 188 tokens_.enqueue_no_check(i); 189 } 190 ~pool_base()191 ~pool_base() { delete [] elts_; } 192 size() const193 size_type size() const { return size_; } 194 element_begin()195 element_type* element_begin() { return elts_; } element_end()196 element_type* element_end() { return elts_ + size_; } 197 }; 198 199 200 template<typename D, typename T> 201 class producer_pool : public pool_base<T, uint32_t> { 202 typedef pool_base<T, uint32_t> super; 203 public: 204 typedef typename super::size_type size_type; 205 typedef circular_buffer<size_type> cbT; 206 typedef T element_type; 207 typedef pool_iterator<producer_pool, std::input_iterator_tag> iterator; 208 209 friend iterator; 210 211 public: producer_pool(uint32_t max_active=1,size_type size=4* std::thread::hardware_concurrency ())212 producer_pool(uint32_t max_active = 1, size_type size = 4 * std::thread::hardware_concurrency()) 213 : super(max_active, size) 214 { } 215 216 begin()217 iterator begin() { return iterator(*this); } begin() const218 const iterator begin() const { return iterator(*this); } end() const219 const iterator end() const { return iterator(); } 220 221 222 private: get_element()223 size_type get_element() { 224 int iteration = 0; 225 226 while(true) { 227 // If less than half full -> try to fill up producer to consumer 228 // queue. Disregard return value: in any case will 229 // attempt to get an element for ourselves 230 if(super::prod_cons_.fill() < super::size() / 2) 231 become_producer(); 232 233 size_type i = super::prod_cons_.dequeue(); 234 if(i != cbT::guard) 235 return i; 236 237 // Try to become producer 238 switch(become_producer()) { 239 case WORKED: 240 iteration = 0; // Produced. Attempt anew to get an element 241 break; 242 case DONE: 243 return super::prod_cons_.dequeue(); 244 case EXISTS: 245 super::delay(iteration++); // Already a producer. Wait a bit it adds things to queue 246 break; 247 } 248 } 249 } 250 release_element(uint32_t i)251 void release_element(uint32_t i) { 252 if(i != cbT::guard) 253 super::cons_prod_.enqueue_no_check(i); 254 } 255 become_producer()256 ACTIVE_STATUS become_producer() { 257 typedef utils::function_traits<decltype(&D::operator())> fun_traits; 258 typedef delegate<fun_traits::arity, D, T> delegate; 259 260 if(super::prod_cons_.is_closed()) 261 return DONE; 262 263 // Mark that we have a produce (myself). If not, return. Token 264 // will be release automatically at end of method. 265 take_token<cbT> producer_token(super::tokens_); 266 if(!producer_token.has_token()) 267 return EXISTS; 268 269 size_type i = cbT::guard; 270 try { 271 while(true) { // Only way out is if produce method is done (returns true or throw an exception) 272 i = super::cons_prod_.dequeue(); 273 if(i == cbT::guard) 274 return WORKED; 275 276 // if(static_cast<D*>(this)->produce(producer_token.token_, elts_[i])) // produce returns true if done 277 if(delegate::call(static_cast<D*>(this), producer_token.token_, super::elts_[i])) 278 break; 279 280 super::prod_cons_.enqueue_no_check(i); 281 } 282 } catch(...) { } // Threw an exception -> same as being done 283 284 // Producing is done for this producer 285 super:: cons_prod_.enqueue_no_check(i); 286 producer_token.drop(); 287 288 uint32_t is_done = ++super::done_; 289 if(is_done < super::max_active_) 290 return WORKED; 291 292 super::prod_cons_.close(); 293 return DONE; 294 } 295 }; 296 297 template<typename D, typename T> 298 class consumer_pool : public pool_base<T, uint32_t> { 299 typedef pool_base<T, uint32_t> super; 300 public: 301 typedef typename super::size_type size_type; 302 typedef circular_buffer<size_type> cbT; 303 typedef T element_type; 304 typedef pool_iterator<consumer_pool, std::output_iterator_tag> iterator; 305 306 friend iterator; 307 308 public: consumer_pool(uint32_t max_active=1,size_type size=4* std::thread::hardware_concurrency ())309 consumer_pool(uint32_t max_active = 1, size_type size = 4 * std::thread::hardware_concurrency()) 310 : super(max_active, size) 311 { } 312 close()313 void close() { 314 while(true) { 315 switch(become_consumer(true)) { 316 case WORKED: // Consumed some, try again until no more 317 break; 318 case DONE: // If done -> quit 319 case EXISTS: // If consumer exists, let it do the closing -> quit 320 return; 321 } 322 } 323 } 324 begin()325 iterator begin() { return iterator(*this); } begin() const326 const iterator begin() const { return iterator(*this); } end() const327 const iterator end() const { return iterator(); } 328 329 330 private: get_element()331 size_type get_element() { 332 int iteration = 0; 333 334 while(true) { 335 // If more than half full -> try to empty consumer to producer 336 // queue. Disregard return value: in any case will attempt to 337 // get an element for ourselves 338 if(super::prod_cons_.fill() > super::size() / 2) 339 become_consumer(); 340 341 size_type i = super::cons_prod_.dequeue(); 342 if(i != cbT::guard) 343 return i; 344 345 // Try to become consumer 346 switch(become_consumer()) { 347 case WORKED: 348 iteration = 0; // Consumed. Attempt anew to get an element 349 break; 350 case DONE: 351 return super::cons_prod_.dequeue(); 352 case EXISTS: 353 super::delay(iteration++); // Already a consumer. Wait a bit it adds things to queue 354 break; 355 } 356 } 357 } 358 release_element(size_type i)359 void release_element(size_type i) { 360 if(i != cbT::guard) 361 super::prod_cons_.enqueue_no_check(i); 362 } 363 become_consumer(bool close=false)364 ACTIVE_STATUS become_consumer(bool close = false) { 365 typedef utils::function_traits<decltype(&D::operator())> fun_traits; 366 typedef delegate<fun_traits::arity, D, T> delegate; 367 368 if(super::cons_prod_.is_closed()) 369 return DONE; 370 371 // Mark that we have a consume (myself). If not, return. Token 372 // will be release automatically at end of method. 373 take_token<cbT> consumer_token(super::tokens_); 374 if(!consumer_token.has_token()) 375 return EXISTS; 376 377 size_type i = cbT::guard; 378 try { 379 while(true) { // Only way out is if consume method is done (returns true or throw an exception) 380 i = super::prod_cons_.dequeue(); 381 if(i == cbT::guard) 382 return !close ? WORKED : DONE; 383 384 if(delegate::call(static_cast<D*>(this), consumer_token.token_, super::elts_[i])) 385 break; 386 387 super::cons_prod_.enqueue_no_check(i); 388 } 389 } catch(...) { } // Threw an exception -> same as being done 390 391 // Consumming is done for this consumer 392 super::prod_cons_.enqueue_no_check(i); 393 consumer_token.drop(); 394 395 uint32_t is_done = ++super::done_; 396 if(is_done < super::max_active_) 397 return WORKED; 398 399 super::cons_prod_.close(); 400 return DONE; 401 } 402 }; 403 404 template<typename Pipe> 405 class pipe_input_iterator : public std::iterator<std::input_iterator_tag, typename Pipe::element_type> { 406 typedef std::iterator<std::input_iterator_tag, typename Pipe::element_type> super; 407 typedef pool_iterator<typename Pipe::pool_type, std::input_iterator_tag> iterator; 408 409 iterator m_it; 410 size_t m_off; 411 412 public: 413 typedef typename super::value_type value_type; 414 typedef typename super::difference_type difference_type; 415 typedef typename super::pointer pointer; 416 typedef typename super::reference reference; 417 typedef typename super::iterator_category iterator_category; 418 pipe_input_iterator(iterator it)419 pipe_input_iterator(iterator it) : m_it(it), m_off(0) { } 420 operator ==(const pipe_input_iterator & rhs) const421 bool operator==(const pipe_input_iterator& rhs) const { return m_it == rhs.m_it; } operator !=(const pipe_input_iterator & rhs) const422 bool operator!=(const pipe_input_iterator& rhs) const { return m_it != rhs.m_it; } operator *()423 value_type& operator*() { return m_it->elts[m_off]; } operator ->()424 value_type* operator->() { return &this->operator*(); } 425 operator ++()426 pipe_input_iterator& operator++() { 427 ++m_off; 428 if(m_off >= m_it->filled) { 429 m_it->filled = 0; 430 ++m_it; 431 m_off = 0; 432 } 433 return *this; 434 } 435 operator ++(int)436 pipe_input_iterator operator++(int) { 437 pipe_input_iterator res(*this); 438 ++*this; 439 return res; 440 } 441 }; 442 443 template<typename Pipe> 444 class pipe_output_iterator : public std::iterator<std::output_iterator_tag, typename Pipe::element_type> { 445 typedef std::iterator<std::output_iterator_tag, typename Pipe::element_type> super; 446 typedef pool_iterator<typename Pipe::pool_type, std::output_iterator_tag> iterator; 447 448 iterator m_it; 449 450 public: 451 typedef typename super::value_type value_type; 452 typedef typename super::difference_type difference_type; 453 typedef typename super::pointer pointer; 454 typedef typename super::reference reference; 455 typedef typename super::iterator_category iterator_category; 456 pipe_output_iterator(iterator it)457 pipe_output_iterator(iterator it) : m_it(it) { } 458 operator ==(const pipe_output_iterator & rhs) const459 bool operator==(const pipe_output_iterator& rhs) const { return m_it == rhs.m_it; } operator !=(const pipe_output_iterator & rhs) const460 bool operator!=(const pipe_output_iterator& rhs) const { return m_it != rhs.m_it; } operator *()461 value_type& operator*() { 462 auto& x = *m_it; 463 return x.elts[x.filled]; 464 } operator ->()465 value_type* operator->() { return &this->operator*(); } flush()466 void flush() { if(m_it->filled) ++m_it; } done()467 void done() { ++*this; flush(); m_it.release(); } 468 operator ++()469 pipe_output_iterator& operator++() { 470 ++m_it->filled; 471 if(m_it->filled >= m_it->elts.size()) 472 ++m_it; 473 return *this; 474 } 475 operator ++(int)476 pipe_output_iterator operator++(int) { 477 pipe_output_iterator res(*this); 478 ++*this; 479 return res; 480 } 481 }; 482 483 484 template<typename T> 485 struct group { 486 size_t filled; 487 std::vector<T> elts; 488 }; 489 490 template<typename D, typename T> 491 class producer_pipe 492 : public producer_pool<producer_pipe<D, T>, group<T>> 493 { 494 public: 495 typedef producer_pool<producer_pipe, group<T>> pool_type; 496 typedef typename pool_type::size_type size_type; 497 typedef T element_type; 498 typedef pipe_input_iterator<producer_pipe> iterator; 499 producer_pipe(uint32_t max_active=1,uint32_t depth=10,size_type size=4* std::thread::hardware_concurrency ())500 producer_pipe(uint32_t max_active = 1, uint32_t depth = 10, size_type size = 4 * std::thread::hardware_concurrency()) 501 : pool_type(max_active, size) 502 { 503 for(auto it = pool_type::element_begin(); it != pool_type::element_end(); ++it) { 504 it->filled = 0; 505 it->elts.resize(depth); 506 } 507 } 508 operator ()(uint32_t i,group<T> & g)509 bool operator()(uint32_t i, group<T>& g) { 510 typedef utils::function_traits<decltype(&D::operator())> fun_traits; 511 typedef delegate<fun_traits::arity, D, T> delegate; 512 513 size_t& f = g.filled; 514 for(f = 0; f < g.elts.size(); ++f) { 515 if(delegate::call(static_cast<D*>(this), i, g.elts[f])) 516 break; 517 } 518 return f == 0; // Nothing filled -> done 519 } 520 begin()521 iterator begin() { return iterator(pool_type::begin()); } begin() const522 const iterator begin() const { return iterator(pool_type::begin()); } end() const523 const iterator end() const { return iterator(pool_type::end()); } 524 }; 525 526 template<typename D, typename T> 527 class consumer_pipe 528 : public consumer_pool<consumer_pipe<D, T>, group<T>> 529 { 530 public: 531 typedef consumer_pool<consumer_pipe, group<T>> pool_type; 532 typedef typename pool_type::size_type size_type; 533 typedef T element_type; 534 typedef pipe_output_iterator<consumer_pipe> iterator; 535 consumer_pipe(uint32_t max_active=1,uint32_t depth=10,size_type size=4* std::thread::hardware_concurrency ())536 consumer_pipe(uint32_t max_active = 1, uint32_t depth = 10, size_type size = 4 * std::thread::hardware_concurrency()) 537 : pool_type(max_active, size) 538 { 539 for(auto it = pool_type::element_begin(); it != pool_type::element_end(); ++it) { 540 it->filled = 0; 541 it->elts.resize(depth); 542 } 543 } 544 operator ()(uint32_t i,group<T> & g)545 bool operator()(uint32_t i, group<T>& g) { 546 typedef utils::function_traits<decltype(&D::operator())> fun_traits; 547 typedef delegate<fun_traits::arity, D, T> delegate; 548 549 for(uint32_t f = 0; f < g.filled; ++f) { 550 if(delegate::call(static_cast<D*>(this), i, g.elts[f])) 551 return true; // Failed -> done 552 } 553 g.filled = 0; 554 return false; 555 } 556 begin()557 iterator begin() { return iterator(pool_type::begin()); } begin() const558 const iterator begin() const { return iterator(pool_type::begin()); } end() const559 const iterator end() const { return iterator(pool_type::end()); } 560 }; 561 562 } // namespace imp 563 } // namespace thread_pipe 564 565 #endif /* __THREAD_PIPE_COOPERATIVE_POOL2_H__ */ 566