1 /*!
2 * Copyright (c) 2015 by Contributors
3 * \file concurrency.h
4 * \brief thread-safe data structures.
5 * \author Yutian Li
6 */
7 #ifndef DMLC_CONCURRENCY_H_
8 #define DMLC_CONCURRENCY_H_
9 // this code depends on c++11
10 #if DMLC_USE_CXX11
11 #include <atomic>
12 #include <deque>
13 #include <queue>
14 #include <mutex>
15 #include <vector>
16 #include <utility>
17 #include <condition_variable>
18 #include "dmlc/base.h"
19
20 namespace dmlc {
21
22 /*!
23 * \brief Simple userspace spinlock implementation.
24 */
25 class Spinlock {
26 public:
27 #ifdef _MSC_VER
Spinlock()28 Spinlock() {
29 lock_.clear();
30 }
31 #else
32 #pragma clang diagnostic push
33 #pragma clang diagnostic ignored "-Wbraced-scalar-init"
34 Spinlock() : lock_(ATOMIC_FLAG_INIT) {
35 }
36 #pragma clang diagnostic pop
37 #endif
38 ~Spinlock() = default;
39 /*!
40 * \brief Acquire lock.
41 */
42 inline void lock() noexcept(true);
43 /*!
44 * \brief Release lock.
45 */
46 inline void unlock() noexcept(true);
47
48 private:
49 std::atomic_flag lock_;
50 /*!
51 * \brief Disable copy and move.
52 */
53 DISALLOW_COPY_AND_ASSIGN(Spinlock);
54 };
55
56 /*! \brief type of concurrent queue */
57 enum class ConcurrentQueueType {
58 /*! \brief FIFO queue */
59 kFIFO,
60 /*! \brief queue with priority */
61 kPriority
62 };
63
64 /*!
65 * \brief Cocurrent blocking queue.
66 */
67 template <typename T,
68 ConcurrentQueueType type = ConcurrentQueueType::kFIFO>
69 class ConcurrentBlockingQueue {
70 public:
71 ConcurrentBlockingQueue();
72 ~ConcurrentBlockingQueue() = default;
73 /*!
74 * \brief Push element to the end of the queue.
75 * \param e Element to push into.
76 * \param priority the priority of the element, only used for priority queue.
77 * The higher the priority is, the better.
78 * \tparam E the element type
79 *
80 * It will copy or move the element into the queue, depending on the type of
81 * the parameter.
82 */
83 template <typename E>
84 void Push(E&& e, int priority = 0);
85
86 /*!
87 * \brief Push element to the front of the queue. Only works for FIFO queue.
88 * For priority queue it is the same as Push.
89 * \param e Element to push into.
90 * \param priority the priority of the element, only used for priority queue.
91 * The higher the priority is, the better.
92 * \tparam E the element type
93 *
94 * It will copy or move the element into the queue, depending on the type of
95 * the parameter.
96 */
97 template <typename E>
98 void PushFront(E&& e, int priority = 0);
99 /*!
100 * \brief Pop element from the queue.
101 * \param rv Element popped.
102 * \return On false, the queue is exiting.
103 *
104 * The element will be copied or moved into the object passed in.
105 */
106 bool Pop(T* rv);
107 /*!
108 * \brief Signal the queue for destruction.
109 *
110 * After calling this method, all blocking pop call to the queue will return
111 * false.
112 */
113 void SignalForKill();
114 /*!
115 * \brief Get the size of the queue.
116 * \return The size of the queue.
117 */
118 size_t Size();
119
120 private:
121 struct Entry {
122 T data;
123 int priority;
124 inline bool operator<(const Entry &b) const {
125 return priority < b.priority;
126 }
127 };
128
129 std::mutex mutex_;
130 std::condition_variable cv_;
131 std::atomic<bool> exit_now_;
132 int nwait_consumer_;
133 // a priority queue
134 std::vector<Entry> priority_queue_;
135 // a FIFO queue
136 std::deque<T> fifo_queue_;
137 /*!
138 * \brief Disable copy and move.
139 */
140 DISALLOW_COPY_AND_ASSIGN(ConcurrentBlockingQueue);
141 };
142
lock()143 inline void Spinlock::lock() noexcept(true) {
144 while (lock_.test_and_set(std::memory_order_acquire)) {
145 }
146 }
147
unlock()148 inline void Spinlock::unlock() noexcept(true) {
149 lock_.clear(std::memory_order_release);
150 }
151
152 template <typename T, ConcurrentQueueType type>
ConcurrentBlockingQueue()153 ConcurrentBlockingQueue<T, type>::ConcurrentBlockingQueue()
154 : exit_now_{false}, nwait_consumer_{0} {}
155
156 template <typename T, ConcurrentQueueType type>
157 template <typename E>
Push(E && e,int priority)158 void ConcurrentBlockingQueue<T, type>::Push(E&& e, int priority) {
159 static_assert(std::is_same<typename std::remove_cv<
160 typename std::remove_reference<E>::type>::type,
161 T>::value,
162 "Types must match.");
163 bool notify;
164 {
165 std::lock_guard<std::mutex> lock{mutex_};
166 if (type == ConcurrentQueueType::kFIFO) {
167 fifo_queue_.emplace_back(std::forward<E>(e));
168 notify = nwait_consumer_ != 0;
169 } else {
170 Entry entry;
171 entry.data = std::move(e);
172 entry.priority = priority;
173 priority_queue_.push_back(std::move(entry));
174 std::push_heap(priority_queue_.begin(), priority_queue_.end());
175 notify = nwait_consumer_ != 0;
176 }
177 }
178 if (notify) cv_.notify_one();
179 }
180
181 template <typename T, ConcurrentQueueType type>
182 template <typename E>
PushFront(E && e,int priority)183 void ConcurrentBlockingQueue<T, type>::PushFront(E&& e, int priority) {
184 static_assert(std::is_same<typename std::remove_cv<
185 typename std::remove_reference<E>::type>::type,
186 T>::value,
187 "Types must match.");
188 bool notify;
189 {
190 std::lock_guard<std::mutex> lock{mutex_};
191 if (type == ConcurrentQueueType::kFIFO) {
192 fifo_queue_.emplace_front(std::forward<E>(e));
193 notify = nwait_consumer_ != 0;
194 } else {
195 Entry entry;
196 entry.data = std::move(e);
197 entry.priority = priority;
198 priority_queue_.push_back(std::move(entry));
199 std::push_heap(priority_queue_.begin(), priority_queue_.end());
200 notify = nwait_consumer_ != 0;
201 }
202 }
203 if (notify) cv_.notify_one();
204 }
205
206 template <typename T, ConcurrentQueueType type>
Pop(T * rv)207 bool ConcurrentBlockingQueue<T, type>::Pop(T* rv) {
208 std::unique_lock<std::mutex> lock{mutex_};
209 if (type == ConcurrentQueueType::kFIFO) {
210 ++nwait_consumer_;
211 cv_.wait(lock, [this] {
212 return !fifo_queue_.empty() || exit_now_.load();
213 });
214 --nwait_consumer_;
215 if (!exit_now_.load()) {
216 *rv = std::move(fifo_queue_.front());
217 fifo_queue_.pop_front();
218 return true;
219 } else {
220 return false;
221 }
222 } else {
223 ++nwait_consumer_;
224 cv_.wait(lock, [this] {
225 return !priority_queue_.empty() || exit_now_.load();
226 });
227 --nwait_consumer_;
228 if (!exit_now_.load()) {
229 std::pop_heap(priority_queue_.begin(), priority_queue_.end());
230 *rv = std::move(priority_queue_.back().data);
231 priority_queue_.pop_back();
232 return true;
233 } else {
234 return false;
235 }
236 }
237 }
238
239 template <typename T, ConcurrentQueueType type>
SignalForKill()240 void ConcurrentBlockingQueue<T, type>::SignalForKill() {
241 {
242 std::lock_guard<std::mutex> lock{mutex_};
243 exit_now_.store(true);
244 }
245 cv_.notify_all();
246 }
247
248 template <typename T, ConcurrentQueueType type>
Size()249 size_t ConcurrentBlockingQueue<T, type>::Size() {
250 std::lock_guard<std::mutex> lock{mutex_};
251 if (type == ConcurrentQueueType::kFIFO) {
252 return fifo_queue_.size();
253 } else {
254 return priority_queue_.size();
255 }
256 }
257 } // namespace dmlc
258 #endif // DMLC_USE_CXX11
259 #endif // DMLC_CONCURRENCY_H_
260