1 /*!
2  * Copyright (c) 2015 by Contributors
3  * \file concurrency.h
4  * \brief thread-safe data structures.
5  * \author Yutian Li
6  */
7 #ifndef DMLC_CONCURRENCY_H_
8 #define DMLC_CONCURRENCY_H_
9 // this code depends on c++11
10 #if DMLC_USE_CXX11
11 #include <atomic>
12 #include <deque>
13 #include <queue>
14 #include <mutex>
15 #include <vector>
16 #include <utility>
17 #include <condition_variable>
18 #include "dmlc/base.h"
19 
20 namespace dmlc {
21 
22 /*!
23  * \brief Simple userspace spinlock implementation.
24  */
25 class Spinlock {
26  public:
27 #ifdef _MSC_VER
Spinlock()28   Spinlock() {
29     lock_.clear();
30   }
31 #else
32 #pragma clang diagnostic push
33 #pragma clang diagnostic ignored "-Wbraced-scalar-init"
34   Spinlock() : lock_(ATOMIC_FLAG_INIT) {
35   }
36 #pragma clang diagnostic pop
37 #endif
38   ~Spinlock() = default;
39   /*!
40    * \brief Acquire lock.
41    */
42   inline void lock() noexcept(true);
43   /*!
44    * \brief Release lock.
45    */
46   inline void unlock() noexcept(true);
47 
48  private:
49   std::atomic_flag lock_;
50   /*!
51    * \brief Disable copy and move.
52    */
53   DISALLOW_COPY_AND_ASSIGN(Spinlock);
54 };
55 
56 /*! \brief type of concurrent queue */
57 enum class ConcurrentQueueType {
58   /*! \brief FIFO queue */
59   kFIFO,
60   /*! \brief queue with priority */
61   kPriority
62 };
63 
64 /*!
65  * \brief Cocurrent blocking queue.
66  */
67 template <typename T,
68           ConcurrentQueueType type = ConcurrentQueueType::kFIFO>
69 class ConcurrentBlockingQueue {
70  public:
71   ConcurrentBlockingQueue();
72   ~ConcurrentBlockingQueue() = default;
73   /*!
74    * \brief Push element to the end of the queue.
75    * \param e Element to push into.
76    * \param priority the priority of the element, only used for priority queue.
77    *            The higher the priority is, the better.
78    * \tparam E the element type
79    *
80    * It will copy or move the element into the queue, depending on the type of
81    * the parameter.
82    */
83   template <typename E>
84   void Push(E&& e, int priority = 0);
85 
86   /*!
87    * \brief Push element to the front of the queue. Only works for FIFO queue.
88    *        For priority queue it is the same as Push.
89    * \param e Element to push into.
90    * \param priority the priority of the element, only used for priority queue.
91    *            The higher the priority is, the better.
92    * \tparam E the element type
93    *
94    * It will copy or move the element into the queue, depending on the type of
95    * the parameter.
96    */
97   template <typename E>
98   void PushFront(E&& e, int priority = 0);
99   /*!
100    * \brief Pop element from the queue.
101    * \param rv Element popped.
102    * \return On false, the queue is exiting.
103    *
104    * The element will be copied or moved into the object passed in.
105    */
106   bool Pop(T* rv);
107   /*!
108    * \brief Signal the queue for destruction.
109    *
110    * After calling this method, all blocking pop call to the queue will return
111    * false.
112    */
113   void SignalForKill();
114   /*!
115    * \brief Get the size of the queue.
116    * \return The size of the queue.
117    */
118   size_t Size();
119 
120  private:
121   struct Entry {
122     T data;
123     int priority;
124     inline bool operator<(const Entry &b) const {
125       return priority < b.priority;
126     }
127   };
128 
129   std::mutex mutex_;
130   std::condition_variable cv_;
131   std::atomic<bool> exit_now_;
132   int nwait_consumer_;
133   // a priority queue
134   std::vector<Entry> priority_queue_;
135   // a FIFO queue
136   std::deque<T> fifo_queue_;
137   /*!
138    * \brief Disable copy and move.
139    */
140   DISALLOW_COPY_AND_ASSIGN(ConcurrentBlockingQueue);
141 };
142 
lock()143 inline void Spinlock::lock() noexcept(true) {
144   while (lock_.test_and_set(std::memory_order_acquire)) {
145   }
146 }
147 
unlock()148 inline void Spinlock::unlock() noexcept(true) {
149   lock_.clear(std::memory_order_release);
150 }
151 
152 template <typename T, ConcurrentQueueType type>
ConcurrentBlockingQueue()153 ConcurrentBlockingQueue<T, type>::ConcurrentBlockingQueue()
154     : exit_now_{false}, nwait_consumer_{0} {}
155 
156 template <typename T, ConcurrentQueueType type>
157 template <typename E>
Push(E && e,int priority)158 void ConcurrentBlockingQueue<T, type>::Push(E&& e, int priority) {
159   static_assert(std::is_same<typename std::remove_cv<
160                                  typename std::remove_reference<E>::type>::type,
161                              T>::value,
162                 "Types must match.");
163   bool notify;
164   {
165     std::lock_guard<std::mutex> lock{mutex_};
166     if (type == ConcurrentQueueType::kFIFO) {
167       fifo_queue_.emplace_back(std::forward<E>(e));
168       notify = nwait_consumer_ != 0;
169     } else {
170       Entry entry;
171       entry.data = std::move(e);
172       entry.priority = priority;
173       priority_queue_.push_back(std::move(entry));
174       std::push_heap(priority_queue_.begin(), priority_queue_.end());
175       notify = nwait_consumer_ != 0;
176     }
177   }
178   if (notify) cv_.notify_one();
179 }
180 
181 template <typename T, ConcurrentQueueType type>
182 template <typename E>
PushFront(E && e,int priority)183 void ConcurrentBlockingQueue<T, type>::PushFront(E&& e, int priority) {
184   static_assert(std::is_same<typename std::remove_cv<
185                                  typename std::remove_reference<E>::type>::type,
186                              T>::value,
187                 "Types must match.");
188   bool notify;
189   {
190     std::lock_guard<std::mutex> lock{mutex_};
191     if (type == ConcurrentQueueType::kFIFO) {
192       fifo_queue_.emplace_front(std::forward<E>(e));
193       notify = nwait_consumer_ != 0;
194     } else {
195       Entry entry;
196       entry.data = std::move(e);
197       entry.priority = priority;
198       priority_queue_.push_back(std::move(entry));
199       std::push_heap(priority_queue_.begin(), priority_queue_.end());
200       notify = nwait_consumer_ != 0;
201     }
202   }
203   if (notify) cv_.notify_one();
204 }
205 
206 template <typename T, ConcurrentQueueType type>
Pop(T * rv)207 bool ConcurrentBlockingQueue<T, type>::Pop(T* rv) {
208   std::unique_lock<std::mutex> lock{mutex_};
209   if (type == ConcurrentQueueType::kFIFO) {
210     ++nwait_consumer_;
211     cv_.wait(lock, [this] {
212         return !fifo_queue_.empty() || exit_now_.load();
213       });
214     --nwait_consumer_;
215     if (!exit_now_.load()) {
216       *rv = std::move(fifo_queue_.front());
217       fifo_queue_.pop_front();
218       return true;
219     } else {
220       return false;
221     }
222   } else {
223     ++nwait_consumer_;
224     cv_.wait(lock, [this] {
225         return !priority_queue_.empty() || exit_now_.load();
226       });
227     --nwait_consumer_;
228     if (!exit_now_.load()) {
229       std::pop_heap(priority_queue_.begin(), priority_queue_.end());
230       *rv = std::move(priority_queue_.back().data);
231       priority_queue_.pop_back();
232       return true;
233     } else {
234       return false;
235     }
236   }
237 }
238 
239 template <typename T, ConcurrentQueueType type>
SignalForKill()240 void ConcurrentBlockingQueue<T, type>::SignalForKill() {
241   {
242     std::lock_guard<std::mutex> lock{mutex_};
243     exit_now_.store(true);
244   }
245   cv_.notify_all();
246 }
247 
248 template <typename T, ConcurrentQueueType type>
Size()249 size_t ConcurrentBlockingQueue<T, type>::Size() {
250   std::lock_guard<std::mutex> lock{mutex_};
251   if (type == ConcurrentQueueType::kFIFO) {
252     return fifo_queue_.size();
253   } else {
254     return priority_queue_.size();
255   }
256 }
257 }  // namespace dmlc
258 #endif  // DMLC_USE_CXX11
259 #endif  // DMLC_CONCURRENCY_H_
260