1 /*
2 ** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
3 **
4 ** This program is free software; you can redistribute it and/or modify it
5 ** under the terms of the GNU General Public License as published by the
6 ** Free Software Foundation; either version 3, or (at your option) any
7 ** later version.
8 **
9 ** This program is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 ** GNU General Public License for more details.
13 **
14 ** You should have received a copy of the GNU General Public License
15 ** along with this program; if not, write to the Free Software Foundation,
16 ** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 **
18 */
19 
20 #ifndef __MU_ASYNC_QUEUE_HH__
21 #define __MU_ASYNC_QUEUE_HH__
22 
23 #include <deque>
24 #include <mutex>
25 #include <chrono>
26 #include <condition_variable>
27 
28 namespace Mu {
29 
30 constexpr std::size_t UnlimitedAsyncQueueSize{0};
31 
32 template <typename    ItemType, /**< the type of Item to queue */
33           std::size_t MaxSize = UnlimitedAsyncQueueSize, /**< maximum size for the queue */
34           typename Allocator  = std::allocator<ItemType>> /**< allocator the items */
35 
36 class AsyncQueue {
37 public:
38         using value_type      = ItemType;
39         using allocator_type  = Allocator;
40         using size_type       = std::size_t;
41         using reference       = value_type&;
42         using const_reference = const value_type&;
43         using pointer         = typename std::allocator_traits<allocator_type>::pointer;
44         using const_pointer   = typename std::allocator_traits<allocator_type>::const_pointer;
45 
46         using Timeout         = std::chrono::steady_clock::duration;
47 
48         #define LOCKED std::unique_lock<std::mutex> lock(m_);
49 
push(const value_type & item,Timeout timeout={})50         bool push (const value_type& item, Timeout timeout = {}) {
51                 return push(std::move(value_type(item)));
52         }
53 
54         /**
55          * Push an item to the end of the queue by moving it
56          *
57          * @param item the item to move to the end of the queue
58          * @param timeout and optional timeout
59          *
60          * @return true if the item was pushed; false otherwise.
61          */
push(value_type && item,Timeout timeout={})62         bool push (value_type&& item, Timeout timeout = {}) {
63 
64                 LOCKED;
65 
66                 if (!unlimited()) {
__anonf686513e0102()67                         const auto rv = cv_full_.wait_for(lock, timeout,[&](){
68                                  return !full_unlocked();}) && !full_unlocked();
69                         if (!rv)
70                                 return false;
71                 }
72 
73                 q_.emplace_back(std::move(item));
74                 lock.unlock();
75 
76                 cv_empty_.notify_one();
77                 return true;
78 
79         }
80 
81         /**
82          * Pop an item from the queue
83          *
84          * @param receives the value if the function returns true
85          * @param timeout optional time to wait for an item to become available
86          *
87          * @return true if an item was popped (into val), false otherwise.
88          */
pop(value_type & val,Timeout timeout={})89         bool pop (value_type& val, Timeout timeout = {}) {
90 
91                 LOCKED;
92 
93                 if (timeout != Timeout{}) {
__anonf686513e0202()94                         const auto rv = cv_empty_.wait_for(lock, timeout,[&](){
95                                  return !q_.empty(); }) && !q_.empty();
96                         if (!rv)
97                                 return false;
98 
99                 } else if (q_.empty())
100                         return false;
101 
102                 val = std::move(q_.front());
103                 q_.pop_front();
104                 lock.unlock();
105                 cv_full_.notify_one();
106 
107                 return true;
108         }
109 
110         /**
111          * Clear the queue
112          *
113          */
clear()114         void clear() {
115                 LOCKED;
116                 q_.clear();
117                 lock.unlock();
118                 cv_full_.notify_one();
119         }
120 
121         /**
122          * Size of the queue
123          *
124          *
125          * @return the size
126          */
size() const127         size_type size() const {
128                 LOCKED;
129                 return q_.size();
130         }
131 
132         /**
133          * Maximum size of the queue if specified through the template
134          * parameter; otherwise the (theoretical) max_size of the inner
135          * container.
136          *
137          * @return the maximum size
138          */
max_size() const139         size_type max_size() const {
140                 if (unlimited())
141                         return q_.max_size();
142                 else
143                         return MaxSize;
144         }
145 
146         /**
147          * Is the queue empty?
148          *
149          * @return true or false
150          */
empty() const151         bool empty() const {
152                 LOCKED;
153                 return q_.empty();
154         }
155 
156         /**
157          * Is the queue full? Returns false unless a maximum size was specified
158          * (as a template argument)
159          *
160          * @return true or false.
161          */
full() const162         bool full() const {
163                 if (unlimited())
164                         return false;
165 
166                 LOCKED;
167                 return full_unlocked();
168         }
169 
170         /**
171          * Is this queue (theoretically) unlimited in size?
172          *
173          * @return true or false
174          */
unlimited()175         constexpr static bool unlimited() {
176                 return MaxSize == UnlimitedAsyncQueueSize;
177         }
178 
179 private:
full_unlocked() const180         bool full_unlocked() const {
181                 return q_.size() >= max_size();
182         }
183 
184         std::deque<ItemType, Allocator> q_;
185         mutable std::mutex              m_;
186         std::condition_variable         cv_full_, cv_empty_;
187 };
188 
189 } // namespace mu
190 
191 #endif /* __MU_ASYNC_QUEUE_HH__ */
192