1 /*******************************************************************************
2  * thrill/common/concurrent_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_COMMON_CONCURRENT_QUEUE_HEADER
13 #define THRILL_COMMON_CONCURRENT_QUEUE_HEADER
14 
15 #include <atomic>
16 #include <deque>
17 #include <mutex>
18 
19 namespace thrill {
20 namespace common {
21 
22 /*!
23  * This is a queue, similar to std::queue and tbb::concurrent_queue, except that
24  * it uses mutexes for synchronization.
25  *
26  * StyleGuide is violated, because signatures are expected to match those of
27  * std::queue.
28  */
29 template <typename T, typename Allocator>
30 class ConcurrentQueue
31 {
32 public:
33     using value_type = T;
34     using reference = T&;
35     using const_reference = const T&;
36     using size_type = std::size_t;
37     using difference_type = std::ptrdiff_t;
38 
39 private:
40     //! the actual data queue
41     std::deque<T, Allocator> queue_;
42 
43     //! the mutex to lock before accessing the queue
44     mutable std::mutex mutex_;
45 
46 public:
47     //! Constructor
ConcurrentQueue(const Allocator & alloc=Allocator ())48     explicit ConcurrentQueue(const Allocator& alloc = Allocator())
49         : queue_(alloc) { }
50 
51     //! Pushes a copy of source onto back of the queue.
push(const T & source)52     void push(const T& source) {
53         std::unique_lock<std::mutex> lock(mutex_);
54         queue_.push_back(source);
55     }
56 
57     //! Pushes given element into the queue by utilizing element's move
58     //! constructor
push(T && elem)59     void push(T&& elem) {
60         std::unique_lock<std::mutex> lock(mutex_);
61         queue_.push_back(std::move(elem));
62     }
63 
64     //! Pushes a new element into the queue. The element is constructed with
65     //! given arguments.
66     template <typename... Arguments>
emplace(Arguments &&...args)67     void emplace(Arguments&& ... args) {
68         std::unique_lock<std::mutex> lock(mutex_);
69         queue_.emplace_back(std::forward<Arguments>(args) ...);
70     }
71 
72     //! Returns: true if queue has no items; false otherwise.
empty() const73     bool empty() const {
74         std::unique_lock<std::mutex> lock(mutex_);
75         return queue_.empty();
76     }
77 
78     //! If value is available, pops it from the queue, assigns it to
79     //! destination, and destroys the original value. Otherwise does nothing.
try_pop(T & destination)80     bool try_pop(T& destination) {
81         std::unique_lock<std::mutex> lock(mutex_);
82         if (queue_.empty())
83             return false;
84 
85         destination = std::move(queue_.front());
86         queue_.pop_front();
87         return true;
88     }
89 
90     //! Clears the queue.
clear()91     void clear() {
92         std::unique_lock<std::mutex> lock(mutex_);
93         queue_.clear();
94     }
95 };
96 
97 } // namespace common
98 } // namespace thrill
99 
100 #endif // !THRILL_COMMON_CONCURRENT_QUEUE_HEADER
101 
102 /******************************************************************************/
103