1 /* 2 ** Copyright (C) 2020 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> 3 ** 4 ** This program is free software; you can redistribute it and/or modify it 5 ** under the terms of the GNU General Public License as published by the 6 ** Free Software Foundation; either version 3, or (at your option) any 7 ** later version. 8 ** 9 ** This program is distributed in the hope that it will be useful, 10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of 11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 ** GNU General Public License for more details. 13 ** 14 ** You should have received a copy of the GNU General Public License 15 ** along with this program; if not, write to the Free Software Foundation, 16 ** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 17 ** 18 */ 19 20 #ifndef __MU_ASYNC_QUEUE_HH__ 21 #define __MU_ASYNC_QUEUE_HH__ 22 23 #include <deque> 24 #include <mutex> 25 #include <chrono> 26 #include <condition_variable> 27 28 namespace Mu { 29 30 constexpr std::size_t UnlimitedAsyncQueueSize{0}; 31 32 template <typename ItemType, /**< the type of Item to queue */ 33 std::size_t MaxSize = UnlimitedAsyncQueueSize, /**< maximum size for the queue */ 34 typename Allocator = std::allocator<ItemType>> /**< allocator the items */ 35 36 class AsyncQueue { 37 public: 38 using value_type = ItemType; 39 using allocator_type = Allocator; 40 using size_type = std::size_t; 41 using reference = value_type&; 42 using const_reference = const value_type&; 43 using pointer = typename std::allocator_traits<allocator_type>::pointer; 44 using const_pointer = typename std::allocator_traits<allocator_type>::const_pointer; 45 46 using Timeout = std::chrono::steady_clock::duration; 47 48 #define LOCKED std::unique_lock<std::mutex> lock(m_); 49 push(const value_type & item,Timeout timeout={})50 bool push (const value_type& item, Timeout timeout = {}) { 51 return push(std::move(value_type(item))); 52 } 53 54 /** 55 * Push an item to the end of the queue by moving it 56 * 57 * @param item the item to move to the end of the queue 58 * @param timeout and optional timeout 59 * 60 * @return true if the item was pushed; false otherwise. 61 */ push(value_type && item,Timeout timeout={})62 bool push (value_type&& item, Timeout timeout = {}) { 63 64 LOCKED; 65 66 if (!unlimited()) { __anonf686513e0102()67 const auto rv = cv_full_.wait_for(lock, timeout,[&](){ 68 return !full_unlocked();}) && !full_unlocked(); 69 if (!rv) 70 return false; 71 } 72 73 q_.emplace_back(std::move(item)); 74 lock.unlock(); 75 76 cv_empty_.notify_one(); 77 return true; 78 79 } 80 81 /** 82 * Pop an item from the queue 83 * 84 * @param receives the value if the function returns true 85 * @param timeout optional time to wait for an item to become available 86 * 87 * @return true if an item was popped (into val), false otherwise. 88 */ pop(value_type & val,Timeout timeout={})89 bool pop (value_type& val, Timeout timeout = {}) { 90 91 LOCKED; 92 93 if (timeout != Timeout{}) { __anonf686513e0202()94 const auto rv = cv_empty_.wait_for(lock, timeout,[&](){ 95 return !q_.empty(); }) && !q_.empty(); 96 if (!rv) 97 return false; 98 99 } else if (q_.empty()) 100 return false; 101 102 val = std::move(q_.front()); 103 q_.pop_front(); 104 lock.unlock(); 105 cv_full_.notify_one(); 106 107 return true; 108 } 109 110 /** 111 * Clear the queue 112 * 113 */ clear()114 void clear() { 115 LOCKED; 116 q_.clear(); 117 lock.unlock(); 118 cv_full_.notify_one(); 119 } 120 121 /** 122 * Size of the queue 123 * 124 * 125 * @return the size 126 */ size() const127 size_type size() const { 128 LOCKED; 129 return q_.size(); 130 } 131 132 /** 133 * Maximum size of the queue if specified through the template 134 * parameter; otherwise the (theoretical) max_size of the inner 135 * container. 136 * 137 * @return the maximum size 138 */ max_size() const139 size_type max_size() const { 140 if (unlimited()) 141 return q_.max_size(); 142 else 143 return MaxSize; 144 } 145 146 /** 147 * Is the queue empty? 148 * 149 * @return true or false 150 */ empty() const151 bool empty() const { 152 LOCKED; 153 return q_.empty(); 154 } 155 156 /** 157 * Is the queue full? Returns false unless a maximum size was specified 158 * (as a template argument) 159 * 160 * @return true or false. 161 */ full() const162 bool full() const { 163 if (unlimited()) 164 return false; 165 166 LOCKED; 167 return full_unlocked(); 168 } 169 170 /** 171 * Is this queue (theoretically) unlimited in size? 172 * 173 * @return true or false 174 */ unlimited()175 constexpr static bool unlimited() { 176 return MaxSize == UnlimitedAsyncQueueSize; 177 } 178 179 private: full_unlocked() const180 bool full_unlocked() const { 181 return q_.size() >= max_size(); 182 } 183 184 std::deque<ItemType, Allocator> q_; 185 mutable std::mutex m_; 186 std::condition_variable cv_full_, cv_empty_; 187 }; 188 189 } // namespace mu 190 191 #endif /* __MU_ASYNC_QUEUE_HH__ */ 192