1 /******************************************************************************* 2 * thrill/common/concurrent_queue.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 7 * 8 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 9 ******************************************************************************/ 10 11 #pragma once 12 #ifndef THRILL_COMMON_CONCURRENT_QUEUE_HEADER 13 #define THRILL_COMMON_CONCURRENT_QUEUE_HEADER 14 15 #include <atomic> 16 #include <deque> 17 #include <mutex> 18 19 namespace thrill { 20 namespace common { 21 22 /*! 23 * This is a queue, similar to std::queue and tbb::concurrent_queue, except that 24 * it uses mutexes for synchronization. 25 * 26 * StyleGuide is violated, because signatures are expected to match those of 27 * std::queue. 28 */ 29 template <typename T, typename Allocator> 30 class ConcurrentQueue 31 { 32 public: 33 using value_type = T; 34 using reference = T&; 35 using const_reference = const T&; 36 using size_type = std::size_t; 37 using difference_type = std::ptrdiff_t; 38 39 private: 40 //! the actual data queue 41 std::deque<T, Allocator> queue_; 42 43 //! the mutex to lock before accessing the queue 44 mutable std::mutex mutex_; 45 46 public: 47 //! Constructor ConcurrentQueue(const Allocator & alloc=Allocator ())48 explicit ConcurrentQueue(const Allocator& alloc = Allocator()) 49 : queue_(alloc) { } 50 51 //! Pushes a copy of source onto back of the queue. push(const T & source)52 void push(const T& source) { 53 std::unique_lock<std::mutex> lock(mutex_); 54 queue_.push_back(source); 55 } 56 57 //! Pushes given element into the queue by utilizing element's move 58 //! constructor push(T && elem)59 void push(T&& elem) { 60 std::unique_lock<std::mutex> lock(mutex_); 61 queue_.push_back(std::move(elem)); 62 } 63 64 //! Pushes a new element into the queue. The element is constructed with 65 //! given arguments. 66 template <typename... Arguments> emplace(Arguments &&...args)67 void emplace(Arguments&& ... args) { 68 std::unique_lock<std::mutex> lock(mutex_); 69 queue_.emplace_back(std::forward<Arguments>(args) ...); 70 } 71 72 //! Returns: true if queue has no items; false otherwise. empty() const73 bool empty() const { 74 std::unique_lock<std::mutex> lock(mutex_); 75 return queue_.empty(); 76 } 77 78 //! If value is available, pops it from the queue, assigns it to 79 //! destination, and destroys the original value. Otherwise does nothing. try_pop(T & destination)80 bool try_pop(T& destination) { 81 std::unique_lock<std::mutex> lock(mutex_); 82 if (queue_.empty()) 83 return false; 84 85 destination = std::move(queue_.front()); 86 queue_.pop_front(); 87 return true; 88 } 89 90 //! Clears the queue. clear()91 void clear() { 92 std::unique_lock<std::mutex> lock(mutex_); 93 queue_.clear(); 94 } 95 }; 96 97 } // namespace common 98 } // namespace thrill 99 100 #endif // !THRILL_COMMON_CONCURRENT_QUEUE_HEADER 101 102 /******************************************************************************/ 103