1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP 2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP 3 4 ////////////////////////////////////////////////////////////////////////////// 5 // 6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost 7 // Software License, Version 1.0. (See accompanying file 8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 // See http://www.boost.org/libs/thread for documentation. 11 // 12 ////////////////////////////////////////////////////////////////////////////// 13 14 #include <boost/thread/detail/config.hpp> 15 #include <boost/thread/condition_variable.hpp> 16 #include <boost/thread/mutex.hpp> 17 #include <boost/thread/detail/move.hpp> 18 #include <boost/throw_exception.hpp> 19 #include <boost/thread/concurrent_queues/queue_op_status.hpp> 20 21 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 22 #include <boost/smart_ptr/shared_ptr.hpp> 23 #include <boost/smart_ptr/make_shared.hpp> 24 #endif 25 #include <boost/config/abi_prefix.hpp> 26 27 namespace boost 28 { 29 namespace concurrent 30 { 31 template <typename ValueType> 32 class sync_bounded_queue 33 { 34 public: 35 typedef ValueType value_type; 36 typedef std::size_t size_type; 37 38 // Constructors/Assignment/Destructors 39 BOOST_THREAD_NO_COPYABLE(sync_bounded_queue) 40 explicit sync_bounded_queue(size_type max_elems); 41 template <typename Range> 42 sync_bounded_queue(size_type max_elems, Range range); 43 ~sync_bounded_queue(); 44 45 // Observers 46 inline bool empty() const; 47 inline bool full() const; 48 inline size_type capacity() const; 49 inline size_type size() const; 50 inline bool closed() const; 51 52 // Modifiers 53 inline void close(); 54 55 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 56 inline void push(const value_type& x); 57 inline void push(BOOST_THREAD_RV_REF(value_type) x); 58 inline bool try_push(const value_type& x); 59 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x); 60 inline bool try_push(no_block_tag, const value_type& x); 61 inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x); 62 #endif 63 inline void push_back(const value_type& x); 64 inline void push_back(BOOST_THREAD_RV_REF(value_type) x); 65 inline queue_op_status try_push_back(const value_type& x); 66 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x); 67 inline queue_op_status nonblocking_push_back(const value_type& x); 68 inline queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x); 69 inline queue_op_status wait_push_back(const value_type& x); 70 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x); 71 72 // Observers/Modifiers 73 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 74 inline void pull(value_type&); 75 // enable_if is_nothrow_copy_movable<value_type> 76 inline value_type pull(); 77 inline shared_ptr<ValueType> ptr_pull(); 78 inline bool try_pull(value_type&); 79 inline bool try_pull(no_block_tag,value_type&); 80 inline shared_ptr<ValueType> try_pull(); 81 #endif 82 inline void pull_front(value_type&); 83 // enable_if is_nothrow_copy_movable<value_type> 84 inline value_type pull_front(); 85 inline queue_op_status try_pull_front(value_type&); 86 inline queue_op_status nonblocking_pull_front(value_type&); 87 88 inline queue_op_status wait_pull_front(ValueType& elem); 89 90 private: 91 mutable mutex mtx_; 92 condition_variable not_empty_; 93 condition_variable not_full_; 94 size_type waiting_full_; 95 size_type waiting_empty_; 96 value_type* data_; 97 size_type in_; 98 size_type out_; 99 size_type capacity_; 100 bool closed_; 101 inc(size_type idx) const102 inline size_type inc(size_type idx) const BOOST_NOEXCEPT 103 { 104 return (idx + 1) % capacity_; 105 } 106 empty(unique_lock<mutex> &) const107 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT 108 { 109 return in_ == out_; 110 } empty(lock_guard<mutex> &) const111 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT 112 { 113 return in_ == out_; 114 } full(unique_lock<mutex> &) const115 inline bool full(unique_lock<mutex>& ) const BOOST_NOEXCEPT 116 { 117 return (inc(in_) == out_); 118 } full(lock_guard<mutex> &) const119 inline bool full(lock_guard<mutex>& ) const BOOST_NOEXCEPT 120 { 121 return (inc(in_) == out_); 122 } capacity(lock_guard<mutex> &) const123 inline size_type capacity(lock_guard<mutex>& ) const BOOST_NOEXCEPT 124 { 125 return capacity_-1; 126 } size(lock_guard<mutex> & lk) const127 inline size_type size(lock_guard<mutex>& lk) const BOOST_NOEXCEPT 128 { 129 if (full(lk)) return capacity(lk); 130 return ((in_+capacity(lk)-out_) % capacity(lk)); 131 } 132 133 inline void throw_if_closed(unique_lock<mutex>&); 134 inline bool closed(unique_lock<mutex>&) const; 135 136 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 137 inline bool try_pull(value_type& x, unique_lock<mutex>& lk); 138 inline shared_ptr<value_type> try_pull(unique_lock<mutex>& lk); 139 inline bool try_push(const value_type& x, unique_lock<mutex>& lk); 140 inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk); 141 #endif 142 inline queue_op_status try_pull_front(value_type& x, unique_lock<mutex>& lk); 143 inline queue_op_status try_push_back(const value_type& x, unique_lock<mutex>& lk); 144 inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk); 145 146 inline queue_op_status wait_pull_front(value_type& x, unique_lock<mutex>& lk); 147 inline queue_op_status wait_push_back(const value_type& x, unique_lock<mutex>& lk); 148 inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk); 149 150 inline void wait_until_not_empty(unique_lock<mutex>& lk); 151 inline void wait_until_not_empty(unique_lock<mutex>& lk, bool&); 152 inline size_type wait_until_not_full(unique_lock<mutex>& lk); 153 inline size_type wait_until_not_full(unique_lock<mutex>& lk, bool&); 154 155 notify_not_empty_if_needed(unique_lock<mutex> & lk)156 inline void notify_not_empty_if_needed(unique_lock<mutex>& lk) 157 { 158 if (waiting_empty_ > 0) 159 { 160 --waiting_empty_; 161 lk.unlock(); 162 not_empty_.notify_one(); 163 } 164 } notify_not_full_if_needed(unique_lock<mutex> & lk)165 inline void notify_not_full_if_needed(unique_lock<mutex>& lk) 166 { 167 if (waiting_full_ > 0) 168 { 169 --waiting_full_; 170 lk.unlock(); 171 not_full_.notify_one(); 172 } 173 } 174 175 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD pull(value_type & elem,unique_lock<mutex> & lk)176 inline void pull(value_type& elem, unique_lock<mutex>& lk) 177 { 178 elem = boost::move(data_[out_]); 179 out_ = inc(out_); 180 notify_not_full_if_needed(lk); 181 } pull(unique_lock<mutex> & lk)182 inline value_type pull(unique_lock<mutex>& lk) 183 { 184 value_type elem = boost::move(data_[out_]); 185 out_ = inc(out_); 186 notify_not_full_if_needed(lk); 187 return boost::move(elem); 188 } ptr_pull(unique_lock<mutex> & lk)189 inline boost::shared_ptr<value_type> ptr_pull(unique_lock<mutex>& lk) 190 { 191 shared_ptr<value_type> res = make_shared<value_type>(boost::move(data_[out_])); 192 out_ = inc(out_); 193 notify_not_full_if_needed(lk); 194 return res; 195 } 196 #endif pull_front(value_type & elem,unique_lock<mutex> & lk)197 inline void pull_front(value_type& elem, unique_lock<mutex>& lk) 198 { 199 elem = boost::move(data_[out_]); 200 out_ = inc(out_); 201 notify_not_full_if_needed(lk); 202 } pull_front(unique_lock<mutex> & lk)203 inline value_type pull_front(unique_lock<mutex>& lk) 204 { 205 value_type elem = boost::move(data_[out_]); 206 out_ = inc(out_); 207 notify_not_full_if_needed(lk); 208 return boost::move(elem); 209 } 210 set_in(size_type in,unique_lock<mutex> & lk)211 inline void set_in(size_type in, unique_lock<mutex>& lk) 212 { 213 in_ = in; 214 notify_not_empty_if_needed(lk); 215 } 216 push_at(const value_type & elem,size_type in_p_1,unique_lock<mutex> & lk)217 inline void push_at(const value_type& elem, size_type in_p_1, unique_lock<mutex>& lk) 218 { 219 data_[in_] = elem; 220 set_in(in_p_1, lk); 221 } 222 push_at(BOOST_THREAD_RV_REF (value_type)elem,size_type in_p_1,unique_lock<mutex> & lk)223 inline void push_at(BOOST_THREAD_RV_REF(value_type) elem, size_type in_p_1, unique_lock<mutex>& lk) 224 { 225 data_[in_] = boost::move(elem); 226 set_in(in_p_1, lk); 227 } 228 }; 229 230 template <typename ValueType> sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems)231 sync_bounded_queue<ValueType>::sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems) : 232 waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1), 233 closed_(false) 234 { 235 BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1"); 236 } 237 238 // template <typename ValueType> 239 // template <typename Range> 240 // sync_bounded_queue<ValueType>::sync_bounded_queue(size_type max_elems, Range range) : 241 // waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1), 242 // closed_(false) 243 // { 244 // BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1"); 245 // BOOST_ASSERT_MSG(max_elems == size(range), "number of elements must match range's size"); 246 // try 247 // { 248 // typedef typename Range::iterator iterator_t; 249 // iterator_t first = boost::begin(range); 250 // iterator_t end = boost::end(range); 251 // size_type in = 0; 252 // for (iterator_t cur = first; cur != end; ++cur, ++in) 253 // { 254 // data_[in] = *cur; 255 // } 256 // set_in(in); 257 // } 258 // catch (...) 259 // { 260 // delete[] data_; 261 // } 262 // } 263 264 template <typename ValueType> ~sync_bounded_queue()265 sync_bounded_queue<ValueType>::~sync_bounded_queue() 266 { 267 delete[] data_; 268 } 269 270 template <typename ValueType> close()271 void sync_bounded_queue<ValueType>::close() 272 { 273 { 274 lock_guard<mutex> lk(mtx_); 275 closed_ = true; 276 } 277 not_empty_.notify_all(); 278 not_full_.notify_all(); 279 } 280 281 template <typename ValueType> closed() const282 bool sync_bounded_queue<ValueType>::closed() const 283 { 284 lock_guard<mutex> lk(mtx_); 285 return closed_; 286 } 287 template <typename ValueType> closed(unique_lock<mutex> &) const288 bool sync_bounded_queue<ValueType>::closed(unique_lock<mutex>& ) const 289 { 290 return closed_; 291 } 292 293 template <typename ValueType> empty() const294 bool sync_bounded_queue<ValueType>::empty() const 295 { 296 lock_guard<mutex> lk(mtx_); 297 return empty(lk); 298 } 299 template <typename ValueType> full() const300 bool sync_bounded_queue<ValueType>::full() const 301 { 302 lock_guard<mutex> lk(mtx_); 303 return full(lk); 304 } 305 306 template <typename ValueType> capacity() const307 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::capacity() const 308 { 309 lock_guard<mutex> lk(mtx_); 310 return capacity(lk); 311 } 312 313 template <typename ValueType> size() const314 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::size() const 315 { 316 lock_guard<mutex> lk(mtx_); 317 return size(lk); 318 } 319 320 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 321 template <typename ValueType> try_pull(ValueType & elem,unique_lock<mutex> & lk)322 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem, unique_lock<mutex>& lk) 323 { 324 if (empty(lk)) 325 { 326 throw_if_closed(lk); 327 return false; 328 } 329 pull(elem, lk); 330 return true; 331 } 332 template <typename ValueType> try_pull(unique_lock<mutex> & lk)333 shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull(unique_lock<mutex>& lk) 334 { 335 if (empty(lk)) 336 { 337 throw_if_closed(lk); 338 return shared_ptr<ValueType>(); 339 } 340 return ptr_pull(lk); 341 } 342 template <typename ValueType> try_pull(ValueType & elem)343 bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem) 344 { 345 unique_lock<mutex> lk(mtx_); 346 return try_pull(elem, lk); 347 } 348 #endif 349 350 template <typename ValueType> try_pull_front(ValueType & elem,unique_lock<mutex> & lk)351 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem, unique_lock<mutex>& lk) 352 { 353 if (empty(lk)) 354 { 355 if (closed(lk)) return queue_op_status::closed; 356 return queue_op_status::empty; 357 } 358 pull_front(elem, lk); 359 return queue_op_status::success; 360 } 361 362 template <typename ValueType> try_pull_front(ValueType & elem)363 queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem) 364 { 365 unique_lock<mutex> lk(mtx_); 366 return try_pull_front(elem, lk); 367 } 368 369 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 370 template <typename ValueType> try_pull(no_block_tag,ValueType & elem)371 bool sync_bounded_queue<ValueType>::try_pull(no_block_tag,ValueType& elem) 372 { 373 unique_lock<mutex> lk(mtx_, try_to_lock); 374 if (!lk.owns_lock()) 375 { 376 return false; 377 } 378 return try_pull(elem, lk); 379 } 380 template <typename ValueType> try_pull()381 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull() 382 { 383 unique_lock<mutex> lk(mtx_); 384 return try_pull(lk); 385 } 386 #endif 387 388 template <typename ValueType> nonblocking_pull_front(ValueType & elem)389 queue_op_status sync_bounded_queue<ValueType>::nonblocking_pull_front(ValueType& elem) 390 { 391 unique_lock<mutex> lk(mtx_, try_to_lock); 392 if (!lk.owns_lock()) 393 { 394 return queue_op_status::busy; 395 } 396 return try_pull_front(elem, lk); 397 } 398 399 template <typename ValueType> throw_if_closed(unique_lock<mutex> &)400 void sync_bounded_queue<ValueType>::throw_if_closed(unique_lock<mutex>&) 401 { 402 if (closed_) 403 { 404 BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); 405 } 406 } 407 408 template <typename ValueType> wait_until_not_empty(unique_lock<mutex> & lk)409 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk) 410 { 411 for (;;) 412 { 413 if (out_ != in_) break; 414 throw_if_closed(lk); 415 ++waiting_empty_; 416 not_empty_.wait(lk); 417 } 418 } 419 template <typename ValueType> wait_until_not_empty(unique_lock<mutex> & lk,bool & closed)420 void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk, bool & closed) 421 { 422 for (;;) 423 { 424 if (out_ != in_) break; 425 if (closed_) {closed=true; return;} 426 ++waiting_empty_; 427 not_empty_.wait(lk); 428 } 429 } 430 431 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 432 template <typename ValueType> pull(ValueType & elem)433 void sync_bounded_queue<ValueType>::pull(ValueType& elem) 434 { 435 unique_lock<mutex> lk(mtx_); 436 wait_until_not_empty(lk); 437 pull(elem, lk); 438 } 439 // template <typename ValueType> 440 // void sync_bounded_queue<ValueType>::pull(ValueType& elem, bool & closed) 441 // { 442 // unique_lock<mutex> lk(mtx_); 443 // wait_until_not_empty(lk, closed); 444 // if (closed) {return;} 445 // pull(elem, lk); 446 // } 447 448 // enable if ValueType is nothrow movable 449 template <typename ValueType> pull()450 ValueType sync_bounded_queue<ValueType>::pull() 451 { 452 unique_lock<mutex> lk(mtx_); 453 wait_until_not_empty(lk); 454 return pull(lk); 455 } 456 template <typename ValueType> ptr_pull()457 boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::ptr_pull() 458 { 459 unique_lock<mutex> lk(mtx_); 460 wait_until_not_empty(lk); 461 return ptr_pull(lk); 462 } 463 464 #endif 465 466 template <typename ValueType> pull_front(ValueType & elem)467 void sync_bounded_queue<ValueType>::pull_front(ValueType& elem) 468 { 469 unique_lock<mutex> lk(mtx_); 470 wait_until_not_empty(lk); 471 pull_front(elem, lk); 472 } 473 474 // enable if ValueType is nothrow movable 475 template <typename ValueType> pull_front()476 ValueType sync_bounded_queue<ValueType>::pull_front() 477 { 478 unique_lock<mutex> lk(mtx_); 479 wait_until_not_empty(lk); 480 return pull_front(lk); 481 } 482 483 template <typename ValueType> wait_pull_front(ValueType & elem,unique_lock<mutex> & lk)484 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem, unique_lock<mutex>& lk) 485 { 486 if (empty(lk) && closed(lk)) {return queue_op_status::closed;} 487 bool is_closed = false; 488 wait_until_not_empty(lk, is_closed); 489 if (is_closed) {return queue_op_status::closed;} 490 pull_front(elem, lk); 491 return queue_op_status::success; 492 } 493 template <typename ValueType> wait_pull_front(ValueType & elem)494 queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem) 495 { 496 unique_lock<mutex> lk(mtx_); 497 return wait_pull_front(elem, lk); 498 } 499 500 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 501 template <typename ValueType> try_push(const ValueType & elem,unique_lock<mutex> & lk)502 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem, unique_lock<mutex>& lk) 503 { 504 throw_if_closed(lk); 505 size_type in_p_1 = inc(in_); 506 if (in_p_1 == out_) // full() 507 { 508 return false; 509 } 510 push_at(elem, in_p_1, lk); 511 return true; 512 } 513 template <typename ValueType> try_push(const ValueType & elem)514 bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem) 515 { 516 unique_lock<mutex> lk(mtx_); 517 return try_push(elem, lk); 518 } 519 520 #endif 521 522 template <typename ValueType> try_push_back(const ValueType & elem,unique_lock<mutex> & lk)523 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem, unique_lock<mutex>& lk) 524 { 525 if (closed(lk)) return queue_op_status::closed; 526 size_type in_p_1 = inc(in_); 527 if (in_p_1 == out_) // full() 528 { 529 return queue_op_status::full; 530 } 531 push_at(elem, in_p_1, lk); 532 return queue_op_status::success; 533 } 534 535 template <typename ValueType> try_push_back(const ValueType & elem)536 queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem) 537 { 538 unique_lock<mutex> lk(mtx_); 539 return try_push_back(elem, lk); 540 } 541 542 template <typename ValueType> wait_push_back(const ValueType & elem,unique_lock<mutex> & lk)543 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem, unique_lock<mutex>& lk) 544 { 545 if (closed(lk)) return queue_op_status::closed; 546 push_at(elem, wait_until_not_full(lk), lk); 547 return queue_op_status::success; 548 } 549 template <typename ValueType> wait_push_back(const ValueType & elem)550 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem) 551 { 552 unique_lock<mutex> lk(mtx_); 553 return wait_push_back(elem, lk); 554 } 555 556 557 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 558 template <typename ValueType> try_push(no_block_tag,const ValueType & elem)559 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, const ValueType& elem) 560 { 561 unique_lock<mutex> lk(mtx_, try_to_lock); 562 if (!lk.owns_lock()) return false; 563 return try_push(elem, lk); 564 } 565 #endif 566 567 template <typename ValueType> nonblocking_push_back(const ValueType & elem)568 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(const ValueType& elem) 569 { 570 unique_lock<mutex> lk(mtx_, try_to_lock); 571 if (!lk.owns_lock()) return queue_op_status::busy; 572 return try_push_back(elem, lk); 573 } 574 575 template <typename ValueType> wait_until_not_full(unique_lock<mutex> & lk)576 typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::wait_until_not_full(unique_lock<mutex>& lk) 577 { 578 for (;;) 579 { 580 throw_if_closed(lk); 581 size_type in_p_1 = inc(in_); 582 if (in_p_1 != out_) // ! full() 583 { 584 return in_p_1; 585 } 586 ++waiting_full_; 587 not_full_.wait(lk); 588 } 589 } 590 591 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 592 template <typename ValueType> push(const ValueType & elem)593 void sync_bounded_queue<ValueType>::push(const ValueType& elem) 594 { 595 unique_lock<mutex> lk(mtx_); 596 push_at(elem, wait_until_not_full(lk), lk); 597 } 598 #endif 599 template <typename ValueType> push_back(const ValueType & elem)600 void sync_bounded_queue<ValueType>::push_back(const ValueType& elem) 601 { 602 unique_lock<mutex> lk(mtx_); 603 push_at(elem, wait_until_not_full(lk), lk); 604 } 605 606 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 607 template <typename ValueType> try_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)608 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk) 609 { 610 throw_if_closed(lk); 611 size_type in_p_1 = inc(in_); 612 if (in_p_1 == out_) // full() 613 { 614 return false; 615 } 616 push_at(boost::move(elem), in_p_1, lk); 617 return true; 618 } 619 620 template <typename ValueType> try_push(BOOST_THREAD_RV_REF (ValueType)elem)621 bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem) 622 { 623 unique_lock<mutex> lk(mtx_); 624 return try_push(boost::move(elem), lk); 625 } 626 #endif 627 628 template <typename ValueType> try_push_back(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)629 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk) 630 { 631 if (closed(lk)) return queue_op_status::closed; 632 size_type in_p_1 = inc(in_); 633 if (in_p_1 == out_) // full() 634 { 635 return queue_op_status::full; 636 } 637 push_at(boost::move(elem), in_p_1, lk); 638 return queue_op_status::success; 639 } 640 template <typename ValueType> try_push_back(BOOST_THREAD_RV_REF (ValueType)elem)641 queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem) 642 { 643 unique_lock<mutex> lk(mtx_); 644 return try_push_back(boost::move(elem), lk); 645 } 646 647 template <typename ValueType> wait_push_back(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)648 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk) 649 { 650 if (closed(lk)) return queue_op_status::closed; 651 push_at(boost::move(elem), wait_until_not_full(lk), lk); 652 return queue_op_status::success; 653 } 654 template <typename ValueType> wait_push_back(BOOST_THREAD_RV_REF (ValueType)elem)655 queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem) 656 { 657 unique_lock<mutex> lk(mtx_); 658 return try_push_back(boost::move(elem), lk); 659 } 660 661 662 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 663 template <typename ValueType> try_push(no_block_tag,BOOST_THREAD_RV_REF (ValueType)elem)664 bool sync_bounded_queue<ValueType>::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem) 665 { 666 unique_lock<mutex> lk(mtx_, try_to_lock); 667 if (!lk.owns_lock()) 668 { 669 return false; 670 } 671 return try_push(boost::move(elem), lk); 672 } 673 #endif 674 template <typename ValueType> nonblocking_push_back(BOOST_THREAD_RV_REF (ValueType)elem)675 queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(BOOST_THREAD_RV_REF(ValueType) elem) 676 { 677 unique_lock<mutex> lk(mtx_, try_to_lock); 678 if (!lk.owns_lock()) 679 { 680 return queue_op_status::busy; 681 } 682 return try_push_back(boost::move(elem), lk); 683 } 684 685 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD 686 template <typename ValueType> push(BOOST_THREAD_RV_REF (ValueType)elem)687 void sync_bounded_queue<ValueType>::push(BOOST_THREAD_RV_REF(ValueType) elem) 688 { 689 unique_lock<mutex> lk(mtx_); 690 push_at(boost::move(elem), wait_until_not_full(lk), lk); 691 } 692 #endif 693 template <typename ValueType> push_back(BOOST_THREAD_RV_REF (ValueType)elem)694 void sync_bounded_queue<ValueType>::push_back(BOOST_THREAD_RV_REF(ValueType) elem) 695 { 696 unique_lock<mutex> lk(mtx_); 697 push_at(boost::move(elem), wait_until_not_full(lk), lk); 698 } 699 700 template <typename ValueType> operator <<(sync_bounded_queue<ValueType> & sbq,BOOST_THREAD_RV_REF (ValueType)elem)701 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, BOOST_THREAD_RV_REF(ValueType) elem) 702 { 703 sbq.push_back(boost::move(elem)); 704 return sbq; 705 } 706 707 template <typename ValueType> operator <<(sync_bounded_queue<ValueType> & sbq,ValueType const & elem)708 sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem) 709 { 710 sbq.push_back(elem); 711 return sbq; 712 } 713 714 template <typename ValueType> operator >>(sync_bounded_queue<ValueType> & sbq,ValueType & elem)715 sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem) 716 { 717 sbq.pull_front(elem); 718 return sbq; 719 } 720 } 721 using concurrent::sync_bounded_queue; 722 723 } 724 725 #include <boost/config/abi_suffix.hpp> 726 727 #endif 728